diff --git a/app/api/api_router.py b/app/api/api_router.py index 45e22f4..e068f61 100644 --- a/app/api/api_router.py +++ b/app/api/api_router.py @@ -5,12 +5,12 @@ from app.api import ( annotation_api, api_key_api, + chat_messages_api, chat_tab_api, driver_api, query_api, test_api, user_db_api, - chat_messages_api, ) api_router = APIRouter() diff --git a/app/api/user_db_api.py b/app/api/user_db_api.py index cd1dbf2..f354ab1 100644 --- a/app/api/user_db_api.py +++ b/app/api/user_db_api.py @@ -160,13 +160,13 @@ def find_all_schema_info( @router.get( "/find/hierarchical-schema/{profile_id}", - response_model=ResponseMessage[DBDetail], + response_model=ResponseMessage[list[DBDetail]], summary="특정 DB의 전체 스키마의 계층적 상세 정보 조회", description="스키마, 테이블, 컬럼, 제약조건, 인덱스를 포함한 모든 스키마 정보를 계층 구조로 반환합니다.", ) def find_hierarchical_schema_info( profile_id: str, service: UserDbService = user_db_service_dependency -) -> ResponseMessage[DBDetail]: +) -> ResponseMessage[list[DBDetail]]: db_info = service.find_profile(profile_id) hierarchical_schema_info = service.get_hierarchical_schema_info(db_info) diff --git a/app/core/status.py b/app/core/status.py index 2acf92c..6b88bc4 100644 --- a/app/core/status.py +++ b/app/core/status.py @@ -21,9 +21,10 @@ class CommonCode(Enum): SUCCESS_DRIVER_INFO = (status.HTTP_200_OK, "2100", "드라이버 정보 조회를 성공하였습니다.") SUCCESS_USER_DB_CONNECT_TEST = (status.HTTP_200_OK, "2101", "테스트 연결을 성공하였습니다.") SUCCESS_FIND_PROFILE = (status.HTTP_200_OK, "2102", "디비 정보 조회를 성공하였습니다.") - SUCCESS_FIND_SCHEMAS = (status.HTTP_200_OK, "2103", "디비 스키마 정보 조회를 성공하였습니다.") - SUCCESS_FIND_TABLES = (status.HTTP_200_OK, "2104", "디비 테이블 정보 조회를 성공하였습니다.") - SUCCESS_FIND_COLUMNS = (status.HTTP_200_OK, "2105", "디비 컬럼 정보 조회를 성공하였습니다.") + SUCCESS_FIND_DATABASES = (status.HTTP_200_OK, "2103", "데이터베이스 정보 조회를 성공하였습니다.") + SUCCESS_FIND_SCHEMAS = (status.HTTP_200_OK, "2104", "디비 스키마 정보 조회를 성공하였습니다.") + SUCCESS_FIND_TABLES = (status.HTTP_200_OK, "2105", "디비 테이블 정보 조회를 성공하였습니다.") + SUCCESS_FIND_COLUMNS = (status.HTTP_200_OK, "2106", "디비 컬럼 정보 조회를 성공하였습니다.") SUCCESS_SAVE_PROFILE = (status.HTTP_200_OK, "2130", "디비 연결 정보를 저장하였습니다.") SUCCESS_UPDATE_PROFILE = (status.HTTP_200_OK, "2150", "디비 연결 정보를 업데이트 하였습니다.") SUCCESS_DELETE_PROFILE = (status.HTTP_200_OK, "2170", "디비 연결 정보를 삭제 하였습니다.") @@ -136,6 +137,11 @@ class CommonCode(Enum): ) FAIL_FIND_INDEXES = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5108", "디비 인덱스 정보 조회 중 에러가 발생했습니다.") FAIL_FIND_SAMPLE_ROWS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5106", "샘플 데이터 조회 중 에러가 발생했습니다.") + FAIL_FIND_DATABASES = ( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "5109", + "데이터베이스 정보 조회 중 에러가 발생했습니다.", + ) FAIL_SAVE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5130", "디비 정보 저장 중 에러가 발생했습니다.") FAIL_UPDATE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5150", "디비 정보 업데이트 중 에러가 발생했습니다.") FAIL_DELETE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5170", "디비 정보 삭제 중 에러가 발생했습니다.") diff --git a/app/repository/user_db_repository.py b/app/repository/user_db_repository.py index f886eb8..c162cba 100644 --- a/app/repository/user_db_repository.py +++ b/app/repository/user_db_repository.py @@ -16,6 +16,7 @@ ColumnInfo, ColumnListResult, ConstraintInfo, + DatabaseListResult, DBProfile, IndexInfo, SchemaListResult, @@ -155,6 +156,45 @@ def find_profile(self, sql: str, data: tuple) -> AllDBProfileInfo: if connection: connection.close() + # ───────────────────────────── + # 데이터베이스 조회 + # ───────────────────────────── + def find_databases( + self, driver_module: Any, db_type: str, database_query: str | None, **kwargs: Any + ) -> DatabaseListResult: + connection = None + logging.info(f"Attempting to find databases for db_type: '{db_type}' with connection args: {kwargs}") + try: + connection = self._connect(driver_module, **kwargs) + cursor = connection.cursor() + + if not database_query: + if db_type == DBTypesEnum.sqlite.name: + cursor.execute("PRAGMA database_list;") + rows = cursor.fetchall() + logging.info(f"SQLite PRAGMA database_list result: {rows}") + db_name = next((row[1] for row in rows if row[2] is not None), "main") + logging.info(f"Found SQLite database: {db_name}") + return DatabaseListResult( + is_successful=True, code=CommonCode.SUCCESS_FIND_DATABASES, databases=[db_name] + ) + else: + logging.warning(f"No database query provided for db_type: '{db_type}'. Returning empty list.") + return DatabaseListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_DATABASES, databases=[]) + + logging.info(f"Executing database query for {db_type}: {database_query}") + cursor.execute(database_query) + rows = cursor.fetchall() + logging.info(f"Raw databases found for {db_type}: {rows}") + databases = [row[0] for row in rows] + return DatabaseListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_DATABASES, databases=databases) + except Exception as e: + logging.error(f"Failed to find databases for {db_type}. Error: {e}", exc_info=True) + return DatabaseListResult(is_successful=False, code=CommonCode.FAIL_FIND_DATABASES, databases=[]) + finally: + if connection: + connection.close() + # ───────────────────────────── # 스키마 조회 # ───────────────────────────── diff --git a/app/schemas/user_db/result_model.py b/app/schemas/user_db/result_model.py index 9af5550..a223ef8 100644 --- a/app/schemas/user_db/result_model.py +++ b/app/schemas/user_db/result_model.py @@ -108,6 +108,10 @@ class ColumnListResult(BasicResult): columns: list[ColumnInfo] = Field([], description="컬럼 정보 목록") +class DatabaseListResult(BasicResult): + databases: list[str] = Field([], description="데이터베이스 이름 목록") + + # ───────────────────────────── # 계층적 스키마 조회를 위한 모델 # ───────────────────────────── diff --git a/app/services/user_db_service.py b/app/services/user_db_service.py index 3a6aef9..4aabb73 100644 --- a/app/services/user_db_service.py +++ b/app/services/user_db_service.py @@ -221,64 +221,93 @@ def get_full_schema_info( def get_hierarchical_schema_info( self, db_info: AllDBProfileInfo, repository: UserDbRepository = user_db_repository - ) -> DBDetail: + ) -> list[DBDetail]: """ - DB 프로필 정보를 받아 해당 데이터베이스의 전체 스키마 정보를 - 계층적인 구조 (스키마 -> 테이블 -> 컬럼 등)로 조회하여 반환합니다. + DB 프로필 정보를 받아 해당 DBMS의 전체 데이터베이스 및 스키마 정보를 + 계층적인 구조 (DB -> 스키마 -> 테이블 -> 컬럼 등)로 조회하여 반환합니다. """ logging.info(f"Starting hierarchical schema scan for db_profile: {db_info.id}") try: driver_module = self._get_driver_module(db_info.type) - connect_kwargs = self._prepare_connection_args(db_info) + db_type = db_info.type.lower() - schemas_result = repository.find_schemas( - driver_module, self._get_schema_query(db_info.type), **connect_kwargs - ) + initial_connect_kwargs = self._prepare_connection_args(db_info, ignore_db_name=(db_type == "postgresql")) + db_query = self._get_database_query(db_type) + databases_result = repository.find_databases(driver_module, db_type, db_query, **initial_connect_kwargs) - if not schemas_result.is_successful: - raise APIException(CommonCode.FAIL_FIND_SCHEMAS) + if not databases_result.is_successful: + raise APIException(CommonCode.FAIL_FIND_DATABASES) - schemas_to_scan = schemas_result.schemas + all_db_details = [] + for db_name in sorted(databases_result.databases): + db_detail = self._get_db_schema_details(db_name, db_info, driver_module, repository) + if db_detail: + all_db_details.append(db_detail) - # For sqlite, schemas might be empty, default to 'main' - if db_info.type.lower() == "sqlite" and not schemas_to_scan: - schemas_to_scan = ["main"] + logging.info(f"Finished hierarchical schema scan. Total databases found: {len(all_db_details)}.") + return all_db_details + except APIException: + raise + except Exception as e: + logging.error("An unexpected error occurred in get_hierarchical_schema_info", exc_info=True) + raise APIException(CommonCode.FAIL) from e - schema_details = [] - for schema_name in sorted(schemas_to_scan): - # For Oracle, schema names are uppercase. - effective_schema_name = schema_name - if db_info.type.lower() == "oracle": - effective_schema_name = schema_name.upper() + def _get_db_schema_details( + self, + db_name: str, + db_info: AllDBProfileInfo, + driver_module: Any, + repository: UserDbRepository, + ) -> DBDetail | None: + """특정 데이터베이스의 모든 스키마와 테이블 정보를 조회하여 DBDetail 모델을 반환합니다.""" + db_type = db_info.type.lower() + if db_type == "sqlite": + current_db_info = db_info + connect_kwargs = self._prepare_connection_args(db_info) + else: + current_db_info = db_info.model_copy(update={"name": db_name}) + connect_kwargs = self._prepare_connection_args(current_db_info) - tables_result = repository.find_tables( - driver_module, self._get_table_query(db_info.type), effective_schema_name, **connect_kwargs - ) - logging.info( - f"Found {len(tables_result.tables)} tables in schema '{effective_schema_name}': {tables_result.tables}" - ) + schema_query = self._get_schema_query(db_type, db_name) + schemas_result = repository.find_schemas(driver_module, schema_query, **connect_kwargs) - if not tables_result.is_successful: - logging.warning(f"Failed to find tables for schema '{effective_schema_name}'. Skipping.") - continue + if not schemas_result.is_successful: + logging.warning(f"Failed to find schemas for database '{db_name}'. Skipping.") + return None - table_details = [] - for table_name in tables_result.tables: - table_info = self._get_table_details( - driver_module, db_info, effective_schema_name, table_name, connect_kwargs, repository - ) - table_details.append(table_info) + schemas_to_scan = schemas_result.schemas + if db_type == "sqlite" and not schemas_to_scan: + schemas_to_scan = ["main"] + + schema_details = [] + for schema_name in sorted(schemas_to_scan): + effective_schema_name = schema_name.upper() if db_type == "oracle" else schema_name + table_query = self._get_table_query(db_type) + tables_result = repository.find_tables(driver_module, table_query, effective_schema_name, **connect_kwargs) + + if not tables_result.is_successful: + logging.warning(f"Failed to find tables for schema '{effective_schema_name}'. Skipping.") + continue + + table_details = [ + self._get_table_details( + driver_module, + current_db_info, + effective_schema_name, + table_name, + connect_kwargs, + repository, + ) + for table_name in tables_result.tables + ] - if table_details: - schema_details.append(SchemaDetail(schema_name=schema_name, tables=table_details)) + if table_details: + schema_details.append(SchemaDetail(schema_name=schema_name, tables=table_details)) - logging.info(f"Finished hierarchical schema scan. Total schemas found: {len(schema_details)}.") - return DBDetail(db_name=db_info.name, db_type=db_info.type, schemas=schema_details) - except APIException: - raise - except Exception as e: - logging.error("An unexpected error occurred in get_hierarchical_schema_info", exc_info=True) - raise APIException(CommonCode.FAIL) from e + if schema_details: + return DBDetail(db_name=db_name, db_type=db_info.type, schemas=schema_details) + + return None def _get_schemas_to_scan( self, @@ -385,45 +414,50 @@ def _get_driver_module(self, db_type: str): return sqlite3 return importlib.import_module(driver_name) - def _prepare_connection_args(self, db_info: DBProfileInfo) -> dict[str, Any]: + def _prepare_connection_args(self, db_info: DBProfileInfo, ignore_db_name: bool = False) -> dict[str, Any]: """ DB 타입에 따라 연결에 필요한 매개변수를 딕셔너리로 구성합니다. + `ignore_db_name` 플래그 추가: 초기 연결 시 특정 DB에 종속되지 않기 위함. """ - # SQLite는 별도 처리 - if db_info.type == "sqlite": - return {"db_name": db_info.name} + db_type = db_info.type.lower() + + if db_type == "sqlite": + return {"database": db_info.name} - # MSSQL은 연결 문자열을 별도로 구성 - if db_info.type == "mssql": + if db_type == "mssql": connection_string = ( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={db_info.host},{db_info.port};" f"UID={db_info.username};" f"PWD={db_info.password};" ) - if db_info.name: + if db_info.name and not ignore_db_name: connection_string += f"DATABASE={db_info.name};" return {"connection_string": connection_string} - # 그 외 DB들은 공통 파라미터로 시작 kwargs = {"host": db_info.host, "port": db_info.port, "user": db_info.username, "password": db_info.password} - # DB 이름이 없을 경우, 기본 파라미터만 반환 - if not db_info.name: - return kwargs - - # DB 이름이 있다면, 타입에 따라 적절한 파라미터를 추가합니다. - if db_info.type == "postgresql": - kwargs["dbname"] = db_info.name - elif db_info.type in ["mysql", "mariadb"]: - kwargs["database"] = db_info.name - elif db_info.type.lower() == "oracle": - # dsn을 직접 사용하는 대신 service_name을 명시적으로 전달 - kwargs["service_name"] = db_info.name + if db_info.name and not ignore_db_name: + if db_type == "postgresql": + kwargs["dbname"] = db_info.name + elif db_type in ["mysql", "mariadb"]: + kwargs["database"] = db_info.name + elif db_type == "oracle": + kwargs["service_name"] = db_info.name return kwargs - def _get_schema_query(self, db_type: str) -> str | None: + def _get_database_query(self, db_type: str) -> str | None: + db_type = db_type.lower() + if db_type == "postgresql": + return "SELECT datname FROM pg_database WHERE datistemplate = false;" + elif db_type in ["mysql", "mariadb"]: + return "SHOW DATABASES;" + elif db_type == "oracle": + return "SELECT global_name FROM global_name" + return None + + def _get_schema_query(self, db_type: str, db_name: str | None = None) -> str | None: db_type = db_type.lower() if db_type == "postgresql": return """ @@ -431,11 +465,13 @@ def _get_schema_query(self, db_type: str) -> str | None: WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast') """ elif db_type in ["mysql", "mariadb"]: - return "SELECT schema_name FROM information_schema.schemata" + # MySQL/MariaDB에서는 스키마가 데이터베이스와 동일하므로, 현재 데이터베이스의 이름을 스키마로 간주합니다. + # `information_schema.schemata`를 쿼리하여 명시적으로 확인하는 것이 더 정확합니다. + return f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{db_name}'" elif db_type == "oracle": return "SELECT username FROM all_users WHERE ORACLE_MAINTAINED = 'N'" elif db_type == "sqlite": - return None + return None # SQLite는 단일 파일 데이터베이스로, 스키마 개념이 다릅니다. 'main'을 사용합니다. return None def _get_table_query(self, db_type: str, for_all_schemas: bool = False) -> str | None: # 수정됨