diff --git a/src/api/v1/endpoints/annotator.py b/src/api/v1/endpoints/annotator.py new file mode 100644 index 0000000..508fb94 --- /dev/null +++ b/src/api/v1/endpoints/annotator.py @@ -0,0 +1,24 @@ + +from fastapi import APIRouter, Depends +from core.llm_provider import llm_instance +from services.annotation_service import AnnotationService +from api.v1.schemas.annotator_schemas import AnnotationRequest, AnnotationResponse + +router = APIRouter() + +# AnnotationService 인스턴스를 싱글턴으로 관리 +annotation_service_instance = AnnotationService(llm=llm_instance) + +def get_annotation_service(): + """의존성 주입을 통해 AnnotationService 인스턴스를 제공합니다.""" + return annotation_service_instance + +@router.post("/annotator", response_model=AnnotationResponse) +async def create_annotations( + request: AnnotationRequest, + service: AnnotationService = Depends(get_annotation_service) +): + """ + DB 스키마 정보를 받아 각 요소에 대한 설명을 비동기적으로 생성하여 반환합니다. + """ + return await service.generate_for_schema(request) diff --git a/src/api/v1/endpoints/chat.py b/src/api/v1/endpoints/chat.py index e3ca8dc..09aa2d3 100644 --- a/src/api/v1/endpoints/chat.py +++ b/src/api/v1/endpoints/chat.py @@ -1,7 +1,7 @@ # src/api/v1/endpoints/chat.py from fastapi import APIRouter, Depends -from api.v1.schemas import ChatRequest, ChatResponse +from api.v1.schemas.chatbot_schemas import ChatRequest, ChatResponse from services.chatbot_service import ChatbotService router = APIRouter() diff --git a/src/api/v1/schemas/annotator_schemas.py b/src/api/v1/schemas/annotator_schemas.py new file mode 100644 index 0000000..7db3174 --- /dev/null +++ b/src/api/v1/schemas/annotator_schemas.py @@ -0,0 +1,47 @@ +# src/api/v1/schemas/annotator_schemas.py + +from pydantic import BaseModel, Field +from typing import List, Dict, Any + +class Column(BaseModel): + column_name: str + data_type: str + +class Table(BaseModel): + table_name: str + columns: List[Column] + sample_rows: List[Dict[str, Any]] + +class Relationship(BaseModel): + from_table: str + from_columns: List[str] + to_table: str + to_columns: List[str] + +class Database(BaseModel): + database_name: str + tables: List[Table] + relationships: List[Relationship] + +class AnnotationRequest(BaseModel): + dbms_type: str + databases: List[Database] + +class AnnotatedColumn(Column): + description: str = Field(..., description="AI가 생성한 컬럼 설명") + +class AnnotatedTable(Table): + description: str = Field(..., description="AI가 생성한 테이블 설명") + columns: List[AnnotatedColumn] + +class AnnotatedRelationship(Relationship): + description: str = Field(..., description="AI가 생성한 관계 설명") + +class AnnotatedDatabase(Database): + description: str = Field(..., description="AI가 생성한 데이터베이스 설명") + tables: List[AnnotatedTable] + relationships: List[AnnotatedRelationship] + +class AnnotationResponse(BaseModel): + dbms_type: str + databases: List[AnnotatedDatabase] diff --git a/src/api/v1/schemas.py b/src/api/v1/schemas/chatbot_schemas.py similarity index 76% rename from src/api/v1/schemas.py rename to src/api/v1/schemas/chatbot_schemas.py index 0d57177..0db3a79 100644 --- a/src/api/v1/schemas.py +++ b/src/api/v1/schemas/chatbot_schemas.py @@ -1,4 +1,4 @@ -# src/api/v1/schemas.py +# src/api/v1/schemas/chatbot_schemas.py from pydantic import BaseModel diff --git a/src/main.py b/src/main.py index 98c0984..b25548d 100644 --- a/src/main.py +++ b/src/main.py @@ -4,7 +4,7 @@ from contextlib import closing import uvicorn from fastapi import FastAPI -from api.v1.endpoints import chat +from api.v1.endpoints import chat, annotator def find_free_port(): """사용 가능한 비어있는 포트를 찾는 함수""" @@ -27,6 +27,13 @@ def find_free_port(): tags=["Chatbot"] ) +# '/api/v1' 경로에 annotator 라우터 포함 +app.include_router( + annotator.router, + prefix="/api/v1", + tags=["Annotator"] +) + @app.get("/") def health_check(): """헬스체크 엔드포인트, 서버 상태가 정상이면 'ok' 반환합니다.""" diff --git a/src/services/annotation_service.py b/src/services/annotation_service.py new file mode 100644 index 0000000..4e0e823 --- /dev/null +++ b/src/services/annotation_service.py @@ -0,0 +1,100 @@ +# src/services/annotation_service.py + +import asyncio +from langchain_openai import ChatOpenAI +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.output_parsers import StrOutputParser +from api.v1.schemas.annotator_schemas import ( + AnnotationRequest, AnnotationResponse, + Database, Table, Column, Relationship, + AnnotatedDatabase, AnnotatedTable, AnnotatedColumn, AnnotatedRelationship +) + +class AnnotationService(): + """ + 어노테이션 생성과 관련된 모든 비즈니스 로직을 담당하는 서비스 클래스. + LLM 호출을 비동기적으로 처리하여 성능을 최적화합니다. + """ + def __init__(self, llm: ChatOpenAI): + self.llm = llm + + async def _generate_description(self, template: str, **kwargs) -> str: + """LLM을 비동기적으로 호출하여 설명을 생성하는 헬퍼 함수""" + prompt = ChatPromptTemplate.from_template(template) + chain = prompt | self.llm | StrOutputParser() + return await chain.ainvoke(kwargs) + + async def _annotate_column(self, table_name: str, sample_rows: str, column: Column) -> AnnotatedColumn: + """단일 컬럼을 비동기적으로 어노테이트합니다.""" + column_desc = await self._generate_description( + """ + 테이블 '{table_name}'의 컬럼 '{column_name}'(타입: {data_type})의 역할을 한국어로 간결하게 설명해줘. + 샘플 데이터: {sample_rows} + """, + table_name=table_name, + column_name=column.column_name, + data_type=column.data_type, + sample_rows=sample_rows + ) + return AnnotatedColumn(**column.model_dump(), description=column_desc.strip()) + + async def _annotate_table(self, db_name: str, table: Table) -> AnnotatedTable: + """단일 테이블과 그 컬럼들을 비동기적으로 어노테이트합니다.""" + sample_rows_str = str(table.sample_rows[:3]) + + # 테이블 설명 생성과 모든 컬럼 설명을 동시에 병렬로 처리 + table_desc_task = self._generate_description( + "데이터베이스 '{db_name}'에 속한 테이블 '{table_name}'의 역할을 한국어로 간결하게 설명해줘.", + db_name=db_name, table_name=table.table_name + ) + column_tasks = [self._annotate_column(table.table_name, sample_rows_str, col) for col in table.columns] + + results = await asyncio.gather(table_desc_task, *column_tasks) + + table_desc = results[0].strip() + annotated_columns = results[1:] + + return AnnotatedTable(**table.model_dump(exclude={'columns'}), description=table_desc, columns=annotated_columns) + + async def _annotate_relationship(self, relationship: Relationship) -> AnnotatedRelationship: + """단일 관계를 비동기적으로 어노테이트합니다.""" + rel_desc = await self._generate_description( + """ + 테이블 '{from_table}'이(가) 테이블 '{to_table}'을(를) 참조하고 있습니다. + 이 관계를 한국어 문장으로 설명해줘. + """, + from_table=relationship.from_table, to_table=relationship.to_table + ) + return AnnotatedRelationship(**relationship.model_dump(), description=rel_desc.strip()) + + async def generate_for_schema(self, request: AnnotationRequest) -> AnnotationResponse: + """ + 입력된 스키마 전체에 대한 어노테이션을 비동기적으로 생성합니다. + """ + annotated_databases = [] + for db in request.databases: + # DB 설명, 모든 테이블, 모든 관계 설명을 동시에 병렬로 처리 + db_desc_task = self._generate_description( + "데이터베이스 '{db_name}'의 역할을 한국어로 간결하게 설명해줘.", + db_name=db.database_name + ) + + table_tasks = [self._annotate_table(db.database_name, table) for table in db.tables] + relationship_tasks = [self._annotate_relationship(rel) for rel in db.relationships] + + db_desc_result, *other_results = await asyncio.gather(db_desc_task, *table_tasks, *relationship_tasks) + + num_tables = len(table_tasks) + annotated_tables = other_results[:num_tables] + annotated_relationships = other_results[num_tables:] + + annotated_databases.append( + AnnotatedDatabase( + database_name=db.database_name, + description=db_desc_result.strip(), + tables=annotated_tables, + relationships=annotated_relationships + ) + ) + + return AnnotationResponse(dbms_type=request.dbms_type, databases=annotated_databases)