diff --git a/README.md b/README.md index f7600f3..1bdc168 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,16 @@ A vector store table has the following columns: - `embedding`: VECTOR type (indexed for similarity search) - `metadata`: JSON (optional metadata) +## Available Prompts + +- **explain_table** + - Provides a detailed explanation of a database table's structure, relationships, and usage. + - Parameters: `table_name` (string, required) + +- **query_tuning** + - Analyzes a SQL query for performance optimization opportunities. + - Parameters: `original_query` (string, required) + --- ## Configuration & Environment Variables diff --git a/pyproject.toml b/pyproject.toml index 68e7d8e..09edf27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" requires-python = ">=3.11" dependencies = [ "asyncmy>=0.2.10", - "fastmcp[cli]==2.2.8", + "fastmcp[cli]==2.11.3", "google-genai>=1.15.0", "google-generativeai>=0.8.5", "openai>=1.78.1", diff --git a/src/prompt.py b/src/prompt.py new file mode 100644 index 0000000..5298645 --- /dev/null +++ b/src/prompt.py @@ -0,0 +1,103 @@ +""" +Prompt template collection for MariaDB MCP Server + +This module provides various prompt templates for database analysis and optimization. +""" + +def get_explain_table_prompt(table_name: str) -> str: + """ + Returns a prompt template for table structure analysis. + + Args: + table_name: Name of the table to analyze + + Returns: + Formatted prompt string + """ + return f"""Using DB MCP, analyze the schema and relationships of the {table_name} table in the database, +and create documentation in an easy-to-read Markdown format. +Before proceeding with this task, call List_databases to confirm which database to work with. +If backend code exists in the user's codebase, please reference it to explain the role of this table. + +### Output Format (Markdown) (Write in the following format) + +# 📋 Table Analysis: {table_name} + +## 1. Table Overview +- **Name**: {table_name} +- **Type**: General +- **Role**: Store billing details and manage payment status + +## 2. Column Analysis +| Column Name | Type | PK/FK | NULL Allowed | Default | Description | +|----------------|--------------|-------|--------------|---------|-------------| +| id | BIGINT | PK | NO | - | Unique identifier | +| member_id | BIGINT | FK | NO | - | Member ID (member.id) | +| amount | DECIMAL(10,2)| | NO | 0.00 | Billing amount | +| status | TINYINT | | NO | 0 | Payment status (0:pending, 1:completed, 2:cancelled) | +... + +## 3. Relationship Analysis +### Parent Tables +- member (member_id) → ON DELETE CASCADE / ON UPDATE CASCADE, billing target member information + +### Child Tables +- payment (charge_id) → payment history reference + +## 4. Key Indexes +- PK Index: PRIMARY KEY (id) +- Secondary Index: idx_member_id (member_id) → improve performance for member-based queries""" + +def get_query_tuning_prompt(original_query: str) -> str: + """ + Returns a prompt template for query performance analysis. + + Args: + original_query: Original query to analyze + database_name: Database name + optimization_focus: Optimization focus area + + Returns: + Formatted prompt string + """ + return f"""You are a MariaDB/MySQL query performance optimization expert (DBA). +Using MariaDB MCP, provide **execution plan analysis, bottleneck causes, and specific improvement suggestions** for the query below. +Before proceeding with this task, call List_databases to confirm which database to work with. + +--- + +### Target Query for Analysis +```sql +{original_query} +``` + +### 1. Execution Plan Analysis +- Interpret each column (type, rows, key, Extra) based on EXPLAIN +- Specifically identify bottleneck sections and causes (full scan, filesort, temporary tables, etc.) +- Explain potential impacts in large table environments + +### 2. Optimization Suggestions +- Remove unnecessary operations/conditions +- Suggest appropriate index creation SQL +- Improve JOIN, WHERE, GROUP BY, ORDER BY structure +- Assess feasibility of converting subqueries to JOINs +- Propose LIMIT or data range limitation methods +- If there are specific columns that absolutely need index settings, you may suggest them +- Before making suggestions, make sure to execute the improved query to confirm there are no issues + +### 3. Improved Query Example +- Maintain same results as original +- Present improved query that can expect performance enhancement + +```sql +[Improved Query] +``` + +### 4. Expected Performance Impact +- Expected execution time change +- Change in number of rows examined +- Change in index usage + +*Output format must follow the above structure (Analysis → Suggestions → Improved Query → Impact), +use concise technical terms and explanations that both DBAs and developers can understand.* +""" diff --git a/src/server.py b/src/server.py index 54c0ef9..b456421 100644 --- a/src/server.py +++ b/src/server.py @@ -8,6 +8,8 @@ import asyncmy import anyio from fastmcp import FastMCP, Context +from fastmcp.tools import FunctionTool +from prompt import get_explain_table_prompt, get_query_tuning_prompt # Import configuration settings from config import ( @@ -238,15 +240,20 @@ async def _is_vector_store(self, database_name: str, table_name: str) -> bool: # --- MCP Tool Definitions --- - async def list_databases(self) -> List[str]: + async def list_databases(self) -> Dict[str, Any]: """Lists all accessible databases on the connected MariaDB server.""" logger.info("TOOL START: list_databases called.") sql = "SHOW DATABASES" try: results = await self._execute_query(sql) db_list = [row['Database'] for row in results if 'Database' in row] + data = { + "databases": db_list, + "notice": f"This MCP server is configured for '{DB_NAME}' database by default. Use {DB_NAME} if there are no other specific requirements." + } + logger.info(f"TOOL END: list_databases completed. Databases found: {len(db_list)}.") - return db_list + return data except Exception as e: logger.error(f"TOOL ERROR: list_databases failed: {e}", exc_info=True) raise @@ -267,119 +274,174 @@ async def list_tables(self, database_name: str) -> List[str]: logger.error(f"TOOL ERROR: list_tables failed for database_name={database_name}: {e}", exc_info=True) raise - async def get_table_schema(self, database_name: str, table_name: str) -> Dict[str, Any]: + async def get_table_schema(self, database_name: str, table_name: str, include_foreign_keys: bool = True) -> Dict[str, Any]: """ - Retrieves the schema (column names, types, nullability, keys, default values) - for a specific table in a database. + Get table schema including column definitions, constraints, comments, and foreign key relationships. + + Args: + database_name: Target database name + table_name: Target table name + include_foreign_keys: Whether to include foreign key relationship information (default: True) + + Returns: + Dictionary with table schema information: + { + 'table_info': { + 'database_name': str, + 'table_name': str, + 'table_comment': str or None, + 'total_columns': int, + 'foreign_key_count': int (only if include_foreign_keys=True) + }, + 'columns': { + 'column_name': { + 'type': 'SQL data type', + 'nullable': bool, + 'key': 'Key type (PRI/UNI/MUL)', + 'default': 'Default value or None', + 'extra': 'Additional info (auto_increment, etc.)', + 'comment': 'Column comment if any', + 'foreign_key': {...} or None (only if include_foreign_keys=True) + } + }, + 'foreign_keys_summary': [...] (only if include_foreign_keys=True) + } """ - logger.info(f"TOOL START: get_table_schema called. database_name={database_name}, table_name={table_name}") if not database_name or not database_name.isidentifier(): - logger.warning(f"TOOL WARNING: get_table_schema called with invalid database_name: {database_name}") raise ValueError(f"Invalid database name provided: {database_name}") if not table_name or not table_name.isidentifier(): - logger.warning(f"TOOL WARNING: get_table_schema called with invalid table_name: {table_name}") raise ValueError(f"Invalid table name provided: {table_name}") - sql = f"DESCRIBE `{database_name}`.`{table_name}`" try: - schema_results = await self._execute_query(sql) + # Get basic schema using DESCRIBE + describe_sql = f"DESCRIBE `{database_name}`.`{table_name}`" + + # Get detailed column information including comments from information_schema + column_info_sql = """ + SELECT + COLUMN_NAME, + COLUMN_COMMENT, + IS_NULLABLE, + COLUMN_DEFAULT, + DATA_TYPE, + COLUMN_TYPE, + EXTRA + FROM information_schema.COLUMNS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + ORDER BY ORDINAL_POSITION + """ + + # Get table comment + table_comment_sql = """ + SELECT TABLE_COMMENT + FROM information_schema.TABLES + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + """ + + # Execute basic queries + schema_results = await self._execute_query(describe_sql) + column_details = await self._execute_query(column_info_sql, params=(database_name, table_name)) + table_comment_result = await self._execute_query(table_comment_sql, params=(database_name, table_name)) + + # Build basic schema info dictionary schema_info = {} + + # Create lookup dictionary for column comments and details + column_lookup = {col['COLUMN_NAME']: col for col in column_details} + if not schema_results: exists_sql = "SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = %s AND table_name = %s" exists_result = await self._execute_query(exists_sql, params=(database_name, table_name)) if not exists_result or exists_result[0]['count'] == 0: - logger.warning(f"TOOL WARNING: Table '{database_name}'.'{table_name}' not found or inaccessible.") raise FileNotFoundError(f"Table '{database_name}'.'{table_name}' not found or inaccessible.") - else: - logger.warning(f"Could not describe table '{database_name}'.'{table_name}'. It might be a view or lack permissions.") + # Process DESCRIBE results with enhanced information for row in schema_results: col_name = row.get('Field') if col_name: + col_details = column_lookup.get(col_name, {}) schema_info[col_name] = { 'type': row.get('Type'), 'nullable': row.get('Null', '').upper() == 'YES', 'key': row.get('Key'), 'default': row.get('Default'), - 'extra': row.get('Extra') + 'extra': row.get('Extra'), + 'comment': col_details.get('COLUMN_COMMENT', '') or None } - logger.info(f"TOOL END: get_table_schema completed. Columns found: {len(schema_info)}. Keys: {list(schema_info.keys())}") - return schema_info - except FileNotFoundError as e: - logger.warning(f"TOOL WARNING: get_table_schema table not found: {e}") - raise e - except Exception as e: - logger.error(f"TOOL ERROR: get_table_schema failed for database_name={database_name}, table_name={table_name}: {e}", exc_info=True) - raise RuntimeError(f"Could not retrieve schema for table '{database_name}.{table_name}'.") - - async def get_table_schema_with_relations(self, database_name: str, table_name: str) -> Dict[str, Any]: - """ - Retrieves table schema with foreign key relationship information. - Includes all basic schema info plus foreign key relationships and referenced tables. - """ - logger.info(f"TOOL START: get_table_schema_with_relations called. database_name={database_name}, table_name={table_name}") - if not database_name or not database_name.isidentifier(): - logger.warning(f"TOOL WARNING: get_table_schema_with_relations called with invalid database_name: {database_name}") - raise ValueError(f"Invalid database name provided: {database_name}") - if not table_name or not table_name.isidentifier(): - logger.warning(f"TOOL WARNING: get_table_schema_with_relations called with invalid table_name: {table_name}") - raise ValueError(f"Invalid table name provided: {table_name}") - - try: - # 1. Get basic schema information - basic_schema = await self.get_table_schema(database_name, table_name) - - # 2. Retrieve foreign key information - fk_sql = """ - SELECT - kcu.COLUMN_NAME as column_name, - kcu.CONSTRAINT_NAME as constraint_name, - kcu.REFERENCED_TABLE_NAME as referenced_table, - kcu.REFERENCED_COLUMN_NAME as referenced_column, - rc.UPDATE_RULE as on_update, - rc.DELETE_RULE as on_delete - FROM information_schema.KEY_COLUMN_USAGE kcu - INNER JOIN information_schema.REFERENTIAL_CONSTRAINTS rc - ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME - AND kcu.CONSTRAINT_SCHEMA = rc.CONSTRAINT_SCHEMA - WHERE kcu.TABLE_SCHEMA = %s - AND kcu.TABLE_NAME = %s - AND kcu.REFERENCED_TABLE_NAME IS NOT NULL - ORDER BY kcu.CONSTRAINT_NAME, kcu.ORDINAL_POSITION - """ - - fk_results = await self._execute_query(fk_sql, params=(database_name, table_name)) + + # Initialize foreign_key field if requested + if include_foreign_keys: + schema_info[col_name]['foreign_key'] = None - # 3. Add foreign key information to the basic schema - enhanced_schema = {} - for col_name, col_info in basic_schema.items(): - enhanced_schema[col_name] = col_info.copy() - enhanced_schema[col_name]['foreign_key'] = None - - # 4. Add foreign key information to the corresponding columns - for fk_row in fk_results: - column_name = fk_row['column_name'] - if column_name in enhanced_schema: - enhanced_schema[column_name]['foreign_key'] = { - 'constraint_name': fk_row['constraint_name'], - 'referenced_table': fk_row['referenced_table'], - 'referenced_column': fk_row['referenced_column'], - 'on_update': fk_row['on_update'], - 'on_delete': fk_row['on_delete'] - } + # Get table comment + table_comment = table_comment_result[0].get('TABLE_COMMENT') if table_comment_result else None - # 5. Return the enhanced schema with foreign key relations + # Build result structure result = { - 'table_name': table_name, - 'columns': enhanced_schema + 'table_info': { + 'database_name': database_name, + 'table_name': table_name, + 'table_comment': table_comment if table_comment else None, + 'total_columns': len(schema_info) + }, + 'columns': schema_info } - logger.info(f"TOOL END: get_table_schema_with_relations completed. Columns: {len(enhanced_schema)}, Foreign keys: {len(fk_results)}") + # Add foreign key information if requested + if include_foreign_keys: + # Query foreign key relationships from information_schema + fk_sql = """ + SELECT + kcu.COLUMN_NAME as column_name, + kcu.CONSTRAINT_NAME as constraint_name, + kcu.REFERENCED_TABLE_SCHEMA as referenced_database, + kcu.REFERENCED_TABLE_NAME as referenced_table, + kcu.REFERENCED_COLUMN_NAME as referenced_column, + rc.UPDATE_RULE as on_update, + rc.DELETE_RULE as on_delete + FROM information_schema.KEY_COLUMN_USAGE kcu + LEFT JOIN information_schema.REFERENTIAL_CONSTRAINTS rc + ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME + AND kcu.CONSTRAINT_SCHEMA = rc.CONSTRAINT_SCHEMA + WHERE kcu.TABLE_SCHEMA = %s + AND kcu.TABLE_NAME = %s + AND kcu.REFERENCED_TABLE_NAME IS NOT NULL + ORDER BY kcu.ORDINAL_POSITION + """ + + fk_results = await self._execute_query(fk_sql, params=(database_name, table_name)) + + # Add foreign key details to matching columns + for fk_row in fk_results: + column_name = fk_row['column_name'] + if column_name in schema_info: + schema_info[column_name]['foreign_key'] = { + 'constraint_name': fk_row['constraint_name'], + 'referenced_database': fk_row['referenced_database'], + 'referenced_table': fk_row['referenced_table'], + 'referenced_column': fk_row['referenced_column'], + 'on_update': fk_row['on_update'], + 'on_delete': fk_row['on_delete'], + 'full_reference': f"{fk_row['referenced_database']}.{fk_row['referenced_table']}.{fk_row['referenced_column']}" + } + + # Add foreign key summary and count to result + result['table_info']['foreign_key_count'] = len(fk_results) + result['foreign_keys_summary'] = [ + { + 'column': fk['column_name'], + 'references': f"{fk['referenced_database']}.{fk['referenced_table']}.{fk['referenced_column']}", + 'constraint': fk['constraint_name'] + } + for fk in fk_results + ] + return result + except FileNotFoundError as e: + raise e except Exception as e: - logger.error(f"TOOL ERROR: get_table_schema_with_relations failed for database_name={database_name}, table_name={table_name}: {e}", exc_info=True) - raise RuntimeError(f"Could not retrieve schema with relations for table '{database_name}.{table_name}': {str(e)}") + raise RuntimeError(f"Could not retrieve schema for table '{database_name}.{table_name}'.") async def execute_sql(self, sql_query: str, database_name: str, parameters: Optional[List[Any]] = None) -> List[Dict[str, Any]]: @@ -400,6 +462,34 @@ async def execute_sql(self, sql_query: str, database_name: str, parameters: Opti except Exception as e: logger.error(f"TOOL ERROR: execute_sql failed for database_name={database_name}, sql_query={sql_query[:100]}, parameters={parameters}: {e}", exc_info=True) raise + + async def explain_query(self, sql_query: str, database_name: str, parameters: Optional[List[Any]] = None) -> List[Dict[str, Any]]: + """ + Get detailed query execution plan with extended information. + + Provides more comprehensive analysis including + filtered row percentages and detailed optimizer information. + + Args: + sql_query: Query to analyze (without EXPLAIN EXTENDED prefix) + database_name: Target database context + parameters: Optional parameters for query placeholders + + Returns: + Extended execution plan with additional optimization details + """ + if database_name and not database_name.isidentifier(): + raise ValueError(f"Invalid database name provided: {database_name}") + + # Prefix query with EXPLAIN EXTENDED for detailed analysis + explain_sql = f"EXPLAIN EXTENDED {sql_query.strip()}" + param_tuple = tuple(parameters) if parameters is not None else None + + try: + results = await self._execute_query(explain_sql, params=param_tuple, database=database_name) + return results + except Exception as e: + raise async def create_database(self, database_name: str) -> Dict[str, Any]: """ @@ -758,6 +848,56 @@ async def search_vector_store(self, user_query: str, database_name: str, vector_ except Exception as e: logger.error(f"Failed to search vector store {database_name}.{vector_store_name}: {e}", exc_info=True) return [] + + # --- MCP Prompt Methods --- + # These methods are exposed as MCP prompts + + async def explain_table( + self, + table_name: str, + ) -> str: + """ + Returns a prompt for table structure analysis. + + Args: + table_name: Name of the table to analyze + + Returns: + Table structure analysis prompt + """ + return get_explain_table_prompt(table_name) + + async def query_tuning( + self, + original_query: str, + ) -> str: + """ + Returns a prompt for query performance analysis. + + Args: + original_query: Original query to analyze + database_name: Database name + optimization_focus: Optimization focus area + + Returns: + Query performance analysis prompt + """ + return get_query_tuning_prompt(original_query) + + # --- Tool Registration --- + + def register_tools(self): + """ + Register all MCP tool methods with the FastMCP server. + + Creates FunctionTool instances for each public method and adds them + to the MCP server. Read-only mode affects which tools are registered. + + Note: Database pool must be initialized before calling this method. + """ + if self.pool is None: + logger.error("Cannot register tools: Database pool is not initialized.") + raise RuntimeError("Database pool must be initialized before registering tools.") # --- Tool Registration (Synchronous) --- def register_tools(self): @@ -766,20 +906,23 @@ def register_tools(self): logger.error("Cannot register tools: Database pool is not initialized.") raise RuntimeError("Database pool must be initialized before registering tools.") - self.mcp.add_tool(self.list_databases) - self.mcp.add_tool(self.list_tables) - self.mcp.add_tool(self.get_table_schema) - self.mcp.add_tool(self.get_table_schema_with_relations) - self.mcp.add_tool(self.execute_sql) - self.mcp.add_tool(self.create_database) + self.mcp.add_tool(FunctionTool.from_function(self.list_databases, name="list_databases")) + self.mcp.add_tool(FunctionTool.from_function(self.list_tables, name="list_tables")) + self.mcp.add_tool(FunctionTool.from_function(self.get_table_schema, name="get_table_schema")) + self.mcp.add_tool(FunctionTool.from_function(self.execute_sql, name="execute_sql")) + self.mcp.add_tool(FunctionTool.from_function(self.explain_query, name="explain_query")) + self.mcp.add_tool(FunctionTool.from_function(self.create_database, name="create_database")) if EMBEDDING_PROVIDER is not None: - self.mcp.add_tool(self.create_vector_store) - self.mcp.add_tool(self.list_vector_stores) - self.mcp.add_tool(self.delete_vector_store) - self.mcp.add_tool(self.insert_docs_vector_store) - self.mcp.add_tool(self.search_vector_store) + self.mcp.add_tool(FunctionTool.from_function(self.create_vector_store, name="create_vector_store")) + self.mcp.add_tool(FunctionTool.from_function(self.list_vector_stores, name="list_vector_stores")) + self.mcp.add_tool(FunctionTool.from_function(self.delete_vector_store, name="delete_vector_store")) + self.mcp.add_tool(FunctionTool.from_function(self.insert_docs_vector_store, name="insert_docs_vector_store")) + self.mcp.add_tool(FunctionTool.from_function(self.search_vector_store, name="search_vector_store")) logger.info("Registered MCP tools explicitly.") + self.mcp.prompt()(self.query_tuning) + self.mcp.prompt()(self.explain_table) + # --- Async Main Server Logic --- async def run_async_server(self, transport="stdio", host="127.0.0.1", port=9001): """