diff --git a/README.md b/README.md index ed2a2ce115..23d88696e0 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,7 @@ The AgentScope Ecosystem - **[2025-12] `FEAT`:** TTS (Text-to-Speech) support. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/tts) | [Tutorial](https://doc.agentscope.io/tutorial/task_tts.html) - **[2025-11] `INTG`:** Anthropic Agent Skill support. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/agent_skill) | [Tutorial](https://doc.agentscope.io/tutorial/task_agent_skill.html) - **[2025-11] `RELS`:** Alias-Agent for diverse real-world tasks and Data-Juicer Agent for data processing open-sourced. [Alias-Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/alias) | [Data-Juicer Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/data_juicer_agent) -- **[2025-11] `INTG`:** Agentic RL via Trinity-RFT library. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/react_agent) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) +- **[2025-11] `INTG`:** Agentic RL via Trinity-RFT library. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/model_tuning) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) - **[2025-11] `INTG`:** ReMe for enhanced long-term memory. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/long_term_memory/reme) - **[2025-11] `RELS`:** agentscope-samples repository launched and agentscope-runtime upgraded with Docker/K8s deployment and VNC-powered GUI sandboxes. [Samples](https://github.com/agentscope-ai/agentscope-samples) | [Runtime](https://github.com/agentscope-ai/agentscope-runtime) @@ -361,7 +361,7 @@ asyncio.run(multi_agent_conversation()) ### Tuner -- [Tune ReAct Agent](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/react_agent) +- [Tune ReAct Agent](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/model_tuning) ## Contributing diff --git a/README_zh.md b/README_zh.md index 89fae8cbb9..952d584ecc 100644 --- a/README_zh.md +++ b/README_zh.md @@ -84,7 +84,7 @@ AgentScope 生态 - **[2025-12] `功能`:** TTS(文本转语音)支持。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/tts) | [教程](https://doc.agentscope.io/zh_CN/tutorial/task_tts.html) - **[2025-11] `集成`:** Anthropic Agent Skill 支持。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/agent_skill) | [教程](https://doc.agentscope.io/zh_CN/tutorial/task_agent_skill.html) - **[2025-11] `发布`:** 面向多样化真实任务的 Alias-Agent 和数据处理的 Data-Juicer Agent 开源。[Alias-Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/alias) | [Data-Juicer Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/data_juicer_agent) -- **[2025-11] `集成`:** 通过 Trinity-RFT 库实现智能体强化学习。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/react_agent) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) +- **[2025-11] `集成`:** 通过 Trinity-RFT 库实现智能体强化学习。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/model_tuning) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) - **[2025-11] `集成`:** ReMe 增强长期记忆。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/long_term_memory/reme) - **[2025-11] `发布`:** agentscope-samples 样例库上线,agentscope-runtime 升级支持 Docker/K8s 部署和 VNC 图形沙盒。[样例库](https://github.com/agentscope-ai/agentscope-samples) | [Runtime](https://github.com/agentscope-ai/agentscope-runtime) @@ -356,7 +356,7 @@ asyncio.run(multi_agent_conversation()) ### 微调 -- [调优 ReAct 智能体](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/react_agent) +- [调优 ReAct 智能体](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/model_tuning) ## 贡献 diff --git a/docs/NEWS.md b/docs/NEWS.md index 8a8a287d6d..ed90cfd0ed 100644 --- a/docs/NEWS.md +++ b/docs/NEWS.md @@ -9,7 +9,7 @@ - **[2025-12] `FEAT`:** TTS (Text-to-Speech) support. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/tts) | [Tutorial](https://doc.agentscope.io/tutorial/task_tts.html) - **[2025-11] `INTG`:** Anthropic Agent Skill support. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/agent_skill) | [Tutorial](https://doc.agentscope.io/tutorial/task_agent_skill.html) - **[2025-11] `RELS`:** Alias-Agent for diverse real-world tasks and Data-Juicer Agent for data processing open-sourced. [Alias-Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/alias) | [Data-Juicer Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/data_juicer_agent) -- **[2025-11] `INTG`:** Agentic RL via Trinity-RFT library. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/react_agent) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) +- **[2025-11] `INTG`:** Agentic RL via Trinity-RFT library. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/model_tuning) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) - **[2025-11] `INTG`:** ReMe for enhanced long-term memory. [Example](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/long_term_memory/reme) - **[2025-11] `RELS`:** agentscope-samples repository launched and agentscope-runtime upgraded with Docker/K8s deployment and VNC-powered GUI sandboxes. [Samples](https://github.com/agentscope-ai/agentscope-samples) | [Runtime](https://github.com/agentscope-ai/agentscope-runtime) - **[2025-11] `DOCS`:** Contributing Guide is online - welcome to contribute! [Guide](./CONTRIBUTING.md) diff --git a/docs/NEWS_zh.md b/docs/NEWS_zh.md index cff895b389..ea54d02309 100644 --- a/docs/NEWS_zh.md +++ b/docs/NEWS_zh.md @@ -9,7 +9,7 @@ - **[2025-12] `功能`:** TTS(文本转语音)支持。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/tts) | [教程](https://doc.agentscope.io/zh_CN/tutorial/task_tts.html) - **[2025-11] `集成`:** Anthropic Agent Skill 支持。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/agent_skill) | [教程](https://doc.agentscope.io/zh_CN/tutorial/task_agent_skill.html) - **[2025-11] `发布`:** 面向多样化真实任务的 Alias-Agent 和数据处理的 Data-Juicer Agent 开源。[Alias-Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/alias) | [Data-Juicer Agent](https://github.com/agentscope-ai/agentscope-samples/tree/main/data_juicer_agent) -- **[2025-11] `集成`:** 通过 Trinity-RFT 库实现智能体强化学习。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/react_agent) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) +- **[2025-11] `集成`:** 通过 Trinity-RFT 库实现智能体强化学习。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/tuner/model_tuning) | [Trinity-RFT](https://github.com/agentscope-ai/Trinity-RFT) - **[2025-11] `集成`:** ReMe 增强长期记忆。[样例](https://github.com/agentscope-ai/agentscope/tree/main/examples/functionality/long_term_memory/reme) - **[2025-11] `发布`:** agentscope-samples 样例库上线,agentscope-runtime 升级支持 Docker/K8s 部署和 VNC 图形沙盒。[样例库](https://github.com/agentscope-ai/agentscope-samples) | [Runtime](https://github.com/agentscope-ai/agentscope-runtime) - **[2025-11] `文档`:** 贡献指南上线 - 欢迎参与贡献![指南](./CONTRIBUTING_zh.md) diff --git a/docs/tutorial/en/src/task_tuner.py b/docs/tutorial/en/src/task_tuner.py index 45f742b2c3..d735b07535 100644 --- a/docs/tutorial/en/src/task_tuner.py +++ b/docs/tutorial/en/src/task_tuner.py @@ -193,7 +193,7 @@ async def example_judge_function( # Below is an example of configuring and starting the tuning process: # # .. note:: -# This example is for demonstration only. For a complete runnable example, see `Tune ReActAgent `_ +# This example is for demonstration only. For a complete runnable example, see `Tune ReActAgent `_ # # .. code-block:: python # diff --git a/docs/tutorial/zh_CN/src/task_tuner.py b/docs/tutorial/zh_CN/src/task_tuner.py index 4143c3c2fd..48a50eafc0 100644 --- a/docs/tutorial/zh_CN/src/task_tuner.py +++ b/docs/tutorial/zh_CN/src/task_tuner.py @@ -193,7 +193,7 @@ async def example_judge_function( # 下面是调优流程的配置与启动示例: # # .. note:: -# 此示例仅供演示。完整可运行示例请参考 `Tune ReActAgent `_ +# 此示例仅供演示。完整可运行示例请参考 `Tune ReActAgent `_ # # .. code-block:: python # diff --git a/examples/tuner/model_selection/README.md b/examples/tuner/model_selection/README.md new file mode 100644 index 0000000000..14e8df4770 --- /dev/null +++ b/examples/tuner/model_selection/README.md @@ -0,0 +1,294 @@ +# Model Selection Guide + +AgentScope provides a `model_selection` sub-module in tuner module to automatically select the best performing model from a set of candidates based on evaluation metrics. This guide walks you through the steps to evaluate and select the optimal model for your agent workflow. + +## Overview + +Model selection is the process of choosing the best performing model from a set of candidate models based on their performance on a dataset. To use model selection, you need to understand three components: + +1. **Workflow function**: An async function that takes a task and model, executes the task with the model, and returns a workflow output. +2. **Judge function**: A function that evaluates the workflow's output and returns a reward indicating performance. +3. **Task dataset**: A dataset containing samples for evaluation. + +The following diagram illustrates the relationship between these components: + +```mermaid +flowchart TD + CandidateModels[Candidate Models] --> WorkflowFunction[Workflow Function] + Task[Task] --> WorkflowFunction + WorkflowFunction --> JudgeFunction[Judge Function] + Task --> JudgeFunction + JudgeFunction --> Reward[Reward] + Reward --> ModelSelector[Model Selector] + ModelSelector --> BestModel[Best Performing Model] +``` + +## How to implement + +Here we use a translation task scenario as an example to illustrate how to implement the above three components. + +Suppose you have an agent workflow that performs translation using the `ReActAgent`. + +```python +from agentscope.agent import ReActAgent +from agentscope.model import ChatModelBase + +async def run_translation_agent(text: str, model: ChatModelBase): + agent = ReActAgent( + name="translator", + sys_prompt="You are a helpful translation agent. Translate the given text accurately, and only output the translated text.", + model=model, + formatter=OpenAIChatFormatter(), + ) + + response = await agent.reply( + msg=Msg("user", f"Translate the following text between English and Chinese: {text}", role="user"), + ) + + print(response) +``` + +### Step 1: Prepare task dataset + +To evaluate models for translation tasks, you need a dataset that contains samples of source texts and their corresponding reference translations. + +The dataset should be organized in a format that can be loaded using the `datasets.load_dataset` function (e.g., JSONL, Parquet, CSV) or from huggingface online datasets. For translation tasks, your data file (like `translate_data/test.json`) might contain samples like: + +```json + { + "question": "量子退相干是限制量子计算机可扩展性的主要障碍之一。", + "answer": "Quantum decoherence is one of the primary obstacles limiting the scalability of quantum computers." + } +``` + + +### Step 2: Define a workflow function + +The workflow function takes a task dictionary and model as input, and returns a `WorkflowOutput`. The model selector will call this function with different models during evaluation. + +```python +async def translation_workflow( + task: Dict, + model: ChatModelBase, +) -> WorkflowOutput: + """Run the translation workflow on a single task with the given model.""" + ... +``` + +- Inputs: + - `task`: A dictionary representing a single training task from the dataset. + - `model`: The model to be used in the workflow. This will be evaluated by the selector. + +- Returns: + - `WorkflowOutput`: An object containing the agent's response. + +Below is a refactored version of the original `run_translation_agent` function to fit the workflow function pattern. + +**Key changes from the original function**: + +1. Add `model` as a parameter to the workflow function. +2. Use the input `model` to initialize the agent. +3. Use the `question` field from the `task` dictionary as the source text for translation. +4. Return a `WorkflowOutput` object containing the agent's response. + +```python +from agentscope.agent import ReActAgent +from agentscope.formatter import OpenAIChatFormatter +from agentscope.tuner import WorkflowOutput +from agentscope.message import Msg + +async def translation_workflow( + task: Dict, + model: ChatModelBase, +) -> WorkflowOutput: + agent = ReActAgent( + name="translator", + sys_prompt="You are a helpful translation agent. Translate the given text accurately, and only output the translated text.", + model=model, + formatter=OpenAIChatFormatter(), + ) + + # Extract source text from task + source_text = task.get("question", "") if isinstance(task, dict) else str(task) + + # Create a message with the translation request + prompt = f"Translate the following text between English and Chinese: {source_text}" + msg = Msg(name="user", content=prompt, role="user") + + # Get response from the agent + response = await agent.reply(msg=msg) + + return WorkflowOutput( + response=response, + ) +``` + +### Step 3: Implement the judge function + +The judge function evaluates the workflow's response and returns a reward. Higher reward values indicate better performance. + +```python +async def judge_function( + task: Dict, + response: Any, +) -> JudgeOutput: + """Calculate reward based on the input task and workflow's response.""" +``` + +- Inputs: + - `task`: A dictionary representing a single training task. + - `response`: A composite dict containing: + - `"response"`: The actual response from the workflow function. + - `"metrics"`: Workflow metrics including execution_time and token usage. + +- Outputs: + - `JudgeOutput`: An object containing: + - `reward`: A scalar float representing the reward (higher is better). + - `metrics`: Optional dictionary of additional metrics. + +Here is an example implementation for translation tasks using BLEU score (please pip install the `sacrebleu` package first): + +```python +from agentscope.tuner import JudgeOutput + +async def bleu_judge( + task: Dict, + response: Any, +) -> JudgeOutput: + """Calculate BLEU score for translation quality.""" + # Lazy import to follow the requirement + import sacrebleu + + # Extract response text from the composite dict + response_content = response["response"] + response_str = response_content.get_text_content() + + # Extract reference translation + reference_translation = task.get("answer", "") if isinstance(task, dict) else "" + + # Calculate BLEU score + ref = reference_translation.strip() + pred = response_str.strip() + bleu_score = sacrebleu.sentence_bleu(pred, [ref]) + + return JudgeOutput( + reward=bleu_score.score, + metrics={ + "bleu": bleu_score.score/100, + "brevity_penalty": bleu_score.bp, + "ratio": bleu_score.ratio + } + ) +``` + +AgentScope.tuner also provides built-in judge functions for common workflow conducting efficiency metrics, such as execution time and token usage in example_token_usage.py: + +```python +from agentscope.tuner.model_selection import avg_time_judge, avg_token_consumption_judge + +# For selecting based on fastest execution time +judge_function = avg_time_judge + +# For selecting based on lowest token consumption +judge_function = avg_token_consumption_judge +``` + +### Step 4: Start model selection + +Use the `select_model` interface to find the best performing model. + +```python +from agentscope.tuner import DatasetConfig +from agentscope.tuner.model_selection import select_model +from agentscope.model import DashScopeChatModel +import os + +# your workflow / judge function and candidate models here... + +if __name__ == "__main__": + # Define your candidate models + model1 = DashScopeChatModel( + "qwen3-max-2025-09-23", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ) + model2 = DashScopeChatModel( + "deepseek-r1", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ) + + best_model, metrics = select_model( + workflow_func=translation_workflow, + judge_func=bleu_judge, + train_dataset=DatasetConfig(path="examples/tuner/model_selection/translate_data.json"), + candidate_models=[model1, model2], + ) + + print(f"Best model: {best_model.model_name}") + print(f"Performance metrics: {metrics}") +``` + +--- + +> **Note**: Besides the BLEU score judge function shown in this example, you can also implement custom judge functions for your specific use case. Alternatively, you can use built-in functions for optimizing workflow efficiency such as time and token usage judges, which can be referenced in `example_token_usage.py`. + +--- + +## How to run + +After implementing the workflow and judge function, follow these steps to run model selection: + +1. Prerequisites + + - Set up your API key as an environment variable: + + ```bash + export DASHSCOPE_API_KEY="your_api_key_here" + ``` + + - Prepare your dataset in a supported format (JSONL, Parquet, CSV, etc.). + + - Install required dependencies if not already installed: + + ```bash + pip install datasets + ``` + +2. Run the selection script + + ```bash + python example_token_usage.py # or other example files in this directory + ``` + +3. The best performing model will be returned along with performance metrics. + +## Output + +``` +Evaluating 3 candidate models: ['qwen3-max', 'deepseek-r1', 'glm-4.7'] + +INFO:agentscope.tuner.model_selection._model_selection:Model evaluation results: +INFO:agentscope.tuner.model_selection._model_selection: qwen3-max: 61.8407 +INFO:agentscope.tuner.model_selection._model_selection: deepseek-r1: 43.5547 +INFO:agentscope.tuner.model_selection._model_selection: glm-4.7: 48.8801 + +Selected best model: qwen3-max-2025-09-23 +Metrics: {'bleu_avg': 0.6184069765855449, 'brevity_penalty_avg': 0.9900344064325004, 'ratio_avg': 1.070816065067906} +``` + +--- + +## Use Cases + +Model selection is particularly useful for: + +| Scenario | Benefit | +|----------|---------| +| **Performance optimization** | Identify the model that achieves the highest accuracy/reward on your specific task | +| **Cost efficiency** | Select models that achieve desired performance with lower computational costs | +| **Latency requirements** | Choose models that meet your speed/latency constraints | +| **Resource constraints** | Find the best model that fits within your hardware limitations | + +> [!TIP] +> Model selection is ideal when you have multiple models available and want to systematically identify which performs best for your specific use case. \ No newline at end of file diff --git a/examples/tuner/model_selection/example_bleu.py b/examples/tuner/model_selection/example_bleu.py new file mode 100644 index 0000000000..6e5509c46f --- /dev/null +++ b/examples/tuner/model_selection/example_bleu.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- +"""Example of model selection for translation tasks using agentscope tuner.""" + +import os +import logging +from typing import Dict, Any +from agentscope.agent import ReActAgent +from agentscope.formatter import OpenAIChatFormatter +from agentscope.message import Msg +from agentscope.model import DashScopeChatModel +from agentscope.tuner import JudgeOutput +from agentscope.tuner import WorkflowOutput +from agentscope.tuner import DatasetConfig +from agentscope.tuner.model_selection import select_model + + +# Configure logging to show INFO level messages +logging.basicConfig(level=logging.INFO) + + +# Initialize models for selection +models = [ + DashScopeChatModel( + "qwen3-max-2025-09-23", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ), + DashScopeChatModel( + "Moonshot-Kimi-K2-Instruct", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ), + DashScopeChatModel( + "MiniMax-M2.1", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ), + DashScopeChatModel( + "deepseek-r1", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ), + DashScopeChatModel( + "glm-4.7", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=1024, + ), +] + + +async def translation_workflow( + task: Dict[str, Any], + model: Any, +) -> WorkflowOutput: + """A workflow function using the ReAct agent to perform translation tasks. + + Args: + task (Dict[str, Any]): The translation task + containing source text and target language. + model: The model to use for the agent. + + Returns: + WorkflowOutput: The workflow output containing the translated text. + """ + agent = ReActAgent( + name="translator", + sys_prompt=( + "You are a helpful translation agent." + "Only output the translated text." + ), + model=model, + formatter=OpenAIChatFormatter(), + ) + + # Extract source text and target language from task + source_text = ( + task.get("question", "") if isinstance(task, dict) else str(task) + ) + + # Create a message with the translation request + prompt = ( + f"Translate following text between English and Chinese: {source_text}" + ) + msg = Msg(name="user", content=prompt, role="user") + + # Get response from the agent + response = await agent.reply(msg=msg) + + return WorkflowOutput( + response=response, + ) + + +async def bleu_judge( + task: Dict[str, Any], + response: Any, +) -> JudgeOutput: + """A judge function to calculate BLEU score for translation quality. + + Args: + task (Dict[str, Any]): The task information. + response (Any): A composite dict containing the workflow response + and metrics. + + Returns: + JudgeOutput: The BLEU score and other metrics. + """ + # Lazy import to follow the requirement + import sacrebleu + + response_str = "" + if isinstance(response, dict) and "response" in response: + response_content = response["response"] + if hasattr(response_content, "content"): + # Handle the response structure + if isinstance(response_content.content, list): + for content_item in response_content.content: + if ( + isinstance(content_item, dict) + and "text" in content_item + ): + response_str += content_item["text"] + elif hasattr(content_item, "text"): + response_str += content_item.text + else: + response_str = str(response_content.content) + else: + raise ValueError("Response must be a dict with 'response' key") + + # Extract reference translation + reference_translation = ( + task.get("answer", "") if isinstance(task, dict) else "" + ) + + # Load the BLEU metric + ref = reference_translation.strip() + pred = response_str.strip() + + bleu_score = sacrebleu.sentence_bleu(pred, [ref]) + + # Return the judge output with the BLEU score as reward + return JudgeOutput( + reward=bleu_score.score, + metrics={ + "bleu": bleu_score.score / 100, + "brevity_penalty": bleu_score.bp, + "ratio": bleu_score.ratio, + }, + ) + + +async def main() -> None: + """Main entry point to run model selection example. + + This function selects the best model based on + bleu score, and prints the results. + """ + # Define the translation benchmark dataset using DatasetConfig + dataset_config = DatasetConfig( + path=os.path.join( + os.path.dirname(__file__), + "translate_data", + ), # Path to the local JSON dataset + split="test", + ) + + # Perform model selection using the local translation benchmark dataset + best_model, metrics = await select_model( + workflow_func=translation_workflow, + judge_func=bleu_judge, + train_dataset=dataset_config, + candidate_models=models, + ) + + print(f"Selected best model: {best_model.model_name}") + print(f"Metrics: {metrics}") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/tuner/model_selection/example_token_usage.py b/examples/tuner/model_selection/example_token_usage.py new file mode 100644 index 0000000000..43c5379b64 --- /dev/null +++ b/examples/tuner/model_selection/example_token_usage.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +"""Example of model selection using agentscope tuner.""" + +import os +import logging +from typing import Dict, Any +from agentscope.agent import ReActAgent +from agentscope.formatter import OpenAIChatFormatter +from agentscope.message import Msg +from agentscope.model import DashScopeChatModel +from agentscope.tuner import DatasetConfig +from agentscope.tuner import WorkflowOutput +from agentscope.tuner.model_selection import select_model +from agentscope.tuner.model_selection import avg_token_consumption_judge + + +# Configure logging to show INFO level messages +logging.basicConfig(level=logging.INFO) + + +# Initialize models for selection +models = [ + DashScopeChatModel( + "qwen-turbo", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=512, + ), + DashScopeChatModel( + "qwen-plus", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=512, + ), + DashScopeChatModel( + "qwen-max", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=512, + ), +] + + +async def workflow( + task: Dict[str, Any], + model: Any, +) -> WorkflowOutput: + """A workflow function using the ReAct agent to solve tasks. + + Args: + task (Dict[str, Any]): The task to be solved. + model: The model to use for the agent. + + Returns: + WorkflowOutput: The workflow output containing the agent's response. + """ + agent = ReActAgent( + name="math_solver", + sys_prompt="You are a helpful math problem solving agent.", + model=model, + formatter=OpenAIChatFormatter(), + ) + + # Extract question from task + question = ( + task.get("question", "") if isinstance(task, dict) else str(task) + ) + + # Create a message with the question + msg = Msg(name="user", content=question, role="user") + + # Get response from the agent + response = await agent.reply(msg=msg) + + return WorkflowOutput( + response=response, + ) + + +async def main() -> None: + """Main entry point to run model selection example. + + This function selects the best model based on + token consumption, and prints the results. + """ + # Configure the GSM8K dataset + dataset_config = DatasetConfig( + path="openai/gsm8k", + name="main", + split="test", + total_steps=20, # Limit for testing purposes + ) + + # Perform model selection + best_model, metrics = await select_model( + workflow_func=workflow, + judge_func=avg_token_consumption_judge, + train_dataset=dataset_config, + candidate_models=models, + ) + + print(f"Selected best model: {best_model.model_name}") + print(f"Metrics: {metrics}") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/tuner/model_selection/translate_data/test.json b/examples/tuner/model_selection/translate_data/test.json new file mode 100644 index 0000000000..e588bc74c4 --- /dev/null +++ b/examples/tuner/model_selection/translate_data/test.json @@ -0,0 +1,122 @@ +[ + { + "question": "量子退相干是限制量子计算机可扩展性的主要障碍之一。", + "answer": "Quantum decoherence is one of the primary obstacles limiting the scalability of quantum computers." + }, + { + "question": "他因小失大,为省小钱而丢了大合同。", + "answer": "He lost the big contract over a minor saving—penny wise, pound foolish." + }, + { + "question": "尽管双方在仲裁条款的适用范围上存在分歧,但法院裁定该争议仍受协议中排他性管辖条款的约束。", + "answer": "Although the parties disagreed on the scope of the arbitration clause, the court ruled that the dispute remains subject to the exclusive jurisdiction clause in the agreement." + }, + { + "question": "Transformer 架构通过自注意力机制实现了对序列依赖关系的全局建模。", + "answer": "The Transformer architecture enables global modeling of sequential dependencies through self-attention mechanisms." + }, + { + "question": "天行健,君子以自强不息。", + "answer": "As Heaven maintains vigor through movement, a noble person should constantly strive for self-perfection." + }, + { + "question": "鉴于全球供应链的脆弱性在疫情后暴露无遗,各国正重新评估其对外部制造中心的过度依赖所蕴含的系统性风险。", + "answer": "Given the fragility of global supply chains exposed by the pandemic, countries are reassessing the systemic risks inherent in their overreliance on external manufacturing hubs." + }, + { + "question": "CRISPR-Cas9 基因编辑技术可精准靶向特定DNA序列并诱导双链断裂。", + "answer": "CRISPR-Cas9 gene-editing technology can precisely target specific DNA sequences and induce double-strand breaks." + }, + { + "question": "冰冻三尺,非一日之寒。", + "answer": "It takes more than one cold day for the river to freeze three feet deep." + }, + { + "question": "正如海德格尔所言,此在(Dasein)的本质在于其存在方式,而非某种现成的实体属性。", + "answer": "As Heidegger argued, the essence of Dasein lies in its mode of being, not in any ready-made ontic property." + }, + { + "question": "央行通过公开市场操作调节基础货币供应,以实现通胀目标制下的利率调控。", + "answer": "The central bank adjusts base money supply via open market operations to conduct interest rate management under an inflation-targeting regime." + }, + { + "question": "他是个\"老黄牛\",默默耕耘从不抱怨。", + "answer": "He is a 'diligent ox'—working tirelessly without complaint." + }, + { + "question": "尽管生成式人工智能在内容创作领域展现出巨大潜力,但其训练数据中的偏见可能被放大并导致有害输出,这引发了监管机构对算法透明度和问责机制的迫切要求。", + "answer": "Although generative AI demonstrates immense potential in content creation, biases in its training data may be amplified and lead to harmful outputs, prompting regulators to urgently demand algorithmic transparency and accountability mechanisms." + }, + { + "question": "碳捕集与封存(CCS)技术旨在将工业排放的二氧化碳注入深层地质构造中永久封存。", + "answer": "Carbon Capture and Storage (CCS) technology aims to permanently sequester CO₂ emissions from industrial sources into deep geological formations." + }, + { + "question": "此事已成定局,覆水难收。", + "answer": "The matter is settled; what's done cannot be undone." + }, + { + "question": "在多极化世界格局加速演进的背景下,新兴经济体呼吁改革以美国为主导的布雷顿森林体系,以更好地反映21世纪全球经济力量的现实分布。", + "answer": "Against the backdrop of accelerating multipolarity, emerging economies are calling for reform of the U.S.-dominated Bretton Woods system to better reflect the realities of 21st-century global economic power distribution." + }, + { + "question": "零信任架构要求对所有用户和设备进行持续验证,无论其位于网络内部还是外部。", + "answer": "Zero Trust Architecture requires continuous verification of all users and devices, regardless of whether they reside inside or outside the network perimeter." + }, + { + "question": "落霞与孤鹜齐飞,秋水共长天一色。", + "answer": "The evening glow flies with the lone wild goose; autumn waters blend with the vast sky in one hue." + }, + { + "question": "当基因编辑技术能够修改人类胚胎的生殖系基因时,我们必须审慎权衡其治愈遗传病的潜力与引发不可逆社会分化的伦理风险。", + "answer": "When gene-editing technologies can modify germline genes in human embryos, we must carefully weigh their potential to cure hereditary diseases against the ethical risks of triggering irreversible social stratification." + }, + { + "question": "可重复使用运载火箭通过垂直回收技术显著降低了进入太空的成本。", + "answer": "Reusable launch vehicles have significantly reduced the cost of space access through vertical landing technology." + }, + { + "question": "欲速则不达。", + "answer": "Haste makes waste." + }, + { + "question": "根据《伯尔尼公约》第5条,作者在作品起源国以外的成员国享有与该国国民同等的版权保护,且不受作品是否在该国首次发表的影响。", + "answer": "Under Article 5 of the Berne Convention, authors enjoy copyright protection in member countries other than the country of origin equivalent to that granted to nationals of those countries, irrespective of whether the work was first published therein." + }, + { + "question": "石墨烯因其超高电子迁移率和机械强度,被视为下一代半导体器件的理想候选材料。", + "answer": "Graphene is regarded as an ideal candidate material for next-generation semiconductor devices due to its ultra-high electron mobility and mechanical strength." + }, + { + "question": "色即是空,空即是色。", + "answer": "Form is emptiness, emptiness is form." + }, + { + "question": "由于人口老龄化加剧了财政压力,政府不得不在提高退休年龄、增加社保缴费率与削减福利支出之间寻求艰难平衡。", + "answer": "Faced with mounting fiscal pressures from population aging, the government is forced to strike a difficult balance among raising the retirement age, increasing social security contribution rates, and cutting benefit expenditures." + }, + { + "question": "智能合约在满足预设条件时自动执行,无需可信第三方介入。", + "answer": "Smart contracts execute automatically when predefined conditions are met, without requiring a trusted third party." + }, + { + "question": "卧薪尝胆,三千越甲可吞吴。", + "answer": "Sleeping on firewood and tasting gall, three thousand Yue soldiers could conquer Wu." + }, + { + "question": "尽管可再生能源装机容量快速增长,但其间歇性特征对电网稳定性构成挑战,因此亟需发展大规模储能技术和需求侧响应机制。", + "answer": "Despite the rapid growth in renewable energy capacity, its intermittent nature poses challenges to grid stability, necessitating the urgent development of large-scale energy storage and demand-side response mechanisms." + }, + { + "question": "功能性磁共振成像(fMRI)通过检测血氧水平依赖信号来间接反映神经元活动。", + "answer": "Functional MRI (fMRI) indirectly reflects neuronal activity by detecting blood-oxygen-level-dependent (BOLD) signals." + }, + { + "question": "他说话总是闪烁其词,让人捉摸不透。", + "answer": "He always speaks evasively, leaving others unable to fathom his true intent." + }, + { + "question": "若自动化决策系统在招聘、信贷或司法等领域广泛应用却缺乏可解释性,不仅会削弱个体的程序性权利,还可能固化甚至加剧现有社会偏见。", + "answer": "If opaque automated decision systems are widely deployed in hiring, credit, or judicial domains, they may not only undermine individuals' procedural rights but also entrench or even exacerbate existing societal biases." + } +] \ No newline at end of file diff --git a/examples/tuner/react_agent/README.md b/examples/tuner/model_tuning/README.md similarity index 99% rename from examples/tuner/react_agent/README.md rename to examples/tuner/model_tuning/README.md index 9e71879ccf..255d5b6370 100644 --- a/examples/tuner/react_agent/README.md +++ b/examples/tuner/model_tuning/README.md @@ -1,4 +1,4 @@ -# AgentScope Tuner Quick Start Guide +# Model Tuning Guide AgentScope provides a `tuner` sub-module to train agent workflows using reinforcement learning (RL). This guide walks you through the steps to implement and train an agent workflow using RL with AgentScope Tuner. diff --git a/examples/tuner/react_agent/config.yaml b/examples/tuner/model_tuning/config.yaml similarity index 100% rename from examples/tuner/react_agent/config.yaml rename to examples/tuner/model_tuning/config.yaml diff --git a/examples/tuner/react_agent/main.py b/examples/tuner/model_tuning/main.py similarity index 100% rename from examples/tuner/react_agent/main.py rename to examples/tuner/model_tuning/main.py diff --git a/examples/tuner/react_agent/reward_curve.png b/examples/tuner/model_tuning/reward_curve.png similarity index 100% rename from examples/tuner/react_agent/reward_curve.png rename to examples/tuner/model_tuning/reward_curve.png diff --git a/examples/tuner/prompt_tuning/README.md b/examples/tuner/prompt_tuning/README.md new file mode 100644 index 0000000000..d4e0d7808d --- /dev/null +++ b/examples/tuner/prompt_tuning/README.md @@ -0,0 +1,348 @@ +# Prompt Tuning Guide + +AgentScope provides a `prompt_tune` sub-module to automatically optimize system prompts. +This guide walks you through the steps to optimize your agent's system prompt without modifying model weights. + +## Overview + +Prompt tuning is a lightweight alternative to model fine-tuning that optimizes the system prompt to improve agent performance. To use prompt tuning, you need to understand three components: + +1. **Workflow function**: An async function that takes a task and system prompt, returns a workflow output. +2. **Judge function**: A function that evaluates the agent's response and returns a reward. +3. **Task dataset**: A dataset containing training samples for optimization. + +The following diagram illustrates the relationship between these components: + +```mermaid +flowchart TD + InitPrompt[Initial Prompt] --> WorkflowFunction[Workflow Function] + Task[Task] --> WorkflowFunction + WorkflowFunction --> JudgeFunction[Judge Function] + Task --> JudgeFunction + JudgeFunction --> Reward[Reward] + Reward --> Optimizer + Optimizer --> OptimizedPrompt[Optimized Prompt] +``` + +## How to implement + +Here we use a math problem solving scenario as an example to illustrate how to implement the above three components. + +Suppose you have an agent workflow that solves math problems using the `ReActAgent`. + +```python +from agentscope.agent import ReActAgent + +async def run_react_agent(query: str): + # model = ... # Initialize your ChatModel here + + agent = ReActAgent( + name="react_agent", + sys_prompt="You are a helpful math problem solving agent.", + model=model, + formatter=OpenAIChatFormatter(), + ) + + response = await agent.reply( + msg=Msg("user", query, role="user"), + ) + + print(response) +``` + +### Step 1: Prepare task dataset + +To optimize the prompt for solving math problems, you need a dataset that contains samples of math problems and their corresponding ground truth answers. + +The dataset should be organized in a format that can be loaded using the `datasets.load_dataset` function (e.g., JSONL, Parquet, CSV). For example: + +``` +my_dataset/ + ├── train.parquet # samples for training/optimization + └── test.parquet # samples for evaluation +``` + +Suppose your `train.parquet` contains samples like: + +```json +{"question": "What is 2 + 2?", "answer": "4"} +{"question": "What is 4 + 4?", "answer": "8"} +``` + +You can preview your dataset using the following code: + +```python +from agentscope.tuner import DatasetConfig + +DatasetConfig(path="train.parquet").preview() + +# Output: +# [ +# { +# "question": "What is 2 + 2?", +# "answer": "4" +# }, +# { +# "question": "What is 4 + 4?", +# "answer": "8" +# } +# ] +``` + +### Step 2: Define a workflow function + +The workflow function takes a task dictionary and system prompt as input, and returns a `WorkflowOutput`. The optimizer will call this function with different prompts during optimization. + +```python +async def workflow( + task: Dict, + system_prompt: str, +) -> WorkflowOutput: + """Run the agent workflow on a single task with the given system prompt.""" + ... +``` + +- Inputs: + - `task`: A dictionary representing a single training task from the dataset. + - `system_prompt`: The system prompt to be used in the workflow. This will be optimized by the tuner. + +- Returns: + - `WorkflowOutput`: An object containing the agent's response. + +Below is a refactored version of the original `run_react_agent` function to fit the workflow function pattern. + +**Key changes from the original function**: + +1. Add `system_prompt` as a parameter to the workflow function. +2. Use the input `system_prompt` to initialize the agent. +3. Use the `question` field from the `task` dictionary as the user query. +4. Return a `WorkflowOutput` object containing the agent's response. + +```python +from agentscope.agent import ReActAgent +from agentscope.formatter import OpenAIChatFormatter +from agentscope.tuner import WorkflowOutput +from agentscope.message import Msg + +# Initialize the model (can be module-level or passed in via closure) +model = DashScopeChatModel("qwen-turbo", api_key="YOUR_API_KEY") + +async def workflow( + task: Dict, + system_prompt: str, +) -> WorkflowOutput: + agent = ReActAgent( + name="react_agent", + sys_prompt=system_prompt, # use the optimizable system prompt + model=model, + formatter=OpenAIChatFormatter(), + ) + + response = await agent.reply( + msg=Msg("user", task["question"], role="user"), + ) + + return WorkflowOutput( + response=response, + ) +``` + +### Step 3: Implement the judge function + +The judge function evaluates the agent's response and returns a reward. It has the same signature as in RL-based tuning. + +```python +async def judge_function( + task: Dict, + response: Any, +) -> JudgeOutput: + """Calculate reward based on the input task and agent's response.""" +``` + +- Inputs: + - `task`: A dictionary representing a single training task. + - `response`: The `response` field of the `WorkflowOutput` struct returned by the workflow function. + +- Outputs: + - `JudgeOutput`: An object containing: + - `reward`: A scalar float representing the reward. + - `metrics`: Optional dictionary of additional metrics. + +Here is an example implementation: + +```python +from agentscope.tuner import JudgeOutput + +async def judge_function( + task: Dict, response: Any +) -> JudgeOutput: + """Simple reward: 1.0 for exact match, else 0.0.""" + ground_truth = task["answer"] + reward = 1.0 if ground_truth in response.get_text_content() else 0.0 + return JudgeOutput(reward=reward) +``` + +### Step 4: Start prompt tuning + +Use the `tune_prompt` interface to optimize your system prompt. + +```python +from agentscope.tuner import DatasetConfig +from agentscope.tuner.prompt_tune import tune_prompt, PromptTuneConfig + +# your workflow / judge function here... + +if __name__ == "__main__": + init_prompt = "You are an agent. Please solve the math problem given to you." + + optimized_prompt, metrics = tune_prompt( + workflow=workflow, + init_system_prompt=init_prompt, + judge_func=judge_function, + train_dataset=DatasetConfig(path="train.parquet"), + eval_dataset=DatasetConfig(path="test.parquet"), + config=PromptTuneConfig( + lm_model_name="dashscope/qwen-plus", + optimization_level="light", + ), + ) + + print(f"Optimized prompt: {optimized_prompt}") + print(f"Metrics: {metrics}") +``` + +Here, we use: +- `DatasetConfig` to specify the training and evaluation datasets. +- `PromptTuneConfig` to configure the optimization process. + +#### PromptTuneConfig Options + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `lm_model_name` | `"dashscope/qwen-plus"` | The model name for the prompt proposer (teacher model). | +| `optimization_level` | `"light"` | Optimization intensity: `"light"`, `"medium"`, or `"heavy"`. | +| `eval_display_progress` | `True` | Whether to display progress during evaluation. | +| `eval_display_table` | `5` | Number of table rows to display during evaluation. | +| `eval_num_threads` | `16` | Number of threads for parallel evaluation. | +| `compare_performance` | `True` | Whether to compare baseline vs optimized performance. | + +--- + +### Complete example + +```python +import os +from typing import Dict + +from agentscope.tuner import DatasetConfig, WorkflowOutput, JudgeOutput +from agentscope.tuner.prompt_tune import tune_prompt, PromptTuneConfig +from agentscope.agent import ReActAgent +from agentscope.model import ChatModelBase, DashScopeChatModel +from agentscope.formatter import OpenAIChatFormatter +from agentscope.message import Msg + + +# Initialize the model for the workflow +model = DashScopeChatModel( + "qwen-turbo", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), +) + + +async def workflow( + task: Dict, + system_prompt: str, +) -> WorkflowOutput: + agent = ReActAgent( + name="react_agent", + sys_prompt=system_prompt, + model=model, + formatter=OpenAIChatFormatter(), + ) + + response = await agent.reply( + msg=Msg("user", task["question"], role="user"), + ) + + return WorkflowOutput( + response=response, + ) + + +async def judge_function( + task: Dict, response: Any +) -> JudgeOutput: + """Simple reward: 1.0 for exact match, else 0.0.""" + ground_truth = task["answer"] + reward = 1.0 if ground_truth in response.get_text_content() else 0.0 + return JudgeOutput(reward=reward) + + +if __name__ == "__main__": + init_prompt = ( + "You are an agent." + "Please solve the math problem given to you." + "You should provide your output within \\boxed{{}}." + ) + + optimized_prompt, metrics = tune_prompt( + workflow=workflow, + init_system_prompt=init_prompt, + judge_func=judge_function, + train_dataset=DatasetConfig(path="train.parquet"), + eval_dataset=DatasetConfig(path="test.parquet"), + ) + + print(f"Optimized prompt: {optimized_prompt}") + print(f"Metrics: {metrics}") +``` + +> Note: +> Above code is a simplified example for illustration purposes only. +> For a complete implementation, please refer to [example.py](./example.py), which tunes a ReAct agent to solve math problems on the GSM8K subset dataset. + +--- + +## How to run + +After implementing the workflow and judge function, follow these steps to run prompt tuning: + +1. Prerequisites + + - Set up your API key as an environment variable: + + ```bash + export DASHSCOPE_API_KEY="your_api_key_here" + ``` + + - Prepare your dataset in a supported format (JSONL, Parquet, CSV, etc.). + +2. Run the tuning script + + ```bash + python example.py + ``` + +3. The optimized prompt will be printed to the console and can be used directly in your agent. + +## Output + +``` +Initial prompt: You are an agent. Please solve the math problem given to you with python code. You should provife your output within \boxed{}. + +Optimized prompt: You are a meticulous math tutor who solves elementary-to-middle-school-level word problems step by step. For each problem, first reason through the narrative to identify the key quantities and relationships. Then, write clear, executable Python code that computes the answer using only integer arithmetic. Finally, present your solution in the format \boxed{answer}, ensuring the answer is an integer and matches the logic of your explanation. Always double-check your reasoning and code before finalizing the boxed result. +``` + +--- + +## Comparison with RL-based Tuning + +| Aspect | Prompt Tuning | RL-based Tuning | +|--------|---------------|-----------------| +| **What is optimized** | System prompt text | Model weights | +| **Computational cost** | Low (API calls only) | High (GPU training) | +| **Hardware requirements** | No GPU required | Multiple GPUs required | +| **Use case** | Quick iteration, limited resources | Maximum performance | + +> [!TIP] +> Prompt tuning is ideal for rapid prototyping and scenarios where you want to improve agent performance without the overhead of model training. diff --git a/examples/tuner/prompt_tuning/example.py b/examples/tuner/prompt_tuning/example.py new file mode 100644 index 0000000000..f8d517ec60 --- /dev/null +++ b/examples/tuner/prompt_tuning/example.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +"""Example of tuning a ReAct agent on GSM8K with Prompt Tuning.""" + +import os +from agentscope.agent import ReActAgent +from agentscope.formatter import OpenAIChatFormatter +from agentscope.message import Msg +from agentscope.model import DashScopeChatModel +from agentscope.tuner import DatasetConfig +from agentscope.tuner import JudgeOutput +from agentscope.tuner import WorkflowOutput +from agentscope.tuner import PromptTuneConfig +from agentscope.tuner import tune_prompt + + +# Initialize the model for the workflow +model = DashScopeChatModel( + "qwen-flash", + api_key=os.environ.get("DASHSCOPE_API_KEY", ""), + max_tokens=512, +) + + +async def workflow( + task: dict, + system_prompt: str, +) -> WorkflowOutput: + """A workflow function using the ReAct agent to solve tasks. + + Args: + task (dict): The task to be solved. + system_prompt (str): The system prompt to use for the agent. + + Returns: + WorkflowOutput: The workflow output containing the agent's response. + """ + from agentscope.tool import ( + Toolkit, + execute_python_code, + ) + + toolkit = Toolkit() + toolkit.register_tool_function(execute_python_code) + agent = ReActAgent( + name="react_agent", + sys_prompt=system_prompt, + model=model, + formatter=OpenAIChatFormatter(), + toolkit=toolkit, + print_hint_msg=False, + ) + agent.set_console_output_enabled(False) + + response = await agent.reply( + msg=Msg("user", task["question"], role="user"), + ) + return WorkflowOutput( + response=response, + ) + + +async def gsm8k_judge( + task: dict, + response: Msg, +) -> JudgeOutput: + """A simple judge function to calculate reward based on agent's response. + + Args: + task (Dict): The task information for the corresponding workflow. + response (Msg): The response generated by the corresponding workflow. + + Returns: + JudgeOutput: The reward value assigned by the judge function. + """ + from trinity.common.rewards.math_reward import MathBoxedRewardFn + + reward_fn = MathBoxedRewardFn() + # parse truth from gsm8k raw text + truth = task["answer"] + if isinstance(truth, str) and "####" in truth: + truth = truth.split("####")[1].strip() + else: + truth = str(truth) + # parse answer from response message + result = response.get_text_content() + reward_dict = reward_fn( + response=result, + truth=truth, + ) + return JudgeOutput( + reward=sum(reward_dict.values()), + metrics=reward_dict, + ) + + +if __name__ == "__main__": + init_prompt = ( + "You are an agent." + "Please solve the math problem given to you with python code." + "You should provife your output within \\boxed{{}}." + ) + + optimized_prompt = tune_prompt( + workflow=workflow, + init_system_prompt=init_prompt, + judge_func=gsm8k_judge, + train_dataset=DatasetConfig( + path="train.parquet", + name="", + split="", + ), + eval_dataset=DatasetConfig( + path="test.parquet", + name="", + split="", + ), + config=PromptTuneConfig( + lm_model_name="dashscope/qwen3-max", + optimization_level="medium", + ), + ) + + print(f"Optimized prompt: {optimized_prompt}") diff --git a/pyproject.toml b/pyproject.toml index da02691554..e3615dd695 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,6 +139,20 @@ rag = [ # ------------ Evaluation ------------ evaluate = ["ray"] +# ------------ Tuner ------------ +tuner = [ + "dspy>=3.1.0", + "datasets>=4.0.0", + "litellm[proxy]>=1.75.3", +] + +tuner-gpu = [ + "trinity-rft>=0.5.0", + "dspy>=3.1.0", + "datasets>=4.0.0", + "litellm[proxy]>=1.75.3" +] + # ------------ Full ------------ full = [ "agentscope[a2a]", @@ -169,6 +183,8 @@ dev = [ "greenlet", # For openjudge "py-openjudge", + # For tuner + "dspy", ] [project.urls] diff --git a/src/agentscope/tuner/__init__.py b/src/agentscope/tuner/__init__.py index 5836d9ade0..c1d82a294d 100644 --- a/src/agentscope/tuner/__init__.py +++ b/src/agentscope/tuner/__init__.py @@ -8,6 +8,8 @@ from ._algorithm import AlgorithmConfig from ._model import TunerModelConfig, TinkerConfig from ._config import check_judge_function, check_workflow_function +from .prompt_tune import tune_prompt, PromptTuneConfig +from .model_selection._model_selection import select_model __all__ = [ @@ -22,4 +24,7 @@ "TinkerConfig", "check_workflow_function", "check_judge_function", + "tune_prompt", + "PromptTuneConfig", + "select_model", ] diff --git a/src/agentscope/tuner/_judge.py b/src/agentscope/tuner/_judge.py index 5dffe402ae..a901652663 100644 --- a/src/agentscope/tuner/_judge.py +++ b/src/agentscope/tuner/_judge.py @@ -1,9 +1,7 @@ # -*- coding: utf-8 -*- """The judge module for tuner.""" -from typing import Any, Callable, Dict, Awaitable -from logging import Logger +from typing import Callable, Dict, Awaitable from pydantic import BaseModel, Field -from ..model import ChatModelBase class JudgeOutput(BaseModel): @@ -19,10 +17,7 @@ class JudgeOutput(BaseModel): ) -JudgeType = Callable[ - [Dict, Any, Dict[str, ChatModelBase] | None, Logger | None], - Awaitable[JudgeOutput], -] +JudgeType = Callable[..., Awaitable[JudgeOutput]] # A judge function type for tuning. # Args: diff --git a/src/agentscope/tuner/_workflow.py b/src/agentscope/tuner/_workflow.py index 506da81795..80940ba64d 100644 --- a/src/agentscope/tuner/_workflow.py +++ b/src/agentscope/tuner/_workflow.py @@ -1,9 +1,7 @@ # -*- coding: utf-8 -*- """The workflow module for tuner.""" -from logging import Logger from typing import Any, Callable, Dict, Awaitable from pydantic import BaseModel, Field -from ..model import ChatModelBase class WorkflowOutput(BaseModel): @@ -30,10 +28,7 @@ class WorkflowOutput(BaseModel): ) -WorkflowType = Callable[ - [Dict, ChatModelBase, Dict[str, ChatModelBase] | None, Logger | None], - Awaitable[WorkflowOutput], -] +WorkflowType = Callable[..., Awaitable[WorkflowOutput]] # An agent workflow function type for tuning. # Args: @@ -42,6 +37,9 @@ class WorkflowOutput(BaseModel): # model (`ChatModelBase`): # The primary chat model used in the workflow, this is the main model # being tuned. +# system_prompt (`str`): +# The system prompt used in the primary agent, will be tuned in +# prompt tuning. # auxiliary_models (`Dict[str, ChatModelBase] | None`, optional): # A dictionary of additional chat models available for LLM-as-a-Judge # usage. The keys are model names, and the values are the corresponding diff --git a/src/agentscope/tuner/model_selection/__init__.py b/src/agentscope/tuner/model_selection/__init__.py new file mode 100644 index 0000000000..62af455b8a --- /dev/null +++ b/src/agentscope/tuner/model_selection/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +"""Model selection module for selecting the best performing model from +candidates based on evaluation metrics.""" + +from ._model_selection import select_model +from ._built_in_judges import avg_time_judge, avg_token_consumption_judge + +__all__ = ["select_model", "avg_time_judge", "avg_token_consumption_judge"] diff --git a/src/agentscope/tuner/model_selection/_built_in_judges.py b/src/agentscope/tuner/model_selection/_built_in_judges.py new file mode 100644 index 0000000000..9bf6a9d277 --- /dev/null +++ b/src/agentscope/tuner/model_selection/_built_in_judges.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +"""Built-in judge functions for model selection.""" + +from typing import Dict, Any +from .._judge import JudgeOutput + + +async def avg_time_judge( + _task: Dict[str, Any], + response: Any, +) -> JudgeOutput: + """ + Built-in judge function to calculate average time consumption of a model. + This function returns a negative reward (making + it a bigger-is-better metric), and includes the original metric + in the metrics field. + + Args: + task (`Dict[str, Any]`): + The task information (unused in this judge). + response (`Any`): + A composite dict containing the workflow response + and workflow metrics including execution_time and usage. + + Returns: + `JudgeOutput`: + The negative time taken (making smaller time a bigger reward), + and metrics containing the original time value. + """ + # Extract execution time from the composite response dict + time_taken = 0.0 + + if not isinstance(response, dict): + raise ValueError( + "Response must be a dict with 'response' and 'metrics' keys", + ) + + metrics = response.get("metrics") + if metrics is None: + raise ValueError("Missing 'metrics' field in response") + if "execution_time" not in metrics: + raise ValueError("Missing 'execution_time' field in metrics") + time_taken = metrics["execution_time"] + + # Smaller time = higher reward + reward = -time_taken + + return JudgeOutput( + reward=reward, + metrics={"avg_time_seconds": time_taken}, + ) + + +async def avg_token_consumption_judge( + _task: Dict[str, Any], + response: Any, +) -> JudgeOutput: + """ + Built-in judge function to calculate average token consumption of a model. + This function returns a negative reward (making + it a bigger-is-better metric), and includes the original metric + in the metrics field. + + Args: + task (`Dict[str, Any]`): + The task information (unused in this judge). + response (`Any`): + A composite dict containing the workflow response + and workflow metrics including execution_time and usage. + Must include a 'metrics.usage' field. + + Returns: + `JudgeOutput`: + The negative token consumption, + and metrics containing the original token consumption value. + """ + original_reward = 0.0 + + if not isinstance(response, dict): + raise ValueError( + "Response must be a dict with 'response' and 'metrics' keys", + ) + + metrics = response.get("metrics") + if metrics is None or "usage" not in metrics: + raise ValueError("Missing 'usage' field in response metrics") + + usage = metrics["usage"] + if isinstance(usage, dict): + if "total_tokens" in usage and usage["total_tokens"] is not None: + original_reward = float(usage["total_tokens"]) + elif "output_tokens" in usage and usage["output_tokens"] is not None: + original_reward = float(usage["output_tokens"]) + else: + raise ValueError( + "Neither 'total_tokens' nor 'output_tokens' found", + ) + else: + raise ValueError( + "Usage field in response.metrics is not a dictionary", + ) + + # smaller token usage = higher reward + reward = -original_reward + + return JudgeOutput( + reward=reward, + metrics={ + "token_consumed": original_reward, + }, + ) diff --git a/src/agentscope/tuner/model_selection/_model_selection.py b/src/agentscope/tuner/model_selection/_model_selection.py new file mode 100644 index 0000000000..47dccdf84e --- /dev/null +++ b/src/agentscope/tuner/model_selection/_model_selection.py @@ -0,0 +1,403 @@ +# -*- coding: utf-8 -*- +"""Model selection module for selecting the best performing model from +candidates based on evaluation metrics.""" +import asyncio +import logging +from typing import List, Dict, Tuple, Optional, Callable +from typing import Sequence, Union, Any +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from ...model import ChatModelBase +from .._workflow import WorkflowType, WorkflowOutput +from .._config import _check_function_signature +from .._judge import JudgeType, JudgeOutput +from .._dataset import DatasetConfig +from ...evaluate._evaluator._in_memory_exporter import _InMemoryExporter + + +logger = logging.getLogger(__name__) + + +def check_workflow_function( + func: Callable, +) -> None: + """Check if the given function is a valid JudgeType. + + Args: + func (Callable): The function to check. + """ + essential_params = ["task", "model"] + _check_function_signature( + func, + essential_params, + ) + + +def check_judge_function( + func: Callable, +) -> None: + """Check if the given function is a valid JudgeType. + + Args: + func (Callable): The function to check. + """ + try: + _check_function_signature(func, ["task", "response"]) + except Exception: + _check_function_signature(func, ["_task", "response"]) + + +async def _load_dataset( + train_dataset: DatasetConfig, +) -> Any: + """Load and optionally limit dataset.""" + try: + from datasets import load_dataset + except ImportError as e: + raise ImportError( + "Please install with `pip install datasets`", + ) from e + + dataset = load_dataset( + path=train_dataset.path, + name=train_dataset.name, + split=train_dataset.split, + ) + + if train_dataset.total_steps is not None: + dataset = dataset.select( + range(min(train_dataset.total_steps, len(dataset))), + ) + return dataset + + +async def select_model( + *, + workflow_func: WorkflowType, + judge_func: JudgeType, + train_dataset: DatasetConfig, + candidate_models: Sequence[ChatModelBase], + max_threads: int = 2, +) -> Tuple[ChatModelBase, Dict[str, float]]: + """ + Select the best performing model from candidate models based on evaluation + metrics on a dataset. + + Args: + workflow_func (`WorkflowType`): + The workflow function that executes the task with a given model. + The workflow may contain multiple nodes that use different models. + Models to be selected should be defined with + "model" as the main parameter in the workflow_func. + judge_func (`JudgeType`): + The judge function that evaluates the output of the workflow. This + function is user-defined and needs to parse the corresponding + WorkflowOutput. The function should return reward values where + higher values indicate better performance by default. + train_dataset (`DatasetConfig`): + Configuration of the dataset used for model evaluation. + candidate_models (`Sequence[ChatModelBase]`): + A sequence of candidate models to evaluate. + max_threads (`int`, optional): + Maximum number of concurrent evaluations. Defaults to 2. + + Returns: + `Tuple[ChatModelBase, dict[str, float]]`: A tuple containing: + - The model that achieved the best performance across the dataset + (with the highest average reward) + - Dictionary of aggregated metrics collected during evaluation + """ + check_workflow_function(workflow_func) + check_judge_function(judge_func) + if len(candidate_models) < 2: + raise ValueError("At least two candidate models must be provided.") + + logger.info( + "Evaluating %d candidate models: %s", + len(candidate_models), + [model.model_name for model in candidate_models], + ) + + # Setup OpenTelemetry tracing with the in-memory exporter once globally + exporter = _InMemoryExporter() + span_processor = SimpleSpanProcessor(exporter) + + # Create and configure tracer provider for the entire evaluation + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(span_processor) + + # Set our custom tracer provider for this evaluation + trace.set_tracer_provider(tracer_provider) + + best_avg_reward = float("-inf") # Look for largest reward + + dataset = await _load_dataset(train_dataset) + + best_model = candidate_models[0] if candidate_models else None + model_scores = {} # Track scores for each model to provide visibility + model_detailed_metrics = {} # Track detailed metrics for each model + all_metrics = {} # Collect metrics from the best model evaluation + + for model in candidate_models: + logger.info("Evaluating model: %s", model.model_name) + + total_reward = 0.0 + num_samples = 0 + model_metrics: Dict[str, float] = {} + # Store accumulated metrics for this model + + # Process dataset samples with async function calls + semaphore = asyncio.Semaphore(max_threads) + + async def evaluate_with_semaphore( + idx: int, + sample: dict, + model: ChatModelBase = model, + exporter: _InMemoryExporter = exporter, + sem: asyncio.Semaphore = semaphore, + ) -> Optional[JudgeOutput]: + async with sem: + try: + # Process this sample using the new async function + judge_output = await _evaluate_single_sample( + sample=sample, + model=model, + workflow_func=workflow_func, + judge_func=judge_func, + exporter=exporter, + ) + return judge_output + + except Exception as e: + logger.warning( + "Skipping sample %d for model %s due to error: %s", + idx, + model.model_name, + str(e), + ) + return None + + # Create tasks for all samples + tasks = [ + evaluate_with_semaphore(idx, sample) + for idx, sample in enumerate(dataset) + if ( + train_dataset.total_steps is None + or idx < train_dataset.total_steps + ) + ] + + # Execute all tasks concurrently + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + ( + total_reward, + num_samples, + averaged_model_metrics, + ) = _process_evaluation_results( + results, + model_metrics, + total_reward, + num_samples, + ) + + avg_reward = total_reward / num_samples if num_samples > 0 else 0.0 + model_scores[model.model_name] = avg_reward + + # Save detailed metrics for this model + model_detailed_metrics[model.model_name] = averaged_model_metrics + + logger.info( + "Model '%s' completed evaluation with average performance: %.4f", + model.model_name, + avg_reward, + ) + + # Update best model if current model performs better + if avg_reward > best_avg_reward: + best_avg_reward = avg_reward + best_model = model + all_metrics = averaged_model_metrics # Store metrics of best model + + # Report final scores and detailed metrics for all models + logger.info("Model evaluation results:") + for model_name, avg_score in model_scores.items(): + logger.info(" %s: %.4f", model_name, avg_score) + + logger.info("Detailed metrics for all models:") + for model_name, metrics in model_detailed_metrics.items(): + logger.info("Metrics for %s:", model_name) + for metric_name, metric_value in metrics.items(): + logger.info(" %s: %s", metric_name, metric_value) + + # Show the selected best model + if best_model is not None: + logger.info("Selected best model: %s", best_model.model_name) + return best_model, all_metrics + else: + raise RuntimeError("No best model selected. This should not happen.") + + +def _process_evaluation_results( + results: List[Union[JudgeOutput, BaseException, None]], + model_metrics: Dict[str, float], + total_reward_init: float, + num_samples_init: int, +) -> Tuple[float, int, Dict[str, float]]: + """ + Process evaluation results to calculate total reward and aggregate metrics. + + Args: + results: The list of evaluation results + model_metrics: Dictionary to accumulate metrics + total_reward_init: Initial value for total reward + num_samples_init: Initial value for number of samples + + Returns: + Tuple of (total_reward, num_samples, averaged_model_metrics) + """ + import numbers + + total_reward = total_reward_init + num_samples = num_samples_init + + for result in results: + if result is None or isinstance(result, Exception): + if isinstance(result, Exception): + logger.warning( + "Sample evaluation failed: %s", + str(result), + ) + continue # Skip failed evaluations and don't count in num_samples + + # Only count results that are JudgeOutput toward num_samples + # Ensure it's actually a JudgeOutput before accessing attributes + if not isinstance(result, JudgeOutput): + continue # Skip non-JudgeOutput results + + # Only count results that are not None toward num_samples + total_reward += result.reward + num_samples += 1 + + # Aggregate metrics from this sample + if result.metrics: + for key, value in result.metrics.items(): + if key in model_metrics: + model_metrics[key] += value + else: + model_metrics[key] = value + + # Average the metrics per sample for this model + averaged_model_metrics = {} + for key, value in model_metrics.items(): + if isinstance(value, numbers.Number): + num_val = value.real if hasattr(value, "real") else 0.0 + averaged_model_metrics[f"{key}_avg"] = ( + float(num_val) / num_samples if num_samples > 0 else 0.0 + ) + + return total_reward, num_samples, averaged_model_metrics + + +async def _evaluate_single_sample( + sample: dict, + model: ChatModelBase, + workflow_func: WorkflowType, + judge_func: JudgeType, + exporter: _InMemoryExporter, +) -> JudgeOutput: + """ + Evaluate a single sample with the given model and workflow/judge functions. + + Args: + sample (dict): The sample to evaluate + model (ChatModelBase): The model to use for evaluation + workflow_func (WorkflowType): The workflow function to execute + judge_func (JudgeType): The judge function to evaluate the result + exporter (_InMemoryExporter): The exporter to collect traces + + Returns: + JudgeOutput: The output from the judge function + """ + # Create a unique task ID for this sample evaluation + import uuid + from opentelemetry import baggage + from opentelemetry.context import attach, detach + + task_id = f"eval_task_{uuid.uuid4().hex[:8]}" + repeat_id = "0" + + # Execute workflow with current model and measure execution time + start_time = asyncio.get_event_loop().time() + + # Setup the tracer with baggage for the exporter to track this task + tracer = trace.get_tracer(__name__) + + # Set baggage (this is critical for exporter to associate spans with tasks) + ctx = baggage.set_baggage("task_id", task_id) + ctx = baggage.set_baggage("repeat_id", repeat_id, context=ctx) + + # Activate the context + token = attach(ctx) + + try: + with tracer.start_as_current_span( + name=f"Solution_{task_id}_{repeat_id}", + ): + # Access _config for trace enablement + from ... import _config + + _config.trace_enabled = True + + # Execute workflow with current model + workflow_output: WorkflowOutput = await workflow_func( + task=sample, + model=model, + ) + finally: + detach(token) + + end_time = asyncio.get_event_loop().time() + + # Add timing information to metrics + execution_time = end_time - start_time + if workflow_output.metrics is None: + workflow_output.metrics = {} + workflow_output.metrics["execution_time"] = execution_time + + # Extract token usage information from the exporter + total_input_tokens = 0 + total_output_tokens = 0 + total_tokens = 0 + + # Get the chat usage data from the exporter + if task_id in exporter.cnt and repeat_id in exporter.cnt[task_id]: + chat_usage_data = exporter.cnt[task_id][repeat_id].get( + "chat_usage", + {}, + ) + # Sum up token usage across all models used in this task + for _, usage in chat_usage_data.items(): # Fixed: unused variable + total_input_tokens += int(usage.get("input_tokens", 0)) + total_output_tokens += int(usage.get("output_tokens", 0)) + + total_tokens = total_input_tokens + total_output_tokens + + # Add usage information to workflow_output metrics + workflow_output.metrics["input_tokens"] = float(total_input_tokens) + workflow_output.metrics["output_tokens"] = float(total_output_tokens) + workflow_output.metrics["total_tokens"] = float(total_tokens) + + # Evaluate the workflow output using judge function + # Pass a composite dict containing both the response and workflow metrics + judge_output: JudgeOutput = await judge_func( + task=sample, + response={ + "response": workflow_output.response, + "metrics": workflow_output.metrics, + }, + ) + + return judge_output diff --git a/src/agentscope/tuner/prompt_tune/__init__.py b/src/agentscope/tuner/prompt_tune/__init__.py new file mode 100644 index 0000000000..8010c123eb --- /dev/null +++ b/src/agentscope/tuner/prompt_tune/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +"""Prompt tuning module for AgentScope. + +This module provides functionality for automatic prompt optimization. +""" + +from ._config import PromptTuneConfig +from ._tune_prompt import tune_prompt + +__all__ = [ + "PromptTuneConfig", + "tune_prompt", +] diff --git a/src/agentscope/tuner/prompt_tune/_config.py b/src/agentscope/tuner/prompt_tune/_config.py new file mode 100644 index 0000000000..69211b0ef2 --- /dev/null +++ b/src/agentscope/tuner/prompt_tune/_config.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +"""Configuration module for prompt tuning.""" + +from typing import Optional, Literal +from pydantic import BaseModel, Field + + +class PromptTuneConfig(BaseModel): + """Configuration for prompt tuning. + + Attributes: + lm_model_name: The model name for the prompt proposer. + optimization_level: Optimization level, can be 'light', 'medium', + or 'heavy'. + eval_display_progress: Whether to display progress during evaluation. + eval_display_table: Number of table rows to display during evaluation. + eval_num_threads: Number of threads for evaluation. + compare_performance: Whether to compare performance before and after + tuning. + """ + + lm_model_name: str = Field( + default="dashscope/qwen-plus", + description="The model name for prompt proposer.", + ) + + optimization_level: Optional[Literal["light", "medium", "heavy"]] = Field( + default="light", + description="Optimization level, can be light, medium, or heavy.", + ) + + eval_display_progress: bool = Field( + default=True, + description="Whether to display progress during evaluation", + ) + eval_display_table: int = Field( + default=5, + description="Number of table rows to display during evaluation", + ) + eval_num_threads: int = Field( + default=16, + description="Number of threads for evaluation", + ) + + compare_performance: bool = Field( + default=True, + description="Whether to compare performance before and after tuning", + ) diff --git a/src/agentscope/tuner/prompt_tune/_tune_prompt.py b/src/agentscope/tuner/prompt_tune/_tune_prompt.py new file mode 100644 index 0000000000..55295cec70 --- /dev/null +++ b/src/agentscope/tuner/prompt_tune/_tune_prompt.py @@ -0,0 +1,253 @@ +# -*- coding: utf-8 -*- +"""Prompt tuning functionality using DSPy's MIPROv2 optimizer.""" + +import os +import asyncio +from pathlib import Path +from typing import Any, Callable, Optional, cast + +from agentscope.tuner import ( + DatasetConfig, +) +from agentscope import logger +from agentscope.tuner._config import _check_function_signature +from agentscope.tuner._workflow import WorkflowType +from agentscope.tuner._judge import JudgeType +from agentscope.tuner.prompt_tune._config import PromptTuneConfig +from agentscope.tuner.prompt_tune._wrapper import _WorkflowWrapperModule + + +def _wrap_judge_fn(judge_fn: JudgeType) -> Callable[..., float]: + """Wrap an async judge function into a synchronous callable. + + Args: + judge_fn: The async judge function to wrap. + + Returns: + A synchronous wrapper function that returns only the reward value. + """ + + async def inner( + task: dict, + response: Any, + ) -> float: + # set logger to None + output = await judge_fn(task=task, response=response) + return output.reward + + def _sync_wrapper( + task: dict, + response: Any, + ) -> float: + return asyncio.run(inner(task=task, response=response)) + + return _sync_wrapper + + +def _guess_by_ext(p: str) -> Optional[str]: + """Guess the dataset format by file extension. + + Args: + p (`str`): The file path. + + Returns: + `Optional[str]`: The format string (e.g., 'json', 'csv') or None if + the extension is not recognized. + """ + pp = Path(p) + ext = pp.suffix.lower() + if ext in {".jsonl", ".jl"}: + return "json" + if ext == ".json": + return "json" + if ext in {".csv", ".tsv"}: + return "csv" + if ext in {".parquet"}: + return "parquet" + if ext in {".txt"}: + return "text" + return None + + +def check_workflow_function( + func: Callable, +) -> None: + """Check if the given function is a valid JudgeType. + + Args: + func (Callable): The function to check. + """ + essential_params = ["task", "system_prompt"] + _check_function_signature( + func, + essential_params, + ) + + +def check_judge_function( + func: Callable, +) -> None: + """Check if the given function is a valid JudgeType. + + Args: + func (Callable): The function to check. + """ + essential_params = ["task", "response"] + _check_function_signature( + func, + essential_params, + ) + + +def tune_prompt( + *, + workflow: WorkflowType, + init_system_prompt: str, + judge_func: JudgeType, + train_dataset: DatasetConfig, + eval_dataset: DatasetConfig | None = None, + config: PromptTuneConfig | None = None, +) -> tuple[str, dict[str, float]]: + """Tune a system prompt using DSPy's MIPROv2 optimizer. + + This function optimizes the system prompt by leveraging DSPy's + automatic prompt optimization capabilities. + + Args: + workflow: An async workflow function that takes a task dict and system + prompt string, returns a WorkflowOutput. + init_system_prompt: The initial system prompt to be optimized. + judge_func: An async function that evaluates the agent's response and + returns a JudgeOutput. + train_dataset: The dataset used for training/optimization. + eval_dataset: Optional dataset for evaluation after optimization. + config: Configuration for prompt tuning. Defaults to + PromptTuneConfig(). + + Returns: + A tuple containing: + - The optimized system prompt string. + - A dict of metrics. May include "valset_improvement" (percentage) + if eval_dataset is provided and config.compare_performance is + True. + """ + import dspy + from datasets import load_dataset + + config = config or PromptTuneConfig() + check_workflow_function(workflow) + check_judge_function(judge_func) + + if os.path.exists(train_dataset.path) and _guess_by_ext( + train_dataset.path, + ): + logger.info("loading dataset from file: %s", train_dataset.path) + trainset = load_dataset( + cast(str, _guess_by_ext(train_dataset.path)), + data_files=train_dataset.path, + )["train"] + else: + logger.info("loading training dataset from remote...") + trainset = load_dataset( + path=train_dataset.path, + name=train_dataset.name, + split=train_dataset.split, + ) + + dspy_trainset = [dspy.Example(inp=x).with_inputs("inp") for x in trainset] + + module = _WorkflowWrapperModule(workflow, init_system_prompt) + + # teacher lm + lm = dspy.LM(config.lm_model_name) + + optimizer = dspy.MIPROv2( + metric=( + lambda data, output, trace=None: _wrap_judge_fn(judge_func)( + data.inp, + output, + ) + ), + auto=config.optimization_level, + teacher_settings={ + "lm": lm, + }, + prompt_model=lm, + task_model=lm, + ) + + # optimize + logger.info("optimizing workflow...") + result = optimizer.compile(module, trainset=dspy_trainset) + logger.info("workflow optimized") + + # evaluate if eval_dataset is provided + valset_improvement: float | None = None + if eval_dataset is not None: + if os.path.exists(eval_dataset.path) and _guess_by_ext( + eval_dataset.path, + ): + logger.info( + "loading evaluation dataset from file: %s", + eval_dataset.path, + ) + evalset = load_dataset( + cast(str, _guess_by_ext(eval_dataset.path)), + data_files=eval_dataset.path, + )["train"] + else: + logger.info("loading evaluation dataset from remote...") + evalset = load_dataset( + path=eval_dataset.path, + name=eval_dataset.name, + split=eval_dataset.split, + ) + logger.info("evaluation dataset loaded") + + dspy_evalset = [ + dspy.Example(inp=x).with_inputs("inp") for x in evalset + ] + + evaluate = dspy.Evaluate( + devset=dspy_evalset, + metric=lambda data, output, trace=None: _wrap_judge_fn( + judge_func, + )(data.inp, output), + display_progress=config.eval_display_progress, + display_table=config.eval_display_table, + num_threads=config.eval_num_threads, + ) + + baseline_score = None + if config.compare_performance: + logger.info("evaluating baseline performance...") + baseline_res = evaluate(module) + baseline_score = baseline_res.score + logger.info("baseline score: %s", baseline_score) + + logger.info("evaluating optimized results...") + eval_res = evaluate(result) + score = eval_res.score + logger.info("optimized score: %s", score) + + if baseline_score is not None: + valset_improvement = ( + (score - baseline_score) / baseline_score * 100 + if baseline_score != 0 + else 0.0 + ) + logger.info("improvement: %.2f%%", valset_improvement) + + optimized_prompt = result.predictor.get_current_prompt() + assert isinstance( + optimized_prompt, + str, + ), f"Optimized prompt must be a string but {type(optimized_prompt)}." + logger.info("---------- Optimized Prompt ----------") + logger.info(optimized_prompt) + + metrics: dict[str, float] = {} + if valset_improvement is not None: + metrics["valset_improvement"] = valset_improvement + + return optimized_prompt, metrics diff --git a/src/agentscope/tuner/prompt_tune/_wrapper.py b/src/agentscope/tuner/prompt_tune/_wrapper.py new file mode 100644 index 0000000000..88d21d6e87 --- /dev/null +++ b/src/agentscope/tuner/prompt_tune/_wrapper.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +"""Wrapper modules for integrating AgentScope agents.""" + +import asyncio +from typing import Any +from dspy import Module, Prediction +import dspy +from dspy.predict.predict import Predict + +from agentscope import logger +from agentscope.tuner._workflow import WorkflowOutput, WorkflowType + + +class _OptimizablePrompt(Predict): + """A DSPy Predict wrapper that makes a system prompt optimizable. + + This class bridges AgentScope's ReActAgent with DSPy's optimization + framework by exposing the system prompt as a DSPy signature. + + Attributes: + _sys_prompt: The current system prompt being optimized. + """ + + def __init__(self, init_prompt: str): + """Initialize the OptimizableAgent. + + Args: + init_prompt: The initial system prompt to optimize. + """ + super().__init__("input -> output") + self.signature = dspy.make_signature("input -> output") + self.instructions = self.signature.instructions + self.demos = [] + + self._sys_prompt = init_prompt + self.instructions = self._sys_prompt + self.signature.instructions = self.instructions + + def forward(self, **kwargs: Any) -> Prediction: + """Forward pass is not implemented. + + Raises: + NotImplementedError: Always raised as this is a wrapper class. + """ + raise NotImplementedError( + "OptimizableAgent is a wrapper, not callable", + ) + + def sync_instruction(self) -> None: + """Sync instruction from DSPy signature to internal state.""" + self.instructions = self.signature.instructions + self._sys_prompt = self.instructions + + def get_current_prompt(self) -> str: + """Get the current optimized system prompt.""" + return self._sys_prompt + + +class _WorkflowWrapperModule(Module): + """A DSPy Module that wraps an AgentScope workflow for optimization. + + This module enables DSPy to optimize the system prompt by wrapping + the workflow execution in a DSPy-compatible interface. + + Attributes: + _workflow: The workflow function that takes task and system prompt. + predictor: The OptimizableAgent wrapping the system prompt. + """ + + def __init__( + self, + workflow: WorkflowType, + init_prompt: str, + ): + """Initialize the _WorkflowWrapperModule. + + Args: + workflow: A workflow function that takes a task dict and system + prompt string, returns an async WorkflowOutput. + init_prompt: The initial system prompt to be optimized. + """ + super().__init__() + self._workflow = workflow + self._init_prompt = init_prompt + + self.predictor = _OptimizablePrompt(self._init_prompt) + + def forward(self, inp: Any) -> Any: + """Execute the workflow with the given input. + + Args: + inp: The input data from DSPy. + + Returns: + The response message from the workflow execution. + """ + self.predictor.sync_instruction() + current_prompt = self.predictor.get_current_prompt() + + async def _run_workflow() -> WorkflowOutput: + return await self._workflow(task=inp, system_prompt=current_prompt) + + result = asyncio.run(_run_workflow()) + + if result.reward: + logger.warning( + ( + "reward in workflow output will be ignored," + "use separate judge function" + ), + ) + + return result.response