1+ """
2+ Finance chain for handling financial expert knowledge queries.
3+ """
4+
5+ from langchain_openai import ChatOpenAI , OpenAIEmbeddings
6+ from langchain_core .output_parsers import StrOutputParser
7+ from langchain_core .prompts import PromptTemplate
8+ from langchain_core .runnables import RunnablePassthrough , RunnableLambda
9+ from langchain_milvus import Milvus
10+ from typing import Dict , Any , Optional
11+ import os
12+
13+
14+ class FinanceChain :
15+ """
16+ Handles queries about financial expert knowledge using vector database search.
17+ """
18+
19+ def __init__ (self , model : str = "gpt-4o-mini" , search_k : int = 3 ):
20+ """
21+ Initialize the finance chain.
22+
23+ Args:
24+ model: OpenAI model to use for response generation
25+ search_k: Number of search results to retrieve from vector DB
26+ """
27+ self .model = model
28+ self .search_k = search_k
29+ self .use_milvus = False
30+ self .vectorstore = None
31+ self ._setup_milvus ()
32+ self ._setup_chain ()
33+
34+ def _setup_milvus (self ):
35+ """Set up Milvus vector database connection."""
36+ try :
37+ embeddings = OpenAIEmbeddings ()
38+ milvus_uri = os .getenv ("MILVUS_URI" , "http://localhost:19530" )
39+ print (f"Milvus 연결 시도: { milvus_uri } " )
40+
41+ self .vectorstore = Milvus (
42+ embedding_function = embeddings ,
43+ connection_args = {
44+ "uri" : milvus_uri ,
45+ },
46+ collection_name = "coindesk_articles" ,
47+ )
48+ print ("Milvus 연결 성공" )
49+ self .use_milvus = True
50+ except Exception as e :
51+ print (f"Milvus 연결 실패: { e } " )
52+ print ("주의: Milvus 연결 실패로 벡터 검색 기능을 사용할 수 없습니다." )
53+ self .use_milvus = False
54+ self .vectorstore = None
55+
56+ def _setup_chain (self ):
57+ """Set up the finance chain with retriever, prompt and model."""
58+
59+ # Finance prompt template
60+ self .prompt = PromptTemplate (
61+ template = """You are an expert in finance. \
62+ Always answer questions starting with "전문가에 따르면..". \
63+
64+ Previous conversation:
65+ {chat_history}
66+
67+ Respond to the following question based the context, statistical information, and previous conversation when possible:
68+ Context: {context}
69+ Question: {question}
70+ Answer:""" ,
71+ input_variables = ["context" , "question" , "chat_history" ]
72+ )
73+
74+ # Create the chain based on Milvus availability
75+ if self .use_milvus and self .vectorstore :
76+ self .retriever = self .vectorstore .as_retriever (search_kwargs = {"k" : self .search_k })
77+ context_chain = RunnableLambda (lambda x : x ["question" ]) | self .retriever | self ._format_docs
78+
79+ self .chain = (
80+ {
81+ "context" : context_chain ,
82+ "question" : RunnablePassthrough (),
83+ "chat_history" : lambda x : x .get ("chat_history" , "" )
84+ }
85+ | self .prompt
86+ | ChatOpenAI (model = self .model )
87+ | StrOutputParser ()
88+ )
89+ else :
90+ # Fallback chain when Milvus is not available
91+ fallback_prompt = PromptTemplate .from_template (
92+ """전문가에 따르면, 현재 벡터 데이터베이스에 연결할 수 없어 전문 지식을 제공하기 어렵습니다.
93+ 시스템 관리자에게 문의해주세요.
94+
95+ Previous conversation:
96+ {chat_history}
97+
98+ Question: {question}
99+ """
100+ )
101+
102+ self .chain = (
103+ fallback_prompt
104+ | ChatOpenAI (model = self .model )
105+ | StrOutputParser ()
106+ )
107+
108+ def _format_docs (self , docs ):
109+ """Format retrieved documents for use in the prompt."""
110+ return "\n \n " .join (doc .page_content for doc in docs )
111+
112+ def process (self , question : str , chat_history : str = "" ) -> str :
113+ """
114+ Process a finance-related question.
115+
116+ Args:
117+ question: The user's question
118+ chat_history: Previous conversation history
119+
120+ Returns:
121+ Response based on financial expert knowledge
122+ """
123+ return self .chain .invoke ({
124+ "question" : question ,
125+ "chat_history" : chat_history
126+ })
127+
128+ def invoke (self , inputs : Dict [str , Any ]) -> str :
129+ """
130+ Invoke the finance chain with inputs.
131+
132+ Args:
133+ inputs: Dictionary containing 'question' and optional 'chat_history'
134+
135+ Returns:
136+ Finance-based response as string
137+ """
138+ return self .chain .invoke (inputs )
139+
140+ async def ainvoke (self , inputs : Dict [str , Any ]) -> str :
141+ """
142+ Asynchronously invoke the finance chain.
143+
144+ Args:
145+ inputs: Dictionary containing 'question' and optional 'chat_history'
146+
147+ Returns:
148+ Finance-based response as string
149+ """
150+ return await self .chain .ainvoke (inputs )
151+
152+ def get_chain (self ):
153+ """Get the underlying chain object."""
154+ return self .chain
155+
156+ def get_retriever (self ) -> Optional [Any ]:
157+ """Get the vector database retriever if available."""
158+ return self .retriever if self .use_milvus else None
159+
160+ def is_milvus_available (self ) -> bool :
161+ """Check if Milvus is available for use."""
162+ return self .use_milvus
0 commit comments