Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/api/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from app.api import (
annotation_api,
api_key_api,
chat_messages_api,
chat_tab_api,
driver_api,
query_api,
test_api,
user_db_api,
chat_messages_api,
)

api_router = APIRouter()
Expand Down
4 changes: 2 additions & 2 deletions app/api/user_db_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ def find_all_schema_info(

@router.get(
"/find/hierarchical-schema/{profile_id}",
response_model=ResponseMessage[DBDetail],
response_model=ResponseMessage[list[DBDetail]],
summary="특정 DB의 전체 스키마의 계층적 상세 정보 조회",
description="스키마, 테이블, 컬럼, 제약조건, 인덱스를 포함한 모든 스키마 정보를 계층 구조로 반환합니다.",
)
def find_hierarchical_schema_info(
profile_id: str, service: UserDbService = user_db_service_dependency
) -> ResponseMessage[DBDetail]:
) -> ResponseMessage[list[DBDetail]]:
db_info = service.find_profile(profile_id)
hierarchical_schema_info = service.get_hierarchical_schema_info(db_info)

Expand Down
12 changes: 9 additions & 3 deletions app/core/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ class CommonCode(Enum):
SUCCESS_DRIVER_INFO = (status.HTTP_200_OK, "2100", "드라이버 정보 조회를 성공하였습니다.")
SUCCESS_USER_DB_CONNECT_TEST = (status.HTTP_200_OK, "2101", "테스트 연결을 성공하였습니다.")
SUCCESS_FIND_PROFILE = (status.HTTP_200_OK, "2102", "디비 정보 조회를 성공하였습니다.")
SUCCESS_FIND_SCHEMAS = (status.HTTP_200_OK, "2103", "디비 스키마 정보 조회를 성공하였습니다.")
SUCCESS_FIND_TABLES = (status.HTTP_200_OK, "2104", "디비 테이블 정보 조회를 성공하였습니다.")
SUCCESS_FIND_COLUMNS = (status.HTTP_200_OK, "2105", "디비 컬럼 정보 조회를 성공하였습니다.")
SUCCESS_FIND_DATABASES = (status.HTTP_200_OK, "2103", "데이터베이스 정보 조회를 성공하였습니다.")
SUCCESS_FIND_SCHEMAS = (status.HTTP_200_OK, "2104", "디비 스키마 정보 조회를 성공하였습니다.")
SUCCESS_FIND_TABLES = (status.HTTP_200_OK, "2105", "디비 테이블 정보 조회를 성공하였습니다.")
SUCCESS_FIND_COLUMNS = (status.HTTP_200_OK, "2106", "디비 컬럼 정보 조회를 성공하였습니다.")
SUCCESS_SAVE_PROFILE = (status.HTTP_200_OK, "2130", "디비 연결 정보를 저장하였습니다.")
SUCCESS_UPDATE_PROFILE = (status.HTTP_200_OK, "2150", "디비 연결 정보를 업데이트 하였습니다.")
SUCCESS_DELETE_PROFILE = (status.HTTP_200_OK, "2170", "디비 연결 정보를 삭제 하였습니다.")
Expand Down Expand Up @@ -136,6 +137,11 @@ class CommonCode(Enum):
)
FAIL_FIND_INDEXES = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5108", "디비 인덱스 정보 조회 중 에러가 발생했습니다.")
FAIL_FIND_SAMPLE_ROWS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5106", "샘플 데이터 조회 중 에러가 발생했습니다.")
FAIL_FIND_DATABASES = (
status.HTTP_500_INTERNAL_SERVER_ERROR,
"5109",
"데이터베이스 정보 조회 중 에러가 발생했습니다.",
)
FAIL_SAVE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5130", "디비 정보 저장 중 에러가 발생했습니다.")
FAIL_UPDATE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5150", "디비 정보 업데이트 중 에러가 발생했습니다.")
FAIL_DELETE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5170", "디비 정보 삭제 중 에러가 발생했습니다.")
Expand Down
40 changes: 40 additions & 0 deletions app/repository/user_db_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ColumnInfo,
ColumnListResult,
ConstraintInfo,
DatabaseListResult,
DBProfile,
IndexInfo,
SchemaListResult,
Expand Down Expand Up @@ -155,6 +156,45 @@ def find_profile(self, sql: str, data: tuple) -> AllDBProfileInfo:
if connection:
connection.close()

