diff --git a/README.md b/README.md index 75f36c7..a259496 100644 --- a/README.md +++ b/README.md @@ -10,54 +10,55 @@ 이 프로젝트는 가상 환경 사용을 권장합니다. -1. **저장소 복제** - ```bash - git clone https://github.com/Queryus/QGenie_ai.git - ``` - -2. **가상 환경 생성 및 활성화** - ```bash - # 가상 환경 생성 (최초 한 번) - python3 -m venv .venv - - # 가상 환경 활성화 (macOS/Linux) - source .venv/bin/activate - - # 가상 환경 활성화 (Windows) - .venv\Scripts\activate - ``` - -3. **라이브러리 설치** - 프로젝트에 필요한 모든 라이브러리는 `requirements.txt` 파일에 명시되어 있습니다. - ```bash - pip install -r requirements.txt - ``` - -4. **실행파일 생성 후 확인** - ```bash - # 이전 빌드 결과물 삭제 - rm -rf build dist - - # 실행 파일 빌드 - pyinstaller --clean --onefile --add-data "src/prompts:prompts" --name ai src/main.py - ``` +1. **저장소 복제** + + ```bash + git clone https://github.com/Queryus/QGenie_ai.git + ``` +2. **가상 환경 생성 및 활성화** + + ```bash + # 가상 환경 생성 (최초 한 번) + python3 -m venv .venv + + # 가상 환경 활성화 (macOS/Linux) + source .venv/bin/activate + + # 가상 환경 활성화 (Windows) + .venv\Scripts\activate + ``` +3. **라이브러리 설치** + 프로젝트에 필요한 모든 라이브러리는 `requirements.txt` 파일에 명시되어 있습니다. + + ```bash + pip install -r requirements.txt + ``` +4. **실행파일 생성 후 확인** + + ```bash + # 이전 빌드 결과물 삭제 + rm -rf build dist + + # 실행 파일 빌드 + pyinstaller --clean --onefile --add-data "src/prompts:prompts" --name ai src/main.py + ``` --- ## 📦 배포 방법 -~~~markdown +```markdown 태그 예시) v1.2.3 1 : 큰 버전(기존에 없던 새로운 도메인 추가) 2 : 중간 버전(기존 도메인에 새로운 기능 추가) 3 : 패치 버전(버그 수정, 코드 개선 등) -~~~ +``` 이 프로젝트는 GitHub Actions를 통해 실행 파일 빌드 및 배포가 자동화되어 있습니다. GitHub에서 새로운 태그를 발행하면 파이프라인이 자동으로 실행됩니다. -~~~markdown +```markdown 1. 모든 기능 개발과 테스트가 완료된 코드를 main 브랜치에 병합(Merge)합니다. 2. 레포지토리에서 Releases 탭으로 이동하여 Create a new release 버튼을 클릭합니다. 3. Choose a tag 항목을 클릭한후 Find or create a new tag 부분에 버전(v1.0.0)과 같이 새로운 버전 태그를 입력하고 아래 Create new tag를 클릭하여 태그를 생성합니다. @@ -68,8 +69,7 @@ GitHub에서 새로운 태그를 발행하면 파이프라인이 자동으로 잘못된 릴리즈는 서비스에 직접적인 영향을 줄 수 있으니, 반드시 팀의 승인을 받고 신중하게 진행해 주십시오. 7. Actions 탭에 들어가 파이프라인을 확인 후 정상 배포되었다면 App 레포에 develop 브랜치에서 실행 파일을 확인합니다. 8. 만약 실패하였다면 인프라 담당자에게 말해주세요. -~~~ - +``` --- @@ -82,5 +82,5 @@ GitHub에서 새로운 태그를 발행하면 파이프라인이 자동으로 ./dist/ai # 다른 터미널에서 헬스체크 요청 -curl http://localhost:<할당된 포트>/api/v1/health +curl http://localhost:35816/api/v1/health ``` diff --git a/src/agents/sql_agent/nodes.py b/src/agents/sql_agent/nodes.py index dd2e916..b8fce6a 100644 --- a/src/agents/sql_agent/nodes.py +++ b/src/agents/sql_agent/nodes.py @@ -62,7 +62,9 @@ def _load_prompts(self): async def intent_classifier_node(self, state: SqlAgentState) -> SqlAgentState: """사용자 질문의 의도를 분류하는 노드""" - print("--- 0. 의도 분류 중 ---") + print("=" * 60) + print("🔍 [INTENT_CLASSIFIER] 의도 분류 시작") + print("=" * 60) try: llm = await self.llm_provider.get_llm() @@ -73,41 +75,62 @@ async def intent_classifier_node(self, state: SqlAgentState) -> SqlAgentState: "chat_history": state.get('chat_history', []) } + print(f"📝 입력 질문: {input_data['question']}") + print(f"💬 채팅 히스토리: {len(input_data['chat_history'])}개 항목") + if input_data['chat_history']: + for i, chat in enumerate(input_data['chat_history'][-3:]): # 최근 3개만 출력 + print(f" [{i}] {chat}") + chain = self.intent_classifier_prompt | llm | StrOutputParser() intent = await chain.ainvoke(input_data) state['intent'] = intent.strip() - print(f"의도 분류 결과: {state['intent']}") + print(f"✅ 의도 분류 결과: '{state['intent']}'") + print(f"📊 분류된 노드 경로: {'SQL 처리' if state['intent'] == 'SQL' else '일반 응답'}") + print("=" * 60) return state except Exception as e: - print(f"의도 분류 실패: {e}") + print(f"❌ 의도 분류 실패: {e}") + print(f"🔄 기본값 SQL로 설정") # 기본값으로 SQL 처리 state['intent'] = "SQL" + print("=" * 60) return state async def unsupported_question_node(self, state: SqlAgentState) -> SqlAgentState: """SQL과 관련 없는 질문을 처리하는 노드""" - print("--- SQL 관련 없는 질문 ---") + print("=" * 60) + print("🚫 [UNSUPPORTED_QUESTION] SQL 관련 없는 질문 처리") + print("=" * 60) + + print(f"📝 처리된 질문: {state['question']}") + print(f"🔄 의도 분류 결과: {state.get('intent', 'UNKNOWN')}") state['final_response'] = """죄송합니다, 해당 질문에는 답변할 수 없습니다. 저는 데이터베이스 관련 질문만 처리할 수 있습니다. SQL 쿼리나 데이터 분석과 관련된 질문을 해주세요.""" + print(f"✅ 최종 응답 설정 완료") + print("=" * 60) return state async def db_classifier_node(self, state: SqlAgentState) -> SqlAgentState: """데이터베이스를 분류하고 스키마를 가져오는 노드""" - print("--- 0.5. DB 분류 중 ---") + print("=" * 60) + print("🗄️ [DB_CLASSIFIER] 데이터베이스 분류 시작") + print("=" * 60) try: + print(f"📝 분석할 질문: {state['question']}") + # DBMS 프로필과 어노테이션을 함께 조회 available_dbs_with_annotations = await self.database_service.get_databases_with_annotations() if not available_dbs_with_annotations: raise DatabaseConnectionException("사용 가능한 DBMS가 없습니다.") - print(f"--- {len(available_dbs_with_annotations)}개의 DBMS 발견 ---") + print(f"🔍 발견된 DBMS: {len(available_dbs_with_annotations)}개") # 어노테이션 정보를 포함한 DBMS 옵션 생성 db_options = "\n".join([ @@ -115,6 +138,18 @@ async def db_classifier_node(self, state: SqlAgentState) -> SqlAgentState: for db in available_dbs_with_annotations ]) + print("📋 사용 가능한 DBMS 목록:") + for i, db in enumerate(available_dbs_with_annotations): + print(f" [{i+1}] {db['display_name']}") + print(f" 설명: {db['description']}") + print(f" 타입: {db['profile']['type']}") + print(f" 호스트: {db['profile']['host']}:{db['profile']['port']}") + annotation_status = "있음" if (db['annotations'] and db['annotations'].code != "4401") else "없음" + print(f" 어노테이션: {annotation_status}") + + print(f"\n🤖 LLM에게 전달할 DBMS 옵션:") + print(db_options) + # LLM을 사용하여 적절한 DBMS 선택 llm = await self.llm_provider.get_llm() chain = self.db_classifier_prompt | llm | StrOutputParser() @@ -125,37 +160,49 @@ async def db_classifier_node(self, state: SqlAgentState) -> SqlAgentState: }) selected_db_display_name = selected_db_display_name.strip() + print(f"🎯 LLM이 선택한 DBMS: '{selected_db_display_name}'") # 선택된 display_name으로 실제 DBMS 정보 찾기 selected_db_info = None for db in available_dbs_with_annotations: if db['display_name'] == selected_db_display_name: selected_db_info = db + print(f"✅ 정확히 매칭됨: {db['display_name']}") break if not selected_db_info: + print(f"⚠️ 정확한 매칭 실패, 부분 매칭 시도...") # 부분 매칭 시도 for db in available_dbs_with_annotations: if selected_db_display_name in db['display_name'] or db['display_name'] in selected_db_display_name: selected_db_info = db + print(f"✅ 부분 매칭됨: {db['display_name']}") break if not selected_db_info: - print(f"--- 선택된 DBMS를 찾을 수 없음: {selected_db_display_name}, 첫 번째 DBMS 사용 ---") + print(f"❌ 매칭 실패: '{selected_db_display_name}'") + print(f"🔄 첫 번째 DBMS 사용: {available_dbs_with_annotations[0]['display_name']}") selected_db_info = available_dbs_with_annotations[0] state['selected_db'] = selected_db_info['display_name'] state['selected_db_profile'] = selected_db_info['profile'] state['selected_db_annotations'] = selected_db_info['annotations'] - print(f'--- 선택된 DBMS: {selected_db_info["display_name"]} ---') - print(f'--- DBMS 프로필 ID: {selected_db_info["profile"]["id"]} ---') + print(f"📊 최종 선택된 DBMS:") + print(f" 이름: {selected_db_info['display_name']}") + print(f" 프로필 ID: {selected_db_info['profile']['id']}") + print(f" 타입: {selected_db_info['profile']['type']}") + print(f" 연결: {selected_db_info['profile']['host']}:{selected_db_info['profile']['port']}") # 어노테이션 정보를 스키마로 사용 - if selected_db_info['annotations'] and 'data' in selected_db_info['annotations']: - schema_info = self._convert_annotations_to_schema(selected_db_info['annotations']) + annotations = selected_db_info['annotations'] + if annotations and annotations.code != "4401" and annotations.data.databases: + schema_info = self._convert_annotations_to_schema(annotations) state['db_schema'] = schema_info - print(f"--- 어노테이션 기반 스키마 사용 ---") + print(f"✅ 어노테이션 기반 스키마 사용 ({len(annotations.data.databases)}개 DB)") + print(f"📄 스키마 요약:") + for db in annotations.data.databases: + print(f" - {db.db_name}: {len(db.tables)}개 테이블, {len(db.relationships)}개 관계") else: # 어노테이션이 없는 경우 기본 정보로 대체 schema_info = f"DBMS 유형: {selected_db_info['profile']['type']}\n" @@ -163,42 +210,68 @@ async def db_classifier_node(self, state: SqlAgentState) -> SqlAgentState: schema_info += f"포트: {selected_db_info['profile']['port']}\n" schema_info += "상세 스키마 정보가 없습니다. 기본 SQL 구문을 사용하세요." state['db_schema'] = schema_info - print(f"--- 기본 DBMS 정보 사용 ---") + print(f"⚠️ 어노테이션 없음, 기본 DBMS 정보 사용") + print("=" * 60) return state except Exception as e: - print(f"데이터베이스 분류 실패: {e}") - print(f"에러 타입: {type(e).__name__}") - print(f"에러 상세: {str(e)}") + print(f"❌ 데이터베이스 분류 실패: {e}") + print(f"🔍 에러 타입: {type(e).__name__}") + print(f"📝 에러 상세: {str(e)}") + print("=" * 60) # 폴백 없이 에러를 다시 발생시킴 raise e - def _convert_annotations_to_schema(self, annotations: dict) -> str: + def _convert_annotations_to_schema(self, annotations) -> str: """어노테이션 데이터를 스키마 문자열로 변환합니다.""" try: - if not annotations or 'data' not in annotations: - return "어노테이션 스키마 정보가 없습니다." - - # 어노테이션 구조에 따라 스키마 정보 추출 - # 실제 어노테이션 응답 구조를 확인 후 구현 필요 - schema_parts = [] - schema_parts.append("=== 어노테이션 기반 스키마 정보 ===") + # AnnotationResponse 객체인지 확인 + if hasattr(annotations, 'data') and hasattr(annotations, 'code'): + if annotations.code == "4401" or not annotations.data.databases: + return "어노테이션 스키마 정보가 없습니다." + + schema_parts = [] + schema_parts.append(f"=== {annotations.data.dbms_type.upper()} 어노테이션 기반 스키마 정보 ===") + + # 각 데이터베이스별 정보 추출 + for db in annotations.data.databases: + schema_parts.append(f"\n[데이터베이스: {db.db_name}]") + schema_parts.append(f"설명: {db.description}") + + # 테이블 정보 + schema_parts.append(f"\n테이블 ({len(db.tables)}개):") + for table in db.tables: + schema_parts.append(f"\n • {table.table_name}") + schema_parts.append(f" 설명: {table.description}") + schema_parts.append(f" 컬럼 ({len(table.columns)}개):") + + for col in table.columns: + schema_parts.append(f" - {col.column_name} ({col.data_type}): {col.description}") + + # 관계 정보 + if db.relationships: + schema_parts.append(f"\n관계 ({len(db.relationships)}개):") + for rel in db.relationships: + rel_desc = rel.description or "관계 설명 없음" + schema_parts.append(f" • {rel.from_table}({', '.join(rel.from_columns)}) → {rel.to_table}({', '.join(rel.to_columns)})") + schema_parts.append(f" 설명: {rel_desc}") + + return "\n".join(schema_parts) - annotation_data = annotations.get('data', {}) + # 기존 dict 형태 처리 (호환성) + elif isinstance(annotations, dict): + if not annotations or annotations.get('code') == "4401": + return "어노테이션 스키마 정보가 없습니다." + + schema_parts = [] + schema_parts.append("=== 어노테이션 기반 스키마 정보 ===") + schema_parts.append(f"어노테이션 데이터: {str(annotations)[:500]}...") + return "\n".join(schema_parts) - # 어노테이션 데이터가 데이터베이스 정보를 포함하는 경우 - if isinstance(annotation_data, dict): - for key, value in annotation_data.items(): - schema_parts.append(f"{key}: {str(value)[:200]}...") - elif isinstance(annotation_data, list): - for i, item in enumerate(annotation_data): - schema_parts.append(f"항목 {i+1}: {str(item)[:200]}...") else: - schema_parts.append(f"어노테이션 데이터: {str(annotation_data)[:500]}...") - - return "\n".join(schema_parts) + return "어노테이션 스키마 정보가 없습니다." except Exception as e: print(f"어노테이션 변환 중 오류: {e}") @@ -206,14 +279,29 @@ def _convert_annotations_to_schema(self, annotations: dict) -> str: async def sql_generator_node(self, state: SqlAgentState) -> SqlAgentState: """SQL 쿼리를 생성하는 노드""" - print("--- 1. SQL 생성 중 ---") + print("=" * 60) + print("🔧 [SQL_GENERATOR] SQL 쿼리 생성 시작") + print("=" * 60) try: parser = PydanticOutputParser(pydantic_object=SqlQuery) + print(f"📝 분석할 질문: {state['question']}") + print(f"🗄️ 선택된 DB: {state.get('selected_db', 'UNKNOWN')}") + # 에러 피드백 컨텍스트 생성 error_feedback = self._build_error_feedback(state) + if error_feedback: + print(f"⚠️ 이전 에러 피드백:") + print(f" {error_feedback.strip()}") + else: + print(f"✅ 첫 번째 SQL 생성 시도") + + print(f"\n📄 사용할 스키마 정보:") + schema_preview = state['db_schema'][:500] + "..." if len(state['db_schema']) > 500 else state['db_schema'] + print(f" {schema_preview}") + prompt = self.sql_generator_prompt.format( format_instructions=parser.get_format_instructions(), db_schema=state['db_schema'], @@ -222,17 +310,34 @@ async def sql_generator_node(self, state: SqlAgentState) -> SqlAgentState: error_feedback=error_feedback ) + print(f"\n🤖 LLM에게 SQL 생성 요청 중...") llm = await self.llm_provider.get_llm() response = await llm.ainvoke(prompt) + + print(f"📨 LLM 응답 길이: {len(response.content)}자") + print(f"📨 LLM 원본 응답:") + print(f" {response.content[:300]}...") + parsed_query = parser.invoke(response.content) state['sql_query'] = parsed_query.query state['validation_error'] = None state['execution_result'] = None + print(f"\n✅ SQL 쿼리 생성 완료:") + print(f" {parsed_query.query}") + print(f"📊 상태 업데이트:") + print(f" - sql_query: 설정됨") + print(f" - validation_error: 초기화됨") + print(f" - execution_result: 초기화됨") + + print("=" * 60) return state except Exception as e: + print(f"❌ SQL 생성 실패: {e}") + print(f"🔍 에러 타입: {type(e).__name__}") + print("=" * 60) raise ExecutionException(f"SQL 생성 실패: {e}") def _build_error_feedback(self, state: SqlAgentState) -> str: @@ -260,55 +365,99 @@ def _build_error_feedback(self, state: SqlAgentState) -> str: async def sql_validator_node(self, state: SqlAgentState) -> SqlAgentState: """SQL 쿼리의 안전성을 검증하는 노드""" - print("--- 2. SQL 검증 중 ---") + print("=" * 60) + print("🔒 [SQL_VALIDATOR] SQL 안전성 검증 시작") + print("=" * 60) try: - query_words = state['sql_query'].lower().split() + sql_query = state['sql_query'] + print(f"🔍 검증할 SQL 쿼리:") + print(f" {sql_query}") + + query_words = sql_query.lower().split() dangerous_keywords = [ "drop", "delete", "update", "insert", "truncate", "alter", "create", "grant", "revoke" ] + + print(f"🚫 검사할 위험 키워드: {dangerous_keywords}") + found_keywords = [keyword for keyword in dangerous_keywords if keyword in query_words] + current_retry_count = state.get('validation_error_count', 0) + print(f"🔄 현재 검증 재시도 횟수: {current_retry_count}/{MAX_ERROR_COUNT}") + if found_keywords: keyword_str = ', '.join(f"'{k}'" for k in found_keywords) error_msg = f'위험한 키워드 {keyword_str}가 포함되어 있습니다.' state['validation_error'] = error_msg - state['validation_error_count'] = state.get('validation_error_count', 0) + 1 + state['validation_error_count'] = current_retry_count + 1 + + print(f"❌ 검증 실패:") + print(f" 발견된 위험 키워드: {found_keywords}") + print(f" 에러 메시지: {error_msg}") + print(f" 실패 횟수: {state['validation_error_count']}/{MAX_ERROR_COUNT}") if state['validation_error_count'] >= MAX_ERROR_COUNT: + print(f"🚨 최대 재시도 횟수 초과!") raise MaxRetryExceededException( f"SQL 검증 실패가 {MAX_ERROR_COUNT}회 반복됨", MAX_ERROR_COUNT ) + else: + print(f"🔄 SQL 재생성으로 이동") else: state['validation_error'] = None state['validation_error_count'] = 0 + print(f"✅ 검증 성공: 위험한 키워드 없음") + print(f"📊 상태 업데이트:") + print(f" - validation_error: 초기화됨") + print(f" - validation_error_count: 0으로 리셋") + print("=" * 60) return state except MaxRetryExceededException: + print("=" * 60) raise except Exception as e: + print(f"❌ SQL 검증 중 오류 발생: {e}") + print(f"🔍 에러 타입: {type(e).__name__}") + print("=" * 60) raise ValidationException(f"SQL 검증 중 오류 발생: {e}") async def sql_executor_node(self, state: SqlAgentState) -> SqlAgentState: """SQL 쿼리를 실행하는 노드""" - print("--- 3. SQL 실행 중 ---") + print("=" * 60) + print("⚡ [SQL_EXECUTOR] SQL 쿼리 실행 시작") + print("=" * 60) try: + sql_query = state['sql_query'] selected_db = state.get('selected_db', 'default') + print(f"🔍 실행할 SQL 쿼리:") + print(f" {sql_query}") + print(f"🗄️ 대상 데이터베이스: {selected_db}") + # 선택된 DB 프로필에서 실제 DB ID 가져오기 db_profile = state.get('selected_db_profile') if db_profile and 'id' in db_profile: user_db_id = db_profile['id'] - print(f"--- 실행용 DB 프로필 ID: {user_db_id} ---") + print(f"📋 사용할 DB 프로필:") + print(f" - ID: {user_db_id}") + print(f" - 타입: {db_profile.get('type', 'UNKNOWN')}") + print(f" - 호스트: {db_profile.get('host', 'UNKNOWN')}") + print(f" - 포트: {db_profile.get('port', 'UNKNOWN')}") else: user_db_id = 'TEST-USER-DB-12345' # 폴백 - print(f"--- DB 프로필이 없어 테스트 ID 사용: {user_db_id} ---") - + print(f"⚠️ DB 프로필 없음, 테스트 ID 사용: {user_db_id}") + + current_retry_count = state.get('execution_error_count', 0) + print(f"🔄 현재 실행 재시도 횟수: {current_retry_count}/{MAX_ERROR_COUNT}") + + print(f"\n🚀 SQL 실행 중...") result = await self.database_service.execute_query( - state['sql_query'], + sql_query, database_name=selected_db, user_db_id=user_db_id ) @@ -317,6 +466,20 @@ async def sql_executor_node(self, state: SqlAgentState) -> SqlAgentState: state['validation_error_count'] = 0 state['execution_error_count'] = 0 + print(f"✅ SQL 실행 성공!") + print(f"📊 실행 결과:") + if isinstance(result, str) and len(result) > 500: + print(f" {result[:500]}...") + print(f" (총 {len(result)}자, 잘림)") + else: + print(f" {result}") + + print(f"📈 상태 업데이트:") + print(f" - execution_result: 설정됨") + print(f" - validation_error_count: 0으로 리셋") + print(f" - execution_error_count: 0으로 리셋") + + print("=" * 60) return state except Exception as e: @@ -325,27 +488,64 @@ async def sql_executor_node(self, state: SqlAgentState) -> SqlAgentState: state['validation_error_count'] = 0 state['execution_error_count'] = state.get('execution_error_count', 0) + 1 - print(f"⚠️ SQL 실행 실패 ({state['execution_error_count']}/{MAX_ERROR_COUNT}): {error_msg}") + print(f"❌ SQL 실행 실패:") + print(f" 에러 메시지: {error_msg}") + print(f" 실패 횟수: {state['execution_error_count']}/{MAX_ERROR_COUNT}") + print(f" 에러 타입: {type(e).__name__}") + + if state['execution_error_count'] >= MAX_ERROR_COUNT: + print(f"🚨 최대 재시도 횟수 도달!") + else: + print(f"🔄 SQL 재생성으로 이동") + + print(f"📈 상태 업데이트:") + print(f" - execution_result: 에러 메시지 설정") + print(f" - validation_error_count: 0으로 리셋") + print(f" - execution_error_count: {state['execution_error_count']}로 증가") + print("=" * 60) # 실행 실패 시에도 상태를 반환하여 엣지에서 판단하도록 함 return state async def response_synthesizer_node(self, state: SqlAgentState) -> SqlAgentState: """최종 답변을 생성하는 노드""" - print("--- 4. 최종 답변 생성 중 ---") + print("=" * 60) + print("📝 [RESPONSE_SYNTHESIZER] 최종 답변 생성 시작") + print("=" * 60) try: + print(f"📝 원본 질문: {state['question']}") + is_failure = (state.get('validation_error_count', 0) >= MAX_ERROR_COUNT or state.get('execution_error_count', 0) >= MAX_ERROR_COUNT) + print(f"📊 처리 상태 분석:") + print(f" - validation_error_count: {state.get('validation_error_count', 0)}") + print(f" - execution_error_count: {state.get('execution_error_count', 0)}") + print(f" - 최대 재시도 횟수: {MAX_ERROR_COUNT}") + print(f" - 실패 상태: {is_failure}") + if is_failure: context_message = self._build_failure_context(state) + print(f"❌ 실패 컨텍스트 사용:") + print(f" {context_message.strip()}") else: context_message = f""" Successfully executed the SQL query to answer the user's question. SQL Query: {state['sql_query']} SQL Result: {state['execution_result']} + + IMPORTANT: Include the SQL query in your response using markdown code block format: + ```sql + {state['sql_query']} + ``` """ + print(f"✅ 성공 컨텍스트 사용:") + print(f" SQL: {state['sql_query']}") + result_preview = str(state['execution_result']) + if len(result_preview) > 200: + result_preview = result_preview[:200] + "..." + print(f" 결과: {result_preview}") prompt = self.response_synthesizer_prompt.format( question=state['question'], @@ -353,15 +553,27 @@ async def response_synthesizer_node(self, state: SqlAgentState) -> SqlAgentState context_message=context_message ) + print(f"\n🤖 LLM에게 답변 생성 요청 중...") llm = await self.llm_provider.get_llm() response = await llm.ainvoke(prompt) state['final_response'] = response.content + print(f"✅ 최종 답변 생성 완료!") + print(f"📄 생성된 답변 (미리보기):") + response_preview = response.content[:300] + "..." if len(response.content) > 300 else response.content + print(f" {response_preview}") + print(f"📊 답변 길이: {len(response.content)}자") + + print("=" * 60) return state except Exception as e: + print(f"❌ 답변 생성 실패: {e}") + print(f"🔍 에러 타입: {type(e).__name__}") # 최종 답변 생성 실패 시 기본 메시지 제공 state['final_response'] = f"죄송합니다. 답변 생성 중 오류가 발생했습니다: {e}" + print(f"🔄 기본 에러 메시지 설정") + print("=" * 60) return state def _build_failure_context(self, state: SqlAgentState) -> str: diff --git a/src/api/v1/routers/health.py b/src/api/v1/routers/health.py index 1ea2ce0..4c3ba7d 100644 --- a/src/api/v1/routers/health.py +++ b/src/api/v1/routers/health.py @@ -6,6 +6,7 @@ from services.chat.chatbot_service import ChatbotService, get_chatbot_service from services.annotation.annotation_service import AnnotationService, get_annotation_service from services.database.database_service import DatabaseService, get_database_service +from core.providers.llm_provider import get_llm_provider import logging logger = logging.getLogger(__name__) @@ -13,18 +14,48 @@ router = APIRouter() @router.get("/health") -async def root_health_check() -> Dict[str, str]: +async def root_health_check( + database_service: DatabaseService = Depends(get_database_service) +) -> Dict[str, Any]: """ - 루트 헬스체크 엔드포인트, 서버 상태가 정상이면 'ok' 반환합니다. + 루트 헬스체크 엔드포인트, 서버와 백엔드 연결 상태를 확인합니다. Returns: - Dict: 기본 상태 정보 + Dict: 기본 상태 정보와 백엔드 연결 상태 """ - return { - "status": "ok", - "message": "Welcome to the QGenie Chatbot AI!", - "version": "2.0.0" - } + try: + # 백엔드 연결 상태 확인 + backend_healthy = await database_service.health_check() + + # 전체 상태 결정 + overall_status = "healthy" if backend_healthy else "degraded" + + response = { + "status": overall_status, + "message": "Welcome to the QGenie Chatbot AI!", + "version": "2.0.0", + "backend_connection": "connected" if backend_healthy else "disconnected", + "timestamp": __import__("datetime").datetime.now().isoformat() + } + + if not backend_healthy: + response["warning"] = "백엔드 서버 연결이 불안정합니다. 일부 기능이 제한될 수 있습니다." + logger.warning("⚠️ 기본 헬스체크에서 백엔드 연결 실패 감지") + else: + logger.debug("✅ 기본 헬스체크: 백엔드 연결 정상") + + return response + + except Exception as e: + logger.error(f"Basic health check failed: {e}") + return { + "status": "unhealthy", + "message": "QGenie Chatbot AI", + "version": "2.0.0", + "backend_connection": "error", + "error": "헬스체크 중 오류가 발생했습니다", + "timestamp": __import__("datetime").datetime.now().isoformat() + } @router.get("/health/detailed") async def detailed_health_check( @@ -62,9 +93,18 @@ async def detailed_health_check( for service in services_status.values() ) + # ConnectionMonitor 상태 정보 추가 + try: + from core.monitoring.connection_monitor import get_connection_monitor + connection_monitor = get_connection_monitor() + monitor_status = connection_monitor.get_status() + except Exception: + monitor_status = {"error": "ConnectionMonitor 상태를 가져올 수 없습니다"} + return { "status": "healthy" if all_healthy else "partial", "services": services_status, + "connection_monitor": monitor_status, "timestamp": __import__("datetime").datetime.now().isoformat() } @@ -75,3 +115,30 @@ async def detailed_health_check( "error": str(e), "timestamp": __import__("datetime").datetime.now().isoformat() } + +@router.post("/refresh-api-key") +async def refresh_api_key() -> Dict[str, str]: + """ + API 키 캐시를 무효화하여 다음 요청에서 최신 키를 조회하도록 합니다. + + Returns: + Dict: 새로고침 결과 + """ + try: + llm_provider = await get_llm_provider() + await llm_provider.refresh_api_key() + + logger.info("🔄 API 키 캐시가 무효화되었습니다") + return { + "status": "success", + "message": "API 키 캐시가 무효화되었습니다. 다음 요청부터 최신 키를 조회합니다.", + "timestamp": __import__("datetime").datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"API 키 새로고침 실패: {e}") + return { + "status": "error", + "message": f"API 키 새로고침 중 오류가 발생했습니다: {str(e)}", + "timestamp": __import__("datetime").datetime.now().isoformat() + } diff --git a/src/core/clients/api_client.py b/src/core/clients/api_client.py index 48b0f3b..eebfe03 100644 --- a/src/core/clients/api_client.py +++ b/src/core/clients/api_client.py @@ -6,6 +6,8 @@ from pydantic import BaseModel import logging +# ConnectionMonitor import는 순환 import를 피하기 위해 함수 내에서 처리 + # 로깅 설정 logger = logging.getLogger(__name__) @@ -33,6 +35,48 @@ class DBProfileResponse(BaseModel): message: str data: List[DBProfileInfo] +class AnnotationColumn(BaseModel): + """어노테이션 컬럼 정보 모델""" + column_name: str + description: str + data_type: str + +class AnnotationTable(BaseModel): + """어노테이션 테이블 정보 모델""" + table_name: str + description: str + columns: List[AnnotationColumn] + +class AnnotationRelationship(BaseModel): + """어노테이션 관계 정보 모델""" + from_table: str + from_columns: List[str] + to_table: str + to_columns: List[str] + description: Optional[str] = None + +class AnnotationDatabase(BaseModel): + """어노테이션 데이터베이스 정보 모델""" + db_name: str + description: str + tables: List[AnnotationTable] + relationships: List[AnnotationRelationship] + +class AnnotationData(BaseModel): + """어노테이션 데이터 모델""" + dbms_type: str + databases: List[AnnotationDatabase] + annotation_id: str + db_profile_id: str + created_at: str + updated_at: str + +class AnnotationResponse(BaseModel): + """어노테이션 조회 응답 모델""" + code: str + message: str + data: AnnotationData + class QueryExecutionRequest(BaseModel): """쿼리 실행 요청 모델""" user_db_id: str @@ -60,6 +104,7 @@ def __init__(self, base_url: str = "http://localhost:39722"): "Content-Type": "application/json" } self._client: Optional[httpx.AsyncClient] = None + self._connection_monitor = None # 지연 초기화 async def _get_client(self) -> httpx.AsyncClient: """재사용 가능한 HTTP 클라이언트를 반환합니다.""" @@ -67,6 +112,13 @@ async def _get_client(self) -> httpx.AsyncClient: self._client = httpx.AsyncClient(timeout=self.timeout) return self._client + def _get_connection_monitor(self): + """ConnectionMonitor 인스턴스를 지연 로딩으로 가져옵니다.""" + if self._connection_monitor is None: + from core.monitoring.connection_monitor import get_connection_monitor + self._connection_monitor = get_connection_monitor() + return self._connection_monitor + async def close(self): """HTTP 클라이언트 연결을 닫습니다.""" if self._client and not self._client.is_closed: @@ -89,37 +141,64 @@ async def get_db_profiles(self) -> List[DBProfileInfo]: profiles = [DBProfileInfo(**profile) for profile in data.get("data", [])] logger.info(f"Successfully fetched {len(profiles)} DB profiles") + + # 연결 복구 확인 + monitor = self._get_connection_monitor() + await monitor.check_api_call_recovery("DB 프로필 조회") + return profiles except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") + monitor = self._get_connection_monitor() + monitor.mark_api_call_failure("DB 프로필 조회") raise except httpx.RequestError as e: logger.error(f"Request error occurred: {e}") + monitor = self._get_connection_monitor() + monitor.mark_api_call_failure("DB 프로필 조회") raise except Exception as e: logger.error(f"Unexpected error: {e}") + monitor = self._get_connection_monitor() + monitor.mark_api_call_failure("DB 프로필 조회") raise - async def get_db_annotations(self, db_profile_id: str) -> Dict[str, Any]: + async def get_db_annotations(self, db_profile_id: str) -> AnnotationResponse: """특정 DBMS의 어노테이션을 조회합니다.""" try: client = await self._get_client() response = await client.get( - f"{self.base_url}/api/annotations/find/db/{db_profile_id}", + f"{self.base_url}/api/annotations/find/hierarchical/{db_profile_id}", headers=self.headers ) response.raise_for_status() data = response.json() + + # 응답을 AnnotationResponse 모델로 파싱 + annotation_response = AnnotationResponse(**data) logger.info(f"Successfully fetched annotations for DB profile: {db_profile_id}") - return data + return annotation_response except httpx.HTTPStatusError as e: if e.response.status_code == 404: # 404는 어노테이션이 없는 정상적인 상황 logger.info(f"No annotations found for DB profile {db_profile_id}: {e.response.text}") - return {"code": "4401", "message": "어노테이션이 없습니다", "data": []} + # 빈 어노테이션 응답 생성 + empty_annotation = AnnotationResponse( + code="4401", + message="어노테이션이 없습니다", + data=AnnotationData( + dbms_type="unknown", + databases=[], + annotation_id="", + db_profile_id=db_profile_id, + created_at="", + updated_at="" + ) + ) + return empty_annotation else: logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") raise @@ -256,9 +335,21 @@ async def health_check(self) -> bool: f"{self.base_url}/health", timeout=httpx.Timeout(5.0) ) - return response.status_code == 200 + is_healthy = response.status_code == 200 + + monitor = self._get_connection_monitor() + if is_healthy: + # 헬스체크 성공 시 연결 복구 확인 + await monitor.check_api_call_recovery("헬스체크") + else: + # 헬스체크 실패 시 상태 업데이트 + monitor.mark_api_call_failure("헬스체크") + + return is_healthy except Exception as e: logger.error(f"Health check failed: {e}") + monitor = self._get_connection_monitor() + monitor.mark_api_call_failure("헬스체크") return False async def get_openai_api_key(self) -> str: diff --git a/src/core/monitoring/__init__.py b/src/core/monitoring/__init__.py new file mode 100644 index 0000000..53c656b --- /dev/null +++ b/src/core/monitoring/__init__.py @@ -0,0 +1 @@ +# src/core/monitoring/__init__.py diff --git a/src/core/monitoring/connection_monitor.py b/src/core/monitoring/connection_monitor.py new file mode 100644 index 0000000..7425408 --- /dev/null +++ b/src/core/monitoring/connection_monitor.py @@ -0,0 +1,199 @@ +# src/core/monitoring/connection_monitor.py + +import asyncio +import logging +from typing import Optional +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +class ConnectionMonitor: + """백엔드 연결 상태를 모니터링하는 클래스""" + + def __init__(self): + self._last_connection_status: Optional[bool] = None # 이전 연결 상태 + self._initial_connection_failed: bool = False # 초기 연결 실패 여부 + self._connection_recovered: bool = False # 연결 복구 완료 여부 + self._monitoring_enabled: bool = False # 모니터링 활성화 여부 + self._monitoring_task: Optional[asyncio.Task] = None # 모니터링 태스크 + self._monitoring_interval: int = 60 # 모니터링 간격 (초) + self._last_success_time: Optional[datetime] = None # 마지막 성공 시간 + + def mark_initial_failure(self): + """초기 연결 실패를 표시""" + self._initial_connection_failed = True + self._last_connection_status = False + logger.info("🔄 ConnectionMonitor: 초기 연결 실패 상태로 설정됨") + + def mark_initial_success(self): + """초기 연결 성공을 표시""" + self._initial_connection_failed = False + self._last_connection_status = True + self._last_success_time = datetime.now() + logger.info("✅ ConnectionMonitor: 초기 연결 성공 상태로 설정됨") + + async def check_api_call_recovery(self, operation_name: str = "API 호출") -> bool: + """ + API 호출 성공 시 연결 복구 여부를 확인하고 로그를 출력합니다. + + Args: + operation_name: 수행된 작업 이름 + + Returns: + bool: 연결이 복구되었는지 여부 + """ + current_time = datetime.now() + + # 이전에 연결이 실패했고, 아직 복구 로그를 출력하지 않은 경우 + if (self._initial_connection_failed or self._last_connection_status is False) and not self._connection_recovered: + self._connection_recovered = True + self._last_connection_status = True + self._last_success_time = current_time + + # 복구 시간 계산 + if hasattr(self, '_failure_start_time'): + downtime = current_time - self._failure_start_time + logger.info(f"🎉 백엔드 서버 연결이 복구되었습니다! (다운타임: {downtime}) - {operation_name} 성공") + else: + logger.info(f"🎉 백엔드 서버 연결이 복구되었습니다! - {operation_name} 성공") + + # 연결이 복구되면 모니터링 중지 + if self._monitoring_enabled: + logger.info("✅ 연결이 복구되어 백그라운드 모니터링을 중지합니다.") + await self.stop_monitoring() + + return True + + # 정상 상태에서의 성공적인 호출 + self._last_success_time = current_time + self._last_connection_status = True + return False + + def mark_api_call_failure(self, operation_name: str = "API 호출"): + """ + API 호출 실패 시 연결 상태를 업데이트합니다. + + Args: + operation_name: 실패한 작업 이름 + """ + current_time = datetime.now() + + # 이전에 연결이 성공했던 경우에만 실패 로그 출력 + if self._last_connection_status is True: + logger.warning(f"⚠️ 백엔드 서버 연결 실패 감지 - {operation_name} 실패") + self._failure_start_time = current_time + + self._last_connection_status = False + self._connection_recovered = False + + async def start_monitoring(self, api_client, interval: int = 60): + """ + 주기적인 연결 상태 모니터링을 시작합니다. + + Args: + api_client: APIClient 인스턴스 + interval: 모니터링 간격 (초) + """ + if self._monitoring_enabled: + logger.warning("ConnectionMonitor: 모니터링이 이미 실행 중입니다") + return + + self._monitoring_enabled = True + self._monitoring_interval = interval + + logger.info(f"🔍 ConnectionMonitor: 백엔드 연결 모니터링 시작 (간격: {interval}초)") + + self._monitoring_task = asyncio.create_task( + self._monitoring_loop(api_client) + ) + + async def stop_monitoring(self): + """연결 상태 모니터링을 중지합니다.""" + if not self._monitoring_enabled: + return + + self._monitoring_enabled = False + + if self._monitoring_task and not self._monitoring_task.done(): + self._monitoring_task.cancel() + try: + await self._monitoring_task + except asyncio.CancelledError: + pass + + logger.info("🛑 ConnectionMonitor: 백엔드 연결 모니터링 중지") + + async def _monitoring_loop(self, api_client): + """모니터링 루프 실행""" + consecutive_failures = 0 + + while self._monitoring_enabled: + try: + # 헬스체크 수행 + current_status = await api_client.health_check() + current_time = datetime.now() + + # 상태 변화 감지 + if self._last_connection_status is not None: + if not self._last_connection_status and current_status: + # 연결 실패 → 성공으로 변화 + if hasattr(self, '_failure_start_time'): + downtime = current_time - self._failure_start_time + logger.info(f"🎉 백엔드 서버 연결이 복구되었습니다! (다운타임: {downtime}) - 헬스체크 성공") + else: + logger.info("🎉 백엔드 서버 연결이 복구되었습니다! - 헬스체크 성공") + + self._connection_recovered = True + consecutive_failures = 0 + + # 연결이 복구되면 모니터링 중지 + logger.info("✅ 연결이 복구되어 백그라운드 모니터링을 중지합니다.") + self._monitoring_enabled = False + break + + elif self._last_connection_status and not current_status: + # 연결 성공 → 실패로 변화 + logger.warning("⚠️ 백엔드 서버 연결이 끊어졌습니다! - 헬스체크 실패") + self._failure_start_time = current_time + self._connection_recovered = False + consecutive_failures = 1 + + # 연결 상태 업데이트 + if current_status: + self._last_success_time = current_time + consecutive_failures = 0 + else: + consecutive_failures += 1 + + self._last_connection_status = current_status + + # 연속 실패 시 경고 + if consecutive_failures >= 3: + logger.error(f"🚨 백엔드 서버 연결이 {consecutive_failures * self._monitoring_interval}초 동안 실패 중입니다") + + # 모니터링 간격만큼 대기 + await asyncio.sleep(self._monitoring_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"ConnectionMonitor: 모니터링 중 오류 발생: {e}") + await asyncio.sleep(self._monitoring_interval) + + def get_status(self) -> dict: + """현재 연결 상태 정보를 반환합니다.""" + return { + "last_connection_status": self._last_connection_status, + "initial_connection_failed": self._initial_connection_failed, + "connection_recovered": self._connection_recovered, + "monitoring_enabled": self._monitoring_enabled, + "last_success_time": self._last_success_time.isoformat() if self._last_success_time else None, + "monitoring_interval": self._monitoring_interval + } + +# 싱글톤 인스턴스 +_connection_monitor = ConnectionMonitor() + +def get_connection_monitor() -> ConnectionMonitor: + """ConnectionMonitor 싱글톤 인스턴스를 반환합니다.""" + return _connection_monitor diff --git a/src/core/providers/llm_provider.py b/src/core/providers/llm_provider.py index 0980a34..ea5d85e 100644 --- a/src/core/providers/llm_provider.py +++ b/src/core/providers/llm_provider.py @@ -3,6 +3,7 @@ import os import asyncio import logging +import time from typing import Optional from langchain_openai import ChatOpenAI from core.clients.api_client import get_api_client @@ -18,57 +19,80 @@ class LLMProvider: def __init__(self, model_name: str = "gpt-4o-mini", temperature: float = 0): self.model_name = model_name self.temperature = temperature - self._llm: Optional[ChatOpenAI] = None - self._api_key: Optional[str] = None self._api_client = None self._initialization_attempted: bool = False self._initialization_failed: bool = False + # 짧은 캐싱을 위한 변수들 (성능 최적화) + self._cached_api_key: Optional[str] = None + self._api_key_cache_time: float = 0 + self._api_key_cache_duration: float = 30.0 # 30초 캐싱 async def _load_api_key(self) -> str: - """백엔드에서 OpenAI API 키를 로드합니다.""" + """백엔드에서 OpenAI API 키를 로드합니다. 짧은 시간 캐싱으로 성능 최적화.""" try: - if self._api_key is None: - if self._api_client is None: - self._api_client = await get_api_client() - - self._api_key = await self._api_client.get_openai_api_key() - return self._api_key + current_time = time.time() + + # 캐시된 키가 있고 아직 유효한 경우 + if (self._cached_api_key and + current_time - self._api_key_cache_time < self._api_key_cache_duration): + logger.debug("📋 캐시된 API 키를 사용합니다") + return self._cached_api_key + + # 캐시가 만료되었거나 없는 경우 새로 조회 + if self._api_client is None: + self._api_client = await get_api_client() + + api_key = await self._api_client.get_openai_api_key() + + # 캐시 업데이트 + self._cached_api_key = api_key + self._api_key_cache_time = current_time + + logger.debug("🔄 백엔드에서 최신 OpenAI API 키를 조회하고 캐시했습니다") + return api_key except Exception as e: + # 실패 시 캐시 무효화 + self._cached_api_key = None + self._api_key_cache_time = 0 logger.error(f"Failed to fetch API key from backend: {e}") raise ValueError("백엔드에서 OpenAI API 키를 가져올 수 없습니다. 백엔드 서버를 확인해주세요.") async def get_llm(self) -> ChatOpenAI: """ ChatOpenAI 인스턴스를 반환합니다. - 지연 초기화를 통해 BE 서버 연결이 실패해도 재시도합니다. + 매번 최신 API 키로 새로운 인스턴스를 생성합니다. """ - if self._llm is None: - # 이전에 초기화를 시도했고 실패했다면 재시도 + # 이전에 초기화를 시도했고 실패했다면 재시도 + if self._initialization_failed: + logger.info("🔄 LLM 초기화 재시도 중...") + self._initialization_failed = False + self._initialization_attempted = False + + try: + self._initialization_attempted = True + llm = await self._create_llm() + + # 연결 복구 감지 if self._initialization_failed: - logger.info("🔄 LLM 초기화 재시도 중...") - self._initialization_failed = False - self._initialization_attempted = False + logger.info("🎉 LLMProvider: 백엔드 연결이 복구되어 LLM 초기화가 성공했습니다!") - try: - self._initialization_attempted = True - self._llm = await self._create_llm() - self._initialization_failed = False - logger.info("✅ LLM 초기화 성공") - - except Exception as e: - self._initialization_failed = True - logger.error(f"❌ LLM 초기화 실패: {e}") - raise RuntimeError(f"LLM을 초기화할 수 없습니다. 백엔드 서버가 실행 중인지 확인해주세요: {e}") - - return self._llm + self._initialization_failed = False + logger.debug("✅ LLM 인스턴스 생성 성공 (최신 API 키 사용)") + + return llm + + except Exception as e: + self._initialization_failed = True + logger.error(f"❌ LLM 초기화 실패: {e}") + raise RuntimeError(f"LLM을 초기화할 수 없습니다. 백엔드 서버가 실행 중인지 확인해주세요: {e}") async def _create_llm(self) -> ChatOpenAI: - """ChatOpenAI 인스턴스를 생성합니다.""" + """ChatOpenAI 인스턴스를 생성합니다. 매번 최신 API 키를 사용합니다.""" try: - # API 키를 비동기적으로 로드 + # API 키를 비동기적으로 로드 (매번 최신 키 조회) api_key = await self._load_api_key() - logger.info("✅ 백엔드에서 OpenAI API 키를 성공적으로 가져왔습니다") + logger.debug("✅ 백엔드에서 최신 OpenAI API 키를 성공적으로 가져왔습니다") llm = ChatOpenAI( model=self.model_name, @@ -81,19 +105,19 @@ async def _create_llm(self) -> ChatOpenAI: raise RuntimeError(f"LLM 인스턴스 생성 실패: {e}") def update_model(self, model_name: str, temperature: float = None): - """모델 설정을 업데이트하고 인스턴스를 재생성합니다.""" + """모델 설정을 업데이트합니다. 다음 get_llm() 호출시 새 설정이 적용됩니다.""" self.model_name = model_name if temperature is not None: self.temperature = temperature - self._llm = None # 다음 호출 시 재생성되도록 함 + logger.info(f"LLM 모델 설정 변경: {model_name}, temperature: {temperature}") async def refresh_api_key(self): - """API 키를 새로고침합니다.""" - self._api_key = None - self._llm = None # LLM 인스턴스도 재생성 + """API 키 캐시를 무효화하여 다음 요청에서 최신 키를 조회하도록 합니다.""" + self._cached_api_key = None + self._api_key_cache_time = 0 self._initialization_attempted = False self._initialization_failed = False - logger.info("API key refreshed") + logger.info("🔄 API 키 캐시 무효화 완료 (다음 요청부터 최신 키 조회)") async def test_connection(self) -> bool: """LLM 연결을 테스트합니다.""" diff --git a/src/main.py b/src/main.py index 672f01d..923f248 100644 --- a/src/main.py +++ b/src/main.py @@ -18,6 +18,10 @@ async def lifespan(app: FastAPI): """애플리케이션 라이프사이클 관리""" logger.info("QGenie AI Chatbot 시작 중...") + # ConnectionMonitor 초기화 + from core.monitoring.connection_monitor import get_connection_monitor + connection_monitor = get_connection_monitor() + # 시작 시 BE 서버 연결 체크 try: from core.clients.api_client import get_api_client @@ -26,20 +30,40 @@ async def lifespan(app: FastAPI): # BE 서버 상태 확인 if await api_client.health_check(): logger.info("✅ 백엔드 서버 연결 성공") + connection_monitor.mark_initial_success() else: logger.warning("⚠️ 백엔드 서버에 연결할 수 없습니다. 첫 요청 시 연결을 재시도합니다.") + connection_monitor.mark_initial_failure() except Exception as e: logger.warning(f"⚠️ 백엔드 서버 초기 연결 실패: {e}") logger.info("🔄 서비스는 지연 초기화 모드로 시작됩니다.") + connection_monitor.mark_initial_failure() try: + # 선택적으로 백그라운드 모니터링 시작 (환경변수로 제어 가능) + import os + if os.getenv("ENABLE_CONNECTION_MONITORING", "false").lower() == "true": + # 초기 연결이 실패한 경우에만 모니터링 시작 + if connection_monitor._initial_connection_failed: + monitoring_interval = int(os.getenv("MONITORING_INTERVAL", "10")) + logger.info(f"🔍 백엔드 연결 모니터링 활성화 (간격: {monitoring_interval}초)") + await connection_monitor.start_monitoring(api_client, monitoring_interval) + else: + logger.info("✅ 초기 연결이 성공했으므로 모니터링을 시작하지 않습니다.") + logger.info("애플리케이션 초기화 완료") yield finally: # 종료 시 정리 작업 logger.info("애플리케이션 종료 중...") + # 모니터링 중지 + try: + await connection_monitor.stop_monitoring() + except Exception as e: + logger.error(f"연결 모니터링 중지 실패: {e}") + # API 클라이언트 정리 try: from core.clients.api_client import get_api_client @@ -81,18 +105,52 @@ async def lifespan(app: FastAPI): # 루트 엔드포인트 @app.get("/") async def root(): - """루트 엔드포인트 - 기본 상태 확인""" - return { - "status": "ok", - "message": "Welcome to the QGenie Chatbot AI! (Refactored)", - "version": "2.0.0", - "endpoints": { - "chat": "/api/v1/chat", - "annotator": "/api/v1/annotator", - "health": "/api/v1/health", - "detailed_health": "/api/v1/health/detailed" + """루트 엔드포인트 - 기본 상태 확인 (백엔드 연결 확인 포함)""" + try: + # 백엔드 연결 상태 확인 + from services.database.database_service import get_database_service + database_service = await get_database_service() + backend_healthy = await database_service.health_check() + + overall_status = "healthy" if backend_healthy else "degraded" + + response = { + "status": overall_status, + "message": "Welcome to the QGenie Chatbot AI! (Refactored)", + "version": "2.0.0", + "backend_connection": "connected" if backend_healthy else "disconnected", + "endpoints": { + "chat": "/api/v1/chat", + "annotator": "/api/v1/annotator", + "health": "/api/v1/health", + "detailed_health": "/api/v1/health/detailed", + "refresh_api_key": "/api/v1/health/refresh-api-key" + }, + "timestamp": __import__("datetime").datetime.now().isoformat() + } + + if not backend_healthy: + response["warning"] = "백엔드 서버 연결이 불안정합니다. 일부 기능이 제한될 수 있습니다." + + return response + + except Exception as e: + logger.error(f"Root endpoint health check failed: {e}") + return { + "status": "degraded", + "message": "Welcome to the QGenie Chatbot AI! (Refactored)", + "version": "2.0.0", + "backend_connection": "error", + "endpoints": { + "chat": "/api/v1/chat", + "annotator": "/api/v1/annotator", + "health": "/api/v1/health", + "detailed_health": "/api/v1/health/detailed", + "refresh_api_key": "/api/v1/health/refresh-api-key" + }, + "warning": "백엔드 연결 상태를 확인할 수 없습니다.", + "timestamp": __import__("datetime").datetime.now().isoformat() } - } if __name__ == "__main__": import uvicorn diff --git a/src/prompts/v1/sql_agent/response_synthesizer.yaml b/src/prompts/v1/sql_agent/response_synthesizer.yaml index 2fd970c..d5984a0 100644 --- a/src/prompts/v1/sql_agent/response_synthesizer.yaml +++ b/src/prompts/v1/sql_agent/response_synthesizer.yaml @@ -21,6 +21,11 @@ template: | - Do not just show the raw data from the SQL result. - Explain what the data means in relation to the user's question. - Present the answer in a natural, conversational, and polite Korean. + - Include the SQL query used to answer the question in a code block format like this: + ```sql + SELECT * FROM table; + ``` + - Format your response as: [Korean explanation] + [SQL query in code block] - If the process failed: - Apologize for the inconvenience. - Explain the reason for the failure in simple, non-technical terms. diff --git a/src/services/database/database_service.py b/src/services/database/database_service.py index 752f8e3..04861e4 100644 --- a/src/services/database/database_service.py +++ b/src/services/database/database_service.py @@ -2,7 +2,7 @@ import asyncio from typing import List, Optional, Dict, Any -from core.clients.api_client import APIClient, DatabaseInfo, DBProfileInfo, get_api_client +from core.clients.api_client import APIClient, DatabaseInfo, DBProfileInfo, AnnotationResponse, get_api_client import logging logger = logging.getLogger(__name__) @@ -16,7 +16,7 @@ class DatabaseService: def __init__(self, api_client: APIClient = None): self.api_client = api_client self._cached_db_profiles: Optional[List[DBProfileInfo]] = None - self._cached_annotations: Dict[str, Dict[str, Any]] = {} + self._cached_annotations: Dict[str, AnnotationResponse] = {} # 호환성을 위해 유지하지만 더 이상 사용하지 않음 self._cached_databases: Optional[List[DatabaseInfo]] = None self._cached_schemas: Dict[str, str] = {} @@ -34,23 +34,14 @@ async def get_available_databases(self) -> List[DatabaseInfo]: """ [DEPRECATED] 사용 가능한 데이터베이스 목록을 가져옵니다. 대신 get_databases_with_annotations()를 사용하세요. + + APIClient의 동일한 메서드로 위임합니다. """ logger.warning("get_available_databases()는 deprecated입니다. get_databases_with_annotations()를 사용하세요.") - # DBMS 프로필 기반으로 DatabaseInfo 형태로 변환하여 호환성 유지 try: - profiles = await self.get_db_profiles() - databases = [] - - for profile in profiles: - db_info = DatabaseInfo( - connection_name=f"{profile.type}_{profile.host}_{profile.port}", - database_name=profile.view_name or f"{profile.type}_db", - description=f"{profile.type} 데이터베이스 ({profile.host}:{profile.port})" - ) - databases.append(db_info) - - return databases + api_client = await self._get_api_client() + return await api_client.get_available_databases() except Exception as e: logger.error(f"Failed to fetch databases: {e}") @@ -93,10 +84,48 @@ async def execute_query(self, sql_query: str, database_name: str = None, user_db # 응답 데이터 형태에 따라 다른 메시지 반환 if hasattr(response.data, 'columns') and hasattr(response.data, 'data'): - # 쿼리 결과 데이터가 있는 경우 - row_count = len(response.data.data) - col_count = len(response.data.columns) - return f"쿼리가 성공적으로 실행되었습니다. {row_count}개 행, {col_count}개 컬럼의 결과를 반환했습니다." + # 쿼리 결과 데이터가 있는 경우 - 실제 데이터를 포함하여 반환 + columns = response.data.columns + data_rows = response.data.data + + # 디버깅: 응답 데이터 구조 확인 + logger.info(f"🔍 DB 응답 구조 - 컬럼: {columns}") + logger.info(f"🔍 DB 응답 구조 - {len(data_rows)}개 행, 첫 번째 행 타입: {type(data_rows[0]) if data_rows else 'N/A'}") + + # 테이블 형태로 결과 포매팅 + result_text = f"쿼리 실행 결과 ({len(data_rows)}개 행, {len(columns)}개 컬럼):\n\n" + + # 컬럼 헤더 추가 (각 컬럼을 15자로 고정폭 정렬) + col_width = 15 + header = " | ".join(col.ljust(col_width)[:col_width] for col in columns) + result_text += header + "\n" + result_text += "-" * len(header) + "\n" + + # 데이터 행 추가 (최대 100행까지만 표시) + max_rows = min(100, len(data_rows)) + for i in range(max_rows): + row = data_rows[i] + # 디버깅: 첫 번째 행만 로그 출력 + if i == 0: + logger.info(f" 첫 번째 행 상세: {row}") + + # 행이 딕셔너리 형태인 경우 (백엔드에서 Dict[str, Any] 형태로 반환) + if isinstance(row, dict): + # 컬럼 순서대로 값을 추출하고 고정폭으로 정렬 + row_values = [str(row.get(col, "NULL")) if row.get(col) is not None else "NULL" for col in columns] + row_text = " | ".join(val.ljust(col_width)[:col_width] for val in row_values) + else: + # 행이 리스트 형태인 경우 (기존 로직) + row_values = [str(cell) if cell is not None else "NULL" for cell in row] + row_text = " | ".join(val.ljust(col_width)[:col_width] for val in row_values) + + result_text += row_text + "\n" + + # 행이 잘렸다면 표시 + if len(data_rows) > max_rows: + result_text += f"\n... ({len(data_rows) - max_rows}개 행 더 있음)" + + return result_text else: # 일반적인 성공 메시지 return "쿼리가 성공적으로 실행되었습니다." @@ -135,6 +164,10 @@ async def get_db_profiles(self) -> List[DBProfileInfo]: self._connection_failed = False logger.info(f"✅ DB 프로필 조회 성공: {len(self._cached_db_profiles)}개") + # 연결 복구 감지 (이미 APIClient에서 처리되지만 추가 로그) + if self._connection_failed: + logger.info("🎉 DatabaseService: 백엔드 연결이 복구되어 DB 프로필 조회가 성공했습니다!") + except Exception as e: self._connection_failed = True logger.error(f"❌ DB 프로필 조회 실패: {e}") @@ -142,7 +175,7 @@ async def get_db_profiles(self) -> List[DBProfileInfo]: return self._cached_db_profiles - async def get_db_annotations(self, db_profile_id: str) -> Dict[str, Any]: + async def get_db_annotations(self, db_profile_id: str) -> AnnotationResponse: """특정 DBMS의 어노테이션을 조회합니다.""" try: if db_profile_id not in self._cached_annotations: @@ -150,7 +183,7 @@ async def get_db_annotations(self, db_profile_id: str) -> Dict[str, Any]: annotations = await api_client.get_db_annotations(db_profile_id) self._cached_annotations[db_profile_id] = annotations - if annotations.get("code") == "4401": + if annotations.code == "4401": logger.info(f"No annotations available for DB profile: {db_profile_id}") else: logger.info(f"Cached annotations for DB profile: {db_profile_id}") @@ -160,7 +193,20 @@ async def get_db_annotations(self, db_profile_id: str) -> Dict[str, Any]: except Exception as e: logger.error(f"Failed to fetch annotations for {db_profile_id}: {e}") # 어노테이션이 없어도 기본 정보는 반환하도록 변경 - return {"code": "4401", "message": "어노테이션이 없습니다", "data": []} + from core.clients.api_client import AnnotationResponse, AnnotationData + empty_annotation = AnnotationResponse( + code="4401", + message="어노테이션이 없습니다", + data=AnnotationData( + dbms_type="unknown", + databases=[], + annotation_id="", + db_profile_id=db_profile_id, + created_at="", + updated_at="" + ) + ) + return empty_annotation async def get_databases_with_annotations(self) -> List[Dict[str, Any]]: """DB 프로필과 어노테이션을 함께 조회합니다.""" @@ -184,7 +230,7 @@ async def get_databases_with_annotations(self) -> List[Dict[str, Any]]: logger.error(f"Failed to get databases with annotations: {e}") raise RuntimeError(f"어노테이션이 포함된 데이터베이스 목록을 가져올 수 없습니다: {e}") - def _generate_db_description(self, profile: DBProfileInfo, annotations: Dict[str, Any]) -> str: + def _generate_db_description(self, profile: DBProfileInfo, annotations: AnnotationResponse) -> str: """DB 프로필과 어노테이션을 기반으로 설명을 생성합니다.""" try: # 기본 설명 @@ -196,9 +242,11 @@ def _generate_db_description(self, profile: DBProfileInfo, annotations: Dict[str base_desc += f" ({profile.host}:{profile.port})" # 어노테이션 정보 확인 - if annotations and annotations.get("code") != "4401" and "data" in annotations: + if annotations and annotations.code != "4401" and annotations.data.databases: # 실제 어노테이션이 있는 경우 - base_desc += " - 어노테이션 정보 포함" + db_count = len(annotations.data.databases) + total_tables = sum(len(db.tables) for db in annotations.data.databases) + base_desc += f" - {db_count}개 DB, {total_tables}개 테이블 어노테이션 포함" return base_desc diff --git a/test_services.py b/test_services.py index 2fb2b4a..bed9199 100644 --- a/test_services.py +++ b/test_services.py @@ -88,7 +88,12 @@ async def test_db_annotation_api(): print(f"📝 첫 번째 DB 정보:") print(f" - Display Name: {first_db['display_name']}") print(f" - Description: {first_db['description']}") - print(f" - Has Annotations: {'data' in first_db['annotations']}") + annotations = first_db['annotations'] + has_annotations = annotations and annotations.code != "4401" and annotations.data.databases + print(f" - Has Annotations: {has_annotations}") + if has_annotations: + print(f" - DB Count: {len(annotations.data.databases)}") + print(f" - DBMS Type: {annotations.data.dbms_type}") except Exception as e: print(f"⚠️ 통합 조회 실패: {e}")