import sqlite3 import spacy import re from thefuzz import process import numpy as np from transformers import pipeline # Load intent classification model # Use Hugging Face's zero-shot pipeline for flexibility classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") nlp = spacy.load("en_core_web_sm") nlp_vectors = spacy.load("en_core_web_md") # Define operator mappings operator_mappings = { "greater than": ">", "less than": "<", "equal to": "=", "not equal to": "!=", "starts with": "LIKE", "ends with": "LIKE", "contains": "LIKE", "above": ">", "below": "<", "more than": ">", "less than": "<", "<": "<", ">": ">" } # Connect to SQLite database def connect_to_db(db_path): conn = sqlite3.connect(db_path) return conn # Fetch database schema def fetch_schema(conn): cursor = conn.cursor() query = """ SELECT name FROM sqlite_master WHERE type='table'; """ cursor.execute(query) tables = cursor.fetchall() schema = {} for table in tables: table_name = table[0] cursor.execute(f"PRAGMA table_info({table_name});") columns = cursor.fetchall() schema[table_name] = [{"name": col[1], "type": col[2], "not_null": col[3], "default": col[4], "pk": col[5]} for col in columns] return schema def find_ai_synonym(token_text, table_schema): """Return the best-matching column from table_schema based on vector similarity.""" token_vec = nlp_vectors(token_text)[0].vector best_col = None best_score = 0.0 for col in table_schema: col_vec = nlp_vectors(col)[0].vector # Cosine similarity score = token_vec.dot(col_vec) / (np.linalg.norm(token_vec) * np.linalg.norm(col_vec)) if score > best_score: best_score = score best_col = col # Apply threshold if best_score > 0.65: return best_col return None def identify_table(question, schema_tables): # schema_tables = ["products", "users", "orders", ...] table, score = process.extractOne(question, schema_tables) if score > 80: # a comfortable threshold return table return None def identify_columns(question, columns_for_table): # columns_for_table = ["id", "price", "stock", "name", ...] # For each token in question, fuzzy match to columns matched_cols = [] tokens = question.lower().split() for token in tokens: col, score = process.extractOne(token, columns_for_table) if score > 80: matched_cols.append(col) return matched_cols def find_closest_column(token, table_schema): # table_schema is a list of column names, e.g. ["price", "stock", "name"] # This returns (best_match, score) best_match, score = process.extractOne(token, table_schema) # You can tune this threshold as needed (e.g. 70, 80, etc.) if score > 90: return best_match return None # Condition extraction with NLP def extract_conditions(question, schema, table): table_schema = [col["name"].lower() for col in schema.get(table, [])] # Detect whether the user used 'AND' / 'OR' # (case-insensitive, hence .lower() checks) use_and = " and " in question.lower() use_or = " or " in question.lower() last_column = None # Split on 'and' or 'or' to handle multiple conditions condition_parts = re.split(r'\band\b|\bor\b', question, flags=re.IGNORECASE) print(condition_parts) conditions = [] for part in condition_parts: part = part.strip() # Use spaCy to tokenize each part doc = nlp(part.lower()) tokens = [token.text for token in doc] # Skip the recognized_table token if it appears in tokens # so it won't be matched as a column tokens = [t for t in tokens if t != table.lower()] part_conditions = [] current_part_column = None print(tokens) for i, token in enumerate(tokens): # Try synonyms/fuzzy, etc. to find a column possible_col = find_ai_synonym(token, table_schema) if possible_col: current_part_column = possible_col last_column = possible_col # update last_column # Check for any matching operator phrase in this part for phrase, sql_operator in operator_mappings.items(): if phrase in part.lower(): # Extract the value after the phrase value_index = part.lower().find(phrase) + len(phrase) value = part[value_index:].strip().split(" ")[0] value = value.replace("'", "").replace('"', "").strip() # Special handling for LIKE operators if sql_operator == "LIKE": if "starts with" in phrase: value = f"'{value}%'" elif "ends with" in phrase: value = f"'%{value}'" elif "contains" in phrase: value = f"'%{value}%'" # If we did not find a new column, fallback to last_column column_to_use = current_part_column or last_column if column_to_use: # Add this condition to the list for this part part_conditions.append(f"{column_to_use} {sql_operator} {value}") # If multiple conditions are found in this part, join them with AND # (e.g., "price > 100 AND stock < 50" within the same part) if part_conditions: conditions.append(" AND ".join(part_conditions)) # Finally, combine each part with AND or OR, depending on the user query if use_and: return " AND ".join(conditions) elif use_or: return " OR ".join(conditions) else: # If there's only one part or no explicit 'and'/'or', default to AND return " AND ".join(conditions) # Interpret user question using intent recognition def interpret_question(question, schema): # Define potential intents intents = { "describe_table": "Provide information about the columns and structure of a table.", "list_table_data": "Fetch and display all data stored in a table.", "count_records": "Count the number of records in a table.", "fetch_column": "Fetch a specific column's data from a table." } # Use classifier to predict intent labels = list(intents.keys()) result = classifier(question, labels) predicted_intent = result["labels"][0] table = identify_table(question, list(schema.keys())) # Rule-based fallback for conditional queries condition_keywords = list(operator_mappings.keys()) if any(keyword in question.lower() for keyword in condition_keywords): predicted_intent = "list_table_data" return {"intent": predicted_intent, "table": table} # Handle different intents def handle_intent(intent_data, schema, conn, question): intent = intent_data["intent"] table = intent_data["table"] if not table: return "I couldn't identify which table you're referring to." if intent == "describe_table": # Describe table structure table_schema = schema[table] description = [f"Table '{table}' has the following columns:"] for col in table_schema: col_details = f"- {col['name']} ({col['type']})" if col['not_null']: col_details += " [NOT NULL]" if col['default'] is not None: col_details += f" [DEFAULT: {col['default']}]" if col['pk']: col_details += " [PRIMARY KEY]" description.append(col_details) return "\n".join(description) elif intent == "list_table_data": # Check for conditions condition = extract_conditions(question, schema, table) cursor = conn.cursor() query = f"SELECT * FROM {table}" if condition: query += f" WHERE {condition};" else: query += ";" print(query) cursor.execute(query) return cursor.fetchall() elif intent == "count_records": # Count records in the table cursor = conn.cursor() cursor.execute(f"SELECT COUNT(*) FROM {table};") return cursor.fetchone() elif intent == "fetch_column": return "Fetching specific column data is not yet implemented." else: return "I couldn't understand your question." # Main function def answer_question(question, conn, schema): intent_data = interpret_question(question, schema) print(intent_data) return handle_intent(intent_data, schema, conn, question) # Example Usage if __name__ == "__main__": db_path = "./ecommerce.db" # Replace with your SQLite database path conn = connect_to_db(db_path) schema = fetch_schema(conn) print("Schema:", schema) while True: question = input("\nAsk a question about the database: ") if question.lower() in ["exit", "quit"]: break answer = answer_question(question, conn, schema) print("Answer:", answer)