Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/api/v1/endpoints/annotator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

from fastapi import APIRouter, Depends
from core.llm_provider import llm_instance
from services.annotation_service import AnnotationService
from api.v1.schemas.annotator_schemas import AnnotationRequest, AnnotationResponse

router = APIRouter()

# AnnotationService 인스턴스를 싱글턴으로 관리
annotation_service_instance = AnnotationService(llm=llm_instance)

def get_annotation_service():
"""의존성 주입을 통해 AnnotationService 인스턴스를 제공합니다."""
return annotation_service_instance

@router.post("/annotator", response_model=AnnotationResponse)
async def create_annotations(
request: AnnotationRequest,
service: AnnotationService = Depends(get_annotation_service)
):
"""
DB 스키마 정보를 받아 각 요소에 대한 설명을 비동기적으로 생성하여 반환합니다.
"""
return await service.generate_for_schema(request)
2 changes: 1 addition & 1 deletion src/api/v1/endpoints/chat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# src/api/v1/endpoints/chat.py

from fastapi import APIRouter, Depends
from api.v1.schemas import ChatRequest, ChatResponse
from api.v1.schemas.chatbot_schemas import ChatRequest, ChatResponse
from services.chatbot_service import ChatbotService

router = APIRouter()
Expand Down
47 changes: 47 additions & 0 deletions src/api/v1/schemas/annotator_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# src/api/v1/schemas/annotator_schemas.py

from pydantic import BaseModel, Field
from typing import List, Dict, Any

class Column(BaseModel):
column_name: str
data_type: str

class Table(BaseModel):
table_name: str
columns: List[Column]
sample_rows: List[Dict[str, Any]]

class Relationship(BaseModel):
from_table: str
from_columns: List[str]
to_table: str
to_columns: List[str]

class Database(BaseModel):
database_name: str
tables: List[Table]
relationships: List[Relationship]

class AnnotationRequest(BaseModel):
dbms_type: str
databases: List[Database]

class AnnotatedColumn(Column):
description: str = Field(..., description="AI가 생성한 컬럼 설명")

class AnnotatedTable(Table):
description: str = Field(..., description="AI가 생성한 테이블 설명")
columns: List[AnnotatedColumn]

class AnnotatedRelationship(Relationship):
description: str = Field(..., description="AI가 생성한 관계 설명")

class AnnotatedDatabase(Database):
description: str = Field(..., description="AI가 생성한 데이터베이스 설명")
tables: List[AnnotatedTable]
relationships: List[AnnotatedRelationship]

class AnnotationResponse(BaseModel):
dbms_type: str
databases: List[AnnotatedDatabase]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# src/api/v1/schemas.py
# src/api/v1/schemas/chatbot_schemas.py

from pydantic import BaseModel

Expand Down
9 changes: 8 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from contextlib import closing
import uvicorn
from fastapi import FastAPI
from api.v1.endpoints import chat
from api.v1.endpoints import chat, annotator

def find_free_port():
"""사용 가능한 비어있는 포트를 찾는 함수"""
Expand All @@ -27,6 +27,13 @@ def find_free_port():
tags=["Chatbot"]
)

# '/api/v1' 경로에 annotator 라우터 포함
app.include_router(
annotator.router,
prefix="/api/v1",
tags=["Annotator"]
)

@app.get("/")
def health_check():
"""헬스체크 엔드포인트, 서버 상태가 정상이면 'ok' 반환합니다."""
Expand Down
100 changes: 100 additions & 0 deletions src/services/annotation_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# src/services/annotation_service.py

import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from api.v1.schemas.annotator_schemas import (
AnnotationRequest, AnnotationResponse,
Database, Table, Column, Relationship,
AnnotatedDatabase, AnnotatedTable, AnnotatedColumn, AnnotatedRelationship
)

class AnnotationService():
"""
어노테이션 생성과 관련된 모든 비즈니스 로직을 담당하는 서비스 클래스.
LLM 호출을 비동기적으로 처리하여 성능을 최적화합니다.
"""
def __init__(self, llm: ChatOpenAI):
self.llm = llm

async def _generate_description(self, template: str, **kwargs) -> str:
"""LLM을 비동기적으로 호출하여 설명을 생성하는 헬퍼 함수"""
prompt = ChatPromptTemplate.from_template(template)
chain = prompt | self.llm | StrOutputParser()
return await chain.ainvoke(kwargs)

async def _annotate_column(self, table_name: str, sample_rows: str, column: Column) -> AnnotatedColumn:
"""단일 컬럼을 비동기적으로 어노테이트합니다."""
column_desc = await self._generate_description(
"""
테이블 '{table_name}'의 컬럼 '{column_name}'(타입: {data_type})의 역할을 한국어로 간결하게 설명해줘.
샘플 데이터: {sample_rows}
""",
table_name=table_name,
column_name=column.column_name,
data_type=column.data_type,
sample_rows=sample_rows
)
return AnnotatedColumn(**column.model_dump(), description=column_desc.strip())

async def _annotate_table(self, db_name: str, table: Table) -> AnnotatedTable:
"""단일 테이블과 그 컬럼들을 비동기적으로 어노테이트합니다."""
sample_rows_str = str(table.sample_rows[:3])

# 테이블 설명 생성과 모든 컬럼 설명을 동시에 병렬로 처리
table_desc_task = self._generate_description(
"데이터베이스 '{db_name}'에 속한 테이블 '{table_name}'의 역할을 한국어로 간결하게 설명해줘.",
db_name=db_name, table_name=table.table_name
)
column_tasks = [self._annotate_column(table.table_name, sample_rows_str, col) for col in table.columns]

results = await asyncio.gather(table_desc_task, *column_tasks)

table_desc = results[0].strip()
annotated_columns = results[1:]

return AnnotatedTable(**table.model_dump(exclude={'columns'}), description=table_desc, columns=annotated_columns)

async def _annotate_relationship(self, relationship: Relationship) -> AnnotatedRelationship:
"""단일 관계를 비동기적으로 어노테이트합니다."""
rel_desc = await self._generate_description(
"""
테이블 '{from_table}'이(가) 테이블 '{to_table}'을(를) 참조하고 있습니다.
이 관계를 한국어 문장으로 설명해줘.
""",
from_table=relationship.from_table, to_table=relationship.to_table
)
return AnnotatedRelationship(**relationship.model_dump(), description=rel_desc.strip())

async def generate_for_schema(self, request: AnnotationRequest) -> AnnotationResponse:
"""
입력된 스키마 전체에 대한 어노테이션을 비동기적으로 생성합니다.
"""
annotated_databases = []
for db in request.databases:
# DB 설명, 모든 테이블, 모든 관계 설명을 동시에 병렬로 처리
db_desc_task = self._generate_description(
"데이터베이스 '{db_name}'의 역할을 한국어로 간결하게 설명해줘.",
db_name=db.database_name
)

table_tasks = [self._annotate_table(db.database_name, table) for table in db.tables]
relationship_tasks = [self._annotate_relationship(rel) for rel in db.relationships]

db_desc_result, *other_results = await asyncio.gather(db_desc_task, *table_tasks, *relationship_tasks)

num_tables = len(table_tasks)
annotated_tables = other_results[:num_tables]
annotated_relationships = other_results[num_tables:]

annotated_databases.append(
AnnotatedDatabase(
database_name=db.database_name,
description=db_desc_result.strip(),
tables=annotated_tables,
relationships=annotated_relationships
)
)

return AnnotationResponse(dbms_type=request.dbms_type, databases=annotated_databases)