diff --git a/.env.template b/.env.template index d8a2d384a..45e57f64c 100644 --- a/.env.template +++ b/.env.template @@ -333,4 +333,9 @@ DBGPT_LOG_LEVEL=INFO # FIN_REPORT_MODEL=/app/models/bge-large-zh ## Turn off notebook display Python flow , which is enabled by default -NOTE_BOOK_ENABLE=False \ No newline at end of file +NOTE_BOOK_ENABLE=False + +## The agent historical message retention configuration defaults to the last two rounds. +# MESSAGES_KEEP_START_ROUNDS=0 +# MESSAGES_KEEP_END_ROUNDS=2 + diff --git a/dbgpt/_private/config.py b/dbgpt/_private/config.py index a2ee9fe0f..d430a870d 100644 --- a/dbgpt/_private/config.py +++ b/dbgpt/_private/config.py @@ -361,6 +361,13 @@ def __init__(self) -> None: ) self.NOTE_BOOK_ROOT: str = os.getenv("NOTE_BOOK_ROOT", os.path.expanduser("~")) + self.MESSAGES_KEEP_START_ROUNDS: int = int( + os.getenv("MESSAGES_KEEP_START_ROUNDS", 0) + ) + self.MESSAGES_KEEP_END_ROUNDS: int = int( + os.getenv("MESSAGES_KEEP_END_ROUNDS", 2) + ) + @property def local_db_manager(self) -> "ConnectorManager": from dbgpt.datasource.manages import ConnectorManager diff --git a/dbgpt/agent/core/agent.py b/dbgpt/agent/core/agent.py index b28079eb2..cca65b2bb 100644 --- a/dbgpt/agent/core/agent.py +++ b/dbgpt/agent/core/agent.py @@ -27,6 +27,8 @@ async def send( silent: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, + rely_messages: Optional[List[AgentMessage]] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, ) -> None: """Send a message to recipient agent. @@ -52,6 +54,8 @@ async def receive( is_recovery: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, + rely_messages: Optional[List[AgentMessage]] = None, ) -> None: """Receive a message from another agent. @@ -74,6 +78,7 @@ async def generate_reply( sender: Agent, reviewer: Optional[Agent] = None, rely_messages: Optional[List[AgentMessage]] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, **kwargs, diff --git a/dbgpt/agent/core/base_agent.py b/dbgpt/agent/core/base_agent.py index a1362461f..a5fef6361 100644 --- a/dbgpt/agent/core/base_agent.py +++ b/dbgpt/agent/core/base_agent.py @@ -209,6 +209,8 @@ async def send( silent: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, + rely_messages: Optional[List[AgentMessage]] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, ) -> None: """Send a message to recipient agent.""" with root_tracer.start_span( @@ -232,6 +234,8 @@ async def send( silent=silent, is_retry_chat=is_retry_chat, last_speaker_name=last_speaker_name, + historical_dialogues=historical_dialogues, + rely_messages=rely_messages, ) async def receive( @@ -244,6 +248,8 @@ async def receive( is_recovery: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, + rely_messages: Optional[List[AgentMessage]] = None, ) -> None: """Receive a message from another agent.""" with root_tracer.start_span( @@ -272,6 +278,8 @@ async def receive( reviewer=reviewer, is_retry_chat=is_retry_chat, last_speaker_name=last_speaker_name, + historical_dialogues=historical_dialogues, + rely_messages=rely_messages, ) else: reply = await self.generate_reply( @@ -279,6 +287,8 @@ async def receive( sender=sender, reviewer=reviewer, is_retry_chat=is_retry_chat, + historical_dialogues=historical_dialogues, + rely_messages=rely_messages, ) if reply is not None: @@ -289,6 +299,7 @@ def prepare_act_param( received_message: Optional[AgentMessage], sender: Agent, rely_messages: Optional[List[AgentMessage]] = None, + **kwargs, ) -> Dict[str, Any]: """Prepare the parameters for the act method.""" return {} @@ -300,6 +311,7 @@ async def generate_reply( sender: Agent, reviewer: Optional[Agent] = None, rely_messages: Optional[List[AgentMessage]] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, **kwargs, @@ -361,9 +373,10 @@ async def generate_reply( f"Depends on the number of historical messages:{len(rely_messages) if rely_messages else 0}!" # noqa ) thinking_messages, resource_info = await self._load_thinking_messages( - received_message, - sender, - rely_messages, + received_message=received_message, + sender=sender, + rely_messages=rely_messages, + historical_dialogues=historical_dialogues, context=reply_message.get_dict_context(), is_retry_chat=is_retry_chat, ) @@ -400,7 +413,10 @@ async def generate_reply( span.metadata["comments"] = comments act_extent_param = self.prepare_act_param( - received_message, sender, rely_messages + received_message=received_message, + sender=sender, + rely_messages=rely_messages, + historical_dialogues=historical_dialogues, ) with root_tracer.start_span( "agent.generate_reply.act", @@ -620,6 +636,8 @@ async def initiate_chat( is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, message_rounds: int = 0, + historical_dialogues: Optional[List[AgentMessage]] = None, + rely_messages: Optional[List[AgentMessage]] = None, **context, ): """Initiate a chat with another agent. @@ -652,6 +670,8 @@ async def initiate_chat( agent_message, recipient, reviewer, + historical_dialogues=historical_dialogues, + rely_messages=rely_messages, request_reply=request_reply, is_retry_chat=is_retry_chat, last_speaker_name=last_speaker_name, @@ -825,6 +845,38 @@ def _excluded_models( return can_uses + def convert_to_agent_message( + self, + gpts_messages: List[GptsMessage], + is_rery_chat: bool = False, + ) -> Optional[List[AgentMessage]]: + """Convert gptmessage to agent message.""" + oai_messages: List[AgentMessage] = [] + # Based on the current agent, all messages received are user, and all messages + # sent are assistant. + if not gpts_messages: + return None + for item in gpts_messages: + # Message conversion, priority is given to converting execution results, + # and only model output results will be used if not. + content = item.content + oai_messages.append( + AgentMessage( + content=content, + context=( + json.loads(item.context) if item.context is not None else None + ), + action_report=ActionOutput.from_dict(json.loads(item.action_report)) + if item.action_report + else None, + name=item.sender, + rounds=item.rounds, + model_name=item.model_name, + success=item.is_success, + ) + ) + return oai_messages + async def _a_select_llm_model( self, excluded_models: Optional[List[str]] = None ) -> str: @@ -959,6 +1011,7 @@ async def _load_thinking_messages( received_message: AgentMessage, sender: Agent, rely_messages: Optional[List[AgentMessage]] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, context: Optional[Dict[str, Any]] = None, is_retry_chat: bool = False, ) -> Tuple[List[AgentMessage], Optional[Dict]]: @@ -1020,13 +1073,27 @@ async def _load_thinking_messages( role=ModelMessageRoleType.SYSTEM, ) ) - if user_prompt: - agent_messages.append( - AgentMessage( - content=user_prompt, - role=ModelMessageRoleType.HUMAN, - ) + # 关联上下文的历史消息 + if historical_dialogues: + for i in range(len(historical_dialogues)): + if i % 2 == 0: + # 偶数开始, 偶数是用户信息 + message = historical_dialogues[i] + message.role = ModelMessageRoleType.HUMAN + agent_messages.append(message) + else: + # 奇数是AI信息 + message = historical_dialogues[i] + message.role = ModelMessageRoleType.AI + agent_messages.append(message) + + # 当前的用户输入信息 + agent_messages.append( + AgentMessage( + content=user_prompt, + role=ModelMessageRoleType.HUMAN, ) + ) return agent_messages, resource_references diff --git a/dbgpt/agent/core/base_team.py b/dbgpt/agent/core/base_team.py index 022662426..a4f50ea7a 100644 --- a/dbgpt/agent/core/base_team.py +++ b/dbgpt/agent/core/base_team.py @@ -161,8 +161,9 @@ async def _load_thinking_messages( received_message: AgentMessage, sender: Agent, rely_messages: Optional[List[AgentMessage]] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, context: Optional[Dict[str, Any]] = None, - is_retry_chat: Optional[bool] = False, + is_retry_chat: bool = False, ) -> Tuple[List[AgentMessage], Optional[Dict]]: """Load messages for thinking.""" return [AgentMessage(content=received_message.content)], None diff --git a/dbgpt/agent/core/memory/gpts/base.py b/dbgpt/agent/core/memory/gpts/base.py index 70e7ba7f2..512537174 100644 --- a/dbgpt/agent/core/memory/gpts/base.py +++ b/dbgpt/agent/core/memory/gpts/base.py @@ -58,7 +58,7 @@ class GptsMessage: receiver: str role: str content: str - rounds: Optional[int] + rounds: int = 0 is_success: bool = True app_code: Optional[str] = None app_name: Optional[str] = None diff --git a/dbgpt/agent/core/memory/gpts/gpts_memory.py b/dbgpt/agent/core/memory/gpts/gpts_memory.py index 1468cf5b6..a8c07666f 100644 --- a/dbgpt/agent/core/memory/gpts/gpts_memory.py +++ b/dbgpt/agent/core/memory/gpts/gpts_memory.py @@ -127,8 +127,11 @@ async def append_message(self, conv_id: str, message: GptsMessage): await self.push_message(conv_id) async def get_messages(self, conv_id: str) -> List[GptsMessage]: - """Get conversation message.""" - return self.messages_cache[conv_id] + """Get message by conv_id.""" + messages = self.messages_cache[conv_id] + if not messages: + messages = self.message_memory.get_by_conv_id(conv_id) + return messages async def get_agent_messages( self, conv_id: str, agent_role: str diff --git a/dbgpt/agent/core/plan/awel/team_awel_layout.py b/dbgpt/agent/core/plan/awel/team_awel_layout.py index 49821a02b..4c7b94825 100644 --- a/dbgpt/agent/core/plan/awel/team_awel_layout.py +++ b/dbgpt/agent/core/plan/awel/team_awel_layout.py @@ -2,7 +2,7 @@ import logging from abc import ABC, abstractmethod -from typing import Optional, cast +from typing import List, Optional, cast from dbgpt._private.config import Config from dbgpt._private.pydantic import ( @@ -114,6 +114,8 @@ async def receive( is_recovery: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, + rely_messages: Optional[List[AgentMessage]] = None, ) -> None: """Recive message by base team.""" if request_reply is False or request_reply is None: diff --git a/dbgpt/agent/core/plan/planner_agent.py b/dbgpt/agent/core/plan/planner_agent.py index 755a398b0..98a7572c7 100644 --- a/dbgpt/agent/core/plan/planner_agent.py +++ b/dbgpt/agent/core/plan/planner_agent.py @@ -169,6 +169,7 @@ def prepare_act_param( received_message: Optional[AgentMessage], sender: Agent, rely_messages: Optional[List[AgentMessage]] = None, + **kwargs, ) -> Dict[str, Any]: """Prepare the parameters for the act method.""" return { diff --git a/dbgpt/agent/core/user_proxy_agent.py b/dbgpt/agent/core/user_proxy_agent.py index a15ab010f..daeac11f0 100644 --- a/dbgpt/agent/core/user_proxy_agent.py +++ b/dbgpt/agent/core/user_proxy_agent.py @@ -1,5 +1,5 @@ """A proxy agent for the user.""" -from typing import Optional +from typing import List, Optional from .. import ActionOutput, Agent, AgentMessage from .base_agent import ConversableAgent @@ -39,6 +39,8 @@ async def receive( is_recovery: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: Optional[str] = None, + historical_dialogues: Optional[List[AgentMessage]] = None, + rely_messages: Optional[List[AgentMessage]] = None, ) -> None: """Receive a message from another agent.""" if not silent: diff --git a/dbgpt/serve/agent/agents/controller.py b/dbgpt/serve/agent/agents/controller.py index 9e7f598cf..8f857f907 100644 --- a/dbgpt/serve/agent/agents/controller.py +++ b/dbgpt/serve/agent/agents/controller.py @@ -206,11 +206,44 @@ async def agent_chat_v2( if not gpt_app: raise ValueError(f"Not found app {gpts_name}!") + historical_dialogues: List[GptsMessage] = [] if not is_retry_chat: - # 新建gpts对话记录 + # Create a new gpts conversation record gpt_app: GptsApp = self.gpts_app.app_detail(gpts_name) if not gpt_app: raise ValueError(f"Not found app {gpts_name}!") + + ## When creating a new gpts conversation record, determine whether to include the history of previous topics according to the application definition. + ## TODO BEGIN + # Temporarily use system configuration management, and subsequently use application configuration management + if CFG.MESSAGES_KEEP_START_ROUNDS and CFG.MESSAGES_KEEP_START_ROUNDS > 0: + gpt_app.keep_start_rounds = CFG.MESSAGES_KEEP_START_ROUNDS + if CFG.MESSAGES_KEEP_END_ROUNDS and CFG.MESSAGES_KEEP_END_ROUNDS > 0: + gpt_app.keep_end_rounds = CFG.MESSAGES_KEEP_END_ROUNDS + ## TODO END + + if gpt_app.keep_start_rounds > 0 or gpt_app.keep_end_rounds > 0: + if gpts_conversations and len(gpts_conversations) > 0: + rely_conversations = [] + if gpt_app.keep_start_rounds + gpt_app.keep_end_rounds < len( + gpts_conversations + ): + if gpt_app.keep_start_rounds > 0: + front = gpts_conversations[gpt_app.keep_start_rounds :] + rely_conversations.extend(front) + if gpt_app.keep_end_rounds > 0: + back = gpts_conversations[-gpt_app.keep_end_rounds :] + rely_conversations.extend(back) + else: + rely_conversations = gpts_conversations + for gpts_conversation in rely_conversations: + temps: List[GptsMessage] = await self.memory.get_messages( + gpts_conversation.conv_id + ) + if temps and len(temps) > 1: + historical_dialogues.append(temps[0]) + historical_dialogues.append(temps[-1]) + self.gpts_conversations.add( GptsConversationsEntity( conv_id=agent_conv_id, @@ -277,6 +310,8 @@ async def agent_chat_v2( is_retry_chat, last_speaker_name=last_speaker_name, init_message_rounds=message_round, + enable_verbose=enable_verbose, + historical_dialogues=historical_dialogues, **ext_info, ) ) @@ -418,6 +453,8 @@ async def agent_team_chat_new( link_sender: ConversableAgent = None, app_link_start: bool = False, enable_verbose: bool = True, + historical_dialogues: Optional[List[GptsMessage]] = None, + rely_messages: Optional[List[GptsMessage]] = None, **ext_info, ): gpts_status = Status.COMPLETE.value @@ -529,6 +566,10 @@ async def agent_team_chat_new( is_retry_chat=is_retry_chat, last_speaker_name=last_speaker_name, message_rounds=init_message_rounds, + historical_dialogues=user_proxy.convert_to_agent_message( + historical_dialogues + ), + rely_messages=rely_messages, **ext_info, ) diff --git a/dbgpt/serve/agent/agents/expand/app_start_assisant_agent.py b/dbgpt/serve/agent/agents/expand/app_start_assisant_agent.py index 69acfbe19..fc0cf2755 100644 --- a/dbgpt/serve/agent/agents/expand/app_start_assisant_agent.py +++ b/dbgpt/serve/agent/agents/expand/app_start_assisant_agent.py @@ -93,6 +93,8 @@ async def receive( is_recovery: Optional[bool] = False, is_retry_chat: bool = False, last_speaker_name: str = None, + historical_dialogues: Optional[List[AgentMessage]] = None, + rely_messages: Optional[List[AgentMessage]] = None, ) -> None: await self._a_process_received_message(message, sender) if request_reply is False or request_reply is None: diff --git a/dbgpt/serve/agent/db/gpts_app.py b/dbgpt/serve/agent/db/gpts_app.py index c232ae2e1..88cdc16c8 100644 --- a/dbgpt/serve/agent/db/gpts_app.py +++ b/dbgpt/serve/agent/db/gpts_app.py @@ -135,6 +135,10 @@ class GptsApp(BaseModel): recommend_questions: Optional[List[RecommendQuestion]] = [] admins: List[str] = Field(default_factory=list) + # By default, keep the last two rounds of conversation records as the context + keep_start_rounds: int = 0 + keep_end_rounds: int = 0 + def to_dict(self): return {k: self._serialize(v) for k, v in self.__dict__.items()} @@ -170,6 +174,8 @@ def from_dict(cls, d: Dict[str, Any]): owner_avatar_url=d.get("owner_avatar_url", None), recommend_questions=d.get("recommend_questions", []), admins=d.get("admins", []), + keep_start_rounds=d.get("keep_start_rounds", 0), + keep_end_rounds=d.get("keep_end_rounds", 2), ) @model_validator(mode="before") @@ -547,6 +553,8 @@ def list_all(self): "published": app_info.published, "details": [], "admins": [], + # "keep_start_rounds": app_info.keep_start_rounds, + # "keep_end_rounds": app_info.keep_end_rounds, } ) for app_info in app_entities @@ -918,6 +926,8 @@ def edit(self, gpts_app: GptsApp): app_entity.icon = gpts_app.icon app_entity.team_context = _parse_team_context(gpts_app.team_context) app_entity.param_need = json.dumps(gpts_app.param_need) + app_entity.keep_start_rounds = gpts_app.keep_start_rounds + app_entity.keep_end_rounds = gpts_app.keep_end_rounds session.merge(app_entity) old_details = session.query(GptsAppDetailEntity).filter(