# ─────────────────────────────
# 데이터베이스 조회
# ─────────────────────────────
def find_databases(
self, driver_module: Any, db_type: str, database_query: str | None, **kwargs: Any
) -> DatabaseListResult:
connection = None
logging.info(f"Attempting to find databases for db_type: '{db_type}' with connection args: {kwargs}")
try:
connection = self._connect(driver_module, **kwargs)
cursor = connection.cursor()

if not database_query:
if db_type == DBTypesEnum.sqlite.name:
cursor.execute("PRAGMA database_list;")
rows = cursor.fetchall()
logging.info(f"SQLite PRAGMA database_list result: {rows}")
db_name = next((row[1] for row in rows if row[2] is not None), "main")
logging.info(f"Found SQLite database: {db_name}")
return DatabaseListResult(
is_successful=True, code=CommonCode.SUCCESS_FIND_DATABASES, databases=[db_name]
)
else:
logging.warning(f"No database query provided for db_type: '{db_type}'. Returning empty list.")
return DatabaseListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_DATABASES, databases=[])

logging.info(f"Executing database query for {db_type}: {database_query}")
cursor.execute(database_query)
rows = cursor.fetchall()
logging.info(f"Raw databases found for {db_type}: {rows}")
databases = [row[0] for row in rows]
return DatabaseListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_DATABASES, databases=databases)
except Exception as e:
logging.error(f"Failed to find databases for {db_type}. Error: {e}", exc_info=True)
return DatabaseListResult(is_successful=False, code=CommonCode.FAIL_FIND_DATABASES, databases=[])
finally:
if connection:
connection.close()

# ─────────────────────────────
# 스키마 조회
# ─────────────────────────────
Expand Down
4 changes: 4 additions & 0 deletions app/schemas/user_db/result_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class ColumnListResult(BasicResult):
columns: list[ColumnInfo] = Field([], description="컬럼 정보 목록")


class DatabaseListResult(BasicResult):
databases: list[str] = Field([], description="데이터베이스 이름 목록")


# ─────────────────────────────
# 계층적 스키마 조회를 위한 모델
# ─────────────────────────────
Expand Down
168 changes: 102 additions & 66 deletions app/services/user_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,64 +221,93 @@ def get_full_schema_info(

def get_hierarchical_schema_info(
self, db_info: AllDBProfileInfo, repository: UserDbRepository = user_db_repository
) -> DBDetail:
) -> list[DBDetail]:
"""
DB 프로필 정보를 받아 해당 데이터베이스의 전체 스키마 정보를
계층적인 구조 (스키마 -> 테이블 -> 컬럼 등)로 조회하여 반환합니다.
DB 프로필 정보를 받아 해당 DBMS의 전체 데이터베이스 및 스키마 정보를
계층적인 구조 (DB -> 스키마 -> 테이블 -> 컬럼 등)로 조회하여 반환합니다.
"""
logging.info(f"Starting hierarchical schema scan for db_profile: {db_info.id}")
try:
driver_module = self._get_driver_module(db_info.type)
connect_kwargs = self._prepare_connection_args(db_info)
db_type = db_info.type.lower()

schemas_result = repository.find_schemas(
driver_module, self._get_schema_query(db_info.type), **connect_kwargs
)
initial_connect_kwargs = self._prepare_connection_args(db_info, ignore_db_name=(db_type == "postgresql"))
db_query = self._get_database_query(db_type)
databases_result = repository.find_databases(driver_module, db_type, db_query, **initial_connect_kwargs)

