diff --git a/README.md b/README.md index 4e589bb..471d0a5 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ A Model Context Protocol (MCP) server for Kubernetes that enables AI assistants like Claude, Cursor, and others to interact with Kubernetes clusters through natural language. +[![Trust Score](https://archestra.ai/mcp-catalog/api/badge/quality/rohitg00/kubectl-mcp-server)](https://archestra.ai/mcp-catalog/rohitg00__kubectl-mcp-server) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Python](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/) [![Kubernetes](https://img.shields.io/badge/kubernetes-%23326ce5.svg?style=flat&logo=kubernetes&logoColor=white)](https://kubernetes.io/) diff --git a/compatible_servers 2/cursor/cursor_compatible_mcp_server.py b/compatible_servers 2/cursor/cursor_compatible_mcp_server.py new file mode 100644 index 0000000..49e5ea7 --- /dev/null +++ b/compatible_servers 2/cursor/cursor_compatible_mcp_server.py @@ -0,0 +1,837 @@ +#!/usr/bin/env python3 +""" +Cursor-compatible MCP server implementation for kubectl-mcp-tool. + +This script provides a specialized MCP server implementation for Cursor integration +with improved error handling and real kubectl command execution. +""" + +import sys +import json +import logging +import asyncio +import signal +import os +import subprocess +from typing import Dict, Any, List, Optional, AsyncGenerator, Callable + +# Configure environment-based settings +DEBUG_MODE = os.environ.get("MCP_DEBUG", "false").lower() in ("true", "1", "yes") +LOG_LEVEL_NAME = os.environ.get("KUBECTL_MCP_LOG_LEVEL", "INFO").upper() +LOG_LEVEL = getattr(logging, LOG_LEVEL_NAME, logging.INFO) + +# Create log directory if it doesn't exist +LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") +os.makedirs(LOG_DIR, exist_ok=True) + +# Configure logging +logging.basicConfig( + level=LOG_LEVEL, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + filename=os.path.join(LOG_DIR, "cursor_compatible_mcp_server.log") +) +logger = logging.getLogger("cursor-compatible-mcp-server") + +# Add console handler for important messages +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO if not DEBUG_MODE else logging.DEBUG) +console_formatter = logging.Formatter('%(levelname)s: %(message)s') +console_handler.setFormatter(console_formatter) +logger.addHandler(console_handler) + +# Log startup information +logger.info(f"Starting kubectl-mcp-tool with DEBUG={DEBUG_MODE}, LOG_LEVEL={LOG_LEVEL_NAME}") +logger.info(f"Python version: {sys.version}") +logger.info(f"Platform: {sys.platform}") +logger.info(f"Current directory: {os.getcwd()}") + +# Import from the correct location +try: + from kubectl_mcp_tool.mcp_server import MCPServer + from kubectl_mcp_tool.natural_language import process_query + logger.info("Successfully imported kubectl_mcp_tool modules") +except ImportError as e: + logger.error(f"Failed to import kubectl_mcp_tool modules: {e}") + # Create a debug error file + with open(os.path.join(LOG_DIR, "import_error.log"), "w") as f: + f.write(f"Import error: {str(e)}\n") + f.write(f"Python path: {sys.path}\n") + + # Try to continue anyway by using a relative import + try: + sys.path.append(os.path.dirname(os.path.abspath(__file__))) + from kubectl_mcp_tool.mcp_server import MCPServer + from kubectl_mcp_tool.natural_language import process_query + logger.info("Successfully imported kubectl_mcp_tool modules after path adjustment") + except ImportError as e: + logger.critical(f"Failed to import kubectl_mcp_tool modules even with path adjustment: {e}") + sys.exit(1) + +class StdioTransport: + """A class that implements the stdio transport for MCP.""" + + def __init__(self): + """Initialize the stdio transport.""" + self.input_queue = asyncio.Queue() + self.running = True + self.debug_file = open(os.path.join(LOG_DIR, "cursor_mcp_debug.log"), "w") + self.debug_file.write(f"StdioTransport initialized at {os.path.abspath(__file__)}\n") + self.debug_file.write(f"Current directory: {os.getcwd()}\n") + self.debug_file.write(f"Python path: {sys.path}\n") + self.debug_file.flush() + logger.info("StdioTransport initialized") + + # Make stdin non-blocking on Unix systems + if sys.platform != 'win32': + import fcntl + import os + fd = sys.stdin.fileno() + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + self.debug_file.write("Set stdin to non-blocking mode\n") + self.debug_file.flush() + + async def start_reader(self): + """Start reading from stdin in a non-blocking way.""" + loop = asyncio.get_running_loop() + logger.info("Starting stdin reader") + self.debug_file.write("Starting stdin reader\n") + self.debug_file.flush() + + while self.running: + try: + # Read from stdin with a timeout to allow for checking self.running + try: + # Use run_in_executor with a short timeout to prevent blocking indefinitely + line = await asyncio.wait_for( + loop.run_in_executor(None, sys.stdin.readline), + timeout=0.5 + ) + except asyncio.TimeoutError: + # Timeout is expected, just continue the loop + await asyncio.sleep(0.1) + continue + + if not line: + logger.debug("End of stdin stream") + self.debug_file.write("End of stdin stream\n") + self.debug_file.flush() + # Don't exit immediately, sleep and check again + await asyncio.sleep(0.5) + continue + + # Log the raw input for debugging + self.debug_file.write(f"STDIN: {line.strip()}\n") + self.debug_file.flush() + + logger.debug(f"Read from stdin: {line.strip()}") + + try: + # Parse the JSON message + message = json.loads(line) + logger.debug(f"Parsed JSON message: {message}") + + # Put the message in the queue + await self.input_queue.put(message) + logger.debug(f"Put message in queue: {message.get('id', 'unknown-id')}") + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON: {line.strip()} - {e}") + self.debug_file.write(f"JSON ERROR: {e} for input: {line.strip()}\n") + self.debug_file.flush() + + # Try to send an error response for malformed JSON + error_response = { + "jsonrpc": "2.0", + "id": None, # We don't know the ID since JSON parsing failed + "error": { + "code": -32700, + "message": "Parse error: Invalid JSON was received" + } + } + await self.write_message(error_response) + except Exception as e: + logger.error(f"Error reading from stdin: {e}") + self.debug_file.write(f"STDIN ERROR: {str(e)}\n") + self.debug_file.flush() + # Don't break the loop on error, just continue after a short delay + await asyncio.sleep(0.1) + + async def read_message(self): + """Read messages from the input queue and yield them.""" + logger.info("Starting message reader") + + while self.running: + try: + # Get a message from the queue + message = await self.input_queue.get() + + # Log the message + logger.debug(f"Yielding message: {message}") + self.debug_file.write(f"YIELD: {json.dumps(message)}\n") + self.debug_file.flush() + + # Yield the message + yield message + + # Mark the task as done + self.input_queue.task_done() + except asyncio.CancelledError: + logger.info("Message reader cancelled") + self.debug_file.write("Message reader cancelled\n") + self.debug_file.flush() + break + except Exception as e: + logger.error(f"Error in read_message: {e}") + self.debug_file.write(f"READ ERROR: {str(e)}\n") + self.debug_file.flush() + # Sleep a bit on error to avoid busy waiting + await asyncio.sleep(0.1) + + async def write_message(self, message: Dict[str, Any]) -> None: + """Write a message to stdout.""" + try: + # Convert the message to a JSON string with a newline + json_str = json.dumps(message) + "\n" + + # Log the message + logger.debug(f"Writing to stdout: {json_str.strip()}") + self.debug_file.write(f"STDOUT: {json_str}") + self.debug_file.flush() + + # Use run_in_executor to prevent blocking the event loop + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, lambda: self._write_to_stdout(json_str)) + except Exception as e: + logger.error(f"Error writing to stdout: {e}") + self.debug_file.write(f"WRITE ERROR: {str(e)}\n") + self.debug_file.flush() + + def _write_to_stdout(self, json_str: str) -> None: + """Helper method to write to stdout and handle potential errors.""" + try: + # Write the message to stdout and flush + sys.stdout.write(json_str) + sys.stdout.flush() + except (BrokenPipeError, IOError) as e: + # Handle broken pipe errors specifically + self.debug_file.write(f"STDOUT PIPE ERROR: {str(e)}\n") + self.debug_file.flush() + # Don't re-raise, as this would kill the process + # Just log and continue + except Exception as e: + self.debug_file.write(f"STDOUT UNEXPECTED ERROR: {str(e)}\n") + self.debug_file.flush() + +class KubectlTools: + """Kubectl tools implementation for Cursor integration.""" + + def __init__(self): + """Initialize the kubectl tools.""" + self.current_namespace = "default" + self.mock_data = self._get_mock_data() + + def _get_mock_data(self) -> Dict[str, Dict[str, str]]: + """Get mock data for kubectl commands.""" + return { + "get_pods": { + "command": "kubectl get pods", + "result": """NAME READY STATUS RESTARTS AGE +nginx-pod 1/1 Running 0 1h +web-deployment-abc123 1/1 Running 0 45m +db-statefulset-0 1/1 Running 0 30m""" + }, + "get_namespaces": { + "command": "kubectl get namespaces", + "result": """NAME STATUS AGE +default Active 1d +kube-system Active 1d +kube-public Active 1d +kube-node-lease Active 1d""" + }, + "switch_namespace": { + "command": "kubectl config set-context --current --namespace={}", + "result": "Switched to namespace {}" + }, + "get_current_namespace": { + "command": "kubectl config view --minify --output 'jsonpath={..namespace}'", + "result": "default" + }, + "get_contexts": { + "command": "kubectl config get-contexts", + "result": """CURRENT NAME CLUSTER AUTHINFO NAMESPACE +* docker-desktop docker-desktop docker-desktop default + minikube minikube minikube default""" + }, + "get_deployments": { + "command": "kubectl get deployments", + "result": """NAME READY UP-TO-DATE AVAILABLE AGE +web-deployment 3/3 3 3 1h +api-deployment 2/2 2 2 45m""" + }, + "get_services": { + "command": "kubectl get services", + "result": """NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE +kubernetes ClusterIP 10.96.0.1 443/TCP 1d +web-service NodePort 10.96.123.45 80:30080/TCP 1h +api-service ClusterIP 10.96.234.56 8080/TCP 45m""" + } + } + + def _run_kubectl_command(self, command: str) -> Dict[str, Any]: + """Run a kubectl command and return the result.""" + try: + result = subprocess.run( + command, + shell=True, + check=True, + capture_output=True, + text=True + ) + return { + "command": command, + "result": result.stdout.strip(), + "success": True + } + except subprocess.CalledProcessError as e: + logger.warning(f"kubectl command failed: {command} - {e}") + return { + "command": command, + "result": e.stderr.strip(), + "success": False, + "error": str(e) + } + except Exception as e: + logger.error(f"Error running kubectl command: {command} - {e}") + return { + "command": command, + "result": f"Error: {str(e)}", + "success": False, + "error": str(e) + } + + async def process_query(self, query: str) -> Dict[str, Any]: + """Process a natural language query for kubectl operations.""" + logger.info(f"Processing query: {query}") + + # Convert query to lowercase for easier matching + query_lower = query.lower() + + try: + # Determine the kubectl command based on the query + if "get all pods" in query_lower or "get pods" in query_lower: + if "namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get pods -n {namespace}" + else: + command = "kubectl get pods --all-namespaces" + else: + command = "kubectl get pods" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + mock_result = self.mock_data["get_pods"] + result = { + "command": command, + "result": mock_result["result"], + "mock": True + } + + elif "show namespaces" in query_lower or "get namespaces" in query_lower: + command = "kubectl get namespaces" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + mock_result = self.mock_data["get_namespaces"] + result = { + "command": command, + "result": mock_result["result"], + "mock": True + } + + elif "switch to namespace" in query_lower or "change namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl config set-context --current --namespace={namespace}" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + mock_result = self.mock_data["switch_namespace"] + result = { + "command": command, + "result": mock_result["result"].format(namespace), + "mock": True + } + + # Update current namespace + self.current_namespace = namespace + else: + result = { + "command": "kubectl config set-context", + "result": "Error: No namespace specified", + "success": False + } + + elif "current namespace" in query_lower or "what namespace" in query_lower: + command = "kubectl config view --minify --output 'jsonpath={..namespace}'" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + result = { + "command": command, + "result": self.current_namespace, + "mock": True + } + + elif "get contexts" in query_lower or "show contexts" in query_lower: + command = "kubectl config get-contexts" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + mock_result = self.mock_data["get_contexts"] + result = { + "command": command, + "result": mock_result["result"], + "mock": True + } + + elif "get deployments" in query_lower or "show deployments" in query_lower: + if "namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get deployments -n {namespace}" + else: + command = "kubectl get deployments --all-namespaces" + else: + command = f"kubectl get deployments -n {self.current_namespace}" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + mock_result = self.mock_data["get_deployments"] + result = { + "command": command, + "result": mock_result["result"], + "mock": True + } + + elif "get services" in query_lower or "show services" in query_lower: + if "namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get services -n {namespace}" + else: + command = "kubectl get services --all-namespaces" + else: + command = f"kubectl get services -n {self.current_namespace}" + + # Try to run the real command + result = self._run_kubectl_command(command) + + # If the command failed, use mock data + if not result.get("success", False): + logger.warning(f"Using mock data for: {command}") + mock_result = self.mock_data["get_services"] + result = { + "command": command, + "result": mock_result["result"], + "mock": True + } + + else: + # For unknown commands, try to run the query as a kubectl command + command = f"kubectl {query}" + result = self._run_kubectl_command(command) + + # If the command failed, return a helpful message + if not result.get("success", False): + result = { + "command": command, + "result": "Could not parse natural language query. Try commands like 'get all pods', 'show namespaces', or 'switch to namespace kube-system'.", + "success": False + } + + return result + except Exception as e: + logger.error(f"Error processing query: {query} - {e}") + return { + "command": "Error", + "result": f"Failed to process query: {str(e)}", + "success": False, + "error": str(e) + } + +class MCPServer: + """MCP server implementation for kubectl-mcp-tool.""" + + def __init__(self, name: str): + """Initialize the MCP server.""" + self.name = name + self.version = "0.1.0" + self.tools = KubectlTools() + logger.info(f"MCPServer initialized: {name} v{self.version}") + + async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Handle an MCP request.""" + logger.info(f"Handling request: {request.get('method', 'unknown')} (ID: {request.get('id', 'unknown')})") + + # Check if the request is valid + if "jsonrpc" not in request or request["jsonrpc"] != "2.0" or "method" not in request: + logger.error(f"Invalid request: {request}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32600, + "message": "Invalid Request" + } + } + + # Handle the request based on the method + method = request["method"] + + if method == "mcp.initialize": + return await self.handle_initialize(request) + elif method == "mcp.tools.list": + return await self.handle_tools_list(request) + elif method == "mcp.tool.call": + return await self.handle_tool_call(request) + else: + logger.error(f"Method not found: {method}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32601, + "message": f"Method not found: {method}" + } + } + + async def handle_initialize(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Handle an initialize request.""" + logger.info("Handling initialize request") + + # Return the server capabilities + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": { + "capabilities": { + "tools": { + "supported": True, + "properties": { + "arguments": True, + "descriptions": True + } + } + }, + "server_info": { + "name": self.name, + "version": self.version + } + } + } + + async def handle_tools_list(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Handle a tools/list request.""" + logger.info("Handling tools/list request") + + # Return the list of available tools + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": { + "tools": [ + { + "name": "process_natural_language", + "description": "Process natural language queries for kubectl operations", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Natural language query for kubectl operations" + } + }, + "required": ["query"] + } + }, + { + "name": "get_pods", + "description": "Get pods in the specified namespace", + "parameters": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to get pods from" + } + } + } + }, + { + "name": "get_namespaces", + "description": "Get all namespaces", + "parameters": { + "type": "object", + "properties": {} + } + }, + { + "name": "switch_namespace", + "description": "Switch to the specified namespace", + "parameters": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to switch to" + } + }, + "required": ["namespace"] + } + }, + { + "name": "get_current_namespace", + "description": "Get the current namespace", + "parameters": { + "type": "object", + "properties": {} + } + }, + { + "name": "get_deployments", + "description": "Get deployments in the specified namespace", + "parameters": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to get deployments from" + } + } + } + } + ] + } + } + + async def handle_tool_call(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Handle a tool/call request.""" + logger.info(f"Handling tool/call request: {request.get('params', {}).get('name', 'unknown')}") + + # Check if the request is valid + if "params" not in request or "name" not in request["params"]: + logger.error(f"Invalid tool/call request: {request}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32602, + "message": "Invalid params" + } + } + + # Get the tool name and input + tool_name = request["params"]["name"] + tool_input = request["params"].get("input", {}) + + # Handle the tool call based on the tool name + if tool_name == "process_natural_language": + if "query" not in tool_input: + logger.error(f"Missing required parameter 'query' for tool: {tool_name}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32602, + "message": "Missing required parameter: query" + } + } + + # Process the query + result = await self.tools.process_query(tool_input["query"]) + + # Return the result + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + elif tool_name == "get_pods": + namespace = tool_input.get("namespace", "") + query = f"get pods {f'in namespace {namespace}' if namespace else ''}" + result = await self.tools.process_query(query) + + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + elif tool_name == "get_namespaces": + result = await self.tools.process_query("show namespaces") + + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + elif tool_name == "switch_namespace": + if "namespace" not in tool_input: + logger.error(f"Missing required parameter 'namespace' for tool: {tool_name}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32602, + "message": "Missing required parameter: namespace" + } + } + + result = await self.tools.process_query(f"switch to namespace {tool_input['namespace']}") + + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + elif tool_name == "get_current_namespace": + result = await self.tools.process_query("what is my current namespace") + + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + elif tool_name == "get_deployments": + namespace = tool_input.get("namespace", "") + query = f"get deployments {f'in namespace {namespace}' if namespace else ''}" + result = await self.tools.process_query(query) + + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + else: + logger.error(f"Tool not found: {tool_name}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32601, + "message": f"Tool not found: {tool_name}" + } + } + +async def run_server(): + """Run the MCP server.""" + logger.info("Starting kubectl MCP server with stdio transport") + + # Create the server + server = MCPServer("kubectl-mcp-tool") + + # Create the stdio transport + stdio = StdioTransport() + + # Start the reader task + reader_task = asyncio.create_task(stdio.start_reader()) + + # Set up signal handlers for graceful shutdown + loop = asyncio.get_running_loop() + for signal_name in ('SIGINT', 'SIGTERM'): + try: + loop.add_signal_handler( + getattr(signal, signal_name), + lambda: asyncio.create_task(shutdown(stdio, reader_task)) + ) + except (NotImplementedError, AttributeError): + # Windows doesn't support SIGINT/SIGTERM + pass + + try: + # Process messages + async for message in stdio.read_message(): + # Handle the message + response = await server.handle_request(message) + + # Send the response + await stdio.write_message(response) + except Exception as e: + logger.error(f"Error in run_server: {e}", exc_info=True) + finally: + # Clean up + await shutdown(stdio, reader_task) + + logger.info("MCP server shutdown complete") + +async def shutdown(stdio, reader_task): + """Shutdown the server gracefully.""" + logger.info("Shutting down MCP server") + stdio.running = False + reader_task.cancel() + try: + await reader_task + except asyncio.CancelledError: + pass + +if __name__ == "__main__": + try: + # Open a separate log file for startup debugging + with open(os.path.join(LOG_DIR, "cursor_mcp_startup.log"), "w") as startup_log: + startup_log.write(f"Starting MCP server at {os.path.abspath(__file__)}\n") + startup_log.write(f"Current directory: {os.getcwd()}\n") + startup_log.write(f"Python path: {sys.path}\n") + startup_log.flush() + + # Set stdin and stdout to binary mode on Windows + if sys.platform == 'win32': + import os, msvcrt + msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) + + # Run the server in a way that catches all exceptions + asyncio.run(run_server()) + except KeyboardInterrupt: + logger.info("Server stopped by keyboard interrupt") + # Write to debug file in case logging isn't working + with open(os.path.join(LOG_DIR, "cursor_mcp_error.log"), "a") as f: + f.write("Server stopped by keyboard interrupt\n") + except Exception as e: + # Log the error and also write to a separate file in case logging is broken + logger.error(f"Server error: {e}", exc_info=True) + with open(os.path.join(LOG_DIR, "cursor_mcp_error.log"), "a") as f: + f.write(f"Server error: {str(e)}\n") + import traceback + traceback.print_exc(file=f) diff --git a/compatible_servers 2/generic/kubectl_jsonrpc_server.py b/compatible_servers 2/generic/kubectl_jsonrpc_server.py new file mode 100755 index 0000000..aabc352 --- /dev/null +++ b/compatible_servers 2/generic/kubectl_jsonrpc_server.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +""" +Simple JSON-RPC based MCP server for kubectl operations. +This server uses standard JSON-RPC 2.0 to handle MCP protocol requests. +""" + +import asyncio +import json +import os +import sys +import subprocess +import traceback +from datetime import datetime + +# Create logs directory +LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") +os.makedirs(LOG_DIR, exist_ok=True) + +# Open debug log file +debug_file = open(os.path.join(LOG_DIR, "kubectl_jsonrpc_debug.log"), "w") + +def log(message): + """Write a log message to the debug file and stderr.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + debug_msg = f"{timestamp} - {message}" + print(f"DEBUG: {debug_msg}", file=sys.stderr) + debug_file.write(f"{debug_msg}\n") + debug_file.flush() + +log(f"Starting kubectl JSON-RPC server") +log(f"Python version: {sys.version}") +log(f"Current directory: {os.getcwd()}") + +# Server information +SERVER_NAME = "kubectl-mcp-tool" +SERVER_VERSION = "0.1.0" + +async def read_line(): + """Read a line from stdin asynchronously.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, sys.stdin.readline) + +def write_response(response): + """Write a response to stdout.""" + json_str = json.dumps(response) + log(f"Sending response: {json_str}") + print(json_str, flush=True) + +def run_kubectl_command(command): + """Run a kubectl command and return the result.""" + log(f"Running kubectl command: {command}") + try: + result = subprocess.run( + command, shell=True, check=True, capture_output=True, text=True + ) + return { + "command": command, + "output": result.stdout.strip(), + "success": True + } + except subprocess.CalledProcessError as e: + log(f"kubectl command failed: {e}") + return { + "command": command, + "output": e.stderr.strip(), + "success": False, + "error": str(e) + } + except Exception as e: + log(f"Error running kubectl command: {e}") + return { + "command": command, + "output": f"Error: {str(e)}", + "success": False, + "error": str(e) + } + +def process_natural_language(query): + """Process a natural language query for kubectl operations.""" + log(f"Processing natural language query: {query}") + + query_lower = query.lower() + + if "get pods" in query_lower: + command = "kubectl get pods" + if "namespace" in query_lower: + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get pods -n {namespace}" + return run_kubectl_command(command) + + elif "namespaces" in query_lower: + command = "kubectl get namespaces" + return run_kubectl_command(command) + + elif "current context" in query_lower: + command = "kubectl config current-context" + return run_kubectl_command(command) + + else: + # Try as direct kubectl command + command = f"kubectl {query}" + return run_kubectl_command(command) + +async def run_server(): + """Run the JSON-RPC server for MCP.""" + log("Starting server loop") + + try: + while True: + log("Waiting for message") + line = await read_line() + if not line: + log("Empty line received, exiting") + break + + log(f"Received: {line.strip()}") + + try: + request = json.loads(line) + request_id = request.get("id", 0) + method = request.get("method") + + if method == "initialize": + # MCP initialization + log("Handling initialize method") + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": { + "name": SERVER_NAME, + "version": SERVER_VERSION, + "capabilities": { + "tools": [ + { + "name": "process_natural_language", + "description": "Process natural language queries for kubectl operations", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Natural language query" + } + }, + "required": ["query"] + } + }, + { + "name": "get_pods", + "description": "Get all pods in the specified namespace", + "parameters": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Optional namespace (default: current namespace)" + } + } + } + }, + { + "name": "get_namespaces", + "description": "Get all namespaces", + "parameters": { + "type": "object", + "properties": {} + } + }, + { + "name": "run_kubectl_command", + "description": "Run a raw kubectl command", + "parameters": { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "The kubectl command to run" + } + }, + "required": ["command"] + } + } + ] + } + } + } + write_response(response) + + elif method == "callTool": + # Tool call handling + log("Handling callTool method") + params = request.get("params", {}) + tool_name = params.get("name") + tool_params = params.get("parameters", {}) + + log(f"Tool call: {tool_name} with params {tool_params}") + + if tool_name == "process_natural_language": + query = tool_params.get("query", "") + result = process_natural_language(query) + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": result + } + + elif tool_name == "get_pods": + namespace = tool_params.get("namespace", "") + cmd = f"kubectl get pods" + (f" -n {namespace}" if namespace else "") + result = run_kubectl_command(cmd) + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": result + } + + elif tool_name == "get_namespaces": + result = run_kubectl_command("kubectl get namespaces") + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": result + } + + elif tool_name == "run_kubectl_command": + cmd = tool_params.get("command", "") + result = run_kubectl_command(cmd) + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": result + } + + else: + response = { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": -32601, + "message": f"Unknown tool: {tool_name}" + } + } + + write_response(response) + + elif method == "shutdown": + # Handle shutdown request + log("Received shutdown request") + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": None + } + write_response(response) + break + + else: + # Unknown method + log(f"Unknown method: {method}") + response = { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": -32601, + "message": f"Method not found: {method}" + } + } + write_response(response) + + except json.JSONDecodeError as e: + log(f"Failed to parse JSON: {e}") + error_response = { + "jsonrpc": "2.0", + "id": None, + "error": { + "code": -32700, + "message": f"Parse error: {str(e)}" + } + } + write_response(error_response) + + except Exception as e: + log(f"Error processing request: {e}\n{traceback.format_exc()}") + error_response = { + "jsonrpc": "2.0", + "id": request.get("id") if "request" in locals() else None, + "error": { + "code": -32603, + "message": f"Internal error: {str(e)}" + } + } + write_response(error_response) + + except Exception as e: + log(f"Fatal error: {e}\n{traceback.format_exc()}") + finally: + log("Server shutting down") + +if __name__ == "__main__": + try: + # Set env var for unbuffered output + os.environ["PYTHONUNBUFFERED"] = "1" + + # Run the server + asyncio.run(run_server()) + except KeyboardInterrupt: + log("Server stopped by keyboard interrupt") + except Exception as e: + log(f"Fatal error: {e}\n{traceback.format_exc()}") + finally: + debug_file.close() \ No newline at end of file diff --git a/compatible_servers 2/generic/kubectl_mcp_server.py b/compatible_servers 2/generic/kubectl_mcp_server.py new file mode 100755 index 0000000..ed3ec37 --- /dev/null +++ b/compatible_servers 2/generic/kubectl_mcp_server.py @@ -0,0 +1,619 @@ +#!/usr/bin/env python3 +""" +A simple MCP server for kubectl that follows the Model Context Protocol specification +""" + +import asyncio +import json +import logging +import os +import subprocess +import sys +import uuid +import select +import time +from typing import Any, Dict, List, Optional, TextIO, Tuple, Union + +# Set up logging +LOG_DIR = "logs" +os.makedirs(LOG_DIR, exist_ok=True) +logging.basicConfig( + level=logging.DEBUG if os.environ.get("MCP_DEBUG") else logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler(os.path.join(LOG_DIR, "kubectl_mcp_server.log")), + logging.StreamHandler(sys.stderr), + ], +) +logger = logging.getLogger("kubectl-mcp-server") + +# Set default kubeconfig path +DEFAULT_KUBECONFIG = os.path.expanduser("~/.kube/config") +KUBECONFIG = os.environ.get("KUBECONFIG", DEFAULT_KUBECONFIG) + +logger.info(f"Using kubeconfig from: {KUBECONFIG}") + +class KubectlMcpServer: + """ + A simple MCP server that executes kubectl commands based on natural language input. + """ + + def __init__(self): + self.initialized = False + self.request_id_counter = 0 + self.tools = [ + { + "name": "get_pods", + "description": "Get information about pods in the current namespace or all namespaces", + "inputSchema": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "The namespace to get pods from, or 'all' for all namespaces" + }, + "label_selector": { + "type": "string", + "description": "Label selector to filter pods" + } + } + } + }, + { + "name": "get_namespaces", + "description": "Get available namespaces", + "inputSchema": { + "type": "object", + "properties": {} + } + }, + { + "name": "describe", + "description": "Describe a Kubernetes resource", + "inputSchema": { + "type": "object", + "required": ["resource_type", "name"], + "properties": { + "resource_type": { + "type": "string", + "description": "Type of resource (pod, deployment, service, etc.)" + }, + "name": { + "type": "string", + "description": "Name of the resource" + }, + "namespace": { + "type": "string", + "description": "Namespace of the resource" + } + } + } + }, + { + "name": "kubectl", + "description": "Run a raw kubectl command", + "inputSchema": { + "type": "object", + "required": ["command"], + "properties": { + "command": { + "type": "string", + "description": "The kubectl command to run" + } + } + } + }, + { + "name": "switch_namespace", + "description": "Switch to a different namespace", + "inputSchema": { + "type": "object", + "required": ["namespace"], + "properties": { + "namespace": { + "type": "string", + "description": "The namespace to switch to" + } + } + } + }, + { + "name": "get_current_context", + "description": "Get information about the current Kubernetes context and cluster", + "inputSchema": { + "type": "object", + "properties": {} + } + }, + { + "name": "check_cluster_status", + "description": "Check if the Kubernetes cluster is accessible and running", + "inputSchema": { + "type": "object", + "properties": {} + } + } + ] + + # Check cluster connection on startup + asyncio.get_event_loop().create_task(self.check_cluster_connection()) + + async def check_cluster_connection(self) -> None: + """Check if we can connect to the Kubernetes cluster.""" + try: + # First make sure kubectl is available + try: + version_cmd = ["kubectl", "version", "--client"] + env = os.environ.copy() + env["KUBECONFIG"] = KUBECONFIG + + result = subprocess.run( + version_cmd, + capture_output=True, + text=True, + env=env + ) + + if result.returncode == 0: + logger.info(f"kubectl client version: {result.stdout.strip()}") + else: + logger.warning(f"kubectl client check failed: {result.stderr}") + except Exception as e: + logger.error(f"Error checking kubectl client: {str(e)}") + + # Simple command to check if cluster is accessible + max_retries = 3 + retry_count = 0 + connected = False + + while retry_count < max_retries and not connected: + try: + result = await self.run_kubernetes_command(["kubectl", "cluster-info"]) + if "Kubernetes control plane" in result and "Error" not in result: + logger.info("Successfully connected to Kubernetes cluster") + connected = True + else: + logger.warning(f"Cluster connection check returned: {result}") + retry_count += 1 + await asyncio.sleep(2) # Wait before retrying + except Exception as e: + logger.error(f"Failed to connect to Kubernetes cluster (attempt {retry_count+1}): {str(e)}") + retry_count += 1 + await asyncio.sleep(2) # Wait before retrying + + # Log cluster status + if connected: + logger.info("Kubernetes cluster is ready") + else: + logger.warning("Could not connect to Kubernetes cluster after multiple attempts") + + except Exception as e: + logger.error(f"Unexpected error checking cluster connection: {str(e)}") + + async def run_kubernetes_command(self, cmd: List[str]) -> str: + """Run a kubectl command and return the output.""" + logger.debug(f"Running kubectl command: {' '.join(cmd)}") + try: + env = os.environ.copy() + env["KUBECONFIG"] = KUBECONFIG + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + env=env + ) + return result.stdout + except subprocess.CalledProcessError as e: + error_msg = f"Command failed with exit code {e.returncode}: {e.stderr}" + logger.error(error_msg) + + # Provide more helpful error messages for common issues + if "connection refused" in e.stderr.lower(): + return f"Error: Could not connect to the Kubernetes cluster. Please check if the cluster is running and accessible.\n\nDetails: {e.stderr}" + elif "current-context is not set" in e.stderr.lower(): + return f"Error: No Kubernetes context is set. Please set a context using 'kubectl config use-context '.\n\nDetails: {e.stderr}" + elif "unauthorized" in e.stderr.lower(): + return f"Error: Unauthorized access to the Kubernetes API. Please check your credentials.\n\nDetails: {e.stderr}" + else: + return f"Error: {error_msg}" + except Exception as e: + error_msg = f"Unexpected error running command: {str(e)}" + logger.error(error_msg) + return f"Error: {error_msg}" + + async def handle_get_pods(self, args: Dict[str, Any]) -> str: + """Handle the get_pods tool call.""" + cmd = ["kubectl", "get", "pods"] + + namespace = args.get("namespace") + if namespace: + if namespace.lower() == "all": + cmd.append("--all-namespaces") + else: + cmd.extend(["-n", namespace]) + + label_selector = args.get("label_selector") + if label_selector: + cmd.extend(["-l", label_selector]) + + return await self.run_kubernetes_command(cmd) + + async def handle_get_namespaces(self, args: Dict[str, Any]) -> str: + """Handle the get_namespaces tool call.""" + cmd = ["kubectl", "get", "namespaces"] + return await self.run_kubernetes_command(cmd) + + async def handle_describe(self, args: Dict[str, Any]) -> str: + """Handle the describe tool call.""" + cmd = ["kubectl", "describe", args["resource_type"], args["name"]] + + namespace = args.get("namespace") + if namespace: + cmd.extend(["-n", namespace]) + + return await self.run_kubernetes_command(cmd) + + async def handle_kubectl(self, args: Dict[str, Any]) -> str: + """Handle the kubectl tool call.""" + # Split the command, respecting quoted strings + cmd_string = args["command"].strip() + logger.debug(f"Raw kubectl command: {cmd_string}") + + if not cmd_string.startswith("kubectl"): + cmd_string = "kubectl " + cmd_string + + # Process command into appropriate pieces + try: + import shlex + parts = shlex.split(cmd_string) + # Ensure the first part is kubectl + if parts[0] != "kubectl": + return f"Error: First command must be kubectl, got: {parts[0]}" + + logger.debug(f"Parsed command parts: {parts}") + return await self.run_kubernetes_command(parts) + except Exception as e: + error_msg = f"Error parsing kubectl command: {str(e)}" + logger.error(error_msg) + return error_msg + + async def handle_switch_namespace(self, args: Dict[str, Any]) -> str: + """Handle the switch_namespace tool call.""" + namespace = args["namespace"] + cmd = ["kubectl", "config", "set-context", "--current", "--namespace", namespace] + result = await self.run_kubernetes_command(cmd) + return f"Switched to namespace {namespace}. {result}" + + async def handle_get_current_context(self, args: Dict[str, Any]) -> str: + """Handle the get_current_context tool call.""" + # Get current context + context_cmd = ["kubectl", "config", "current-context"] + context_result = await self.run_kubernetes_command(context_cmd) + + # Get cluster info + cluster_cmd = ["kubectl", "cluster-info"] + cluster_result = await self.run_kubernetes_command(cluster_cmd) + + # Get current namespace + namespace_cmd = ["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"] + namespace_result = await self.run_kubernetes_command(namespace_cmd) + + if not namespace_result or "Error:" in namespace_result: + namespace_result = "default" + + return f"Current context: {context_result}\nCurrent namespace: {namespace_result}\nCluster info:\n{cluster_result}" + + async def handle_check_cluster_status(self, args: Dict[str, Any]) -> str: + """Handle the check_cluster_status tool call.""" + # Check if the cluster is accessible + try: + # Check if we can connect to the cluster + cluster_cmd = ["kubectl", "cluster-info"] + cluster_result = await self.run_kubernetes_command(cluster_cmd) + + # Check the status of nodes + nodes_cmd = ["kubectl", "get", "nodes"] + nodes_result = await self.run_kubernetes_command(nodes_cmd) + + # Check the status of core services + core_services_cmd = ["kubectl", "get", "pods", "-n", "kube-system"] + core_services_result = await self.run_kubernetes_command(core_services_cmd) + + return f"Cluster Status: Connected\n\nCluster Info:\n{cluster_result}\n\nNodes:\n{nodes_result}\n\nCore Services:\n{core_services_result}" + + except Exception as e: + return f"Cluster Status: Error - Not connected\n\nError Details: {str(e)}" + + async def handle_tool_call(self, tool_name: str, args: Dict[str, Any]) -> str: + """Dispatch to the appropriate tool handler.""" + handlers = { + "get_pods": self.handle_get_pods, + "get_namespaces": self.handle_get_namespaces, + "describe": self.handle_describe, + "kubectl": self.handle_kubectl, + "switch_namespace": self.handle_switch_namespace, + "get_current_context": self.handle_get_current_context, + "check_cluster_status": self.handle_check_cluster_status, + } + + if tool_name in handlers: + return await handlers[tool_name](args) + else: + return f"Error: Unknown tool '{tool_name}'" + + def get_next_request_id(self) -> str: + """Generate a unique request ID.""" + self.request_id_counter += 1 + return str(self.request_id_counter) + + async def handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Handle the initialize request.""" + logger.info(f"Handling initialize request: {params}") + self.initialized = True + + return { + "id": params.get("id", self.get_next_request_id()), + "jsonrpc": "2.0", + "result": { + "serverInfo": { + "name": "kubectl-mcp-server", + "version": "0.1.0" + }, + "capabilities": { + "tools": {} + } + } + } + + async def handle_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Handle the tools/list request.""" + logger.info("Handling tools/list request") + + return { + "id": params.get("id", self.get_next_request_id()), + "jsonrpc": "2.0", + "result": { + "tools": self.tools + } + } + + async def handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Handle the tools/call request.""" + logger.info(f"Handling tools/call request: {params}") + + tool_name = params["params"]["name"] + arguments = params["params"]["arguments"] + + result = await self.handle_tool_call(tool_name, arguments) + + return { + "id": params.get("id", self.get_next_request_id()), + "jsonrpc": "2.0", + "result": { + "result": [ + { + "type": "text", + "text": result + } + ] + } + } + + async def handle_message(self, message: Dict[str, Any]) -> Dict[str, Any]: + """Handle an incoming JSON-RPC message.""" + method = message.get("method") + + if not self.initialized and method != "initialize": + return { + "id": message.get("id", self.get_next_request_id()), + "jsonrpc": "2.0", + "error": { + "code": -32002, + "message": "Server not initialized" + } + } + + handlers = { + "initialize": self.handle_initialize, + "tools/list": self.handle_list_tools, + "tools/call": self.handle_call_tool, + } + + if method in handlers: + return await handlers[method](message) + else: + return { + "id": message.get("id", self.get_next_request_id()), + "jsonrpc": "2.0", + "error": { + "code": -32601, + "message": f"Method not found: {method}" + } + } + + async def process_stdin_line(self, line: str) -> None: + """Process a single line from stdin and send response to stdout.""" + if not line.strip(): + logger.debug("Received empty line, ignoring") + return + + logger.debug(f"Received: {line}") + + try: + message = json.loads(line) + response = await self.handle_message(message) + + if response: + response_str = json.dumps(response) + logger.debug(f"Sending: {response_str}") + try: + print(response_str, flush=True) + except (BrokenPipeError, IOError) as e: + logger.error(f"Failed to write to stdout: {str(e)}") + # Don't exit, just log the error + + except json.JSONDecodeError: + logger.error(f"Failed to parse JSON: {line}") + try: + error_response = { + "jsonrpc": "2.0", + "error": { + "code": -32700, + "message": "Parse error" + }, + "id": None + } + print(json.dumps(error_response), flush=True) + except (BrokenPipeError, IOError) as e: + logger.error(f"Failed to write error to stdout: {str(e)}") + + except Exception as e: + logger.exception(f"Error handling message: {str(e)}") + try: + error_response = { + "jsonrpc": "2.0", + "error": { + "code": -32603, + "message": f"Internal error: {str(e)}" + }, + "id": None + } + print(json.dumps(error_response), flush=True) + except (BrokenPipeError, IOError) as e: + logger.error(f"Failed to write error to stdout: {str(e)}") + + def _handle_stdin(self) -> None: + """Read and process a line from stdin.""" + try: + line = sys.stdin.readline() + if not line: # EOF + logger.info("Received EOF, client may have disconnected") + # Don't exit immediately on EOF, give a chance for reconnection + asyncio.get_event_loop().call_later(5, self._check_stdin_status) + return + + asyncio.create_task(self.process_stdin_line(line)) + except Exception as e: + logger.error(f"Error reading from stdin: {str(e)}") + # Don't exit, try to continue reading + + def _check_stdin_status(self) -> None: + """Check if stdin is still available after EOF.""" + try: + # Try to peek at stdin to see if we can read anything + if sys.stdin.isatty() or select.select([sys.stdin], [], [], 0)[0]: + logger.info("stdin is still available, continuing to read") + # Re-add the reader if it was removed + loop = asyncio.get_event_loop() + try: + loop.add_reader(sys.stdin.fileno(), self._handle_stdin) + except Exception: + # Reader might already be registered + pass + else: + logger.info("stdin appears to be closed, but keeping server running") + except Exception as e: + logger.error(f"Error checking stdin status: {str(e)}") + + async def run(self) -> None: + """Run the MCP server, processing commands from stdin and responding to stdout.""" + logger.info("Starting kubectl MCP server") + + loop = asyncio.get_event_loop() + + # Set up non-blocking stdin reading + loop.add_reader(sys.stdin.fileno(), self._handle_stdin) + + # Set up a heartbeat to keep the connection alive + heartbeat_task = asyncio.create_task(self._send_heartbeat()) + + try: + # Just wait indefinitely until interrupted + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down kubectl MCP server") + finally: + heartbeat_task.cancel() + try: + await heartbeat_task + except asyncio.CancelledError: + pass + loop.remove_reader(sys.stdin.fileno()) + + async def _send_heartbeat(self) -> None: + """Send a heartbeat every 30 seconds to keep the connection alive.""" + while True: + try: + await asyncio.sleep(30) + # Log a heartbeat but don't send to stdout to avoid interfering with MCP protocol + logger.debug("Heartbeat: MCP server is alive") + except asyncio.CancelledError: + logger.debug("Heartbeat task cancelled") + break + except Exception as e: + logger.error(f"Error in heartbeat: {str(e)}") + +def main() -> None: + """Main entry point.""" + logger.info(f"Starting kubectl MCP server, Python version: {sys.version}") + logger.info(f"Current directory: {os.getcwd()}") + + # Check if we can access kubectl before starting + try: + result = subprocess.run( + ["kubectl", "version", "--client"], + capture_output=True, + text=True, + env=os.environ.copy() + ) + if result.returncode == 0: + logger.info(f"kubectl client check passed: {result.stdout.strip()}") + else: + logger.warning(f"kubectl client check warning: {result.stderr}") + except Exception as e: + logger.warning(f"kubectl availability check failed: {str(e)}") + + # Create server instance + server = KubectlMcpServer() + + # Set up signal handlers for graceful shutdown + try: + import signal + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}, shutting down gracefully") + asyncio.get_event_loop().stop() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + except Exception as e: + logger.warning(f"Failed to set up signal handlers: {str(e)}") + + # Run the server with retry logic + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + try: + logger.info(f"Starting server (attempt {retry_count + 1}/{max_retries})") + asyncio.run(server.run()) + break # If we get here without an exception, break the loop + except KeyboardInterrupt: + logger.info("Interrupted by user, shutting down") + break + except Exception as e: + retry_count += 1 + logger.exception(f"Unhandled error (attempt {retry_count}/{max_retries}): {str(e)}") + if retry_count < max_retries: + logger.info(f"Retrying in 3 seconds...") + time.sleep(3) + else: + logger.error("Maximum retry attempts reached, shutting down") + return 1 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/compatible_servers 2/generic/mcp_kubectl_server.py b/compatible_servers 2/generic/mcp_kubectl_server.py new file mode 100755 index 0000000..71708d9 --- /dev/null +++ b/compatible_servers 2/generic/mcp_kubectl_server.py @@ -0,0 +1,449 @@ +#!/usr/bin/env python3 +""" +MCP-compliant server for kubectl operations. + +This server implements the Model Context Protocol (MCP) to provide +Kubernetes operations through kubectl to AI assistants. +""" + +import asyncio +import json +import logging +import os +import subprocess +import sys +import traceback +from typing import Any, Dict, List, Optional + +# Create log directory +LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") +os.makedirs(LOG_DIR, exist_ok=True) + +# Configure logging with more detailed format +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(filename)s:%(lineno)d", + filename=os.path.join(LOG_DIR, "mcp_kubectl_server.log"), +) +logger = logging.getLogger("mcp-kubectl-server") + +# Add console handler +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +formatter = logging.Formatter("%(levelname)s: %(message)s") +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + +# Log startup information +logger.info("Starting MCP kubectl server") +logger.info(f"Python version: {sys.version}") +logger.info(f"Current directory: {os.getcwd()}") + +# Create a debug log file +debug_file = open(os.path.join(LOG_DIR, "debug.log"), "w") +debug_file.write(f"Starting MCP kubectl server at {os.path.abspath(__file__)}\n") +debug_file.write(f"Current directory: {os.getcwd()}\n") +debug_file.flush() + +try: + # Import MCP SDK - use specific version for compatibility + debug_file.write("Attempting to import MCP SDK...\n") + debug_file.flush() + + # Try to directly install the MCP package first + subprocess.run([sys.executable, "-m", "pip", "install", "mcp>=1.4.0"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + from mcp.server import Server + from mcp.server import stdio + import mcp.types as types + from mcp.server.models import InitializationOptions + from mcp.server.lowlevel import NotificationOptions + + debug_file.write("Successfully imported MCP SDK\n") + debug_file.flush() + logger.info("Successfully imported MCP SDK") +except ImportError as e: + error_msg = f"Failed to import MCP SDK: {e}" + logger.error(error_msg) + debug_file.write(f"{error_msg}\n") + debug_file.write(f"Python path: {sys.path}\n") + debug_file.flush() + + logger.info("Attempting to install MCP SDK...") + try: + # Try more direct installation methods + debug_file.write("Installing MCP from git repo...\n") + debug_file.flush() + subprocess.check_call([ + sys.executable, "-m", "pip", "install", + "git+https://github.com/modelcontextprotocol/python-sdk.git" + ]) + + from mcp.server import Server + from mcp.server import stdio + import mcp.types as types + from mcp.server.models import InitializationOptions + from mcp.server.lowlevel import NotificationOptions + + logger.info("Successfully installed and imported MCP SDK") + debug_file.write("Successfully installed MCP from git\n") + debug_file.flush() + except Exception as e: + critical_error = f"Failed to install MCP SDK: {e}" + logger.critical(critical_error) + debug_file.write(f"{critical_error}\n") + debug_file.write(traceback.format_exc()) + debug_file.flush() + sys.exit(1) + +class KubectlServer: + """Implementation of kubectl operations for MCP.""" + + def __init__(self): + """Initialize the kubectl server.""" + self.current_namespace = "default" + # Create server with version but remove description parameter which is no longer supported + self.server = Server( + name="kubectl-mcp-tool", + version="0.1.0" + ) + self.setup_tools() + logger.info("KubectlServer initialized") + debug_file.write("KubectlServer initialized\n") + debug_file.flush() + + def setup_tools(self): + """Set up the tools for the server.""" + debug_file.write("Setting up tools...\n") + debug_file.flush() + + @self.server.call_tool("process_natural_language") + async def process_natural_language(query: str) -> Dict[str, Any]: + """Process natural language queries for kubectl operations.""" + logger.info(f"Processing natural language query: {query}") + debug_file.write(f"Processing query: {query}\n") + debug_file.flush() + return await self._process_query(query) + + @self.server.call_tool("get_pods") + async def get_pods(namespace: Optional[str] = None) -> Dict[str, Any]: + """Get pods in the specified namespace.""" + query = f"get pods {f'in namespace {namespace}' if namespace else ''}" + debug_file.write(f"get_pods called with namespace={namespace}\n") + debug_file.flush() + return await self._process_query(query) + + @self.server.call_tool("get_namespaces") + async def get_namespaces() -> Dict[str, Any]: + """Get all namespaces.""" + debug_file.write("get_namespaces called\n") + debug_file.flush() + return await self._process_query("show namespaces") + + @self.server.call_tool("switch_namespace") + async def switch_namespace(namespace: str) -> Dict[str, Any]: + """Switch to the specified namespace.""" + debug_file.write(f"switch_namespace called with namespace={namespace}\n") + debug_file.flush() + return await self._process_query(f"switch to namespace {namespace}") + + @self.server.call_tool("get_current_namespace") + async def get_current_namespace() -> Dict[str, Any]: + """Get the current namespace.""" + debug_file.write("get_current_namespace called\n") + debug_file.flush() + return await self._process_query("what is my current namespace") + + @self.server.call_tool("get_deployments") + async def get_deployments(namespace: Optional[str] = None) -> Dict[str, Any]: + """Get deployments in the specified namespace.""" + query = f"get deployments {f'in namespace {namespace}' if namespace else ''}" + debug_file.write(f"get_deployments called with namespace={namespace}\n") + debug_file.flush() + return await self._process_query(query) + + @self.server.call_tool("get_services") + async def get_services(namespace: Optional[str] = None) -> Dict[str, Any]: + """Get services in the specified namespace.""" + query = f"get services {f'in namespace {namespace}' if namespace else ''}" + debug_file.write(f"get_services called with namespace={namespace}\n") + debug_file.flush() + return await self._process_query(query) + + @self.server.call_tool("describe_resource") + async def describe_resource(resource_type: str, name: str, namespace: Optional[str] = None) -> Dict[str, Any]: + """Describe a Kubernetes resource.""" + query = f"describe {resource_type} {name} {f'in namespace {namespace}' if namespace else ''}" + debug_file.write(f"describe_resource called with type={resource_type}, name={name}, namespace={namespace}\n") + debug_file.flush() + return await self._process_query(query) + + @self.server.call_tool("run_kubectl_command") + async def run_kubectl_command(command: str) -> Dict[str, Any]: + """Run a raw kubectl command.""" + debug_file.write(f"run_kubectl_command called with command={command}\n") + debug_file.flush() + return await self._process_query(command) + + logger.info("Tools setup complete") + debug_file.write("Tools setup complete\n") + debug_file.flush() + + def _run_kubectl_command(self, command: str) -> Dict[str, Any]: + """Run a kubectl command and return the result.""" + debug_file.write(f"Running kubectl command: {command}\n") + debug_file.flush() + + try: + result = subprocess.run( + command, shell=True, check=True, capture_output=True, text=True + ) + output = result.stdout.strip() + debug_file.write(f"Command success, output length: {len(output)}\n") + debug_file.flush() + + return { + "command": command, + "result": output, + "success": True, + } + except subprocess.CalledProcessError as e: + error_msg = f"kubectl command failed: {command} - {e}" + logger.warning(error_msg) + debug_file.write(f"{error_msg}\nError output: {e.stderr}\n") + debug_file.flush() + + return { + "command": command, + "result": e.stderr.strip(), + "success": False, + "error": str(e), + } + except Exception as e: + error_msg = f"Error running kubectl command: {command} - {e}" + logger.error(error_msg) + debug_file.write(f"{error_msg}\n{traceback.format_exc()}\n") + debug_file.flush() + + return { + "command": command, + "result": f"Error: {str(e)}", + "success": False, + "error": str(e), + } + + async def _process_query(self, query: str) -> Dict[str, Any]: + """Process a natural language query for kubectl operations.""" + logger.info(f"Processing query: {query}") + debug_file.write(f"Processing query: {query}\n") + debug_file.flush() + + # Convert query to lowercase for easier matching + query_lower = query.lower() + + try: + # Determine the kubectl command based on the query + if "get all pods" in query_lower or "get pods" in query_lower: + if "namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get pods -n {namespace}" + else: + command = "kubectl get pods --all-namespaces" + else: + command = "kubectl get pods" + + elif "show namespaces" in query_lower or "get namespaces" in query_lower: + command = "kubectl get namespaces" + + elif "switch to namespace" in query_lower or "change namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl config set-context --current --namespace={namespace}" + # Update current namespace if command succeeds + self.current_namespace = namespace + else: + return { + "command": "kubectl config set-context", + "result": "Error: No namespace specified", + "success": False + } + + elif "current namespace" in query_lower or "what namespace" in query_lower: + command = "kubectl config view --minify --output 'jsonpath={..namespace}'" + + elif "get contexts" in query_lower or "show contexts" in query_lower: + command = "kubectl config get-contexts" + + elif "get deployments" in query_lower or "show deployments" in query_lower: + if "namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get deployments -n {namespace}" + else: + command = "kubectl get deployments --all-namespaces" + else: + command = f"kubectl get deployments -n {self.current_namespace}" + + elif "get services" in query_lower or "show services" in query_lower: + if "namespace" in query_lower: + # Extract namespace name + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl get services -n {namespace}" + else: + command = "kubectl get services --all-namespaces" + else: + command = f"kubectl get services -n {self.current_namespace}" + + elif "describe" in query_lower: + # Try to parse a describe command + words = query_lower.split() + if len(words) >= 3 and words[0] == "describe": + resource_type = words[1] + name = words[2] + if "namespace" in query_lower: + parts = query_lower.split("namespace") + if len(parts) > 1: + namespace = parts[1].strip() + command = f"kubectl describe {resource_type} {name} -n {namespace}" + else: + command = f"kubectl describe {resource_type} {name}" + else: + command = f"kubectl describe {resource_type} {name} -n {self.current_namespace}" + else: + command = f"kubectl {query}" + else: + # For unknown commands, try to run the query as a kubectl command + command = f"kubectl {query}" + + debug_file.write(f"Determined command: {command}\n") + debug_file.flush() + + # Run the command + result = self._run_kubectl_command(command) + return result + except Exception as e: + error_msg = f"Error processing query: {query} - {e}" + logger.error(error_msg) + debug_file.write(f"{error_msg}\n{traceback.format_exc()}\n") + debug_file.flush() + + return { + "command": "Error", + "result": f"Failed to process query: {str(e)}", + "success": False, + "error": str(e) + } + + async def run(self): + """Run the server with manual initialization.""" + logger.info("Starting KubectlServer run") + debug_file.write("Starting KubectlServer run method\n") + debug_file.flush() + + try: + # Use the lower-level stdio server for more control + debug_file.write("Initializing stdio transport\n") + debug_file.flush() + + async with stdio.stdio_server() as (read_stream, write_stream): + debug_file.write("Stdio transport started\n") + debug_file.flush() + + # Create initialization options + init_options = InitializationOptions( + server_name="kubectl-mcp-tool", + server_version="0.1.0", + capabilities=self.server.get_capabilities( + notification_options=NotificationOptions(), + experimental_capabilities={}, + ), + ) + + debug_file.write(f"Running server with initialization options: {init_options}\n") + debug_file.flush() + + # Run the server with explicit initialization + await self.server.run( + read_stream, + write_stream, + initialization_options=init_options + ) + + logger.info("Server stopped normally") + debug_file.write("Server stopped normally\n") + debug_file.flush() + + except Exception as e: + error_msg = f"Error running server: {e}" + logger.error(error_msg, exc_info=True) + debug_file.write(f"{error_msg}\n{traceback.format_exc()}\n") + debug_file.flush() + + # Log to a separate file in case logging is broken + with open(os.path.join(LOG_DIR, "server_error.log"), "a") as f: + f.write(f"Server error: {str(e)}\n") + traceback.print_exc(file=f) + + +async def main(): + """Run the MCP kubectl server.""" + debug_file.write("Entering main function\n") + debug_file.flush() + + server = KubectlServer() + debug_file.write("KubectlServer instance created\n") + debug_file.flush() + + await server.run() + debug_file.write("Server run completed\n") + debug_file.flush() + + +if __name__ == "__main__": + try: + # Open a separate log file for startup debugging + with open(os.path.join(LOG_DIR, "startup.log"), "w") as startup_log: + startup_log.write(f"Starting MCP kubectl server at {os.path.abspath(__file__)}\n") + startup_log.write(f"Current directory: {os.getcwd()}\n") + startup_log.write(f"Python path: {sys.path}\n") + startup_log.flush() + + # Set env var for unbuffered output + os.environ["PYTHONUNBUFFERED"] = "1" + + debug_file.write("About to run main async function\n") + debug_file.flush() + + # Run the server + asyncio.run(main()) + + debug_file.write("Main function completed\n") + debug_file.flush() + + except KeyboardInterrupt: + logger.info("Server stopped by keyboard interrupt") + debug_file.write("Server stopped by keyboard interrupt\n") + debug_file.flush() + + except Exception as e: + error_msg = f"Fatal error: {e}" + logger.critical(error_msg, exc_info=True) + debug_file.write(f"{error_msg}\n{traceback.format_exc()}\n") + debug_file.flush() + + with open(os.path.join(LOG_DIR, "fatal_error.log"), "a") as f: + f.write(f"Fatal error: {str(e)}\n") + traceback.print_exc(file=f) + finally: + debug_file.write("Exiting script\n") + debug_file.close() \ No newline at end of file diff --git a/compatible_servers 2/generic/simple_kubectl_mcp.py b/compatible_servers 2/generic/simple_kubectl_mcp.py new file mode 100755 index 0000000..1e22cde --- /dev/null +++ b/compatible_servers 2/generic/simple_kubectl_mcp.py @@ -0,0 +1,757 @@ +#!/usr/bin/env python3 +""" +Simple Kubernetes MCP Server with enhanced features. +""" + +import json +import sys +import logging +import threading +import time +from typing import Dict, Any, Optional +from kubectl_mcp_tool.core.kubernetes_ops import KubernetesOperations +from kubectl_mcp_tool.security.security_ops import KubernetesSecurityOps +from kubectl_mcp_tool.monitoring.diagnostics import KubernetesDiagnostics +from kubernetes import client + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stderr) + ] +) +logger = logging.getLogger(__name__) + +class SimpleKubectlMcpServer: + """Simple MCP server for Kubernetes operations.""" + + def __init__(self, input_stream=sys.stdin, output_stream=sys.stdout): + """Initialize the server.""" + self.input_stream = input_stream + self.output_stream = output_stream + self.k8s_ops = KubernetesOperations() + self.security_ops = KubernetesSecurityOps() + self.diagnostics = KubernetesDiagnostics() + self.current_namespace = "default" # Store the current namespace + self.heartbeat_interval = 5 + self.running = True + self.heartbeat_thread = threading.Thread(target=self.send_heartbeat) + self.heartbeat_thread.daemon = True + self.heartbeat_thread.start() + + def send_heartbeat(self): + """Send periodic heartbeat messages.""" + while self.running: + try: + self.write_message({ + "jsonrpc": "2.0", + "method": "heartbeat", + "params": {} + }) + time.sleep(self.heartbeat_interval) + except Exception as e: + logging.error(f"Error sending heartbeat: {str(e)}") + + def set_current_namespace(self, namespace: str) -> Dict[str, Any]: + """Set the current namespace for future operations.""" + old_namespace = self.current_namespace + self.current_namespace = namespace + + return { + "status": "success", + "message": f"Namespace changed from '{old_namespace}' to '{namespace}'", + "current_namespace": namespace + } + + def handle_init(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Handle initialization request.""" + try: + return { + "name": "kubectl-mcp-server-simple", + "version": "0.1.0", + "capabilities": ["tools/list", "tools/call"] + } + except Exception as e: + logger.error(f"Error during initialization: {e}") + return { + "error": str(e) + } + + def handle_tools_list(self) -> Dict[str, Any]: + """List all the available tools.""" + return { + "tools": [ + { + "name": "init", + "description": "Initialize the server with a config file.", + "input_schema": { + "type": "object", + "properties": { + "config_path": {"type": "string", "description": "Path to the config file."}, + }, + "required": ["config_path"] + } + }, + # Pod Management + { + "name": "create_pod", + "description": "Create a new pod in the Kubernetes cluster.", + "input_schema": { + "type": "object", + "properties": { + "pod_name": {"type": "string", "description": "Name for the pod."}, + "namespace": {"type": "string", "description": "Namespace to create the pod in."}, + "image": {"type": "string", "description": "Container image to use for the pod."}, + "command": {"type": "array", "items": {"type": "string"}, "description": "Command to run in the container."}, + "args": {"type": "array", "items": {"type": "string"}, "description": "Arguments for the command."}, + "env_vars": {"type": "object", "description": "Environment variables to set in the container."}, + "ports": {"type": "array", "items": {"type": "integer"}, "description": "Ports to expose from the container."}, + "volume_mounts": {"type": "array", "items": {"type": "object"}, "description": "Volumes to mount in the container."}, + }, + "required": ["pod_name", "namespace", "image"] + } + }, + { + "name": "delete_pod", + "description": "Delete a pod", + "inputSchema": { + "type": "object", + "required": ["pod_name"], + "properties": { + "pod_name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + # Deployment Management + { + "name": "create_deployment", + "description": "Create a new deployment", + "inputSchema": { + "type": "object", + "required": ["deployment_spec"], + "properties": { + "deployment_spec": {"type": "object"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "scale_deployment", + "description": "Scale a deployment", + "inputSchema": { + "type": "object", + "required": ["name", "replicas"], + "properties": { + "name": {"type": "string"}, + "replicas": {"type": "integer"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "rollback_deployment", + "description": "Rollback a deployment", + "inputSchema": { + "type": "object", + "required": ["name"], + "properties": { + "name": {"type": "string"}, + "revision": {"type": "integer"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + # Service Management + { + "name": "create_service", + "description": "Create a new service", + "inputSchema": { + "type": "object", + "required": ["service_spec"], + "properties": { + "service_spec": {"type": "object"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "delete_service", + "description": "Delete a service", + "inputSchema": { + "type": "object", + "required": ["name"], + "properties": { + "name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + # Configuration Management + { + "name": "create_config_map", + "description": "Create a ConfigMap", + "inputSchema": { + "type": "object", + "required": ["name", "data"], + "properties": { + "name": {"type": "string"}, + "data": {"type": "object"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "create_secret", + "description": "Create a Secret", + "inputSchema": { + "type": "object", + "required": ["name", "data"], + "properties": { + "name": {"type": "string"}, + "data": {"type": "object"}, + "secret_type": {"type": "string", "default": "Opaque"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + # Network Operations + { + "name": "create_network_policy", + "description": "Create a NetworkPolicy", + "inputSchema": { + "type": "object", + "required": ["policy_spec"], + "properties": { + "policy_spec": {"type": "object"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "create_ingress", + "description": "Create an Ingress", + "inputSchema": { + "type": "object", + "required": ["ingress_spec"], + "properties": { + "ingress_spec": {"type": "object"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + # Security Operations + { + "name": "create_role", + "description": "Create a Role", + "inputSchema": { + "type": "object", + "required": ["name", "rules"], + "properties": { + "name": {"type": "string"}, + "rules": {"type": "array"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "create_cluster_role", + "description": "Create a ClusterRole", + "inputSchema": { + "type": "object", + "required": ["name", "rules"], + "properties": { + "name": {"type": "string"}, + "rules": {"type": "array"} + } + } + }, + { + "name": "create_service_account", + "description": "Create a ServiceAccount", + "inputSchema": { + "type": "object", + "required": ["name"], + "properties": { + "name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"}, + "annotations": {"type": "object"} + } + } + }, + { + "name": "audit_rbac", + "description": "Audit RBAC permissions", + "inputSchema": { + "type": "object", + "properties": { + "namespace": {"type": "string"}, + "audit_type": {"type": "string"} + } + } + }, + # Monitoring and Diagnostics + { + "name": "get_pod_logs", + "description": "Get logs from a pod", + "inputSchema": { + "type": "object", + "required": ["pod_name"], + "properties": { + "pod_name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"}, + "container": {"type": "string"}, + "tail_lines": {"type": "integer", "default": 100}, + "since_seconds": {"type": "integer"} + } + } + }, + { + "name": "analyze_pod_logs", + "description": "Analyze pod logs for patterns and issues", + "inputSchema": { + "type": "object", + "required": ["pod_name"], + "properties": { + "pod_name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"}, + "container": {"type": "string"}, + "tail_lines": {"type": "integer", "default": 1000} + } + } + }, + { + "name": "get_pod_events", + "description": "Get events related to a pod", + "inputSchema": { + "type": "object", + "required": ["pod_name"], + "properties": { + "pod_name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "check_pod_health", + "description": "Check the health status of a pod", + "inputSchema": { + "type": "object", + "required": ["pod_name"], + "properties": { + "pod_name": {"type": "string"}, + "namespace": {"type": "string", "default": "default"} + } + } + }, + { + "name": "get_resource_usage", + "description": "Get resource usage metrics for pods", + "inputSchema": { + "type": "object", + "properties": { + "namespace": {"type": "string"} + } + } + }, + { + "name": "validate_resources", + "description": "Validate resource configurations and usage", + "inputSchema": { + "type": "object", + "properties": { + "namespace": {"type": "string"} + } + } + }, + { + "name": "analyze_network_policies", + "description": "Analyze NetworkPolicies for security gaps", + "inputSchema": { + "type": "object", + "properties": { + "namespace": {"type": "string"} + } + } + }, + { + "name": "check_pod_security", + "description": "Check pod security context for best practices", + "inputSchema": { + "type": "object", + "required": ["pod_spec"], + "properties": { + "pod_spec": {"type": "object"} + } + } + }, + # Context Management + { + "name": "get_contexts", + "description": "Get available contexts", + "inputSchema": { + "type": "object", + "properties": {} + } + }, + { + "name": "switch_context", + "description": "Switch to a different context", + "inputSchema": { + "type": "object", + "required": ["context_name"], + "properties": { + "context_name": {"type": "string"} + } + } + }, + # Resource Listing Tools + { + "name": "list_pods", + "description": "List pods in a namespace with filtering options.", + "input_schema": { + "type": "object", + "properties": { + "namespace": {"type": "string", "description": "Namespace to list pods from."}, + "label_selector": {"type": "string", "description": "Label selector to filter pods (e.g. 'app=nginx')."}, + "field_selector": {"type": "string", "description": "Field selector to filter pods (e.g. 'status.phase=Running')."} + }, + "required": [] + } + }, + { + "name": "list_deployments", + "description": "List deployments in a namespace with filtering options.", + "input_schema": { + "type": "object", + "properties": { + "namespace": {"type": "string", "description": "Namespace to list deployments from."}, + "label_selector": {"type": "string", "description": "Label selector to filter deployments (e.g. 'app=nginx')."} + }, + "required": [] + } + }, + { + "name": "list_services", + "description": "List services in a namespace with filtering options.", + "input_schema": { + "type": "object", + "properties": { + "namespace": {"type": "string", "description": "Namespace to list services from."}, + "label_selector": {"type": "string", "description": "Label selector to filter services (e.g. 'app=nginx')."} + }, + "required": [] + } + }, + { + "name": "list_nodes", + "description": "List all nodes in the Kubernetes cluster.", + "input_schema": { + "type": "object", + "properties": { + "label_selector": {"type": "string", "description": "Label selector to filter nodes."} + }, + "required": [] + } + }, + { + "name": "list_namespaces", + "description": "List all namespaces in the Kubernetes cluster.", + "input_schema": { + "type": "object", + "properties": { + "label_selector": {"type": "string", "description": "Label selector to filter namespaces."} + }, + "required": [] + } + }, + # Helm Chart Support + { + "name": "install_helm_chart", + "description": "Install a Helm chart in the Kubernetes cluster.", + "input_schema": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Release name for the chart installation."}, + "chart": {"type": "string", "description": "Chart name or path (e.g., 'stable/nginx' or 'nginx')."}, + "namespace": {"type": "string", "description": "Namespace to install the chart in."}, + "repo": {"type": "string", "description": "Chart repository URL (optional)."}, + "values": {"type": "object", "description": "Values to override in the chart (optional)."} + }, + "required": ["name", "chart", "namespace"] + } + }, + { + "name": "upgrade_helm_chart", + "description": "Upgrade an existing Helm release.", + "input_schema": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Name of the Helm release to upgrade."}, + "chart": {"type": "string", "description": "Chart name or path to upgrade to."}, + "namespace": {"type": "string", "description": "Namespace of the release."}, + "repo": {"type": "string", "description": "Chart repository URL (optional)."}, + "values": {"type": "object", "description": "Values to override in the chart (optional)."} + }, + "required": ["name", "chart", "namespace"] + } + }, + { + "name": "uninstall_helm_chart", + "description": "Uninstall a Helm release.", + "input_schema": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Name of the Helm release to uninstall."}, + "namespace": {"type": "string", "description": "Namespace of the release."} + }, + "required": ["name", "namespace"] + } + }, + # Kubectl Utilities + { + "name": "explain_resource", + "description": "Get documentation for a Kubernetes resource from the API server.", + "input_schema": { + "type": "object", + "properties": { + "resource": {"type": "string", "description": "Resource to explain (e.g., 'pods', 'deployments.v1.apps')."}, + "api_version": {"type": "string", "description": "API version of the resource (optional)."}, + "recursive": {"type": "boolean", "description": "Whether to show all fields recursively."} + }, + "required": ["resource"] + } + }, + { + "name": "list_api_resources", + "description": "List API resources available in the Kubernetes cluster.", + "input_schema": { + "type": "object", + "properties": { + "api_group": {"type": "string", "description": "API group to filter by (e.g., 'apps', 'networking.k8s.io')."}, + "namespaced": {"type": "boolean", "description": "Whether to show only namespaced resources."}, + "verbs": {"type": "array", "items": {"type": "string"}, "description": "Filter by verbs (e.g., ['get', 'list', 'watch'])."} + }, + "required": [] + } + }, + { + "name": "describe_pod", + "description": "Get detailed information about a pod.", + "input_schema": { + "type": "object", + "properties": { + "pod_name": {"type": "string", "description": "Name of the pod to describe."}, + "namespace": {"type": "string", "description": "Namespace of the pod."} + }, + "required": ["pod_name"] + } + }, + # Context and Namespace Management + { + "name": "set_namespace", + "description": "Set the default namespace for subsequent commands.", + "input_schema": { + "type": "object", + "properties": { + "namespace": {"type": "string", "description": "The namespace to set as default."} + }, + "required": ["namespace"] + } + } + ] + } + + def handle_tool_call(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Handle tool call request.""" + name = params.get("name", "") + arguments = params.get("arguments", {}) + + try: + # Pod Management + if name == "create_pod": + return self.k8s_ops.create_pod(**arguments) + elif name == "delete_pod": + return self.k8s_ops.delete_pod(**arguments) + elif name == "check_pod_health": + return self.k8s_ops.check_pod_health(**arguments) + elif name == "get_pod_logs": + return self.k8s_ops.get_pod_logs(**arguments) + + # Deployment Management + elif name == "create_deployment": + return self.k8s_ops.create_deployment(**arguments) + elif name == "delete_deployment": + return self.k8s_ops.delete_deployment(**arguments) + elif name == "scale_deployment": + return self.k8s_ops.scale_deployment(**arguments) + + # Service Management + elif name == "create_service": + return self.k8s_ops.create_service(**arguments) + elif name == "delete_service": + return self.k8s_ops.delete_service(**arguments) + + # Resource Listing Tools + elif name == "list_pods": + return self.k8s_ops.list_pods(**arguments) + elif name == "list_deployments": + return self.k8s_ops.list_deployments(**arguments) + elif name == "list_services": + return self.k8s_ops.list_services(**arguments) + elif name == "list_nodes": + return self.k8s_ops.list_nodes(**arguments) + elif name == "list_namespaces": + return self.k8s_ops.list_namespaces(**arguments) + + # Helm Chart Support + elif name == "install_helm_chart": + return self.k8s_ops.install_helm_chart(**arguments) + elif name == "upgrade_helm_chart": + return self.k8s_ops.upgrade_helm_chart(**arguments) + elif name == "uninstall_helm_chart": + return self.k8s_ops.uninstall_helm_chart(**arguments) + + # Kubectl Utilities + elif name == "explain_resource": + return self.k8s_ops.explain_resource(**arguments) + elif name == "list_api_resources": + return self.k8s_ops.list_api_resources(**arguments) + elif name == "describe_pod": + return self.k8s_ops.describe_pod(**arguments) + + # Security Operations + elif name == "audit_rbac": + return self.handle_audit_rbac( + arguments.get("namespace", "default"), + arguments.get("type", "all"), + arguments.get("chunk_number", 0), + arguments.get("chunks_total", 5), + arguments.get("minimal_mode", False) + ) + elif name == "create_role": + return self.k8s_ops.create_role(**arguments) + elif name == "create_cluster_role": + return self.k8s_ops.create_cluster_role(**arguments) + elif name == "create_service_account": + return self.k8s_ops.create_service_account(**arguments) + + # Context Operations + elif name == "list_contexts": + return self.k8s_ops.list_contexts() + elif name == "get_current_context": + return self.k8s_ops.get_current_context() + elif name == "switch_context": + return self.k8s_ops.switch_context(**arguments) + + # Miscellaneous Operations + elif name == "get_pod_events": + return self.k8s_ops.get_pod_events(**arguments) + elif name == "analyze_pod_logs": + return self.k8s_ops.analyze_pod_logs(**arguments) + + # Namespace Management + elif name == "set_namespace": + return self.set_current_namespace(arguments.get("namespace", "default")) + + else: + return {"error": f"Unknown tool: {name}"} + except Exception as e: + logging.error(f"Error handling tool call {name}: {str(e)}") + return {"error": f"Error handling tool call {name}: {str(e)}"} + + def handle_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Handle incoming JSON-RPC request.""" + try: + method = request.get("method") + request_id = request.get("id") + params = request.get("params", {}) + + logger.debug(f"Handling request: method={method}, id={request_id}") + + # Skip heartbeat messages in response + if method == "heartbeat": + return None + + result = None + if method == "initialize": + result = self.handle_init(params) + elif method == "tools/list": + result = self.handle_tools_list() + elif method == "tools/call": + result = self.handle_tool_call(params) + else: + return { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": -32601, + "message": f"Method not found: {method}" + } + } + + if result is None: + return { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": -32603, + "message": "Internal error: null result" + } + } + + return { + "jsonrpc": "2.0", + "id": request_id, + "result": result + } + + except Exception as e: + logger.error(f"Error handling request: {str(e)}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32603, + "message": f"Internal error: {str(e)}" + } + } + + def run(self): + """Run the server.""" + logger.info("Starting simplified kubectl MCP server...") + + while self.running: + try: + line = self.input_stream.readline() + if not line: + break + + request = json.loads(line) + response = self.handle_request(request) + + if response: + self.output_stream.write(json.dumps(response) + "\n") + self.output_stream.flush() + + except Exception as e: + logger.error(f"Error processing request: {e}") + continue + + self.running = False + logger.info("Server stopped.") + +def main(): + """Main entry point.""" + server = SimpleKubectlMcpServer() + try: + server.run() + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down...") + finally: + server.running = False + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/compatible_servers 2/generic/simple_mcp_server.py b/compatible_servers 2/generic/simple_mcp_server.py new file mode 100755 index 0000000..73a52ca --- /dev/null +++ b/compatible_servers 2/generic/simple_mcp_server.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +""" +Minimal MCP server implementation for debugging. +This server accepts JSON commands over stdin and responds with JSON over stdout. +It doesn't depend on any external packages, just using basic Python libraries. +""" + +import asyncio +import json +import os +import sys +import traceback +from datetime import datetime + +# Create log directory +LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") +os.makedirs(LOG_DIR, exist_ok=True) + +# Open a debug log file +debug_file = open(os.path.join(LOG_DIR, "simple_mcp_debug.log"), "w") + +def log(message): + """Log a message to both the debug file and stdout.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + formatted = f"{timestamp} - {message}" + print(f"DEBUG: {formatted}", file=sys.stderr) + debug_file.write(f"{formatted}\n") + debug_file.flush() + +log(f"Starting simple MCP server") +log(f"Python version: {sys.version}") +log(f"Current directory: {os.getcwd()}") + +# MCP protocol constants +MCP_VERSION = "0.1.0" +SERVER_NAME = "kubectl-mcp-tool-simple" + +async def read_stdin(): + """Read a line from stdin asynchronously.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, sys.stdin.readline) + +async def run_server(): + """Run the MCP server.""" + log("Starting server loop") + + # Send server information on startup to stderr for debugging + print(f"MCP Server {SERVER_NAME} v{MCP_VERSION} started", file=sys.stderr) + + # Respond to initialization + try: + # First message should be initialization + log("Waiting for initialization message") + init_req = await read_stdin() + log(f"Received init: {init_req.strip()}") + + # Parse the request + try: + request = json.loads(init_req) + log(f"Parsed request: {request}") + + # Check if it's an initialization request + if request.get("method") == "initialize": + # Respond with initialization response + response = { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": { + "name": SERVER_NAME, + "version": MCP_VERSION, + "serverInfo": { + "name": SERVER_NAME + }, + "capabilities": { + "tools": [ + { + "name": "kubectl_command", + "description": "Run a kubectl command", + "parameters": { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "The kubectl command to run" + } + }, + "required": ["command"] + } + }, + { + "name": "get_pods", + "description": "Get all pods in the current namespace", + "parameters": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Optional namespace" + } + } + } + } + ] + } + } + } + + log("Sending initialization response") + print(json.dumps(response), flush=True) + else: + log(f"Not an initialization request: {request}") + # Send error response + error_response = { + "jsonrpc": "2.0", + "id": request.get("id", 0), + "error": { + "code": -32600, + "message": "Expected 'initialize' as first message" + } + } + print(json.dumps(error_response), flush=True) + except json.JSONDecodeError as e: + log(f"Failed to parse JSON: {e}") + # Send error response + error_response = { + "jsonrpc": "2.0", + "id": 0, + "error": { + "code": -32700, + "message": f"Parse error: {str(e)}" + } + } + print(json.dumps(error_response), flush=True) + + # Main message loop + while True: + log("Waiting for message") + line = await read_stdin() + if not line: + log("Empty line received, exiting") + break + + log(f"Received: {line.strip()}") + + try: + request = json.loads(line) + request_id = request.get("id", 0) + + # Handle method calls + if request.get("method") == "callTool": + params = request.get("params", {}) + tool_name = params.get("name") + tool_params = params.get("parameters", {}) + + log(f"Tool call: {tool_name} with params {tool_params}") + + # Simple response depending on the tool + if tool_name == "kubectl_command": + cmd = tool_params.get("command", "") + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": { + "command": cmd, + "output": f"Simulated output for: {cmd}", + "success": True + } + } + elif tool_name == "get_pods": + namespace = tool_params.get("namespace", "default") + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": { + "namespace": namespace, + "pods": [ + {"name": "pod-1", "status": "Running"}, + {"name": "pod-2", "status": "Running"} + ] + } + } + else: + response = { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": -32601, + "message": f"Unknown tool: {tool_name}" + } + } + # Handle shutdown + elif request.get("method") == "shutdown": + log("Shutdown request received") + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": None + } + print(json.dumps(response), flush=True) + break + else: + log(f"Unknown method: {request.get('method')}") + response = { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": -32601, + "message": f"Method not found: {request.get('method')}" + } + } + + log(f"Sending response: {response}") + print(json.dumps(response), flush=True) + + except json.JSONDecodeError as e: + log(f"Failed to parse JSON: {e}") + error_response = { + "jsonrpc": "2.0", + "id": 0, + "error": { + "code": -32700, + "message": f"Parse error: {str(e)}" + } + } + print(json.dumps(error_response), flush=True) + except Exception as e: + log(f"Error processing request: {e}\n{traceback.format_exc()}") + error_response = { + "jsonrpc": "2.0", + "id": request.get("id", 0) if "request" in locals() else 0, + "error": { + "code": -32603, + "message": f"Internal error: {str(e)}" + } + } + print(json.dumps(error_response), flush=True) + + except Exception as e: + log(f"Fatal error: {e}\n{traceback.format_exc()}") + finally: + log("Server shutting down") + debug_file.close() + +if __name__ == "__main__": + try: + # Set env var for unbuffered output + os.environ["PYTHONUNBUFFERED"] = "1" + + # Run the server + asyncio.run(run_server()) + except KeyboardInterrupt: + log("Server stopped by keyboard interrupt") + except Exception as e: + log(f"Fatal error: {e}\n{traceback.format_exc()}") + finally: + if "debug_file" in globals() and not debug_file.closed: + debug_file.close() \ No newline at end of file diff --git a/compatible_servers 2/minimal/minimal_mcp_server.py b/compatible_servers 2/minimal/minimal_mcp_server.py new file mode 100755 index 0000000..c9e6ef5 --- /dev/null +++ b/compatible_servers 2/minimal/minimal_mcp_server.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +Absolutely minimal MCP server implementation. +This implements only the essential parts of the protocol to work with Cursor. +""" + +import asyncio +import json +import os +import sys +import subprocess +from datetime import datetime + +# Create log directory +LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") +os.makedirs(LOG_DIR, exist_ok=True) + +# Set up logging +log_file = open(os.path.join(LOG_DIR, "minimal_mcp.log"), "w") + +def log(message): + """Log a message to both the log file and stderr.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + formatted = f"{timestamp} - {message}" + print(f"LOG: {formatted}", file=sys.stderr) + log_file.write(f"{formatted}\n") + log_file.flush() + +# Basic server info +SERVER_NAME = "kubectl-mcp-minimal" +SERVER_VERSION = "0.1.0" + +# MCP protocol implementation +async def stdio_transport(): + """Create an asyncio transport over stdin/stdout.""" + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + + # Handle stdin + await loop.connect_read_pipe(lambda: protocol, sys.stdin) + + # Handle stdout + w_transport, w_protocol = await loop.connect_write_pipe( + asyncio.streams.FlowControlMixin, sys.stdout + ) + writer = asyncio.StreamWriter(w_transport, w_protocol, None, loop) + + return reader, writer + +async def read_message(reader): + """Read a JSON-RPC message from stdin.""" + line = await reader.readline() + if not line: + return None + + line_str = line.decode('utf-8').strip() + log(f"RECEIVED: {line_str}") + + try: + return json.loads(line_str) + except json.JSONDecodeError as e: + log(f"JSON parse error: {e}") + return None + +def write_message(writer, message): + """Write a JSON-RPC message to stdout.""" + json_str = json.dumps(message) + log(f"SENDING: {json_str}") + writer.write(f"{json_str}\n".encode('utf-8')) + +def run_kubectl(command): + """Run a kubectl command and return the output.""" + log(f"Running kubectl: {command}") + try: + result = subprocess.run( + command, shell=True, capture_output=True, text=True, check=False + ) + success = result.returncode == 0 + output = result.stdout if success else result.stderr + return { + "command": command, + "output": output.strip(), + "success": success + } + except Exception as e: + log(f"Error running command: {e}") + return { + "command": command, + "output": f"Error: {str(e)}", + "success": False + } + +async def run_server(): + """Run the MCP server.""" + log("Starting MCP server") + reader, writer = await stdio_transport() + + # Wait for initialize request + log("Waiting for initialization message") + + # Handle messages + while True: + message = await read_message(reader) + if message is None: + log("Received empty message, shutting down") + break + + method = message.get("method") + message_id = message.get("id") + + if method == "initialize": + # Handle initialization + log("Handling initialization request") + response = { + "jsonrpc": "2.0", + "id": message_id, + "result": { + "name": SERVER_NAME, + "version": SERVER_VERSION, + "capabilities": { + "tools": [ + { + "name": "run_kubectl", + "description": "Run kubectl command", + "parameters": { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "The kubectl command to run" + } + }, + "required": ["command"] + } + }, + { + "name": "get_pods", + "description": "Get pods in the specified namespace", + "parameters": { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to get pods from (optional)" + } + } + } + } + ] + } + } + } + write_message(writer, response) + + elif method == "callTool": + # Handle tool calls + log("Handling tool call") + params = message.get("params", {}) + tool_name = params.get("name", "") + tool_params = params.get("parameters", {}) + + log(f"Tool call: {tool_name}, params: {tool_params}") + + if tool_name == "run_kubectl": + cmd = tool_params.get("command", "") + result = run_kubectl(f"kubectl {cmd}") + response = { + "jsonrpc": "2.0", + "id": message_id, + "result": result + } + + elif tool_name == "get_pods": + namespace = tool_params.get("namespace", "") + cmd = "kubectl get pods" + (f" -n {namespace}" if namespace else "") + result = run_kubectl(cmd) + response = { + "jsonrpc": "2.0", + "id": message_id, + "result": result + } + + else: + # Unknown tool + response = { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32601, + "message": f"Tool not found: {tool_name}" + } + } + + write_message(writer, response) + + elif method == "shutdown": + # Handle shutdown + log("Handling shutdown request") + response = { + "jsonrpc": "2.0", + "id": message_id, + "result": None + } + write_message(writer, response) + break + + else: + # Unknown method + log(f"Unknown method: {method}") + response = { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32601, + "message": f"Method not found: {method}" + } + } + write_message(writer, response) + + log("Shutting down server") + await writer.drain() + writer.close() + +if __name__ == "__main__": + try: + # Unbuffered output + os.environ["PYTHONUNBUFFERED"] = "1" + + # Run the server + log(f"Starting server, Python version: {sys.version}") + log(f"Current directory: {os.getcwd()}") + asyncio.run(run_server()) + + except KeyboardInterrupt: + log("Server stopped by keyboard interrupt") + except Exception as e: + log(f"Fatal error: {str(e)}") + finally: + log_file.close() \ No newline at end of file diff --git a/compatible_servers 2/windsurf/windsurf_compatible_mcp_server.py b/compatible_servers 2/windsurf/windsurf_compatible_mcp_server.py new file mode 100644 index 0000000..7c3adaf --- /dev/null +++ b/compatible_servers 2/windsurf/windsurf_compatible_mcp_server.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +""" +Windsurf-compatible MCP server implementation for kubectl-mcp-tool. +""" + +import json +import sys +import logging +import asyncio +import argparse +from aiohttp import web +import aiohttp_sse +from typing import Dict, Any, List, Optional, Callable, Awaitable + +# Import from the correct location +from kubectl_mcp_tool.mcp_server import MCPServer +from kubectl_mcp_tool.natural_language import process_query + +# Configure logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + filename="windsurf_mcp_server.log" +) +logger = logging.getLogger("windsurf-mcp-server") + +# Add console handler for important messages +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +console_formatter = logging.Formatter('%(levelname)s: %(message)s') +console_handler.setFormatter(console_formatter) +logger.addHandler(console_handler) + +class WindsurfMCPServer: + """Windsurf-compatible MCP server implementation.""" + + def __init__(self): + """Initialize the MCP server.""" + self.tools = {} + + # Register the natural language tool + self.register_tool( + "process_natural_language", + "Process natural language query for kubectl", + { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The natural language query to process" + } + }, + "required": ["query"] + }, + self._process_natural_language + ) + + # Register the get_pods tool + self.register_tool( + "get_pods", + "Get pods in the specified namespace", + { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to get pods from" + } + } + }, + self._get_pods + ) + + # Register the switch_namespace tool + self.register_tool( + "switch_namespace", + "Switch to the specified namespace", + { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to switch to" + } + }, + "required": ["namespace"] + }, + self._switch_namespace + ) + + # Register the get_current_namespace tool + self.register_tool( + "get_current_namespace", + "Get the current namespace", + { + "type": "object", + "properties": {} + }, + self._get_current_namespace + ) + + # Register the get_deployments tool + self.register_tool( + "get_deployments", + "Get deployments in the specified namespace", + { + "type": "object", + "properties": { + "namespace": { + "type": "string", + "description": "Namespace to get deployments from" + } + } + }, + self._get_deployments + ) + + def register_tool(self, name: str, description: str, parameters: Dict[str, Any], handler: Callable): + """Register a tool with the server.""" + self.tools[name] = { + "description": description, + "parameters": parameters, + "handler": handler + } + + def _process_natural_language(self, input_params: Dict[str, Any]) -> Dict[str, Any]: + """Process a natural language query.""" + from kubectl_mcp_tool.natural_language import process_query + + query = input_params.get("query", "") + if not query: + return {"error": "Missing required parameter: query"} + + # Process the query + result = process_query(query) + + # Return the result + return result + + def _get_pods(self, input_params: Dict[str, Any]) -> Dict[str, Any]: + """Get pods in the specified namespace.""" + import subprocess + + namespace = input_params.get("namespace", "default") + + try: + # Run kubectl command + command = f"kubectl get pods -n {namespace}" + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + check=True + ) + + return { + "command": command, + "result": result.stdout.strip(), + "success": True + } + except subprocess.CalledProcessError as e: + logger.warning(f"kubectl command failed: {command} - {e}") + + # Return mock data as fallback + return { + "command": command, + "result": f"Error: {e.stderr}", + "success": False, + "error": e.stderr + } + + def _switch_namespace(self, input_params: Dict[str, Any]) -> Dict[str, Any]: + """Switch to the specified namespace.""" + import subprocess + + namespace = input_params.get("namespace", "") + if not namespace: + return {"error": "Missing required parameter: namespace"} + + try: + # Run kubectl command + command = f"kubectl config set-context --current --namespace={namespace}" + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + check=True + ) + + return { + "command": command, + "result": result.stdout.strip(), + "success": True + } + except subprocess.CalledProcessError as e: + logger.warning(f"kubectl command failed: {command} - {e}") + + return { + "command": command, + "result": f"Error: {e.stderr}", + "success": False, + "error": e.stderr + } + + def _get_current_namespace(self, input_params: Dict[str, Any]) -> Dict[str, Any]: + """Get the current namespace.""" + import subprocess + + try: + # Run kubectl command + command = "kubectl config view --minify --output 'jsonpath={..namespace}'" + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + check=True + ) + + return { + "command": command, + "result": result.stdout.strip() or "default", + "success": True + } + except subprocess.CalledProcessError as e: + logger.warning(f"kubectl command failed: {command} - {e}") + + return { + "command": command, + "result": f"Error: {e.stderr}", + "success": False, + "error": e.stderr + } + + def _get_deployments(self, input_params: Dict[str, Any]) -> Dict[str, Any]: + """Get deployments in the specified namespace.""" + import subprocess + + namespace = input_params.get("namespace", "default") + + try: + # Run kubectl command + command = f"kubectl get deployments -n {namespace}" + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + check=True + ) + + return { + "command": command, + "result": result.stdout.strip(), + "success": True + } + except subprocess.CalledProcessError as e: + logger.warning(f"kubectl command failed: {command} - {e}") + + return { + "command": command, + "result": f"Error: {e.stderr}", + "success": False, + "error": e.stderr + } + + def handle_initialize(self, message_id: str) -> Dict[str, Any]: + """Handle initialize request.""" + logger.info("Handling initialize request") + return { + "jsonrpc": "2.0", + "id": message_id, + "result": { + "capabilities": { + "tools": True + } + } + } + + def handle_tools_list(self, message_id: str) -> Dict[str, Any]: + """Handle tools/list request.""" + logger.info("Handling tools/list request") + tools_list = [] + for name, info in self.tools.items(): + tools_list.append({ + "name": name, + "description": info["description"], + "parameters": info["parameters"] + }) + + return { + "jsonrpc": "2.0", + "id": message_id, + "result": { + "tools": tools_list + } + } + + def handle_tool_call(self, message_id: str, params: Dict[str, Any]) -> Dict[str, Any]: + """Handle tool/call request.""" + logger.info(f"Handling tool/call request: {params}") + + # Get the tool name and input + tool_name = params.get("name", "") + tool_input = params.get("input", {}) + + # Check if the tool exists + if tool_name not in self.tools: + logger.error(f"Tool not found: {tool_name}") + return { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32601, + "message": f"Tool not found: {tool_name}" + } + } + + # Call the tool + try: + tool_info = self.tools[tool_name] + + # Check for required parameters + required_params = tool_info["parameters"].get("required", []) + for param_name in required_params: + if param_name not in tool_input: + logger.error(f"Missing required parameter: {param_name}") + return { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32602, + "message": f"Missing required parameter: {param_name}" + } + } + + # Call the handler + result = tool_info["handler"](tool_input) + + # Format the result + if "error" in result and not "success" in result: + return { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32603, + "message": result["error"] + } + } + else: + return { + "jsonrpc": "2.0", + "id": message_id, + "result": result + } + except Exception as e: + logger.error(f"Error calling tool {tool_name}: {e}") + return { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32603, + "message": f"Error calling tool {tool_name}: {str(e)}" + } + } + + def handle_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Handle a message from the client.""" + if "method" not in message: + logger.error(f"Invalid message, missing method: {message}") + return None + + method = message["method"] + message_id = message.get("id", "") + + # Handle initialize + if method == "mcp.initialize": + return self.handle_initialize(message_id) + + # Handle tools/list + elif method == "mcp.tools.list": + return self.handle_tools_list(message_id) + + # Handle tool/call + elif method == "mcp.tool.call": + return self.handle_tool_call(message_id, message.get("params", {})) + + # Handle unknown method + else: + logger.error(f"Unknown method: {method}") + return { + "jsonrpc": "2.0", + "id": message_id, + "error": { + "code": -32601, + "message": f"Method not found: {method}" + } + } + + async def serve_sse(self, host: str, port: int): + """Serve the MCP server over SSE.""" + logger.info(f"Serving MCP server over SSE on {host}:{port}") + + # Create the web app + app = web.Application() + + # Create the SSE endpoint + async def sse_handler(request): + try: + # Get the request data + data = await request.json() + logger.info(f"Received request: {data}") + + # Handle the message + response = self.handle_message(data) + + # Create SSE response + async with aiohttp_sse.sse_response(request) as resp: + if response: + # Send the response as SSE event + await resp.send(json.dumps(response)) + + # Keep the connection open for a while + for _ in range(10): # Keep alive for a short time + await asyncio.sleep(1) + + return resp + except Exception as e: + logger.error(f"Error in SSE handler: {e}") + return web.Response(status=500, text=str(e)) + + # Create a health check endpoint + async def health_handler(request): + return web.Response(text="OK") + + # Add the endpoints + app.router.add_post("/", sse_handler) + app.router.add_get("/health", health_handler) + + # Run the web server + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, host, port) + await site.start() + + logger.info(f"SSE server started on {host}:{port}") + + # Wait forever + while True: + await asyncio.sleep(3600) + +async def main(): + """Run the MCP server.""" + parser = argparse.ArgumentParser(description="Windsurf-compatible MCP server") + parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") + parser.add_argument("--port", type=int, default=8080, help="Port to listen on") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") + args = parser.parse_args() + + # Configure logging + if args.debug: + logger.setLevel(logging.DEBUG) + console_handler.setLevel(logging.DEBUG) + + # Create the server + server = WindsurfMCPServer() + + # Serve the server + await server.serve_sse(args.host, args.port) + +if __name__ == "__main__": + asyncio.run(main())