-
Notifications
You must be signed in to change notification settings - Fork 592
/
Copy pathsingleton.py
81 lines (64 loc) · 2.21 KB
/
singleton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""API server module.
This module initializes and configures the FastAPI application,
including routers, middleware, and startup/shutdown events.
The API server provides endpoints for agent execution and management.
"""
import logging
from contextlib import asynccontextmanager
import sentry_sdk
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.admin.api import admin_router, admin_router_readonly
from app.admin.health import health_router
from app.config.config import config
from app.services.twitter.oauth2 import router as twitter_oauth2_router
from app.services.twitter.oauth2_callback import router as twitter_callback_router
from models.db import init_db
from models.redis import init_redis
# init logger
logger = logging.getLogger(__name__)
if config.sentry_dsn:
sentry_sdk.init(
dsn=config.sentry_dsn,
sample_rate=config.sentry_sample_rate,
traces_sample_rate=config.sentry_traces_sample_rate,
profiles_sample_rate=config.sentry_profiles_sample_rate,
environment=config.env,
release=config.release,
server_name="intent-singleton",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle.
This context manager:
1. Initializes database connection
2. Performs any necessary startup tasks
3. Handles graceful shutdown
Args:
app: FastAPI application instance
"""
# Initialize database
await init_db(**config.db)
# Initialize Redis if configured
if config.redis_host:
await init_redis(
host=config.redis_host,
port=config.redis_port,
)
logger.info("API server start")
yield
# Clean up will run after the API server shutdown
logger.info("Cleaning up and shutdown...")
app = FastAPI(lifespan=lifespan)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
app.include_router(admin_router)
app.include_router(admin_router_readonly)
app.include_router(twitter_callback_router)
app.include_router(twitter_oauth2_router)
app.include_router(health_router)