chronos / Mark-1 /phas1_v2.py
Manoj Kumar
phas2
f860f0a
import sqlite3
import spacy
import re
from thefuzz import process
import numpy as np
from transformers import pipeline
# Load intent classification model
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
nlp = spacy.load("en_core_web_sm")
nlp_vectors = spacy.load("en_core_web_md")
# Define operator mappings
operator_mappings = {
"greater than": ">",
"less than": "<",
"equal to": "=",
"not equal to": "!=",
"starts with": "LIKE",
"ends with": "LIKE",
"contains": "LIKE",
"above": ">",
"below": "<",
"more than": ">",
"less than": "<",
"<": "<",
">": ">"
}
# Connect to SQLite database
def connect_to_db(db_path):
try:
conn = sqlite3.connect(db_path)
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
return None
# Fetch database schema
def fetch_schema(conn):
cursor = conn.cursor()
query = """
SELECT name
FROM sqlite_master
WHERE type='table';
"""
cursor.execute(query)
tables = cursor.fetchall()
schema = {}
for table in tables:
table_name = table[0]
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
schema[table_name] = [{"name": col[1], "type": col[2], "not_null": col[3], "default": col[4], "pk": col[5]} for col in columns]
return schema
# Match token to schema columns using vector similarity and fuzzy matching
def find_best_match(token_text, table_schema):
"""Return the best-matching column from table_schema."""
token_vec = nlp_vectors(token_text).vector
best_col = None
best_score = 0.0
for col in table_schema:
col_vec = nlp_vectors(col).vector
score = token_vec.dot(col_vec) / (np.linalg.norm(token_vec) * np.linalg.norm(col_vec))
if score > best_score:
best_score = score
best_col = col
if best_score > 0.65:
return best_col
# Fallback to fuzzy matching if vector similarity fails
best_fuzzy_match, fuzzy_score = process.extractOne(token_text, table_schema)
if fuzzy_score > 80:
return best_fuzzy_match
return None
# Extract conditions from user query
def extract_conditions(question, schema, table):
table_schema = [col["name"].lower() for col in schema.get(table, [])]
# Detect whether the user used 'AND' / 'OR'
use_and = " and " in question.lower()
use_or = " or " in question.lower()
condition_parts = re.split(r"\band\b|\bor\b", question, flags=re.IGNORECASE)
conditions = []
for part in condition_parts:
part = part.strip()
tokens = [token.text.lower() for token in nlp(part)]
current_col = None
for token in tokens:
possible_col = find_best_match(token, table_schema)
if possible_col:
current_col = possible_col
break
if current_col:
for phrase, sql_operator in operator_mappings.items():
if phrase in part:
value_start = part.lower().find(phrase) + len(phrase)
value = part[value_start:].strip().split()[0]
if sql_operator == "LIKE":
if "starts with" in phrase:
value = f"'{value}%'"
elif "ends with" in phrase:
value = f"'%{value}'"
elif "contains" in phrase:
value = f"'%{value}%'"
conditions.append(f"{current_col} {sql_operator} {value}")
break
if use_and:
return " AND ".join(conditions)
elif use_or:
return " OR ".join(conditions)
else:
return " AND ".join(conditions) if conditions else None
# Main interpretation and execution
def interpret_question(question, schema):
intents = {
"describe_table": "Provide information about the columns and structure of a table.",
"list_table_data": "Fetch and display all data stored in a table.",
"count_records": "Count the number of records in a table.",
"fetch_column": "Fetch a specific column's data from a table.",
"fetch_all_data": "Fetch all records from a table without filters.",
"filter_data_with_conditions": "Fetch records based on specific conditions."
}
labels = list(intents.keys())
result = classifier(question, labels, multi_label=True)
scores = result["scores"]
predicted_label_index = np.argmax(scores)
predicted_intent = labels[predicted_label_index]
# Extract table name using schema and fuzzy matching
table, score = process.extractOne(question, schema.keys())
if score > 80:
return {"intent": predicted_intent, "table": table}
return {"intent": predicted_intent, "table": None}
def handle_intent(intent_data, schema, conn, question):
intent = intent_data["intent"]
table = intent_data["table"]
if not table:
return "I couldn't identify which table you're referring to."
if intent == "describe_table":
return schema.get(table, "No such table found.")
elif intent in ["list_table_data", "fetch_all_data"]:
conditions = extract_conditions(question, schema, table) if intent == "list_table_data" else None
query = f"SELECT * FROM {table}"
if conditions:
query += f" WHERE {conditions}"
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
elif intent == "count_records":
query = f"SELECT COUNT(*) FROM {table}"
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchone()
elif intent == "fetch_column":
column = extract_conditions(question, schema, table)
if column:
query = f"SELECT {column} FROM {table}"
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
else:
return "I couldn't identify which column you're referring to."
elif intent == "filter_data_with_conditions":
conditions = extract_conditions(question, schema, table)
query = f"SELECT * FROM {table} WHERE {conditions}"
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
return "Unsupported intent."
# Entry point
def answer_question(question, conn, schema):
intent_data = interpret_question(question, schema)
return handle_intent(intent_data, schema, conn, question)
if __name__ == "__main__":
db_path = "./ecommerce.db"
conn = connect_to_db(db_path)
if conn:
schema = fetch_schema(conn)
while True:
question = input("\nAsk a question about the database: ")
if question.lower() in ["exit", "quit"]:
break
print(answer_question(question, conn, schema))