diff --git a/app/api/api_key_api.py b/app/api/api_key_api.py new file mode 100644 index 0000000..22b80cb --- /dev/null +++ b/app/api/api_key_api.py @@ -0,0 +1,113 @@ +from fastapi import APIRouter, Depends + +from app.core.enum.llm_service_info import LLMServiceEnum +from app.core.response import ResponseMessage +from app.core.status import CommonCode +from app.schemas.api_key.create_model import APIKeyCreate +from app.schemas.api_key.response_model import APIKeyResponse +from app.schemas.api_key.update_model import APIKeyUpdate +from app.services.api_key_service import APIKeyService, api_key_service + +api_key_service_dependency = Depends(lambda: api_key_service) + +router = APIRouter() + + +@router.post( + "/actions", + response_model=ResponseMessage[APIKeyResponse], + summary="API KEY 저장 (처음 한 번)", + description="외부 AI 서비스의 API Key를 암호화하여 로컬 데이터베이스에 저장합니다.", +) +def store_api_key( + credential: APIKeyCreate, service: APIKeyService = api_key_service_dependency +) -> ResponseMessage[APIKeyResponse]: + """ + - **service_name**: API Key가 사용될 외부 서비스 이름 (예: "OpenAI") + - **api_key**: 암호화하여 저장할 실제 API Key (예: "sk-***..") + """ + created_credential = service.store_api_key(credential) + + response_data = APIKeyResponse( + id=created_credential.id, + service_name=created_credential.service_name.value, + api_key_encrypted=created_credential.api_key, + created_at=created_credential.created_at, + updated_at=created_credential.updated_at, + ) + + return ResponseMessage.success(value=response_data, code=CommonCode.CREATED) + + +@router.get( + "/result", + response_model=ResponseMessage[list[APIKeyResponse]], + summary="저장된 모든 API KEY 정보 조회", + description=""" + ai_credential 테이블에 저장된 모든 서비스 이름을 확인합니다. + 이를 통해 프론트엔드에서는 비워둘 필드, 임의의 마스킹된 값을 채워둘 필드를 구분합니다. + """, +) +def get_all_api_keys( + service: APIKeyService = api_key_service_dependency, +) -> ResponseMessage[list[APIKeyResponse]]: + """저장된 모든 API Key의 메타데이터를 조회하여 등록 여부를 확인합니다.""" + db_credentials = service.get_all_api_keys() + + response_data = [ + APIKeyResponse( + id=cred.id, + service_name=cred.service_name, + created_at=cred.created_at, + updated_at=cred.updated_at, + ) + for cred in db_credentials + ] + return ResponseMessage.success(value=response_data) + + +@router.get( + "/result/{serviceName}", + response_model=ResponseMessage[APIKeyResponse], + summary="특정 서비스의 API KEY 정보 조회", +) +def get_api_key_by_service_name( + serviceName: LLMServiceEnum, service: APIKeyService = api_key_service_dependency +) -> ResponseMessage[APIKeyResponse]: + """서비스 이름을 기준으로 특정 API Key의 메타데이터를 조회합니다.""" + db_credential = service.get_api_key_by_service_name(serviceName) + + response_data = APIKeyResponse( + id=db_credential.id, + service_name=db_credential.service_name, + created_at=db_credential.created_at, + updated_at=db_credential.updated_at, + ) + return ResponseMessage.success(value=response_data) + + +@router.put( + "/modify/{serviceName}", + response_model=ResponseMessage[APIKeyResponse], + summary="특정 서비스의 API KEY 수정", +) +def update_api_key( + serviceName: LLMServiceEnum, + key_data: APIKeyUpdate, + service: APIKeyService = api_key_service_dependency, +) -> ResponseMessage[APIKeyResponse]: + """ + 서비스 이름을 기준으로 특정 API Key를 새로운 값으로 수정합니다. + - **service_name**: 수정할 서비스의 이름 + - **api_key**: 새로운 API Key + """ + updated_credential = service.update_api_key(serviceName.value, key_data) + + response_data = APIKeyResponse( + id=updated_credential.id, + service_name=updated_credential.service_name, + created_at=updated_credential.created_at, + updated_at=updated_credential.updated_at, + ) + + return ResponseMessage.success(value=response_data) diff --git a/app/api/api_router.py b/app/api/api_router.py index c7238e1..b1c2d39 100644 --- a/app/api/api_router.py +++ b/app/api/api_router.py @@ -2,7 +2,7 @@ from fastapi import APIRouter -from app.api import driver_api, test_api, user_db_api +from app.api import api_key_api, driver_api, test_api, user_db_api api_router = APIRouter() @@ -12,3 +12,4 @@ # 라우터 api_router.include_router(driver_api.router, prefix="/driver", tags=["Driver"]) api_router.include_router(user_db_api.router, prefix="/user/db", tags=["UserDb"]) +api_router.include_router(api_key_api.router, prefix="/keys", tags=["API Key"]) diff --git a/app/core/enum/llm_service_info.py b/app/core/enum/llm_service_info.py new file mode 100644 index 0000000..c568e8d --- /dev/null +++ b/app/core/enum/llm_service_info.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class LLMServiceEnum(str, Enum): + """지원하는 외부 LLM 서비스 목록""" + + OPENAI = "OpenAI" + ANTHROPIC = "Anthropic" + GEMINI = "Gemini" + # TODO: 다른 지원 서비스를 여기에 추가 diff --git a/app/core/status.py b/app/core/status.py index 7fe3fb1..6d5e538 100644 --- a/app/core/status.py +++ b/app/core/status.py @@ -43,6 +43,12 @@ class CommonCode(Enum): NO_DB_DRIVER = (status.HTTP_400_BAD_REQUEST, "4101", "데이터베이스는 필수 값입니다.") """ KEY 클라이언트 오류 코드 - 42xx """ + INVALID_API_KEY_FORMAT = (status.HTTP_400_BAD_REQUEST, "4200", "API 키의 형식이 올바르지 않습니다.") + INVALID_API_KEY_PREFIX = ( + status.HTTP_400_BAD_REQUEST, + "4201", + "API 키가 선택한 서비스의 올바른 형식이 아닙니다. (예: OpenAI는 sk-로 시작)", + ) """ AI CHAT, DB 클라이언트 오류 코드 - 43xx """ @@ -55,6 +61,16 @@ class CommonCode(Enum): # ================================== """ 기본 서버 오류 코드 - 50xx """ FAIL = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5000", "서버 처리 중 오류가 발생했습니다.") + DB_BUSY = ( + status.HTTP_503_SERVICE_UNAVAILABLE, + "5001", + "데이터베이스가 현재 사용 중입니다. 잠시 후 다시 시도해주세요.", + ) + FAIL_TO_VERIFY_CREATION = ( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "5002", + "데이터 생성 후 검증 과정에서 오류가 발생했습니다.", + ) """ DRIVER, DB 서버 오류 코드 - 51xx """ FAIL_CONNECT_DB = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5100", "디비 연결 중 오류가 발생했습니다.") diff --git a/app/db/init_db.py b/app/db/init_db.py index 0b8fa58..79fa774 100644 --- a/app/db/init_db.py +++ b/app/db/init_db.py @@ -48,7 +48,7 @@ def initialize_database(): """ CREATE TABLE IF NOT EXISTS ai_credential ( id VARCHAR(64) PRIMARY KEY NOT NULL, - service_name VARCHAR(32) NOT NULL, + service_name VARCHAR(32) NOT NULL UNIQUE, api_key VARCHAR(256) NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP diff --git a/app/repository/api_key_repository.py b/app/repository/api_key_repository.py new file mode 100644 index 0000000..caaae96 --- /dev/null +++ b/app/repository/api_key_repository.py @@ -0,0 +1,112 @@ +import sqlite3 + +from app.core.utils import get_db_path +from app.schemas.api_key.db_model import APIKeyInDB + + +class APIKeyRepository: + def create_api_key(self, new_id: str, service_name: str, encrypted_key: str) -> APIKeyInDB: + """ + 암호화된 API Key 정보를 받아 데이터베이스에 저장하고, + 저장된 객체를 반환합니다. + """ + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute( + """ + INSERT INTO ai_credential (id, service_name, api_key) + VALUES (?, ?, ?) + """, + (new_id, service_name, encrypted_key), + ) + conn.commit() + + cursor.execute("SELECT * FROM ai_credential WHERE id = ?", (new_id,)) + created_row = cursor.fetchone() + + if not created_row: + return None + + return APIKeyInDB.model_validate(dict(created_row)) + + finally: + if conn: + conn.close() + + def get_all_api_keys(self) -> list[APIKeyInDB]: + """데이터베이스에 저장된 모든 API Key를 조회합니다.""" + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute("SELECT * FROM ai_credential") + rows = cursor.fetchall() + + return [APIKeyInDB.model_validate(dict(row)) for row in rows] + finally: + if conn: + conn.close() + + def get_api_key_by_service_name(self, service_name: str) -> APIKeyInDB | None: + """서비스 이름으로 특정 API Key를 조회합니다.""" + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute("SELECT * FROM ai_credential WHERE service_name = ?", (service_name,)) + row = cursor.fetchone() + + if not row: + return None + + return APIKeyInDB.model_validate(dict(row)) + finally: + if conn: + conn.close() + + def update_api_key(self, service_name: str, encrypted_key: str) -> APIKeyInDB | None: + """서비스 이름에 해당하는 API Key를 수정하고, 수정된 객체를 반환합니다.""" + db_path = get_db_path() + conn = None + try: + conn = sqlite3.connect(str(db_path), timeout=10) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + # 먼저 해당 서비스의 데이터가 존재하는지 확인 + cursor.execute("SELECT id FROM ai_credential WHERE service_name = ?", (service_name,)) + if not cursor.fetchone(): + return None + + # 데이터 업데이트 + cursor.execute( + "UPDATE ai_credential SET api_key = ?, updated_at = datetime('now', 'localtime') WHERE service_name = ?", + (encrypted_key, service_name), + ) + conn.commit() + + # rowcount가 0이면 업데이트된 행이 없음 (정상적인 경우 발생하기 어려움) + if cursor.rowcount == 0: + return None + + cursor.execute("SELECT * FROM ai_credential WHERE service_name = ?", (service_name,)) + updated_row = cursor.fetchone() + + return APIKeyInDB.model_validate(dict(updated_row)) + finally: + if conn: + conn.close() + + +api_key_repository = APIKeyRepository() diff --git a/app/schemas/api_key/base_model.py b/app/schemas/api_key/base_model.py new file mode 100644 index 0000000..bb2c3f0 --- /dev/null +++ b/app/schemas/api_key/base_model.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel, Field + +from app.core.enum.llm_service_info import LLMServiceEnum + + +class APIKeyBase(BaseModel): + """API Key 도메인의 모든 스키마가 상속하는 기본 모델""" + + service_name: LLMServiceEnum = Field(..., description="외부 서비스 이름") diff --git a/app/schemas/api_key/create_model.py b/app/schemas/api_key/create_model.py new file mode 100644 index 0000000..c53e3de --- /dev/null +++ b/app/schemas/api_key/create_model.py @@ -0,0 +1,27 @@ +from pydantic import Field + +from app.core.enum.llm_service_info import LLMServiceEnum +from app.core.exceptions import APIException +from app.core.status import CommonCode +from app.schemas.api_key.base_model import APIKeyBase + + +class APIKeyCreate(APIKeyBase): + """API Key 생성을 위한 스키마""" + + api_key: str = Field(..., description="암호화하여 저장할 실제 API Key") + + def validate_with_service(self) -> None: + """서비스 종류에 따라 API Key의 유효성을 검증합니다.""" + # 1. 기본 형식 검증 (공백 또는 빈 문자열) + if not self.api_key or self.api_key.isspace(): + raise APIException(CommonCode.INVALID_API_KEY_FORMAT) + + # 2. 서비스별 접두사 검증 + key_prefix_map = { + LLMServiceEnum.OPENAI: "sk-", + } + required_prefix = key_prefix_map.get(self.service_name) + + if required_prefix and not self.api_key.startswith(required_prefix): + raise APIException(CommonCode.INVALID_API_KEY_PREFIX) diff --git a/app/schemas/api_key/db_model.py b/app/schemas/api_key/db_model.py new file mode 100644 index 0000000..251c95c --- /dev/null +++ b/app/schemas/api_key/db_model.py @@ -0,0 +1,15 @@ +from datetime import datetime + +from app.schemas.api_key.base_model import APIKeyBase + + +class APIKeyInDB(APIKeyBase): + """데이터베이스에 저장된 형태의 스키마 (내부용)""" + + id: str + api_key: str # DB 모델에서는 암호화된 키를 의미 + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True diff --git a/app/schemas/api_key/response_model.py b/app/schemas/api_key/response_model.py new file mode 100644 index 0000000..236c3b1 --- /dev/null +++ b/app/schemas/api_key/response_model.py @@ -0,0 +1,11 @@ +from datetime import datetime + +from app.schemas.api_key.base_model import APIKeyBase + + +class APIKeyResponse(APIKeyBase): + """API 응답용 스키마""" + + id: str + created_at: datetime + updated_at: datetime diff --git a/app/schemas/api_key/update_model.py b/app/schemas/api_key/update_model.py new file mode 100644 index 0000000..89f2b16 --- /dev/null +++ b/app/schemas/api_key/update_model.py @@ -0,0 +1,16 @@ +from pydantic import BaseModel, Field + +from app.core.exceptions import APIException +from app.core.status import CommonCode + + +class APIKeyUpdate(BaseModel): + """API Key 수정을 위한 스키마""" + + api_key: str = Field(..., description="새로운 API Key") + + def validate_with_api_key(self) -> None: + """API Key의 유효성을 검증합니다.""" + # 기본 형식 검증 (공백 또는 빈 문자열) + if not self.api_key or self.api_key.isspace(): + raise APIException(CommonCode.INVALID_API_KEY_FORMAT) diff --git a/app/services/api_key_service.py b/app/services/api_key_service.py new file mode 100644 index 0000000..f227562 --- /dev/null +++ b/app/services/api_key_service.py @@ -0,0 +1,80 @@ +import sqlite3 + +from fastapi import Depends + +from app.core.exceptions import APIException +from app.core.security import AES256 +from app.core.status import CommonCode +from app.core.utils import generate_prefixed_uuid +from app.repository.api_key_repository import APIKeyRepository, api_key_repository +from app.schemas.api_key.create_model import APIKeyCreate +from app.schemas.api_key.db_model import APIKeyInDB +from app.schemas.api_key.update_model import APIKeyUpdate + +api_key_repository_dependency = Depends(lambda: api_key_repository) + + +class APIKeyService: + def __init__(self, repository: APIKeyRepository = api_key_repository): + self.repository = repository + + def store_api_key(self, credential_data: APIKeyCreate) -> APIKeyInDB: + """API_KEY를 암호화하고 repository를 통해 데이터베이스에 저장합니다.""" + credential_data.validate_with_service() + try: + encrypted_key = AES256.encrypt(credential_data.api_key) + new_id = generate_prefixed_uuid("QGENIE") + + created_row = self.repository.create_api_key( + new_id=new_id, + service_name=credential_data.service_name.value, + encrypted_key=encrypted_key, + ) + + if not created_row: + raise APIException(CommonCode.FAIL_TO_VERIFY_CREATION) + + return created_row + + except sqlite3.IntegrityError as e: + raise APIException(CommonCode.DUPLICATION) from e + except sqlite3.Error as e: + if "database is locked" in str(e): + raise APIException(CommonCode.DB_BUSY) from e + raise APIException(CommonCode.FAIL) from e + + def get_all_api_keys(self) -> list[APIKeyInDB]: + """데이터베이스에 저장된 모든 API Key를 조회합니다.""" + try: + return self.repository.get_all_api_keys() + except sqlite3.Error as e: + raise APIException(CommonCode.FAIL) from e + + def get_api_key_by_service_name(self, service_name: str) -> APIKeyInDB: + """서비스 이름으로 특정 API Key를 조회합니다.""" + try: + api_key = self.repository.get_api_key_by_service_name(service_name) + if not api_key: + raise APIException(CommonCode.NO_SEARCH_DATA) + return api_key + except sqlite3.Error as e: + raise APIException(CommonCode.FAIL) from e + + def update_api_key(self, service_name: str, key_data: APIKeyUpdate) -> APIKeyInDB: + """서비스 이름에 해당하는 API Key를 수정합니다.""" + key_data.validate_with_api_key() + try: + encrypted_key = AES256.encrypt(key_data.api_key) + updated_api_key = self.repository.update_api_key(service_name, encrypted_key) + + if not updated_api_key: + raise APIException(CommonCode.NO_SEARCH_DATA) + + return updated_api_key + except sqlite3.Error as e: + if "database is locked" in str(e): + raise APIException(CommonCode.DB_BUSY) from e + raise APIException(CommonCode.FAIL) from e + + +api_key_service = APIKeyService()