if not schemas_result.is_successful:
raise APIException(CommonCode.FAIL_FIND_SCHEMAS)
if not databases_result.is_successful:
raise APIException(CommonCode.FAIL_FIND_DATABASES)

schemas_to_scan = schemas_result.schemas
all_db_details = []
for db_name in sorted(databases_result.databases):
db_detail = self._get_db_schema_details(db_name, db_info, driver_module, repository)
if db_detail:
all_db_details.append(db_detail)

# For sqlite, schemas might be empty, default to 'main'
if db_info.type.lower() == "sqlite" and not schemas_to_scan:
schemas_to_scan = ["main"]
logging.info(f"Finished hierarchical schema scan. Total databases found: {len(all_db_details)}.")
return all_db_details
except APIException:
raise
except Exception as e:
logging.error("An unexpected error occurred in get_hierarchical_schema_info", exc_info=True)
raise APIException(CommonCode.FAIL) from e

schema_details = []
for schema_name in sorted(schemas_to_scan):
# For Oracle, schema names are uppercase.
effective_schema_name = schema_name
if db_info.type.lower() == "oracle":
effective_schema_name = schema_name.upper()
def _get_db_schema_details(
self,
db_name: str,
db_info: AllDBProfileInfo,
driver_module: Any,
repository: UserDbRepository,
) -> DBDetail | None:
"""특정 데이터베이스의 모든 스키마와 테이블 정보를 조회하여 DBDetail 모델을 반환합니다."""
db_type = db_info.type.lower()
if db_type == "sqlite":
current_db_info = db_info
connect_kwargs = self._prepare_connection_args(db_info)
else:
current_db_info = db_info.model_copy(update={"name": db_name})
connect_kwargs = self._prepare_connection_args(current_db_info)

tables_result = repository.find_tables(
driver_module, self._get_table_query(db_info.type), effective_schema_name, **connect_kwargs
)
logging.info(
f"Found {len(tables_result.tables)} tables in schema '{effective_schema_name}': {tables_result.tables}"
)
schema_query = self._get_schema_query(db_type, db_name)
schemas_result = repository.find_schemas(driver_module, schema_query, **connect_kwargs)

if not tables_result.is_successful:
logging.warning(f"Failed to find tables for schema '{effective_schema_name}'. Skipping.")
continue
if not schemas_result.is_successful:
logging.warning(f"Failed to find schemas for database '{db_name}'. Skipping.")
return None

table_details = []
for table_name in tables_result.tables:
table_info = self._get_table_details(
driver_module, db_info, effective_schema_name, table_name, connect_kwargs, repository
)
table_details.append(table_info)
schemas_to_scan = schemas_result.schemas
if db_type == "sqlite" and not schemas_to_scan:
schemas_to_scan = ["main"]

schema_details = []
for schema_name in sorted(schemas_to_scan):
effective_schema_name = schema_name.upper() if db_type == "oracle" else schema_name
table_query = self._get_table_query(db_type)
tables_result = repository.find_tables(driver_module, table_query, effective_schema_name, **connect_kwargs)

if not tables_result.is_successful:
logging.warning(f"Failed to find tables for schema '{effective_schema_name}'. Skipping.")
continue

table_details = [
self._get_table_details(
driver_module,
current_db_info,
effective_schema_name,
table_name,
connect_kwargs,
repository,
)
for table_name in tables_result.tables
]

if table_details:
schema_details.append(SchemaDetail(schema_name=schema_name, tables=table_details))
if table_details:
schema_details.append(SchemaDetail(schema_name=schema_name, tables=table_details))

logging.info(f"Finished hierarchical schema scan. Total schemas found: {len(schema_details)}.")
return DBDetail(db_name=db_info.name, db_type=db_info.type, schemas=schema_details)
except APIException:
raise
except Exception as e:
logging.error("An unexpected error occurred in get_hierarchical_schema_info", exc_info=True)
raise APIException(CommonCode.FAIL) from e
if schema_details:
return DBDetail(db_name=db_name, db_type=db_info.type, schemas=schema_details)

