diff --git a/app/api/user_db_api.py b/app/api/user_db_api.py index f8fec19..81f6dbd 100644 --- a/app/api/user_db_api.py +++ b/app/api/user_db_api.py @@ -1,11 +1,13 @@ # app/api/user_db_api.py from fastapi import APIRouter, Depends +from typing import List from app.core.exceptions import APIException from app.core.response import ResponseMessage -from app.schemas.user_db.db_profile_model import DBProfileCreate +from app.schemas.user_db.db_profile_model import DBProfileInfo, UpdateOrCreateDBProfile from app.services.user_db_service import UserDbService, user_db_service +from app.schemas.user_db.result_model import DBProfile, ColumnInfo user_db_service_dependency = Depends(lambda: user_db_service) @@ -18,13 +20,132 @@ summary="DB 연결 테스트", ) def connection_test( - db_info: DBProfileCreate, + db_info: DBProfileInfo, service: UserDbService = user_db_service_dependency, ) -> ResponseMessage[bool]: - """DB 연결 정보를 받아 연결 가능 여부를 테스트합니다.""" - db_info.validate_required_fields() + db_info.validate_required_fields() result = service.connection_test(db_info) + if not result.is_successful: raise APIException(result.code) return ResponseMessage.success(value=result.is_successful, code=result.code) + +@router.post( + "/create/profile", + response_model=ResponseMessage[str], + summary="DB 프로필 저장", +) +def create_profile( + create_db_info: UpdateOrCreateDBProfile, + service: UserDbService = user_db_service_dependency, +) -> ResponseMessage[str]: + + create_db_info.validate_required_fields() + result = service.create_profile(create_db_info) + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.view_name, code=result.code) + +@router.put( + "/modify/profile", + response_model=ResponseMessage[str], + summary="DB 프로필 업데이트", +) +def update_profile( + update_db_info: UpdateOrCreateDBProfile, + service: UserDbService = user_db_service_dependency, +) -> ResponseMessage[str]: + + update_db_info.validate_required_fields() + result = service.update_profile(update_db_info) + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.view_name, code=result.code) + +@router.delete( + "/remove/{profile_id}", + response_model=ResponseMessage[str], + summary="DB 프로필 삭제", +) +def delete_profile( + profile_id: str, + service: UserDbService = user_db_service_dependency, +) -> ResponseMessage[str]: + + result = service.delete_profile(profile_id) + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.view_name, code=result.code) + +@router.get( + "/find/all", + response_model=ResponseMessage[List[DBProfile]], + summary="DB 프로필 전체 조회", +) +def find_all_profile( + service: UserDbService = user_db_service_dependency, +) -> ResponseMessage[List[DBProfile]]: + + result = service.find_all_profile() + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.profiles, code=result.code) + +@router.get( + "/find/schemas/{profile_id}", + response_model=ResponseMessage[List[str]], + summary="특정 DB의 전체 스키마 조회", +) +def find_schemas( + profile_id: str, + service: UserDbService = user_db_service_dependency +) -> ResponseMessage[List[str]]: + + db_info = service.find_profile(profile_id) + result = service.find_schemas(db_info) + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.schemas, code=result.code) + +@router.get( + "/find/tables/{profile_id}/{schema_name}", + response_model=ResponseMessage[List[str]], + summary="특정 스키마의 전체 테이블 조회", +) +def find_tables( + profile_id: str, + schema_name: str, + service: UserDbService = user_db_service_dependency +) -> ResponseMessage[List[str]]: + + db_info = service.find_profile(profile_id) + result = service.find_tables(db_info, schema_name) + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.tables, code=result.code) + +@router.get( + "/find/columns/{profile_id}/{schema_name}/{table_name}", + response_model=ResponseMessage[List[ColumnInfo]], + summary="특정 테이블의 전체 컬럼 조회", +) +def find_columns( + profile_id: str, + schema_name: str, + table_name: str, + service: UserDbService = user_db_service_dependency +) -> ResponseMessage[List[ColumnInfo]]: + + db_info = service.find_profile(profile_id) + result = service.find_columns(db_info, schema_name, table_name) + + if not result.is_successful: + raise APIException(result.code) + return ResponseMessage.success(value=result.columns, code=result.code) diff --git a/app/core/all_logging.py b/app/core/all_logging.py new file mode 100644 index 0000000..c31d4eb --- /dev/null +++ b/app/core/all_logging.py @@ -0,0 +1,33 @@ +# app/core/all_logging.py + +import logging +from fastapi import Request + +# 로깅 기본 설정 (애플리케이션 시작 시 한 번만 구성) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", # [수정] 로그 레벨(INFO, ERROR)을 포함 + datefmt="%Y-%m-%d %H:%M:%S", +) + + +async def log_requests_middleware(request: Request, call_next): + """ + 모든 API 요청과 에러에 대한 로그를 남기는 미들웨어입니다. + """ + endpoint = f"{request.method} {request.url.path}" + + # 일반 요청 로그를 남깁니다. + logging.info(f"엔드포인트: {endpoint}") + + try: + # 다음 미들웨어 또는 실제 엔드포인트를 호출합니다. + response = await call_next(request) + return response + except Exception as e: + # [수정] 에러 발생 시, exc_info=True를 추가하여 전체 트레이스백을 함께 기록합니다. + # 메시지 형식도 "ERROR 엔드포인트:"로 변경합니다. + logging.error(f"ERROR 엔드포인트: {endpoint}", exc_info=True) + # 예외를 다시 발생시켜 FastAPI의 전역 예외 처리기가 최종 응답을 만들도록 합니다. + raise e + diff --git a/app/core/enum/db_key_prefix_name.py b/app/core/enum/db_key_prefix_name.py new file mode 100644 index 0000000..5eea582 --- /dev/null +++ b/app/core/enum/db_key_prefix_name.py @@ -0,0 +1,7 @@ +# app/core/enum/db_key_prefix_name.py +from enum import Enum + +class DBSaveIdEnum(Enum): + """저장할 디비 ID 앞에 들어갈 이름""" + user_db = "USER-DB" + driver = "DRIVER" \ No newline at end of file diff --git a/app/core/status.py b/app/core/status.py index d20e742..1dc25c5 100644 --- a/app/core/status.py +++ b/app/core/status.py @@ -20,6 +20,14 @@ class CommonCode(Enum): """ DRIVER, DB 성공 코드 - 21xx """ SUCCESS_DRIVER_INFO = (status.HTTP_200_OK, "2100", "드라이버 정보 조회를 성공하였습니다.") SUCCESS_USER_DB_CONNECT_TEST = (status.HTTP_200_OK, "2101", "테스트 연결을 성공하였습니다.") + SUCCESS_FIND_PROFILE = (status.HTTP_200_OK, "2102", "디비 정보 조회를 성공하였습니다.") + SUCCESS_FIND_SCHEMAS = (status.HTTP_200_OK, "2103", "디비 스키마 정보 조회를 성공하였습니다.") + SUCCESS_FIND_TABLES = (status.HTTP_200_OK, "2104", "디비 테이블 정보 조회를 성공하였습니다.") + SUCCESS_FIND_COLUMNS = (status.HTTP_200_OK, "2105", "디비 컬럼 정보 조회를 성공하였습니다.") + SUCCESS_SAVE_PROFILE = (status.HTTP_200_OK, "2130", "디비 연결 정보를 저장하였습니다.") + SUCCESS_UPDATE_PROFILE = (status.HTTP_200_OK, "2150", "디비 연결 정보를 업데이트 하였습니다.") + SUCCESS_DELETE_PROFILE = (status.HTTP_200_OK, "2170", "디비 연결 정보를 삭제 하였습니다.") + """ KEY 성공 코드 - 22xx """ @@ -31,19 +39,19 @@ class CommonCode(Enum): """ SQL 성공 코드 - 25xx """ # ======================================= - # 클라이언트 오류 (Client Error) - 4xxx + # 클라이언트 에러 (Client Error) - 4xxx # ======================================= - """ 기본 클라이언트 오류 코드 - 40xx """ + """ 기본 클라이언트 에러 코드 - 40xx """ NO_VALUE = (status.HTTP_400_BAD_REQUEST, "4000", "필수 값이 존재하지 않습니다.") DUPLICATION = (status.HTTP_409_CONFLICT, "4001", "이미 존재하는 데이터입니다.") NO_SEARCH_DATA = (status.HTTP_404_NOT_FOUND, "4002", "요청한 데이터를 찾을 수 없습니다.") INVALID_PARAMETER = (status.HTTP_422_UNPROCESSABLE_ENTITY, "4003", "필수 값이 누락되었습니다.") - """ DRIVER, DB 클라이언트 오류 코드 - 41xx """ + """ DRIVER, DB 클라이언트 에러 코드 - 41xx """ INVALID_DB_DRIVER = (status.HTTP_409_CONFLICT, "4100", "지원하지 않는 데이터베이스입니다.") NO_DB_DRIVER = (status.HTTP_400_BAD_REQUEST, "4101", "데이터베이스는 필수 값입니다.") - """ KEY 클라이언트 오류 코드 - 42xx """ + """ KEY 클라이언트 에러 코드 - 42xx """ INVALID_API_KEY_FORMAT = (status.HTTP_400_BAD_REQUEST, "4200", "API 키의 형식이 올바르지 않습니다.") INVALID_API_KEY_PREFIX = ( status.HTTP_400_BAD_REQUEST, @@ -51,7 +59,7 @@ class CommonCode(Enum): "API 키가 선택한 서비스의 올바른 형식이 아닙니다. (예: OpenAI는 sk-로 시작)", ) - """ AI CHAT TAB 클라이언트 오류 코드 - 43xx """ + """ AI CHAT, DB 클라이언트 에러 코드 - 43xx """ INVALID_CHAT_TAB_NAME_FORMAT = (status.HTTP_400_BAD_REQUEST, "4300", "채팅 탭 이름의 형식이 올바르지 않습니다.") INVALID_CHAT_TAB_NAME_LENGTH = ( status.HTTP_400_BAD_REQUEST, @@ -65,15 +73,15 @@ class CommonCode(Enum): "허용되지 않는 특수 문자: 큰따옴표(\"), 작은따옴표('), 세미콜론(;), 꺾쇠괄호(<, >)", ) - """ ANNOTATION 클라이언트 오류 코드 - 44xx """ + """ ANNOTATION 클라이언트 에러 코드 - 44xx """ - """ SQL 클라이언트 오류 코드 - 45xx """ + """ SQL 클라이언트 에러 코드 - 45xx """ # ================================== - # 서버 오류 (Server Error) - 5xx + # 서버 에러 (Server Error) - 5xx # ================================== - """ 기본 서버 오류 코드 - 50xx """ - FAIL = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5000", "서버 처리 중 오류가 발생했습니다.") + """ 기본 서버 에러 코드 - 50xx """ + FAIL = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5000", "서버 처리 중 에러가 발생했습니다.") DB_BUSY = ( status.HTTP_503_SERVICE_UNAVAILABLE, "5001", @@ -82,19 +90,26 @@ class CommonCode(Enum): FAIL_TO_VERIFY_CREATION = ( status.HTTP_500_INTERNAL_SERVER_ERROR, "5002", - "데이터 생성 후 검증 과정에서 오류가 발생했습니다.", + "데이터 생성 후 검증 과정에서 에러가 발생했습니다.", ) - """ DRIVER, DB 서버 오류 코드 - 51xx """ - FAIL_CONNECT_DB = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5100", "디비 연결 중 오류가 발생했습니다.") + """ DRIVER, DB 서버 에러 코드 - 51xx """ + FAIL_CONNECT_DB = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5100", "디비 연결 중 에러가 발생했습니다.") + FAIL_FIND_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5101", "디비 정보 조회 중 에러가 발생했습니다.") + FAIL_FIND_SCHEMAS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5102", "디비 스키마 정보 조회 중 에러가 발생했습니다.") + FAIL_FIND_TABLES = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5103", "디비 테이블 정보 조회 중 에러가 발생했습니다.") + FAIL_FIND_COLUMNS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5104", "디비 컬럼 정보 조회 중 에러가 발생했습니다.") + FAIL_SAVE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5130", "디비 정보 저장 중 에러가 발생했습니다.") + FAIL_UPDATE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5150", "디비 정보 업데이트 중 에러가 발생했습니다.") + FAIL_DELETE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5170", "디비 정보 삭제 중 에러가 발생했습니다.") - """ KEY 서버 오류 코드 - 52xx """ + """ KEY 서버 에러 코드 - 52xx """ - """ AI CHAT, DB 서버 오류 코드 - 53xx """ + """ AI CHAT, DB 서버 에러 코드 - 53xx """ - """ ANNOTATION 서버 오류 코드 - 54xx """ + """ ANNOTATION 서버 에러 코드 - 54xx """ - """ SQL 서버 오류 코드 - 55xx """ + """ SQL 서버 에러 코드 - 55xx """ def __init__(self, http_status: int, code: str, message: str): """Enum 멤버가 생성될 때 각 값을 속성으로 할당합니다.""" diff --git a/app/db/init_db.py b/app/db/init_db.py index 34a8c95..0989e5c 100644 --- a/app/db/init_db.py +++ b/app/db/init_db.py @@ -1,152 +1,171 @@ # db/init_db.py import sqlite3 +import logging from app.core.utils import get_db_path -""" -데이터베이스에 연결하고, 애플리케이션에 필요한 테이블이 없으면 생성합니다. -""" +def _synchronize_table(cursor, table_name: str, target_columns: dict): + """ + 테이블 스키마를 확인하고, 코드와 다를 경우 테이블을 재생성하여 동기화합니다. + """ + try: + cursor.execute(f"PRAGMA table_info({table_name})") + current_schema_rows = cursor.fetchall() + current_columns = {row[1]: row[2].upper() for row in current_schema_rows} + target_schema_simple = {name: definition.split()[0].upper() for name, definition in target_columns.items()} -def initialize_database(): + if current_columns == target_schema_simple: + return + + logging.warning(f"'{table_name}' 테이블의 스키마 변경을 감지했습니다. 마이그레이션을 시작합니다. (데이터 손실 위험)") + + temp_table_name = f"{table_name}_temp_old" + cursor.execute(f"ALTER TABLE {table_name} RENAME TO {temp_table_name}") + + columns_with_definitions = ", ".join([f"{name} {definition}" for name, definition in target_columns.items()]) + cursor.execute(f"CREATE TABLE {table_name} ({columns_with_definitions})") + + cursor.execute(f"PRAGMA table_info({temp_table_name})") + temp_columns = {row[1] for row in cursor.fetchall()} + common_columns = ", ".join(target_columns.keys() & temp_columns) + + if common_columns: + cursor.execute(f"INSERT INTO {table_name} ({common_columns}) SELECT {common_columns} FROM {temp_table_name}") + logging.info(f"'{temp_table_name}'에서 '{table_name}'으로 데이터를 복사했습니다.") + + cursor.execute(f"DROP TABLE {temp_table_name}") + logging.info(f"임시 테이블 '{temp_table_name}'을(를) 삭제했습니다.") + + except sqlite3.Error as e: + logging.error(f"'{table_name}' 테이블 마이그레이션 중 오류 발생: {e}") + raise e + +def initialize_database(): + """ + 데이터베이스에 연결하고, 테이블 스키마를 최신 상태로 동기화합니다. + """ db_path = get_db_path() conn = None try: conn = sqlite3.connect(db_path) + conn.execute("BEGIN") cursor = conn.cursor() - # db_profile 테이블 생성 - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS db_profile ( - id VARCHAR(64) PRIMARY KEY NOT NULL, - type VARCHAR(32) NOT NULL, - host VARCHAR(255) NOT NULL, - port INTEGER NOT NULL, - name VARCHAR(64), - username VARCHAR(128) NOT NULL, - password VARCHAR(128) NOT NULL, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP - ); - """ - ) - # db_profile 테이블의 updated_at을 자동으로 업데이트하는 트리거 + + # --- db_profile 테이블 처리 --- + db_profile_cols = { + "id": "VARCHAR(64) PRIMARY KEY NOT NULL", + "type": "VARCHAR(32) NOT NULL", + "host": "VARCHAR(255)", + "port": "INTEGER", + "name": "VARCHAR(64)", + "username": "VARCHAR(128)", + "password": "VARCHAR(128)", + "view_name": "VARCHAR(64)", + "created_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "updated_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP" + } + cursor.execute(f"CREATE TABLE IF NOT EXISTS db_profile ({', '.join([f'{k} {v}' for k, v in db_profile_cols.items()])})") + _synchronize_table(cursor, "db_profile", db_profile_cols) + cursor.execute( """ CREATE TRIGGER IF NOT EXISTS update_db_profile_updated_at - BEFORE UPDATE ON db_profile - FOR EACH ROW - BEGIN - UPDATE db_profile SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; - END; - """ - ) - - # ai_credential 테이블 생성 - cursor.execute( + BEFORE UPDATE ON db_profile FOR EACH ROW + BEGIN UPDATE db_profile SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; END; """ - CREATE TABLE IF NOT EXISTS ai_credential ( - id VARCHAR(64) PRIMARY KEY NOT NULL, - service_name VARCHAR(32) NOT NULL UNIQUE, - api_key VARCHAR(256) NOT NULL, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP - ); - """ ) - # ai_credential 테이블의 updated_at을 자동으로 업데이트하는 트리거 + + # --- ai_credential 테이블 처리 --- + ai_credential_cols = { + "id": "VARCHAR(64) PRIMARY KEY NOT NULL", + "service_name": "VARCHAR(32) NOT NULL UNIQUE", + "api_key": "VARCHAR(256) NOT NULL", + "created_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "updated_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP" + } + cursor.execute(f"CREATE TABLE IF NOT EXISTS ai_credential ({', '.join([f'{k} {v}' for k, v in ai_credential_cols.items()])})") + _synchronize_table(cursor, "ai_credential", ai_credential_cols) + cursor.execute( """ CREATE TRIGGER IF NOT EXISTS update_ai_credential_updated_at - BEFORE UPDATE ON ai_credential - FOR EACH ROW - BEGIN - UPDATE ai_credential SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; - END; - """ - ) - - # chat_tab 테이블 생성 - cursor.execute( + BEFORE UPDATE ON ai_credential FOR EACH ROW + BEGIN UPDATE ai_credential SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; END; """ - CREATE TABLE IF NOT EXISTS chat_tab ( - id VARCHAR(64) PRIMARY KEY NOT NULL, - name VARCHAR(128), - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP - ); - """ ) - # chat_tab 테이블의 updated_at을 자동으로 업데이트하는 트리거 + + # --- chat_tab 테이블 처리 --- + chat_tab_cols = { + "id": "VARCHAR(64) PRIMARY KEY NOT NULL", + "name": "VARCHAR(128)", + "created_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "updated_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP" + } + cursor.execute(f"CREATE TABLE IF NOT EXISTS chat_tab ({', '.join([f'{k} {v}' for k, v in chat_tab_cols.items()])})") + _synchronize_table(cursor, "chat_tab", chat_tab_cols) cursor.execute( """ CREATE TRIGGER IF NOT EXISTS update_chat_tab_updated_at - BEFORE UPDATE ON chat_tab - FOR EACH ROW - BEGIN - UPDATE chat_tab SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; - END; - """ - ) - - # chat_message 테이블 생성 - cursor.execute( + BEFORE UPDATE ON chat_tab FOR EACH ROW + BEGIN UPDATE chat_tab SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; END; """ - CREATE TABLE IF NOT EXISTS chat_message ( - id VARCHAR(64) PRIMARY KEY NOT NULL, - chat_tab_id VARCHAR(64) NOT NULL, - sender VARCHAR(1) NOT NULL, - message TEXT NOT NULL, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (chat_tab_id) REFERENCES chat_tab(id) - ); - """ ) - # chat_message 테이블의 updated_at을 자동으로 업데이트하는 트리거 + + # --- chat_message 테이블 처리 --- + chat_message_cols = { + "id": "VARCHAR(64) PRIMARY KEY NOT NULL", + "chat_tab_id": "VARCHAR(64) NOT NULL", + "sender": "VARCHAR(1) NOT NULL", + "message": "TEXT NOT NULL", + "created_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "updated_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "FOREIGN KEY (chat_tab_id)": "REFERENCES chat_tab(id)" + } + create_chat_message_sql = ", ".join([f"{k} {v}" for k, v in chat_message_cols.items() if not k.startswith("FOREIGN KEY")]) + create_chat_message_sql += f", FOREIGN KEY (chat_tab_id) REFERENCES chat_tab(id)" + cursor.execute(f"CREATE TABLE IF NOT EXISTS chat_message ({create_chat_message_sql})") + _synchronize_table(cursor, "chat_message", {k: v for k, v in chat_message_cols.items() if not k.startswith("FOREIGN KEY")}) + cursor.execute( """ CREATE TRIGGER IF NOT EXISTS update_chat_message_updated_at - BEFORE UPDATE ON chat_message - FOR EACH ROW - BEGIN - UPDATE chat_message SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; - END; - """ - ) - - # query_history 테이블 생성 - cursor.execute( + BEFORE UPDATE ON chat_message FOR EACH ROW + BEGIN UPDATE chat_message SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; END; """ - CREATE TABLE IF NOT EXISTS query_history ( - id VARCHAR(64) PRIMARY KEY NOT NULL, - chat_message_id VARCHAR(64) NOT NULL, - query_text TEXT NOT NULL, - is_success VARCHAR(1) NOT NULL, - error_message TEXT NOT NULL, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (chat_message_id) REFERENCES chat_message(id) - ); - """ ) - # query_history 테이블의 updated_at을 자동으로 업데이트하는 트리거 + + # --- query_history 테이블 처리 --- + query_history_cols = { + "id": "VARCHAR(64) PRIMARY KEY NOT NULL", + "chat_message_id": "VARCHAR(64) NOT NULL", + "query_text": "TEXT NOT NULL", + "is_success": "VARCHAR(1) NOT NULL", + "error_message": "TEXT NOT NULL", + "created_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "updated_at": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP", + "FOREIGN KEY (chat_message_id)": "REFERENCES chat_message(id)" + } + create_query_history_sql = ", ".join([f"{k} {v}" for k, v in query_history_cols.items() if not k.startswith("FOREIGN KEY")]) + create_query_history_sql += f", FOREIGN KEY (chat_message_id) REFERENCES chat_message(id)" + cursor.execute(f"CREATE TABLE IF NOT EXISTS query_history ({create_query_history_sql})") + _synchronize_table(cursor, "query_history", {k: v for k, v in query_history_cols.items() if not k.startswith("FOREIGN KEY")}) + cursor.execute( """ CREATE TRIGGER IF NOT EXISTS update_query_history_updated_at - BEFORE UPDATE ON query_history - FOR EACH ROW - BEGIN - UPDATE query_history SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; - END; - """ + BEFORE UPDATE ON query_history FOR EACH ROW + BEGIN UPDATE query_history SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; END; + """ ) conn.commit() except sqlite3.Error as e: - print(f"데이터베이스 초기화 중 오류 발생: {e}") + logging.error(f"데이터베이스 초기화 중 오류 발생: {e}. 변경 사항을 롤백합니다.") + if conn: + conn.rollback() finally: if conn: conn.close() diff --git a/app/main.py b/app/main.py index 198e5fd..01e9da2 100644 --- a/app/main.py +++ b/app/main.py @@ -14,8 +14,14 @@ ) from app.db.init_db import initialize_database +from starlette.middleware.base import BaseHTTPMiddleware +from app.core.all_logging import log_requests_middleware + app = FastAPI() +# 전체 로그 찍는 부분 +app.add_middleware(BaseHTTPMiddleware, dispatch=log_requests_middleware) + # 전역 예외 처리기 등록 app.add_exception_handler(Exception, generic_exception_handler) app.add_exception_handler(APIException, api_exception_handler) diff --git a/app/repository/user_db_repository.py b/app/repository/user_db_repository.py index d77d789..cd7a0e0 100644 --- a/app/repository/user_db_repository.py +++ b/app/repository/user_db_repository.py @@ -1,39 +1,278 @@ +import sqlite3 from typing import Any import oracledb +from app.core.exceptions import APIException from app.core.status import CommonCode -from app.schemas.user_db.connect_test_result_model import TestConnectionResult +from app.core.utils import get_db_path +from app.schemas.user_db.db_profile_model import AllDBProfileInfo, UpdateOrCreateDBProfile +from app.schemas.user_db.result_model import ( + AllDBProfileResult, + BasicResult, + ChangeProfileResult, + ColumnInfo, + ColumnListResult, + DBProfile, + SchemaListResult, + TableListResult, +) class UserDbRepository: - def test_db_connection(self, driver_module: Any, **kwargs: Any) -> TestConnectionResult: + def connection_test(self, driver_module: Any, **kwargs: Any) -> BasicResult: """ DB 드라이버와 연결에 필요한 매개변수들을 받아 연결을 테스트합니다. """ connection = None try: - if driver_module is oracledb: - if kwargs.get("user").lower() == "sys": - kwargs["mode"] = oracledb.AUTH_MODE_SYSDBA - connection = driver_module.connect(**kwargs) - # MSSQL과 같이 전체 연결 문자열이 제공된 경우 - elif "connection_string" in kwargs: - connection = driver_module.connect(kwargs["connection_string"]) - # SQLite와 같이 파일 이름만 필요한 경우 - elif "db_name" in kwargs: - connection = driver_module.connect(kwargs["db_name"]) - # 그 외 (MySQL, PostgreSQL, Oracle 등) 일반적인 키워드 인자 방식 연결 + connection = self._connect(driver_module, **kwargs) + return BasicResult(is_successful=True, code=CommonCode.SUCCESS_USER_DB_CONNECT_TEST) + except (AttributeError, driver_module.OperationalError, driver_module.DatabaseError): + return BasicResult(is_successful=False, code=CommonCode.FAIL_CONNECT_DB) + except Exception: + return BasicResult(is_successful=False, code=CommonCode.FAIL) + finally: + if connection: + connection.close() + + def create_profile(self, sql: str, data: tuple, create_db_info: UpdateOrCreateDBProfile) -> ChangeProfileResult: + """ + DB 드라이버와 연결에 필요한 매개변수들을 받아 저장합니다. + """ + db_path = get_db_path() + connection = None + try: + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + cursor.execute(sql, data) + connection.commit() + name = create_db_info.view_name if create_db_info.view_name else create_db_info.type + return ChangeProfileResult(is_successful=True, code=CommonCode.SUCCESS_SAVE_PROFILE, view_name=name) + except sqlite3.Error: + return ChangeProfileResult(is_successful=False, code=CommonCode.FAIL_SAVE_PROFILE) + except Exception: + return ChangeProfileResult(is_successful=False, code=CommonCode.FAIL_SAVE_PROFILE) + finally: + if connection: + connection.close() + + def update_profile(self, sql: str, data: tuple, update_db_info: UpdateOrCreateDBProfile) -> ChangeProfileResult: + """ + DB 드라이버와 연결에 필요한 매개변수들을 받아 업데이트합니다. + """ + db_path = get_db_path() + connection = None + try: + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + cursor.execute(sql, data) + connection.commit() + name = update_db_info.view_name if update_db_info.view_name else update_db_info.type + return ChangeProfileResult(is_successful=True, code=CommonCode.SUCCESS_UPDATE_PROFILE, view_name=name) + except sqlite3.Error: + return ChangeProfileResult(is_successful=False, code=CommonCode.FAIL_UPDATE_PROFILE) + except Exception: + return ChangeProfileResult(is_successful=False, code=CommonCode.FAIL_UPDATE_PROFILE) + finally: + if connection: + connection.close() + + def delete_profile( + self, + sql: str, + data: tuple, + profile_id: str, + ) -> ChangeProfileResult: + """ + DB 드라이버와 연결에 필요한 매개변수들을 받아 삭제합니다. + """ + db_path = get_db_path() + connection = None + try: + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + cursor.execute(sql, data) + connection.commit() + return ChangeProfileResult(is_successful=True, code=CommonCode.SUCCESS_DELETE_PROFILE, view_name=profile_id) + except sqlite3.Error: + return ChangeProfileResult(is_successful=False, code=CommonCode.FAIL_DELETE_PROFILE) + except Exception: + return ChangeProfileResult(is_successful=False, code=CommonCode.FAIL_DELETE_PROFILE) + finally: + if connection: + connection.close() + + def find_all_profile(self, sql: str) -> AllDBProfileResult: + """ + 전달받은 쿼리를 실행하여 모든 DB 연결 정보를 조회합니다. + """ + db_path = get_db_path() + connection = None + try: + connection = sqlite3.connect(db_path) + connection.row_factory = sqlite3.Row + cursor = connection.cursor() + cursor.execute(sql) + rows = cursor.fetchall() + profiles = [DBProfile(**row) for row in rows] + return AllDBProfileResult(is_successful=True, code=CommonCode.SUCCESS_FIND_PROFILE, profiles=profiles) + except sqlite3.Error: + return AllDBProfileResult(is_successful=False, code=CommonCode.FAIL_FIND_PROFILE) + except Exception: + return AllDBProfileResult(is_successful=False, code=CommonCode.FAIL_FIND_PROFILE) + finally: + if connection: + connection.close() + + def find_profile(self, sql: str, data: tuple) -> AllDBProfileInfo: + """ + 전달받은 쿼리를 실행하여 특정 DB 연결 정보를 조회합니다. + """ + db_path = get_db_path() + connection = None + try: + connection = sqlite3.connect(db_path) + connection.row_factory = sqlite3.Row + cursor = connection.cursor() + cursor.execute(sql, data) + row = cursor.fetchone() + + if not row: + raise APIException(CommonCode.NO_SEARCH_DATA) + return AllDBProfileInfo(**dict(row)) + except sqlite3.Error as e: + raise APIException(CommonCode.FAIL_FIND_PROFILE) from e + except Exception as e: + raise APIException(CommonCode.FAIL) from e + finally: + if connection: + connection.close() + + # ───────────────────────────── + # 스키마 조회 + # ───────────────────────────── + def find_schemas(self, driver_module: Any, schema_query: str, **kwargs: Any) -> SchemaListResult: + connection = None + try: + connection = self._connect(driver_module, **kwargs) + cursor = connection.cursor() + + if not schema_query: + return SchemaListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_SCHEMAS, schemas=["main"]) + + cursor.execute(schema_query) + schemas = [row[0] for row in cursor.fetchall()] + + return SchemaListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_SCHEMAS, schemas=schemas) + except Exception: + return SchemaListResult(is_successful=False, code=CommonCode.FAIL_FIND_SCHEMAS, schemas=[]) + finally: + if connection: + connection.close() + + # ───────────────────────────── + # 테이블 조회 + # ───────────────────────────── + def find_tables(self, driver_module: Any, table_query: str, schema_name: str, **kwargs: Any) -> TableListResult: + connection = None + try: + connection = self._connect(driver_module, **kwargs) + cursor = connection.cursor() + + if "%s" in table_query or "?" in table_query: + cursor.execute(table_query, (schema_name,)) + elif ":owner" in table_query: + cursor.execute(table_query, {"owner": schema_name}) + else: + cursor.execute(table_query) + + tables = [row[0] for row in cursor.fetchall()] + + return TableListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_TABLES, tables=tables) + except Exception: + return TableListResult(is_successful=False, code=CommonCode.FAIL_FIND_TABLES, tables=[]) + finally: + if connection: + connection.close() + + # ───────────────────────────── + # 컬럼 조회 + # ───────────────────────────── + def find_columns( + self, driver_module: Any, column_query: str, schema_name: str, db_type, table_name: str, **kwargs: Any + ) -> ColumnListResult: + connection = None + try: + connection = self._connect(driver_module, **kwargs) + cursor = connection.cursor() + + if db_type == "sqlite": + # SQLite는 PRAGMA를 직접 실행 + pragma_sql = f"PRAGMA table_info('{table_name}')" + cursor.execute(pragma_sql) + columns_raw = cursor.fetchall() + columns = [ + ColumnInfo( + name=c[1], + type=c[2], + nullable=(c[3] == 0), # notnull == 0 means nullable + default=c[4], + comment=None, + is_pk=(c[5] == 1), + ) + for c in columns_raw + ] else: - connection = driver_module.connect(**kwargs) + if "%s" in column_query or "?" in column_query: + cursor.execute(column_query, (schema_name, table_name)) + elif ":owner" in column_query and ":table" in column_query: + owner_bind = schema_name.upper() if schema_name else schema_name + table_bind = table_name.upper() if table_name else table_name + try: + cursor.execute(column_query, {"owner": owner_bind, "table": table_bind}) + except Exception: + try: + pos_query = column_query.replace(":owner", ":1").replace(":table", ":2") + cursor.execute(pos_query, [owner_bind, table_bind]) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + else: + cursor.execute(column_query) - return TestConnectionResult(is_successful=True, code=CommonCode.SUCCESS_USER_DB_CONNECT_TEST) + columns = [ + ColumnInfo( + name=c[0], + type=c[1], + nullable=(c[2] in ["YES", "Y", True]), + default=c[3], + comment=c[4] if len(c) > 4 else None, + is_pk=(c[5] in ["PRI", True] if len(c) > 5 else False), + ) + for c in cursor.fetchall() + ] + return ColumnListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_COLUMNS, columns=columns) except Exception: - return TestConnectionResult(is_successful=False, code=CommonCode.FAIL_CONNECT_DB) + return ColumnListResult(is_successful=False, code=CommonCode.FAIL_FIND_COLUMNS, columns=[]) finally: if connection: connection.close() + # ───────────────────────────── + # DB 연결 메서드 + # ───────────────────────────── + def _connect(self, driver_module: Any, **kwargs): + if driver_module is oracledb: + if kwargs.get("user", "").lower() == "sys": + kwargs["mode"] = oracledb.AUTH_MODE_SYSDBA + return driver_module.connect(**kwargs) + elif "connection_string" in kwargs: + return driver_module.connect(kwargs["connection_string"]) + elif "db_name" in kwargs: + return driver_module.connect(kwargs["db_name"]) + else: + return driver_module.connect(**kwargs) + user_db_repository = UserDbRepository() diff --git a/app/schemas/user_db/connect_test_result_model.py b/app/schemas/user_db/connect_test_result_model.py deleted file mode 100644 index 5f68f38..0000000 --- a/app/schemas/user_db/connect_test_result_model.py +++ /dev/null @@ -1,10 +0,0 @@ -# app/schemas/user_db/connect_test_result_model.py - -from pydantic import BaseModel, Field - -from app.core.status import CommonCode - - -class TestConnectionResult(BaseModel): - is_successful: bool = Field(..., description="성공 여부") - code: CommonCode = Field(None, description="결과 코드") diff --git a/app/schemas/user_db/db_profile_model.py b/app/schemas/user_db/db_profile_model.py index 616156c..f6d601f 100644 --- a/app/schemas/user_db/db_profile_model.py +++ b/app/schemas/user_db/db_profile_model.py @@ -10,15 +10,13 @@ # 사용자가 직접 입력해야 하는 정보만 포함합니다. -class DBProfileCreate(BaseModel): +class DBProfileInfo(BaseModel): type: str = Field(..., description="DB 종류") host: str | None = Field(None, description="호스트 주소") port: int | None = Field(None, description="포트 번호") + name: str | None = Field(None, description="연결할 데이터베이스명") username: str | None = Field(None, description="사용자 이름") password: str | None = Field(None, description="비밀번호") - name: str | None = Field(None, description="데이터베이스 이름") - driver: str | None = Field(None, description="드라이버 이름") - def validate_required_fields(self) -> None: """DB 종류별 필수 필드 유효성 검사""" required_fields_by_type = { @@ -54,17 +52,12 @@ def _is_empty(value: Any | None) -> bool: return True return False +class UpdateOrCreateDBProfile(DBProfileInfo): + id: str | None = Field(None, description="DB Key 값") + view_name: str | None = Field(None, description="DB 노출명") -# DB에서 조회되는 모든 정보를 담는 클래스입니다. -class DBProfile(BaseModel): - id: str - type: str - host: str - port: int - name: str | None - username: str - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True +class AllDBProfileInfo(DBProfileInfo): + id: str | None = Field(..., description="DB Key 값") + view_name: str | None = Field(None, description="DB 노출명") + created_at: datetime = Field(..., description="profile 저장일") + updated_at: datetime = Field(..., description="profile 수정일") diff --git a/app/schemas/user_db/result_model.py b/app/schemas/user_db/result_model.py new file mode 100644 index 0000000..28c375e --- /dev/null +++ b/app/schemas/user_db/result_model.py @@ -0,0 +1,65 @@ +# app/schemas/user_db/result_model.py + +from pydantic import BaseModel, Field +from datetime import datetime +from typing import List, Any + +from app.core.status import CommonCode + +# 기본 반환 모델 +class BasicResult(BaseModel): + is_successful: bool = Field(..., description="성공 여부") + code: CommonCode = Field(None, description="결과 코드") + +# 디비 정보 후 반환되는 저장 모델 +class ChangeProfileResult(BasicResult): + """DB 조회 결과를 위한 확장 모델""" + view_name: str = Field(..., description="저장된 디비명") + +# DB Profile 조회되는 정보를 담는 모델입니다. +class DBProfile(BaseModel): + id: str + type: str + host: str | None + port: int | None + name: str | None + username: str | None + view_name: str | None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + +# DB Profile 전체 조회 결과를 담는 새로운 모델 +class AllDBProfileResult(BasicResult): + """DB 프로필 전체 조회 결과를 위한 확장 모델""" + profiles: List[DBProfile] = Field([], description="DB 프로필 목록") + +class ColumnInfo(BaseModel): + """단일 컬럼의 상세 정보를 담는 모델""" + name: str = Field(..., description="컬럼 이름") + type: str = Field(..., description="데이터 타입") + nullable: bool = Field(..., description="NULL 허용 여부") + default: Any | None = Field(None, description="기본값") + comment: str | None = Field(None, description="코멘트") + is_pk: bool = Field(False, description="기본 키(Primary Key) 여부") + +class TableInfo(BaseModel): + """단일 테이블의 이름과 컬럼 목록을 담는 모델""" + name: str = Field(..., description="테이블 이름") + columns: List[ColumnInfo] = Field([], description="컬럼 목록") + comment: str | None = Field(None, description="테이블 코멘트") + +class SchemaInfoResult(BasicResult): + """DB 스키마 상세 정보 조회 결과를 위한 확장 모델""" + schema: List[TableInfo] = Field([], description="테이블 및 컬럼 정보 목록") + +class SchemaListResult(BasicResult): + schemas: List[str] = Field([], description="스키마 이름 목록") + +class TableListResult(BasicResult): + tables: List[str] = Field([], description="테이블 이름 목록") + +class ColumnListResult(BasicResult): + columns: List[ColumnInfo] = Field([], description="컬럼 정보 목록") diff --git a/app/services/user_db_service.py b/app/services/user_db_service.py index 50b5b9f..9463d85 100644 --- a/app/services/user_db_service.py +++ b/app/services/user_db_service.py @@ -7,27 +7,146 @@ from fastapi import Depends from app.core.enum.db_driver import DBTypesEnum +from app.core.enum.db_key_prefix_name import DBSaveIdEnum from app.core.exceptions import APIException from app.core.status import CommonCode +from app.core.utils import generate_prefixed_uuid from app.repository.user_db_repository import UserDbRepository, user_db_repository -from app.schemas.user_db.connect_test_result_model import TestConnectionResult -from app.schemas.user_db.db_profile_model import DBProfileCreate +from app.schemas.user_db.db_profile_model import AllDBProfileInfo, DBProfileInfo, UpdateOrCreateDBProfile +from app.schemas.user_db.result_model import ( + AllDBProfileResult, + BasicResult, + ChangeProfileResult, + ColumnListResult, + SchemaInfoResult, + TableListResult, +) user_db_repository_dependency = Depends(lambda: user_db_repository) class UserDbService: - def connection_test( - self, db_info: DBProfileCreate, repository: UserDbRepository = user_db_repository - ) -> TestConnectionResult: + def connection_test(self, db_info: DBProfileInfo, repository: UserDbRepository = user_db_repository) -> BasicResult: """ - DB 연결 정보를 받아 연결 테스트를 수행하고 결과를 객체로 반환합니다. + DB 연결 정보를 받아 연결 테스트를 수행 후 결과를 반환합니다. """ try: driver_module = self._get_driver_module(db_info.type) connect_kwargs = self._prepare_connection_args(db_info) - return repository.test_db_connection(driver_module, **connect_kwargs) - except (ValueError, ImportError) as e: + return repository.connection_test(driver_module, **connect_kwargs) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def create_profile( + self, create_db_info: UpdateOrCreateDBProfile, repository: UserDbRepository = user_db_repository + ) -> ChangeProfileResult: + """ + DB 연결 정보를 저장 후 결과를 반환합니다. + """ + create_db_info.id = generate_prefixed_uuid(DBSaveIdEnum.user_db.value) + try: + # [수정] 쿼리와 데이터를 서비스에서 생성하여 레포지토리로 전달합니다. + sql, data = self._get_create_query_and_data(create_db_info) + return repository.create_profile(sql, data, create_db_info) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def update_profile( + self, update_db_info: UpdateOrCreateDBProfile, repository: UserDbRepository = user_db_repository + ) -> ChangeProfileResult: + """ + DB 연결 정보를 업데이트 후 결과를 반환합니다. + """ + try: + # [수정] 쿼리와 데이터를 서비스에서 생성하여 레포지토리로 전달합니다. + sql, data = self._get_update_query_and_data(update_db_info) + return repository.update_profile(sql, data, update_db_info) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def delete_profile(self, profile_id: str, repository: UserDbRepository = user_db_repository) -> ChangeProfileResult: + """ + DB 연결 정보를 삭제 후 결과를 반환합니다. + """ + try: + # [수정] 쿼리와 데이터를 서비스에서 생성하여 레포지토리로 전달합니다. + sql, data = self._get_delete_query_and_data(profile_id) + return repository.delete_profile(sql, data, profile_id) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def find_all_profile(self, repository: UserDbRepository = user_db_repository) -> AllDBProfileResult: + """ + 모든 DB 연결 정보를 반환합니다. + """ + try: + # [수정] 쿼리를 서비스에서 생성하여 레포지토리로 전달합니다. + sql = self._get_find_all_query() + return repository.find_all_profile(sql) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def find_profile(self, profile_id, repository: UserDbRepository = user_db_repository) -> AllDBProfileInfo: + """ + 특정 DB 연결 정보를 반환합니다. + """ + try: + # [수정] 쿼리와 데이터를 서비스에서 생성하여 레포지토리로 전달합니다. + sql, data = self._get_find_one_query_and_data(profile_id) + return repository.find_profile(sql, data) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def find_schemas( + self, db_info: AllDBProfileInfo, repository: UserDbRepository = user_db_repository + ) -> SchemaInfoResult: + """ + DB 스키마 정보를 조회를 수행합니다. + """ + try: + driver_module = self._get_driver_module(db_info.type) + connect_kwargs = self._prepare_connection_args(db_info) + schema_query = self._get_schema_query(db_info.type) + + return repository.find_schemas(driver_module, schema_query, **connect_kwargs) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def find_tables( + self, db_info: AllDBProfileInfo, schema_name: str, repository: UserDbRepository = user_db_repository + ) -> TableListResult: + """ + 특정 스키마 내의 테이블 정보를 조회합니다. + """ + try: + driver_module = self._get_driver_module(db_info.type) + connect_kwargs = self._prepare_connection_args(db_info) + table_query = self._get_table_query(db_info.type, for_all_schemas=False) + + return repository.find_tables(driver_module, table_query, schema_name, **connect_kwargs) + except Exception as e: + raise APIException(CommonCode.FAIL) from e + + def find_columns( + self, + db_info: AllDBProfileInfo, + schema_name: str, + table_name: str, + repository: UserDbRepository = user_db_repository, + ) -> ColumnListResult: + """ + 특정 컬럼 정보를 조회합니다. + """ + try: + driver_module = self._get_driver_module(db_info.type) + connect_kwargs = self._prepare_connection_args(db_info) + column_query = self._get_column_query(db_info.type) + db_type = db_info.type + + return repository.find_columns( + driver_module, column_query, schema_name, db_type, table_name, **connect_kwargs + ) + except Exception as e: raise APIException(CommonCode.FAIL) from e def _get_driver_module(self, db_type: str): @@ -39,7 +158,7 @@ def _get_driver_module(self, db_type: str): return sqlite3 return importlib.import_module(driver_name) - def _prepare_connection_args(self, db_info: DBProfileCreate) -> dict[str, Any]: + def _prepare_connection_args(self, db_info: DBProfileInfo) -> dict[str, Any]: """ DB 타입에 따라 연결에 필요한 매개변수를 딕셔너리로 구성합니다. """ @@ -76,5 +195,109 @@ def _prepare_connection_args(self, db_info: DBProfileCreate) -> dict[str, Any]: return kwargs + def _get_schema_query(self, db_type: str) -> str | None: + db_type = db_type.lower() + if db_type == "postgresql": + return """ + SELECT schema_name FROM information_schema.schemata + WHERE schema_name NOT IN ('pg_catalog', 'information_schema') + """ + elif db_type in ["mysql", "mariadb"]: + return "SELECT schema_name FROM information_schema.schemata" + elif db_type == "oracle": + return "SELECT username FROM all_users" + elif db_type == "sqlite": + return None + return None + + def _get_table_query(self, db_type: str, for_all_schemas: bool = False) -> str | None: # 수정됨 + db_type = db_type.lower() + if db_type == "postgresql": + if for_all_schemas: + return """ + SELECT table_name, table_schema FROM information_schema.tables + WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('pg_catalog', 'information_schema') + """ + else: + return """ + SELECT table_name, table_schema FROM information_schema.tables + WHERE table_type = 'BASE TABLE' AND table_schema = %s + """ + elif db_type in ["mysql", "mariadb"]: + if for_all_schemas: + return """ + SELECT table_name, table_schema FROM information_schema.tables + WHERE table_type = 'BASE TABLE' + """ + else: + return """ + SELECT table_name, table_schema FROM information_schema.tables + WHERE table_type = 'BASE TABLE' AND table_schema = %s + """ + elif db_type == "oracle": + return "SELECT table_name FROM all_tables WHERE owner = :owner" + elif db_type == "sqlite": + return "SELECT name FROM sqlite_master WHERE type='table'" + return None + + def _get_column_query(self, db_type: str) -> str | None: + db_type = db_type.lower() + if db_type == "postgresql": + return """ + SELECT column_name, data_type, is_nullable, column_default, table_name, table_schema + FROM information_schema.columns + WHERE table_schema NOT IN ('pg_catalog', 'information_schema') + AND table_schema = %s + AND table_name = %s + """ + elif db_type in ["mysql", "mariadb"]: + return """ + SELECT column_name, data_type, is_nullable, column_default, table_name, table_schema + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + """ + elif db_type == "oracle": + return """ + SELECT column_name, data_type, nullable, data_default, table_name + FROM all_tab_columns + WHERE owner = :owner AND table_name = :table + """ + elif db_type == "sqlite": + return None + return None + + # ───────────────────────────── + # 프로필 CRUD 쿼리 생성 메서드 + # ───────────────────────────── + def _get_create_query_and_data(self, db_info: UpdateOrCreateDBProfile) -> tuple[str, tuple]: + profile_dict = db_info.model_dump() + columns_to_insert = {k: v for k, v in profile_dict.items() if v is not None} + columns = ", ".join(columns_to_insert.keys()) + placeholders = ", ".join(["?"] * len(columns_to_insert)) + sql = f"INSERT INTO db_profile ({columns}) VALUES ({placeholders})" + data = tuple(columns_to_insert.values()) + return sql, data + + def _get_update_query_and_data(self, db_info: UpdateOrCreateDBProfile) -> tuple[str, tuple]: + profile_dict = db_info.model_dump() + columns_to_update = {k: v for k, v in profile_dict.items() if v is not None and k != "id"} + set_clause = ", ".join([f"{key} = ?" for key in columns_to_update.keys()]) + sql = f"UPDATE db_profile SET {set_clause} WHERE id = ?" + data = tuple(columns_to_update.values()) + (db_info.id,) + return sql, data + + def _get_delete_query_and_data(self, profile_id: str) -> tuple[str, tuple]: + sql = "DELETE FROM db_profile WHERE id = ?" + data = (profile_id,) + return sql, data + + def _get_find_all_query(self) -> str: + return "SELECT id, type, host, port, name, username, view_name, created_at, updated_at FROM db_profile" + + def _get_find_one_query_and_data(self, profile_id: str) -> tuple[str, tuple]: + sql = "SELECT * FROM db_profile WHERE id = ?" + data = (profile_id,) + return sql, data + user_db_service = UserDbService()