Text-to-SQL lets users query databases in plain English. Instead of writing SELECT AVG(salary) FROM employees JOIN dept_emp ON ..., a user types "What is the average salary in the Sales department?" and the agent figures out the rest. This lesson builds that agent with a safety-first approach: queries are validated before execution, destructive operations are blocked, and failed queries are automatically corrected and retried.
The dataset is the employees database — a SQLite port of the classic MySQL employees test database with 300,024 employee records across 6 tables.
Prerequisites: langchain, langchain-ollama, langchain-community, langchain-core, python-dotenv installed. Ollama running with qwen3. The employees_db-full-1.0.6.db SQLite file in a db/ subdirectory.
Step 1: Database Setup
import re
from langchain_ollama import ChatOllama
from langchain_community.utilities import SQLDatabase
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage
from langchain.agents import create_agent
from dotenv import load_dotenv
load_dotenv()
True
Connect to the SQLite employees database:
print("Setting up employees database...")
db = SQLDatabase.from_uri("sqlite:///db/employees_db-full-1.0.6.db")
try:
tables = db.get_usable_table_names()
print(f"✔ Database connected successfully")
print(f"✔ Found {len(tables)} tables: {', '.join(tables)}")
except Exception as e:
print(f"✘ Database connection failed: {e}")
SCHEMA = db.get_table_info()
print("✔ Connected to employees database")
Setting up employees database...
✔ Database connected successfully
✔ Found 6 tables: departments, dept_emp, dept_manager, employees, salaries, titles
✔ Connected to employees database
tables
['departments', 'dept_emp', 'dept_manager', 'employees', 'salaries', 'titles']
Verify with raw SQL:
db.run("SELECT name FROM sqlite_master WHERE type='table';")
"[('departments',), ('dept_emp',), ('dept_manager',), ('employees',), ('salaries',), ('titles',)]"
db.run("select count(*) from employees")
'[(300024,)]'
300,024 employee records. SCHEMA (from db.get_table_info()) contains CREATE TABLE statements with sample rows — it is passed to the LLM for context when generating queries.
Step 2: Models Setup
llm = ChatOllama(
model="qwen3",
base_url="http://localhost:11434",
temperature=0
)
response = llm.invoke("Hello, how are you?")
response.pretty_print()
print("✔ Initialized Ollama chat model")
================================== Ai Message ==================================
Hello! I'm just a chatbot, so I don't have feelings, but I'm here and ready to help! 😊 How can I assist you today?
✔ Initialized Ollama chat model
temperature=0 is essential for SQL generation — deterministic output produces consistent, reproducible queries.
Step 3: SQL Tools
Tool 1 — get_database_schema
The first tool the agent calls for any new question. It retrieves the schema for a specific table or all tables at once:
@tool
def get_database_schema(table_name: str = None) -> str:
"""Get database schema information for SQL query generation.
Use this first to understand table structure before creating queries."""
print(f"🔍 Getting schema for: {table_name if table_name else 'all tables'}")
if table_name:
try:
tables = db.get_usable_table_names()
if table_name.lower() in [t.lower() for t in tables]:
result = db.get_table_info([table_name])
print(f"✔ Retrieved schema for table: {table_name}")
return result
else:
return f"Error: Table '{table_name}' not found. Available tables: {', '.join(tables)}"
except Exception as e:
return f"Error getting table info: {e}"
else:
print("✔ Retrieved full database schema")
return SCHEMA
Test it:
result = get_database_schema.invoke({'table_name': 'departments'})
print(result)
🔍 Getting schema for: departments
✔ Retrieved schema for table: departments
CREATE TABLE departments (
dept_no CHAR(4) NOT NULL,
dept_name VARCHAR(40) NOT NULL,
PRIMARY KEY (dept_no),
UNIQUE (dept_name)
)
/*
3 rows from departments table:
dept_no dept_name
d009 Customer Service
d005 Development
d002 Finance
*/
Tool 2 — generate_sql_query
Generates a SELECT query from a natural language question using the database schema as context:
@tool
def generate_sql_query(question: str, schema_info: str = None) -> str:
"""Generate a SQL SELECT query from a natural language question using database schema.
Always use this after getting schema information."""
print(f"🔺 Generating SQL for: {question[:100]}...")
schema_to_use = schema_info if schema_info else SCHEMA
prompt = f"""
Based on this database schema:
{schema_to_use}
Generate a SQL query to answer this question: {question}
Rules:
- Use only SELECT statements
- Include only existing columns and tables
- Add appropriate WHERE, GROUP BY, ORDER BY clauses as needed
- Limit results to 10 rows unless specified otherwise
- Use proper SQL syntax for SQLite
Return only the SQL query, nothing else.
"""
try:
response = llm.invoke(prompt)
query = response.content.strip()
print(f"✔ Generated SQL query")
return query
except Exception as e:
return f"Error generating query: {e}"
Test it:
result = generate_sql_query.invoke({"question": "what is maximum salary in employees"})
print(result)
🔺 Generating SQL for: what is maximum salary in employees...
✔ Generated SQL query
SELECT MAX(salary) FROM salaries;
Verify directly:
db.run("SELECT MAX(salary) FROM salaries;")
'[(158220,)]'
The maximum salary in the database is 158,220.
Tool 3 — validate_sql_query
Validates generated SQL for safety before execution. Blocks all destructive operations:
@tool
def validate_sql_query(query: str) -> str:
"""Validate SQL query for safety and syntax before execution.
Returns 'Valid: <query>' if safe or 'Error: <message>' if unsafe."""
print(f"🔍 Validating SQL: {query[:100]}...")
clean_query = query.strip()
# Remove markdown SQL code block markers (```sql ... ```)
clean_query = re.sub(r'```sql\s*', '', clean_query, flags=re.IGNORECASE)
clean_query = re.sub(r'```\s*', '', clean_query)
clean_query = clean_query.strip().rstrip(";")
# Check 1: Must be a SELECT statement
if not clean_query.lower().startswith("select"):
return "Error: Only SELECT statements are allowed"
# Check 2: Block dangerous SQL keywords
dangerous_keywords = ['INSERT', 'UPDATE', 'DELETE', 'ALTER', 'DROP', 'CREATE', 'TRUNCATE']
query_upper = clean_query.upper()
for keyword in dangerous_keywords:
if keyword in query_upper:
return f"Error: {keyword} operations are not allowed"
print("✔ Query validation passed")
return f"Valid: {clean_query}"
Test both safe and unsafe queries:
result = validate_sql_query.invoke({"query": "SELECT MAX(salary) FROM salaries;"})
print(result)
result = validate_sql_query.invoke({"query": "Update MAX(salary) FROM salaries;"})
print(result)
🔍 Validating SQL: SELECT MAX(salary) FROM salaries;...
✔ Query validation passed
Valid: SELECT MAX(salary) FROM salaries
🔍 Validating SQL: Update MAX(salary) FROM salaries;...
Error: Only SELECT statements are allowed
Tool 4 — execute_sql_query
Executes a validated query against the database. Strips the "Valid: " prefix from the validator output and handles exceptions:
@tool
def execute_sql_query(query: str) -> str:
"""Execute a validated SQL query and return results.
Only use this after validating the query for safety."""
print(f"🚀 Executing SQL: {query[:100]}...")
try:
clean_query = query.strip()
if clean_query.startswith("Valid: "):
clean_query = clean_query[7:] # Remove "Valid: " prefix
clean_query = re.sub(r'```sql\s*', '', clean_query, flags=re.IGNORECASE)
clean_query = re.sub(r'```\s*', '', clean_query)
clean_query = clean_query.strip().rstrip(";")
result = db.run(clean_query)
print("✔ Query executed successfully")
if result:
return f"Query Results:\n{result}"
else:
return "Query executed successfully but returned no results."
except Exception as e:
error_msg = f"Execution Error: {str(e)}"
print(f"✘ {error_msg}")
return error_msg
Test it:
execute_sql_query.invoke({"query": "SELECT MAX(salary) FROM salaries;"})
🚀 Executing SQL: SELECT MAX(salary) FROM salaries;...
✔ Query executed successfully
'Query Results:\n[(158220,)]'
Tool 5 — fix_sql_error
When a query fails validation or execution, the agent calls this tool to generate a corrected version:
@tool
def fix_sql_error(original_query: str, error_message: str, question: str) -> str:
"""Fix a failed SQL query by analyzing the error and generating a corrected version.
Use this when validation or execution fails."""
print(f"🔺 Fixing SQL error: {error_message[:100]}...")
fix_prompt = f"""
The following SQL query failed:
Query: {original_query}
Error: {error_message}
Original Question: {question}
Database Schema:
{SCHEMA}
Analyze the error and provide a corrected SQL query that:
1. Fixes the specific error mentioned
2. Still answers the original question
3. Uses only valid table and column names from the schema
4. Follows SQLite syntax rules
Return only the corrected SQL query, nothing else.
"""
try:
response = llm.invoke(fix_prompt)
fixed_query = response.content.strip()
print("✔ Generated fixed SQL query")
return fixed_query
except Exception as e:
return f"Error generating fix: {e}"
Test on a deliberately wrong query:
fix_sql_error.invoke({
"original_query": "Update MAX(salary) FROM salaries;",
"error_message": "Error: Only SELECT statements are allowed",
"question": "what is maximum salary in employees"
})
🔺 Fixing SQL error: Error: Only SELECT statements are allowed...
✔ Generated fixed SQL query
'SELECT MAX(salary) FROM salaries;'
The fix tool corrects the destructive UPDATE query into a proper SELECT.
Step 4: System Prompt
The system prompt defines the agent's workflow explicitly — each of the five tools maps to a numbered step:
SQL_SYSTEM_PROMPT = f"""You are an expert SQL analyst working with an employees database.
Database Schema:
{SCHEMA}
Your workflow for answering questions:
1. Use `get_database_schema` first to understand available tables and columns (if needed)
2. Use `generate_sql_query` to create SQL based on the question
3. Use `validate_sql_query` to check the query for safety and syntax
4. Use `execute_sql_query` to run the validated query
5. If there's an error, use `fix_sql_error` to correct it and try again (up to 3 times)
6. Provide a clear answer based on the query results
Rules:
- Always follow the workflow step by step
- If a query fails, use the fix tool and try again
- Provide clear, informative answers
- Be precise with table and column names
- Handle errors gracefully and try to fix them
- If you fail after 3 attempts, explain what went wrong
Available tools for each step:
- get_database_schema: Get table structure info
- generate_sql_query: Create SQL from question
- validate_sql_query: Check query safety/syntax
- execute_sql_query: Run the query
- fix_sql_error: Fix failed queries
Remember: Always validate queries before executing them for safety.
"""
Step 5: Create the SQL Agent
tools = [
get_database_schema,
generate_sql_query,
validate_sql_query,
execute_sql_query,
fix_sql_error
]
sql_agent = create_agent(
llm,
tools,
system_prompt=SQL_SYSTEM_PROMPT
)
print("✔ Created SQL agent with create_agent")
✔ Created SQL agent with create_agent
sql_agent
<langgraph.graph.state.CompiledStateGraph object at 0x000001D28BDDBE90>
Step 6: Query Functions
Streaming ask_sql() Helper
def ask_sql(question: str):
"""Ask the SQL agent a question using the full workflow."""
print(f"\n{'='*60}")
print(f"SQL AGENT - Question: {question}")
print('='*60)
for event in sql_agent.stream({"messages": question}, stream_mode="values"):
msg = event["messages"][-1]
if hasattr(msg, 'tool_calls') and msg.tool_calls:
for tc in msg.tool_calls:
print(f"\n🔺 Using Tool: {tc['name']}")
print(f"Args: {str(tc['args'])[:200]}")
elif hasattr(msg, 'content') and msg.content:
print(f"\n💼 Answer:\n{msg.content}")
End-to-End Demo with Self-Healing
Run a complex multi-table join query:
ask_sql("What is the average salary of employees in the Sales department?")
============================================================
SQL AGENT - Question: What is the average salary of employees in the Sales department?
============================================================
💼 Answer:
What is the average salary of employees in the Sales department?
🔺 Using Tool: get_database_schema
Args: {'table_name': 'departments'}
🔍 Getting schema for: departments
✔ Retrieved schema for table: departments
💼 Answer:
CREATE TABLE departments (
dept_no CHAR(4) NOT NULL,
dept_name VARCHAR(40) NOT NULL,
...
🔺 Using Tool: generate_sql_query
Args: {'question': 'What is the average salary of employees in the Sales department?', 'schema_info': '...'}
🔺 Generating SQL for: What is the average salary of employees in the Sales department?...
✔ Generated SQL query
💼 Answer:
SELECT AVG(salary) AS average_salary
FROM employees
JOIN departments ON employees.dept_no = departments.dept_no
WHERE departments.dept_name = 'Sales';
🔺 Using Tool: validate_sql_query
Args: {'query': 'SELECT AVG(salary) AS average_salary\nFROM employees\nJOIN departments ON...'}
🔍 Validating SQL: SELECT AVG(salary) AS average_salary...
✔ Query validation passed
💼 Answer:
Valid: SELECT AVG(salary) AS average_salary
FROM employees
JOIN departments ON employees.dept_no = departments.dept_no
WHERE departments.dept_name = 'Sales'
🔺 Using Tool: execute_sql_query
Args: {'query': 'SELECT AVG(salary) AS average_salary\nFROM employees\nJOIN dept_emp...'}
🚀 Executing SQL: SELECT AVG(salary) AS average_salary
FROM employees
JOIN dept_emp ON employees.emp_no = dept_emp.emp_no...
✘ Execution Error: (sqlite3.OperationalError) near "JOIN": syntax error
[SQL: SELECT AVG(salary)...JOIN salaries ON employees.emp_no = salaries.emp_no]
💼 Answer:
Execution Error: near "JOIN": syntax error...
🔺 Using Tool: fix_sql_error
Args: {'error_message': 'near "JOIN": syntax error', 'original_query': '...'}
Note
This trace shows the self-healing loop in action. The first generated query had a JOIN ordering error. The agent detected the execution failure, called fix_sql_error, generated a corrected query with proper JOIN ordering, and retried — all without any manual intervention.
The Agent's Five-Tool Workflow
User Question: "What is the average salary in Sales?"
│
▼
get_database_schema("departments") ← Understand table structure
│
▼
generate_sql_query(question, schema) ← LLM writes the SELECT query
│
▼
validate_sql_query(query) ← Safety check — must be SELECT only
│ │
│ [FAIL] └──── Error: dangerous keyword → STOP
│
▼
execute_sql_query(query) ← Run against SQLite
│ │
│ [FAIL] └──── Execution error → fix_sql_error() → retry (max 3x)
│
▼
LLM synthesizes final answer from query results
What You Built
In this lesson you built a production-safe Text-to-SQL agent:
| Tool | Role | Safety |
|---|---|---|
get_database_schema |
Inspect table structure before generating queries | Read-only |
generate_sql_query |
LLM generates SELECT query from natural language | SELECT only (instructed) |
validate_sql_query |
Regex validation — blocks 7 destructive keywords | Enforced by code, not LLM |
execute_sql_query |
Runs validated query against SQLite | Only receives pre-validated queries |
fix_sql_error |
LLM rewrites failed queries with error context | SELECT only (instructed) |
Key design decisions:
- Safety is enforced in code —
validate_sql_queryuses regex, not LLM judgment - Self-healing — the
fix_sql_error+ retry loop handles LLM SQL generation mistakes automatically - Schema in system prompt — the agent always has full schema context, reducing hallucinated column names