diff --git a/contributing/samples/fastapi_modular_server/.env.example b/contributing/samples/fastapi_modular_server/.env.example new file mode 100644 index 0000000000..f4dd8b3c90 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/.env.example @@ -0,0 +1,16 @@ +# Application Configuration +DEBUG=true +PORT=8881 +HOST=localhost +LOG_LEVEL=INFO +LOGGING=true + +# ADK Configuration +SERVE_WEB_INTERFACE=true +RELOAD_AGENTS=true + +# Google Gemini Configuration +GOOGLE_API_KEY=YOUR_GOOGLE_API_KEY_HERE + +# Model Configuration +MODEL_PROVIDER=google diff --git a/contributing/samples/fastapi_modular_server/.gitignore b/contributing/samples/fastapi_modular_server/.gitignore new file mode 100644 index 0000000000..e7febef8a4 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/.gitignore @@ -0,0 +1,20 @@ +# Logs +*.log + +# Environment variables +.env + +# Python bytecode +*.pyc +__pycache__/ + +# Test artifacts +.pytest_cache/ + +# Virtual environments +.venv/ +venv/ + +# IDE configuration +.vscode/ +.idea/ \ No newline at end of file diff --git a/contributing/samples/fastapi_modular_server/README.md b/contributing/samples/fastapi_modular_server/README.md new file mode 100644 index 0000000000..863d1ae6f3 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/README.md @@ -0,0 +1,260 @@ +# 🚀 Google ADK FastAPI Modular Server + +A **production-ready template** for extending Google's Agent Development Kit (ADK) with custom FastAPI endpoints, optimized SSE streaming, and modular architecture patterns. + +## 🎯 Purpose & Value + +The **FastAPI Modular Server** serves as a **template and reference implementation** for teams who want to: + +- **Extend ADK's built-in server** without modifying core behavior +- **Accelerate production deployment** with battle-tested patterns +- **Add custom business logic** through modular router systems +- **Enable hot-reload capabilities** for faster development cycles + +## ✨ Key Features + +### 🔧 **Modular Router Architecture** +- Clean separation of concerns with dedicated router classes +- Easy to add new endpoints without touching core server code + +### ⚡ **Optimized SSE Streaming** +- **3 optimization levels** for different use cases: + - `MINIMAL`: Essential content only (author + text) + - `BALANCED`: Core data with invocation tracking + - `FULL_COMPAT`: Complete ADK event compatibility +- Reduced payload sizes for improved performance +- Custom event filtering and mapping + +### 🔄 **Hot-Reload Development** +- Automatic agent reloading on file changes +- File system monitoring with `watchdog` +- Development-friendly with production stability + + +## 📁 Project Structure + +``` +fastapi_modular_server/ +├── .env.example # Environment variables template +├── README.md # Project documentation +├── __init__.py # Package initialization +├── app/ # Main application directory +│ ├── __init__.py # App package initialization +│ ├── agents/ # Agent definitions +│ │ └── greetings_agent/ # Greetings agent module +│ │ ├── __init__.py # Agent package init +│ │ └── greetings_agent.py # Greetings agent implementation +│ ├── api/ # API layer +│ │ ├── __init__.py # API package init +│ │ ├── routers/ # API route definitions +│ │ │ ├── __init__.py # Routers package init +│ │ │ └── agent_router.py # Agent-related API routes +│ │ └── custom_adk_server.py # FastAPI server configuration +│ ├── config/ # Configuration management +│ │ └── settings.py # Application settings +│ ├── core/ # Core application components +│ │ ├── __init__.py # Core package init +│ │ ├── dependencies.py # Dependency injection +│ │ ├── logging.py # Logging configuration +│ │ └── mapping/ # Data mapping utilities +│ │ ├── __init__.py # Mapping package init +│ │ └── sse_event_mapper.py # Server-Sent Events mapper +│ └── models/ # Data models +│ ├── __init__.py # Models package init +│ └── streaming_request.py # Streaming data models +└── main.py # Application entry point +``` + +## 🚀 Quick Start + +### 1. **Configuration** +```bash +# Copy environment template +cp .env.example .env + +# Edit .env with your settings +vim .env + +# Set the API KEY + +``` + +### 2. **Run the Server** +```bash +# Development mode with hot-reload +python main.py + +# Production mode +uvicorn main:app --host 0.0.0.0 --port 8881 +``` + + +## 🔧 Customization Guide + +### **Adding New Routers** + +Create a new router following the established pattern: + +```python +# app/api/routers/my_custom_router.py +from fastapi import APIRouter, Depends +from app.core.dependencies import ADKServices, get_adk_services + +class MyCustomRouter: + def __init__(self, web_server_instance): + self.web_server = web_server_instance + self.router = APIRouter(prefix="/custom", tags=["Custom"]) + self._setup_routes() + + def _setup_routes(self): + @self.router.get("/endpoint") + async def my_endpoint( + ): + # Access any ADK service + sessions = await self.web_server.session_service.list_sessions() + return {"data": "custom response", "session_count": len(sessions)} + + def get_router(self) -> APIRouter: + return self.router +``` + +Register it in the custom server: + +```python +# In app/api/custom_adk_server.py - CustomAdkWebServer class +def _initialize_routers(self): + try: + self.agent_router = AgentRouter(self) + self.my_custom_router = MyCustomRouter(self) # Add this + logger.info("All routers initialized successfully.") + except Exception as e: + logger.error(f"Failed to initialize routers: {e}", exc_info=True) + +def _register_modular_routers(self, app: FastAPI): + # ... existing code ... + + if self.my_custom_router: + app.include_router(self.my_custom_router.get_router()) + logger.info("Registered MyCustomRouter.") +``` + +### **Overriding ADK Endpoints** + +#### **Method 1: Route Removal (Current Approach)** + +```python +def _register_modular_routers(self, app: FastAPI): + # Remove specific ADK routes + routes_to_remove = [] + for route in app.routes: + if route.path in [ + "/run_sse", + # You could add additional ADK routes here if you want to override them, + # e.g., "/apps/{app_name}/users/{user_id}/sessions" + ] and hasattr(route, 'methods') and 'POST' in route.methods: + routes_to_remove.append(route) + + # Remove the routes + for route in routes_to_remove: + app.routes.remove(route) +``` + +#### **Method 2: Middleware Interception** + +For more complex overrides, use middleware: + +```python +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware + +class RouteOverrideMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + # Intercept specific routes + if request.url.path == "/run_sse" and request.method == "POST": + # Handle with custom logic + return await self.handle_custom_sse(request) + + return await call_next(request) +``` + +### **Accessing ADK Services and Runners** + +#### **From Router Classes** +```python +class AgentRouter: + def __init__(self, web_server_instance): + self.web_server = web_server_instance + + async def my_endpoint(self, adk_services: ADKServices = Depends(get_adk_services)): + # Access services + agents = self.web_server.agent_loader.list_agents() + session = await self.web_server.session_service.list_sessions() + + # Access runners through web server + runner = await self.web_server.get_runner_async("your_app_name") + + # Access other web server properties + runners_cache = self.web_server.runners_to_clean +``` + +### **Optimizing SSE Streaming** + +#### **Custom Event Filtering** + +Extend the SSE mapper for more sophisticated filtering: + +```python +# app/models/streaming_request.py +class OptimizationLevel(str, Enum): + """Enumeration for the available SSE optimization levels.""" + + MINIMAL = "minimal" + BALANCED = "balanced" + FULL_COMPAT = "full_compat" + ULTRA_MINIMAL = "ultra_minimal" + +# app/core/mapping/sse_mapper.py +class AdvancedSSEEventMapper(SSEEventMapper): + def map_event_to_sse_message(self, event: Event, optimization_level: OptimizationLevel) -> Optional[str]: + # Custom filtering logic + if self._should_skip_event(event): + return None + + # Custom payload creation + payload = self._create_custom_payload(event, optimization_level) + + # Custom serialization + return self._serialize_payload(payload) + + def _should_skip_event(self, event: Event) -> bool: + # Skip system events, debug events, empty events, etc. + if event.author in ["system", "debug"]: + return True + if not event.content or not event.content.parts: + return True + return False + + def _create_custom_payload(self, event: Event, level: OptimizationLevel) -> Dict: + if level == OptimizationLevel.ULTRA_MINIMAL: + # Even more minimal than minimal + return {"t": self._extract_text_only(event)} + + return super()._create_minimal_payload(event) +``` + +## 🤝 Contributing + +This template is designed to be extended and customized for your specific needs. Key extension points: + +1. **Router Classes**: Add domain-specific endpoints +2. **SSE Mappers**: Custom event processing and optimization +3. **Middleware**: Cross-cutting concerns +4. **Services**: Additional business logic services +5. **Configuration**: Environment-specific settings + +## 📚 Further Resources + +- **Google ADK Documentation**: https://google.github.io/adk-docs/ +- **FastAPI Documentation**: https://fastapi.tiangolo.com/ +- **Pydantic Settings**: https://docs.pydantic.dev/latest/concepts/pydantic_settings/ +- **Server-Sent Events**: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events diff --git a/contributing/samples/fastapi_modular_server/__init__.py b/contributing/samples/fastapi_modular_server/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py new file mode 100644 index 0000000000..bd486a4e89 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py @@ -0,0 +1 @@ +from app.agents.greetings_agent.greetings_agent import root_agent diff --git a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greetings_agent.py b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greetings_agent.py new file mode 100644 index 0000000000..e5ab697686 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greetings_agent.py @@ -0,0 +1,9 @@ +from google.adk.agents import LlmAgent + +root_agent = LlmAgent( + model="gemini-2.5-flash", + name="greetings_agent", + description="A friendly Google Gemini-powered agent", + instruction="You are a helpful AI assistant powered by Google Gemini.", + tools=[], +) diff --git a/contributing/samples/fastapi_modular_server/app/api/__init__.py b/contributing/samples/fastapi_modular_server/app/api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py new file mode 100644 index 0000000000..9c51b90ce0 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py @@ -0,0 +1,302 @@ +from contextlib import asynccontextmanager +from importlib.resources import files +import logging +from pathlib import Path +import threading +import time +from typing import Any + +# Import the new modular routers +from app.api.routers.agent_router import AgentRouter +from app.config.settings import Settings +from app.core.dependencies import ADKServices +from fastapi import FastAPI +from google.adk.cli.adk_web_server import AdkWebServer +from google.adk.cli.utils.agent_change_handler import AgentChangeEventHandler +# Import agent refresh capabilities +from watchdog.observers import Observer + +# Configure logging +logger = logging.getLogger(__name__) + + +class CustomAdkWebServer(AdkWebServer): + """ + Enhanced ADK Web Server with modular router support and agent refresh capabilities. + Maintains backward compatibility while adding robust agent reloading functionality. + """ + + def __init__( + self, settings: Settings, adk_services: ADKServices, agents_dir: str + ): + """ + Initialize the custom ADK web server. + + Args: + settings: Application settings + adk_services: Container with all ADK services + agents_dir: Directory containing agents + """ + self.settings = settings + self.adk_services = adk_services + self.reload_agents = settings.reload_agents + self.agents_root = Path(agents_dir) + + # Extract services from container for parent class + # pass individual services as keyword arguments to the super init + super().__init__( + agent_loader=self.adk_services.agent_loader, + session_service=self.adk_services.session_service, + memory_service=self.adk_services.memory_service, + artifact_service=self.adk_services.artifact_service, + credential_service=self.adk_services.credential_service, + eval_sets_manager=self.adk_services.eval_sets_manager, + eval_set_results_manager=self.adk_services.eval_set_results_manager, + agents_dir=str(self.agents_root), + ) + + # Modular routers + self.agent_router: AgentRouter | None = None + + # Agent refresh components + self.observer = None + self.agent_change_handler = None + self._setup_agent_refresh() + + def _setup_agent_refresh(self): + """Initialize agent refresh capabilities if enabled.""" + if not self.reload_agents: + logger.info("Agent refresh disabled.") + return + + try: + self.observer = Observer() + self.agent_change_handler = AgentChangeEventHandler( + agent_loader=self.adk_services.agent_loader, + runners_to_clean=self.runners_to_clean, + current_app_name_ref=self.current_app_name_ref, + ) + self._start_observer() + logger.info(f"Agent refresh enabled for root: {self.agents_root}") + except Exception as e: + logger.error(f"Failed to setup agent refresh: {e}", exc_info=True) + self.reload_agents = False + + def _start_observer(self): + """Start the file system observer for agent changes.""" + if not self.observer or not self.agent_change_handler: + return + + try: + if self.agents_root.exists(): + self.observer.schedule( + self.agent_change_handler, str(self.agents_root), recursive=True + ) + observer_thread = threading.Thread( + target=self.observer.start, daemon=True + ) + observer_thread.start() + logger.info(f"Started file system observer for: {self.agents_root}") + except Exception as e: + logger.error(f"Failed to start observer: {e}", exc_info=True) + + def _stop_observer(self): + """Stop the file system observer.""" + if self.observer and self.observer.is_alive(): + try: + self.observer.stop() + self.observer.join() + logger.info("Stopped file system observer") + except Exception as e: + logger.error(f"Error stopping observer: {e}", exc_info=True) + + def _initialize_routers(self): + """Initialize the modular routers.""" + try: + # Pass the web server instance, which now has proper dependency injection + self.agent_router = AgentRouter(self) + logger.info("AgentRouter initialized successfully.") + + except Exception as e: + logger.error(f"Failed to initialize modular routers: {e}", exc_info=True) + + def get_enhanced_fast_api_app(self) -> FastAPI: + """Assemble and return the enhanced FastAPI application.""" + web_assets_dir = None + if self.settings.serve_web_interface: + try: + web_assets_dir = str(files("google.adk.cli.browser").joinpath("")) + except ModuleNotFoundError: + logger.warning( + "Could not locate ADK web assets. UI will not be served." + ) + + # Get the FastAPI app from ADK with custom lifespan + app = self.get_fast_api_app( + lifespan=self._setup_custom_lifespan(), + allow_origins=self.settings.allow_origins, + web_assets_dir=web_assets_dir, + ) + + # Add custom routes defined directly on this server + self.add_custom_routes(app) + + # Initialize and register our modular routers + self._initialize_routers() + self._register_modular_routers(app) + + return app + + def add_custom_routes(self, app: FastAPI): + """Add server-specific, non-modular routes to the app.""" + + @app.get("/diagnostic", tags=["Diagnostics"]) + async def diagnostic(): + """Provides diagnostic information about the server setup.""" + agent_loader_status = ( + "initialized" if self.adk_services.agent_loader else "not_initialized" + ) + + return { + "status": "success", + "message": "Server components are active.", + "agent_loader_status": agent_loader_status, + "settings": { + "agent_dir": str(self.settings.agent_parent_dir), + "reload_agents": self.settings.reload_agents, + "debug": self.settings.debug, + "app_name": self.settings.app_name, + }, + "services": { + "session_service": type( + self.adk_services.session_service + ).__name__, + "memory_service": type(self.adk_services.memory_service).__name__, + "artifact_service": type( + self.adk_services.artifact_service + ).__name__, + }, + } + + @app.get("/health", tags=["Health"]) + async def health_check(): + """Health check endpoint.""" + try: + # Perform basic health checks + checks = { + "agent_loader": self.adk_services.agent_loader is not None, + "session_service": self.adk_services.session_service is not None, + "memory_service": self.adk_services.memory_service is not None, + } + + all_healthy = all(checks.values()) + + return { + "status": "healthy" if all_healthy else "degraded", + "checks": checks, + "timestamp": time.time(), + } + except Exception as e: + logger.error(f"Health check failed: {e}") + return { + "status": "unhealthy", + "error": str(e), + "timestamp": time.time(), + } + + @app.get("/", include_in_schema=False) + async def root(): + """Root endpoint with API information.""" + return { + "message": "ADK Custom FastAPI server is running", + "app_name": self.settings.app_name, + "docs_url": "/docs", + "diagnostic_url": "/diagnostic", + "health_url": "/health", + } + + def _register_modular_routers(self, app: FastAPI): + """Register the main modular routers, overriding ADK defaults.""" + try: + routes_to_remove = [] + for route in app.routes: + # Identify original ADK routes by path and methods + if route.path in [ + "/run_sse", + ] and hasattr(route, "methods"): + # Check if it's a POST method route + if "POST" in route.methods: + routes_to_remove.append(route) + + for route in routes_to_remove: + app.routes.remove(route) + + if routes_to_remove: + logger.info( + f"Successfully removed {len(routes_to_remove)} original ADK routes" + " for override." + ) + + if self.agent_router: + app.include_router( + self.agent_router.get_router(), + ) + logger.info( + "Registered AgentRouter, overriding default agent endpoints." + ) + + except Exception as e: + logger.error(f"Failed to register modular routers: {e}", exc_info=True) + + def _setup_custom_lifespan(self): + """Setup custom lifespan events for startup and shutdown.""" + + @asynccontextmanager + async def custom_lifespan(app: FastAPI): + logger.info("Server startup sequence initiated...") + + # Startup logic + try: + # Validate services are properly initialized + if not self.adk_services.agent_loader: + logger.warning("Agent loader not properly initialized") + + # Log configuration + logger.info( + f"Server configured with settings: {self.settings.app_name}" + ) + logger.info(f"Agent directory: {self.agents_root}") + logger.info(f"Debug mode: {self.settings.debug}") + + except Exception as e: + logger.error(f"Error during startup: {e}", exc_info=True) + + yield + + # Shutdown logic + logger.info("Server shutdown sequence initiated...") + try: + self._stop_observer() + + # Additional cleanup can go here + logger.info("Server shutdown completed successfully") + + except Exception as e: + logger.error(f"Error during shutdown: {e}", exc_info=True) + + return custom_lifespan + + def get_service(self, service_name: str) -> Any: + """ + Get a specific service from the ADK services container. + + Args: + service_name: Name of the service to retrieve + + Returns: + The requested service + + Raises: + AttributeError: If the service doesn't exist + """ + return getattr(self.adk_services, service_name) diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/__init__.py b/contributing/samples/fastapi_modular_server/app/api/routers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py new file mode 100644 index 0000000000..46ccaa1070 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.api.custom_adk_server import CustomAdkWebServer + +import json +import logging +import time +from typing import AsyncGenerator + +from app.core.dependencies import get_sse_event_mapper +from app.core.mapping.sse_event_mapper import SSEEventMapper +from app.models.streaming_request import RunAgentRequestOptimized +from fastapi import APIRouter +from fastapi import Depends +from fastapi import HTTPException +from google.adk.agents.run_config import RunConfig +from google.adk.agents.run_config import StreamingMode +from google.adk.utils.context_utils import Aclosing +from starlette.responses import StreamingResponse + +logger = logging.getLogger(__name__) + + +class AgentRouter: + """Agent-related endpoints router.""" + + def __init__(self, web_server_instance: "CustomAdkWebServer"): + self.web_server = web_server_instance + self.router = APIRouter() + self._setup_routes() + + def _setup_routes(self): + """Setup all agent-related routes.""" + + @self.router.post("/run_sse") + async def custom_run_agent_sse( + req: RunAgentRequestOptimized, + sse_event_mapper: SSEEventMapper = Depends(get_sse_event_mapper), + ) -> StreamingResponse: + """ + Custom implementation of the run_agent_sse endpoint with enhanced logging. + """ + session = await self.web_server.session_service.get_session( + app_name=req.app_name, user_id=req.user_id, session_id=req.session_id + ) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + logger.info( + f"Starting CUSTOM SSE for app: {req.app_name} and user: {req.user_id}" + ) + + return StreamingResponse( + self._generate_events(req, sse_event_mapper), + media_type="text/event-stream", + ) + + async def _generate_events( + self, + req: RunAgentRequestOptimized, + sse_event_mapper: SSEEventMapper, + ) -> AsyncGenerator[str, None]: + """Generate SSE events for the agent run.""" + try: + yield ( + "data:" + f" {json.dumps({'status': 'Starting custom SSE process.', 'timestamp': time.time()})}\n\n" + ) + + stream_mode = StreamingMode.SSE if req.streaming else StreamingMode.NONE + + runner = await self.web_server.get_runner_async(req.app_name) + + async with Aclosing( + runner.run_async( + user_id=req.user_id, + session_id=req.session_id, + new_message=req.new_message, + state_delta=req.state_delta, + run_config=RunConfig(streaming_mode=stream_mode), + ) + ) as agen: + async for event in agen: + logger.debug(f"Received event: {event}") + sse_message = sse_event_mapper.map_event_to_sse_message( + event, req.optimization_level + ) + if sse_message: + yield sse_message + + except Exception as e: + logger.error(f"Error in SSE handler: {str(e)}", exc_info=True) + yield f'data: {json.dumps({"error": f"An error occurred: {str(e)}"})}\n\n' + + def get_router(self) -> APIRouter: + """Returns the configured FastAPI router.""" + return self.router diff --git a/contributing/samples/fastapi_modular_server/app/config/settings.py b/contributing/samples/fastapi_modular_server/app/config/settings.py new file mode 100644 index 0000000000..716dc52ad3 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/config/settings.py @@ -0,0 +1,58 @@ +from pathlib import Path +from typing import Optional + +from pydantic import Field +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings using Pydantic for validation and environment variable support.""" + + # Application + app_name: str = "ADK Agent FastAPI Server" + debug: bool = Field(default=False, description="Enable debug mode") + port: int = Field(default=8881, description="Server port") + host: str = Field(default="localhost", description="Server host") + + # Paths + current_dir: Path = Field( + default_factory=lambda: Path(__file__).parent.parent.absolute() + ) + agent_parent_dir: Optional[Path] = None + artifact_root_path: Optional[Path] = None + + # Database + session_db_url: Optional[str] = None + + # ADK Configuration + serve_web_interface: bool = Field( + default=True, description="Serve web interface" + ) + reload_agents: bool = Field( + default=True, description="Enable agent hot-reload" + ) + + # CORS + allow_origins: list[str] = Field( + default=["*"], description="CORS allowed origins" + ) + + # Logging + log_level: str = Field(default="INFO", description="Logging level") + + model_config = { + "env_file": ".env", + "env_file_encoding": "utf-8", + "case_sensitive": False, + "extra": "allow", # Allows extra fields and makes them accessible + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + # Set computed defaults after initialization + if self.agent_parent_dir is None: + self.agent_parent_dir = self.current_dir / "agents" + + +# Global settings instance +settings = Settings() diff --git a/contributing/samples/fastapi_modular_server/app/core/__init__.py b/contributing/samples/fastapi_modular_server/app/core/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/core/dependencies.py b/contributing/samples/fastapi_modular_server/app/core/dependencies.py new file mode 100644 index 0000000000..cbdc8d2a47 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/core/dependencies.py @@ -0,0 +1,109 @@ +# dependencies.py - Improved version +from functools import lru_cache +import logging + +from app.config.settings import Settings +from app.core.mapping.sse_event_mapper import SSEEventMapper +from fastapi import Depends +from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService +from google.adk.auth.credential_service.in_memory_credential_service import InMemoryCredentialService +from google.adk.cli.utils.agent_loader import AgentLoader +from google.adk.evaluation.local_eval_set_results_manager import LocalEvalSetResultsManager +from google.adk.evaluation.local_eval_sets_manager import LocalEvalSetsManager +from google.adk.memory import InMemoryMemoryService +from google.adk.sessions import BaseSessionService +from google.adk.sessions import DatabaseSessionService +from google.adk.sessions import InMemorySessionService + +logger = logging.getLogger(__name__) + + +# Dependency to get settings +def get_settings() -> Settings: + """Get application settings.""" + from app.config.settings import settings # Import here to avoid circular imports + + return settings + + +class ADKServices: + """Container for ADK services to avoid long parameter lists.""" + + def __init__( + self, + agent_loader: AgentLoader, + session_service: BaseSessionService, + memory_service: InMemoryMemoryService, + artifact_service: InMemoryArtifactService, + credential_service: InMemoryCredentialService, + eval_sets_manager: LocalEvalSetsManager, + eval_set_results_manager: LocalEvalSetResultsManager, + ): + self.agent_loader = agent_loader + self.session_service = session_service + self.memory_service = memory_service + self.artifact_service = artifact_service + self.credential_service = credential_service + self.eval_sets_manager = eval_sets_manager + self.eval_set_results_manager = eval_set_results_manager + + +def _create_adk_services_impl(settings: Settings) -> ADKServices: + """Internal function to create ADK services.""" + try: + # Ensure directories exist + settings.agent_parent_dir.mkdir(parents=True, exist_ok=True) + + logger.info(f'Looking for agents in: {settings.agent_parent_dir}') + + # Create services + agent_loader = AgentLoader(agents_dir=str(settings.agent_parent_dir)) + session_service = ( + DatabaseSessionService(db_url=settings.session_db_url) + if settings.session_db_url + else InMemorySessionService() + ) + memory_service = InMemoryMemoryService() + artifact_service = InMemoryArtifactService() + credential_service = InMemoryCredentialService() + eval_sets_manager = LocalEvalSetsManager( + agents_dir=str(settings.agent_parent_dir) + ) + eval_set_results_manager = LocalEvalSetResultsManager( + agents_dir=str(settings.agent_parent_dir) + ) + + logger.info('All ADK services created successfully') + + return ADKServices( + agent_loader=agent_loader, + session_service=session_service, + memory_service=memory_service, + artifact_service=artifact_service, + credential_service=credential_service, + eval_sets_manager=eval_sets_manager, + eval_set_results_manager=eval_set_results_manager, + ) + + except Exception as e: + logger.error(f'Failed to create ADK services: {e}') + raise + + +# Cache based on settings identity to ensure singleton behavior +_adk_services_cache: dict[int, ADKServices] = {} + + +def get_adk_services(settings: Settings = Depends(get_settings)) -> ADKServices: + """Dependency provider for ADK services as a singleton.""" + settings_id = id(settings) + if settings_id not in _adk_services_cache: + _adk_services_cache[settings_id] = _create_adk_services_impl(settings) + return _adk_services_cache[settings_id] + + +# Use lru_cache for stateless singletons +@lru_cache() +def get_sse_event_mapper() -> SSEEventMapper: + """Dependency provider for the SSEEventMapper as a singleton.""" + return SSEEventMapper() diff --git a/contributing/samples/fastapi_modular_server/app/core/logging.py b/contributing/samples/fastapi_modular_server/app/core/logging.py new file mode 100644 index 0000000000..1a198d79e9 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/core/logging.py @@ -0,0 +1,25 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.config.settings import Settings + +import logging +import sys + + +def setup_logging(settings: Settings) -> None: + """Configure application logging.""" + logging.basicConfig( + level=getattr(logging, settings.log_level.upper()), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler("app.log") + if not settings.debug + else logging.NullHandler(), + ], + ) + + # Configure specific loggers + logging.getLogger("uvicorn.access").setLevel(logging.WARNING) + logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING) diff --git a/contributing/samples/fastapi_modular_server/app/core/mapping/__init__.py b/contributing/samples/fastapi_modular_server/app/core/mapping/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py new file mode 100644 index 0000000000..c6fa3f17c0 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py @@ -0,0 +1,66 @@ +import json +import logging +from typing import Any +from typing import Dict +from typing import Optional + +from app.models.streaming_request import OptimizationLevel +from google.adk.events.event import Event + +logger = logging.getLogger(__name__) + + +class SSEEventMapper: + """ + Maps and optimizes ADK Events into Server-Sent Event (SSE) messages. + """ + + def map_event_to_sse_message( + self, event: Event, optimization_level: OptimizationLevel + ) -> Optional[str]: + try: + if optimization_level == OptimizationLevel.MINIMAL: + payload = self._create_minimal_payload(event) + elif optimization_level == OptimizationLevel.BALANCED: + payload = self._create_balanced_payload(event) + else: + payload = self._create_full_compat_payload(event) + + if payload is None: + return None + + sse_json = json.dumps(payload, separators=(",", ":")) + return f"data: {sse_json}\n\n" + + except (TypeError, AttributeError) as e: + logger.error(f"Failed to map event: {e}", exc_info=True) + return f"data: {event.model_dump_json(exclude_none=True)}\n\n" + + def _create_minimal_payload(self, event: Event) -> Optional[Dict[str, Any]]: + # Skip debug or system events based on author + if event.author in ["system", "debug"]: + return None + + payload = {"author": event.author} + + # Extract text content if available + if event.content and event.content.parts: + text_parts = [ + part.text + for part in event.content.parts + if hasattr(part, "text") and part.text + ] + if text_parts: + payload["text"] = " ".join(text_parts) + + return payload + + def _create_balanced_payload(self, event: Event) -> Optional[Dict[str, Any]]: + payload = self._create_minimal_payload(event) + if payload is None: + return None + payload["invocation_id"] = event.invocation_id + return payload + + def _create_full_compat_payload(self, event: Event) -> Dict[str, Any]: + return event.model_dump(exclude_none=True) diff --git a/contributing/samples/fastapi_modular_server/app/models/__init__.py b/contributing/samples/fastapi_modular_server/app/models/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/models/streaming_request.py b/contributing/samples/fastapi_modular_server/app/models/streaming_request.py new file mode 100644 index 0000000000..787fbf8318 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/models/streaming_request.py @@ -0,0 +1,20 @@ +from enum import Enum + +from google.adk.cli.adk_web_server import RunAgentRequest + + +class OptimizationLevel(str, Enum): + """Enumeration for the available SSE optimization levels.""" + + MINIMAL = "minimal" + BALANCED = "balanced" + FULL_COMPAT = "full_compat" + + +class RunAgentRequestOptimized(RunAgentRequest): + """ + Request model for the enhanced SSE endpoint. + This can extend the ADK's `RunAgentRequest` if available, or be standalone. + """ + + optimization_level: OptimizationLevel = OptimizationLevel.FULL_COMPAT diff --git a/contributing/samples/fastapi_modular_server/main.py b/contributing/samples/fastapi_modular_server/main.py new file mode 100644 index 0000000000..887083b391 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/main.py @@ -0,0 +1,80 @@ +# main.py - Improved version +import logging + +from app.api.custom_adk_server import CustomAdkWebServer +from app.config.settings import settings +from app.core.dependencies import get_adk_services +from app.core.logging import setup_logging +from fastapi import FastAPI +import uvicorn + +logger = logging.getLogger(__name__) + + +def create_app() -> FastAPI: + """ + Application Factory. + + Creates and configures the main FastAPI application instance by orchestrating + the setup of logging, services, and the custom web server. + + Returns: + The fully configured FastAPI application instance. + """ + # 1. Setup logging as the very first step. + setup_logging(settings) + + logger.info( + f"Starting application factory for '{settings.app_name}' " + f"in {'DEBUG' if settings.debug else 'PRODUCTION'} mode." + ) + + try: + # 2. Create the foundational ADK services. + logger.debug("Initializing core ADK services...") + adk_services = get_adk_services(settings) + + # 3. Create an instance of your custom web server. + logger.debug("Creating CustomAdkWebServer instance...") + custom_server = CustomAdkWebServer( + settings=settings, + adk_services=adk_services, # Pass the services container + agents_dir=str(settings.agent_parent_dir), + ) + + # 4. Get the final, fully configured FastAPI app + logger.debug( + "Assembling the enhanced FastAPI app from the custom server..." + ) + app = custom_server.get_enhanced_fast_api_app() + + logger.info("FastAPI application created and configured successfully.") + return app + + except Exception as e: + logger.critical( + f"FATAL: Failed to create FastAPI application: {e}", exc_info=True + ) + raise + + +# Create the global 'app' instance by calling the factory. +app = create_app() + + +if __name__ == "__main__": + print("--- Starting ADK FastAPI Server for Development ---") + print(f"Host: http://{settings.host}:{settings.port}") + print(f"API Docs: http://{settings.host}:{settings.port}/docs") + if settings.serve_web_interface: + print(f"Web UI: http://{settings.host}:{settings.port}/ui") + print(f"Reload on changes: {settings.debug}") + print("----------------------------------------------------") + + uvicorn.run( + "main:app", + host=settings.host, + port=settings.port, + reload=settings.debug, + log_level=settings.log_level.lower(), + )