Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions app/api/annotation_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from app.core.response import ResponseMessage
from app.core.status import CommonCode
from app.schemas.annotation.hierarchical_response_model import HierarchicalDBMSAnnotation
from app.schemas.annotation.request_model import AnnotationCreateRequest
from app.schemas.annotation.response_model import AnnotationDeleteResponse, FullAnnotationResponse
from app.services.annotation_service import AnnotationService, annotation_service
Expand Down Expand Up @@ -59,6 +60,22 @@ def get_annotation_by_db_profile_id(
return ResponseMessage.success(value=annotation, code=CommonCode.SUCCESS_FIND_ANNOTATION)


@router.get(
"/find/hierarchical/{db_profile_id}",
response_model=ResponseMessage[HierarchicalDBMSAnnotation],
summary="DB 프로필 ID로 계층적 어노테이션 조회",
)
def get_hierarchical_annotation_by_db_profile_id(
db_profile_id: str,
service: AnnotationService = annotation_service_dependency,
) -> ResponseMessage[HierarchicalDBMSAnnotation]:
"""
`db_profile_id`에 연결된 어노테이션을 계층 구조(DBMS > DB > 스키마 > 테이블 > 컬럼)로 조회합니다.
"""
annotation = service.get_hierarchical_annotation_by_db_profile_id(db_profile_id)
return ResponseMessage.success(value=annotation, code=CommonCode.SUCCESS_FIND_ANNOTATION)


@router.delete(
"/remove/{annotation_id}",
response_model=ResponseMessage[AnnotationDeleteResponse],
Expand Down
145 changes: 144 additions & 1 deletion app/repository/annotation_repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging
import sqlite3

from app.core.exceptions import APIException
from app.core.status import CommonCode
from app.core.utils import get_db_path
from app.schemas.annotation.db_model import (
ColumnAnnotationInDB,
Expand All @@ -10,6 +13,13 @@
TableAnnotationInDB,
TableConstraintInDB,
)
from app.schemas.annotation.hierarchical_response_model import (
HierarchicalColumnAnnotation,
HierarchicalDBAnnotation,
HierarchicalDBMSAnnotation,
HierarchicalRelationshipAnnotation,
HierarchicalTableAnnotation,
)
from app.schemas.annotation.response_model import (
ColumnAnnotationDetail,
ConstraintDetail,
Expand Down Expand Up @@ -99,7 +109,7 @@ def create_full_annotation(
(
c.id,
c.table_annotation_id,
c.constraint_type,
c.constraint_type.value,
c.name,
c.description,
c.expression,
Expand Down Expand Up @@ -279,6 +289,139 @@ def find_full_annotation_by_id(self, annotation_id: str) -> FullAnnotationRespon
if conn:
conn.close()

def find_hierarchical_annotation_by_profile_id(self, db_profile_id: str) -> HierarchicalDBMSAnnotation | None:
"""
db_profile_id로 계층적 어노테이션 정보를 조회합니다.
- DBMS > DB > 테이블 > 컬럼 구조로 데이터를 조립하여 반환합니다.
"""
db_path = get_db_path()
conn = None
try:
conn = sqlite3.connect(str(db_path), timeout=10)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

# 1. 기본 정보 조회 (db_profile, database_annotation)
cursor.execute(
"""
SELECT
dp.type as dbms_type,
da.id as annotation_id,
da.db_profile_id,
da.database_name,
da.description as db_description,
da.created_at,
da.updated_at
FROM db_profile dp
JOIN database_annotation da ON dp.annotation_id = da.id
WHERE dp.id = ?
""",
(db_profile_id,),
)
base_info = cursor.fetchone()
if not base_info:
return None

# 2. 테이블 및 컬럼 정보 한번에 조회
cursor.execute(
"""
SELECT
ta.id as table_id,
ta.table_name,
ta.description as table_description,
ca.column_name,
ca.description as column_description,
ca.data_type
FROM table_annotation ta
JOIN column_annotation ca ON ta.id = ca.table_annotation_id
WHERE ta.database_annotation_id = ?
ORDER BY ta.table_name, ca.ordinal_position
""",
(base_info["annotation_id"],),
)
rows = cursor.fetchall()

# 3. 데이터 계층 구조로 조립 (테이블, 컬럼)
tables_map = {}
for row in rows:
table_name = row["table_name"]
if table_name not in tables_map:
tables_map[table_name] = HierarchicalTableAnnotation(
table_name=table_name,
description=row["table_description"],
columns=[],
)
tables_map[table_name].columns.append(
HierarchicalColumnAnnotation(
column_name=row["column_name"],
description=row["column_description"],
data_type=row["data_type"],
)
)

# 4. 관계 정보 조회
cursor.execute(
"""
SELECT
ta_from.table_name as from_table,
ca_from.column_name as from_column,
tc.ref_table as to_table,
cc.referenced_column_name as to_column,
tc.name as constraint_name,
tc.description as relationship_description
FROM table_constraint tc
JOIN table_annotation ta_from ON tc.table_annotation_id = ta_from.id
JOIN constraint_column cc ON tc.id = cc.constraint_id
JOIN column_annotation ca_from ON cc.column_annotation_id = ca_from.id
WHERE ta_from.database_annotation_id = ? AND tc.constraint_type = 'FOREIGN KEY'
ORDER BY tc.name, cc.position
""",
(base_info["annotation_id"],),
)
relationship_rows = cursor.fetchall()
logging.info(f"Raw relationship rows from DB: {[dict(row) for row in relationship_rows]}")

relationships_map = {}
for row in relationship_rows:
constraint_name = row["constraint_name"]
if constraint_name not in relationships_map:
relationships_map[constraint_name] = {
"from_table": row["from_table"],
"to_table": row["to_table"],
"description": row["relationship_description"],
"from_columns": [],
"to_columns": [],
}
relationships_map[constraint_name]["from_columns"].append(row["from_column"])
relationships_map[constraint_name]["to_columns"].append(row["to_column"])

logging.info(f"Processed relationships map: {relationships_map}")
relationships = [HierarchicalRelationshipAnnotation(**data) for data in relationships_map.values()]
logging.info(f"Final relationships list: {relationships}")

# 5. 최종 데이터 조립
db = HierarchicalDBAnnotation(
db_name=base_info["database_name"],
description=base_info["db_description"],
tables=list(tables_map.values()),
relationships=relationships,
)

return HierarchicalDBMSAnnotation(
dbms_type=base_info["dbms_type"],
databases=[db],
annotation_id=base_info["annotation_id"],
db_profile_id=base_info["db_profile_id"],
created_at=base_info["created_at"],
updated_at=base_info["updated_at"],
)

except sqlite3.Error as e:
raise APIException(CommonCode.FAIL_FIND_ANNOTATION) from e
finally:
if conn:
conn.close()

def delete_annotation_by_id(self, annotation_id: str) -> bool:
"""
annotationId로 특정 어노테이션을 삭제합니다.
Expand Down
39 changes: 39 additions & 0 deletions app/schemas/annotation/hierarchical_response_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from datetime import datetime

from pydantic import BaseModel, Field


class HierarchicalColumnAnnotation(BaseModel):
column_name: str = Field(..., description="컬럼 이름")
description: str | None = Field(None, description="AI가 생성한 컬럼 어노테이션")
data_type: str | None = Field(None, description="컬럼 데이터 타입")


class HierarchicalTableAnnotation(BaseModel):
table_name: str = Field(..., description="테이블 이름")
description: str | None = Field(None, description="AI가 생성한 테이블 어노테이션")
columns: list[HierarchicalColumnAnnotation] = Field(..., description="테이블에 속한 컬럼 목록")


class HierarchicalRelationshipAnnotation(BaseModel):
from_table: str = Field(..., description="외래 키 제약조건이 시작되는 테이블")
from_columns: list[str] = Field(..., description="외래 키에 포함된 컬럼들")
to_table: str = Field(..., description="외래 키가 참조하는 테이블")
to_columns: list[str] = Field(..., description="참조되는 테이블의 컬럼들")
description: str | None = Field(None, description="AI가 생성한 관계 어노테이션")


class HierarchicalDBAnnotation(BaseModel):
db_name: str = Field(..., description="데이터베이스 이름")
description: str | None = Field(None, description="AI가 생성한 데이터베이스 어노테이션")
tables: list[HierarchicalTableAnnotation] = Field(..., description="데이터베이스에 속한 테이블 목록")
relationships: list[HierarchicalRelationshipAnnotation] = Field(..., description="테이블 간의 관계 목록")


class HierarchicalDBMSAnnotation(BaseModel):
dbms_type: str = Field(..., description="DBMS 종류 (e.g., postgresql, oracle)")
databases: list[HierarchicalDBAnnotation] = Field(..., description="DBMS에 속한 데이터베이스 목록")
annotation_id: str = Field(..., description="최상위 어노테이션 ID")
db_profile_id: str = Field(..., description="DB 프로필 ID")
created_at: datetime = Field(..., description="생성 일시")
updated_at: datetime = Field(..., description="수정 일시")
29 changes: 23 additions & 6 deletions app/services/annotation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
TableAnnotationInDB,
TableConstraintInDB,
)
from app.schemas.annotation.hierarchical_response_model import HierarchicalDBMSAnnotation
from app.schemas.annotation.request_model import AnnotationCreateRequest
from app.schemas.annotation.response_model import AnnotationDeleteResponse, FullAnnotationResponse
from app.schemas.user_db.db_profile_model import AllDBProfileInfo
Expand Down Expand Up @@ -134,6 +135,15 @@ def get_annotation_by_db_profile_id(self, db_profile_id: str) -> FullAnnotationR

return self.get_full_annotation(db_profile.annotation_id)

def get_hierarchical_annotation_by_db_profile_id(self, db_profile_id: str) -> HierarchicalDBMSAnnotation:
"""
db_profile_id를 기반으로 계층적인 어노테이션 정보를 조회합니다.
"""
annotation = self.repository.find_hierarchical_annotation_by_profile_id(db_profile_id)
if not annotation:
raise APIException(CommonCode.NO_ANNOTATION_FOR_PROFILE)
return annotation

def _prepare_ai_request_body(
self,
db_profile: AllDBProfileInfo,
Expand Down Expand Up @@ -194,7 +204,11 @@ def _prepare_ai_request_body(
database_name=db_profile.name or db_profile.username, tables=ai_tables, relationships=ai_relationships
)

return AIAnnotationRequest(dbms_type=db_profile.type, databases=[ai_database])
request_body = AIAnnotationRequest(dbms_type=db_profile.type, databases=[ai_database])

logging.info(request_body.model_dump_json(indent=2))

return request_body

def _transform_ai_response_to_db_models(
self,
Expand Down Expand Up @@ -335,20 +349,23 @@ def _process_constraints(
) -> tuple[list[TableConstraintInDB], list[ConstraintColumnInDB]]:
"""
테이블의 제약조건 및 제약조건 컬럼 어노테이션 모델 리스트를 생성합니다.
AI 응답이 아닌 원본 스키마의 모든 제약조건을 기준으로 처리합니다.
"""
constraint_annos, constraint_col_annos = [], []
for const_data in tbl_data.get("constraints", []):
original_constraint = next((c for c in original_table.constraints if c.name == const_data["name"]), None)
if not original_constraint:
continue
ai_constraints_lookup = {c["name"]: c for c in tbl_data.get("constraints", [])}

for original_constraint in original_table.constraints:
const_data = ai_constraints_lookup.get(original_constraint.name)
annotation = const_data.get("annotation") if const_data else None

const_id = generate_prefixed_uuid(DBSaveIdEnum.table_constraint.value)
constraint_annos.append(
TableConstraintInDB(
id=const_id,
table_annotation_id=table_id,
name=original_constraint.name,
constraint_type=ConstraintTypeEnum(original_constraint.type),
description=const_data.get("annotation"),
description=annotation,
ref_table=original_constraint.referenced_table,
expression=original_constraint.check_expression,
on_update_action=original_constraint.on_update,
Expand Down