return None

def _get_schemas_to_scan(
self,
Expand Down Expand Up @@ -385,57 +414,64 @@ def _get_driver_module(self, db_type: str):
return sqlite3
return importlib.import_module(driver_name)

def _prepare_connection_args(self, db_info: DBProfileInfo) -> dict[str, Any]:
def _prepare_connection_args(self, db_info: DBProfileInfo, ignore_db_name: bool = False) -> dict[str, Any]:
"""
DB 타입에 따라 연결에 필요한 매개변수를 딕셔너리로 구성합니다.
`ignore_db_name` 플래그 추가: 초기 연결 시 특정 DB에 종속되지 않기 위함.
"""
# SQLite는 별도 처리
if db_info.type == "sqlite":
return {"db_name": db_info.name}
db_type = db_info.type.lower()

if db_type == "sqlite":
return {"database": db_info.name}

# MSSQL은 연결 문자열을 별도로 구성
if db_info.type == "mssql":
if db_type == "mssql":
connection_string = (
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={db_info.host},{db_info.port};"
f"UID={db_info.username};"
f"PWD={db_info.password};"
)
if db_info.name:
if db_info.name and not ignore_db_name:
connection_string += f"DATABASE={db_info.name};"
return {"connection_string": connection_string}

# 그 외 DB들은 공통 파라미터로 시작
kwargs = {"host": db_info.host, "port": db_info.port, "user": db_info.username, "password": db_info.password}

# DB 이름이 없을 경우, 기본 파라미터만 반환
if not db_info.name:
return kwargs

# DB 이름이 있다면, 타입에 따라 적절한 파라미터를 추가합니다.
if db_info.type == "postgresql":
kwargs["dbname"] = db_info.name
elif db_info.type in ["mysql", "mariadb"]:
kwargs["database"] = db_info.name
elif db_info.type.lower() == "oracle":
# dsn을 직접 사용하는 대신 service_name을 명시적으로 전달
kwargs["service_name"] = db_info.name
if db_info.name and not ignore_db_name:
if db_type == "postgresql":
kwargs["dbname"] = db_info.name
elif db_type in ["mysql", "mariadb"]:
kwargs["database"] = db_info.name
elif db_type == "oracle":
kwargs["service_name"] = db_info.name

return kwargs

def _get_schema_query(self, db_type: str) -> str | None:
def _get_database_query(self, db_type: str) -> str | None:
db_type = db_type.lower()
if db_type == "postgresql":
return "SELECT datname FROM pg_database WHERE datistemplate = false;"
elif db_type in ["mysql", "mariadb"]:
return "SHOW DATABASES;"
elif db_type == "oracle":
return "SELECT global_name FROM global_name"
return None

def _get_schema_query(self, db_type: str, db_name: str | None = None) -> str | None:
db_type = db_type.lower()
if db_type == "postgresql":
return """
SELECT schema_name FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
"""
elif db_type in ["mysql", "mariadb"]:
return "SELECT schema_name FROM information_schema.schemata"
# MySQL/MariaDB에서는 스키마가 데이터베이스와 동일하므로, 현재 데이터베이스의 이름을 스키마로 간주합니다.
# `information_schema.schemata`를 쿼리하여 명시적으로 확인하는 것이 더 정확합니다.
return f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{db_name}'"
elif db_type == "oracle":
return "SELECT username FROM all_users WHERE ORACLE_MAINTAINED = 'N'"
elif db_type == "sqlite":
return None
return None # SQLite는 단일 파일 데이터베이스로, 스키마 개념이 다릅니다. 'main'을 사용합니다.
return None

def _get_table_query(self, db_type: str, for_all_schemas: bool = False) -> str | None: # 수정됨
Expand Down