Skip to content

Commit f5ccb77

Browse files
committedFeb 17, 2025
feat: Implement LLM Architecture Patterns - Building blocks (Augmented LLM), Workflows (Prompt Chaining, Routing, Parallelization, Orchestrator-Workers, Evaluator-Optimizer), and Agents (Autonomous, Domain-Specific) with examples and docs
1 parent d9b2261 commit f5ccb77

File tree

36 files changed

+4259
-57
lines changed

36 files changed

+4259
-57
lines changed
 

‎.cursorlogs

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Cursor Development Logs
2+
3+
[2024-02-15 Initial Setup]
4+
PROBLEM: Need to track development progress - ACTION: Created .cursorlogs file - RESULT: Successfully created tracking file
5+
PROBLEM: Repository structure analysis - ACTION: Listed current files and directories - RESULT: Found basic structure with building-block/, workflows/, and agents/ directories
6+
PROBLEM: Project dependencies - ACTION: Created requirements.txt - RESULT: Added core dependencies for LLM implementations
7+
PROBLEM: Building-block analysis - ACTION: Inspected building-block/augmented-llm - RESULT: Found basic implementation with augmented.py and supporting files
8+
PROBLEM: Workflows analysis - ACTION: Inspected workflows directory - RESULT: Found empty directories for all workflow patterns
9+
PROBLEM: Agents analysis - ACTION: Inspected agents directory - RESULT: Found empty directory, needs complete implementation
10+
11+
[2024-02-15 Implementation Plan]
12+
13+
1. Complete building-block/augmented-llm implementation
14+
2. Implement workflow patterns:
15+
- prompt-chaining
16+
- routing
17+
- parallelization
18+
- orchestrator-workers
19+
- evaluator-optimizer
20+
3. Implement agent patterns:
21+
- autonomous-agent
22+
- domain-specific-agents
23+
24+
[2024-02-15 Building Block Implementation]
25+
PROBLEM: Augmented LLM tools needed - ACTION: Created basic_tools.py with WebSearch, Calculator, and Weather tools - RESULT: Successfully implemented tool classes
26+
PROBLEM: Example implementation needed - ACTION: Created basic_usage.py with complete example - RESULT: Successfully demonstrated tool and memory usage
27+
PROBLEM: Documentation needed - ACTION: Updated augmented-llm README.md - RESULT: Comprehensive documentation of implementation and usage
28+
29+
[2024-02-15 Workflow Implementation - Prompt Chaining]
30+
PROBLEM: Core prompt chaining implementation needed - ACTION: Created chain.py with ChainStep, PromptChain, and ChainBuilder classes - RESULT: Successfully implemented base functionality
31+
PROBLEM: Example implementation needed - ACTION: Created text_analysis.py demonstrating multi-step text analysis - RESULT: Successfully implemented three-step chain example
32+
PROBLEM: Documentation needed - ACTION: Created prompt-chaining README.md - RESULT: Comprehensive documentation of pattern and usage
33+
34+
[2024-02-15 Workflow Implementation - Routing]
35+
PROBLEM: Core routing implementation needed - ACTION: Created router.py with Route, Router, and RouterBuilder classes - RESULT: Successfully implemented base functionality
36+
PROBLEM: Example implementation needed - ACTION: Created support_routing.py demonstrating support ticket routing - RESULT: Successfully implemented support routing system
37+
PROBLEM: Documentation needed - ACTION: Created routing README.md - RESULT: Comprehensive documentation of pattern and usage
38+
39+
[2024-02-15 Workflow Implementation - Parallelization]
40+
PROBLEM: Core parallelization implementation needed - ACTION: Created parallel.py with ParallelTask, Parallelizer, and ParallelizerBuilder classes - RESULT: Successfully implemented base functionality
41+
PROBLEM: Example implementation needed - ACTION: Created content_moderation.py demonstrating parallel content moderation - RESULT: Successfully implemented moderation system with multiple checks
42+
PROBLEM: Documentation needed - ACTION: Created parallelization README.md - RESULT: Comprehensive documentation of pattern and usage
43+
44+
[2024-02-15 Workflow Implementation - Orchestrator-Workers]
45+
PROBLEM: Core orchestrator implementation needed - ACTION: Created orchestrator.py with Task, Worker, Orchestrator, and OrchestratorBuilder classes - RESULT: Successfully implemented base functionality
46+
PROBLEM: Example implementation needed - ACTION: Created document_processing.py demonstrating document analysis system - RESULT: Successfully implemented document processing with multiple workers
47+
PROBLEM: Documentation needed - ACTION: Created orchestrator-workers README.md - RESULT: Comprehensive documentation of pattern and usage
48+
49+
[2024-02-15 Workflow Implementation - Evaluator-Optimizer]
50+
PROBLEM: Core optimizer implementation needed - ACTION: Created optimizer.py with Candidate, Optimizer, and OptimizerBuilder classes - RESULT: Successfully implemented base functionality with multiple strategies
51+
PROBLEM: Example implementation needed - ACTION: Created code_optimization.py demonstrating code improvement system - RESULT: Successfully implemented code optimization with multiple strategies
52+
PROBLEM: Documentation needed - ACTION: Created evaluator-optimizer README.md - RESULT: Comprehensive documentation of pattern and usage
53+
54+
[2024-02-15 Agent Implementation - Autonomous Agent]
55+
PROBLEM: Core agent implementation needed - ACTION: Created agent.py with Action, Plan, Tool, Memory, and AutonomousAgent classes - RESULT: Successfully implemented base functionality with planning and execution capabilities
56+
PROBLEM: Example implementation needed - ACTION: Created research_assistant.py demonstrating autonomous research system - RESULT: Successfully implemented research assistant with multiple tools and adaptive planning
57+
PROBLEM: Documentation needed - ACTION: Created autonomous-agent README.md - RESULT: Comprehensive documentation of pattern and usage
58+
59+
[2024-02-15 Agent Implementation - Domain-Specific Agent]
60+
PROBLEM: Core domain-specific agent implementation needed - ACTION: Created domain-specific/agent.py with DomainConstraint, DomainKnowledge, DomainBehavior, and DomainSpecificAgent classes - RESULT: Successfully implemented base functionality with domain-specific capabilities
61+
PROBLEM: Example implementation needed - ACTION: Created medical_assistant.py demonstrating medical diagnosis system - RESULT: Successfully implemented medical assistant with domain knowledge, constraints, and behaviors
62+
PROBLEM: Documentation needed - ACTION: Created domain-specific README.md - RESULT: Comprehensive documentation of pattern and usage

‎agents/autonomous-agent/README.md

+220
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
# Autonomous Agent Pattern
2+
3+
This pattern demonstrates how to implement self-directed agents that can plan, execute, and adapt their behavior to achieve goals.
4+
5+
## Overview
6+
7+
The autonomous agent pattern is useful when:
8+
9+
- Tasks require complex planning and execution
10+
- Goals can be broken down into discrete actions
11+
- Adaptation to results is needed
12+
- Long-running tasks need monitoring
13+
14+
## Components
15+
16+
### Core Classes
17+
18+
1. **Action**
19+
20+
- Represents a discrete task
21+
- Tracks execution status
22+
- Stores results
23+
- Manages parameters
24+
25+
2. **Plan**
26+
27+
- Sequences of actions
28+
- Goal definition
29+
- Context information
30+
- Execution status
31+
32+
3. **Tool**
33+
34+
- Specific capabilities
35+
- Parameter definitions
36+
- Execution handlers
37+
- Error handling
38+
39+
4. **Memory**
40+
41+
- Action history
42+
- Plan history
43+
- Context storage
44+
- State management
45+
46+
5. **AutonomousAgent**
47+
- Plan creation
48+
- Action execution
49+
- Progress evaluation
50+
- Plan adjustment
51+
52+
## Example Implementation
53+
54+
The `examples/research_assistant.py` demonstrates autonomous research:
55+
56+
1. Research Tools:
57+
58+
- Web Search
59+
- Webpage Reading
60+
- Text Summarization
61+
- Fact Extraction
62+
- Note Taking
63+
64+
2. Features:
65+
- Goal decomposition
66+
- Adaptive planning
67+
- Progress tracking
68+
- Result aggregation
69+
70+
## Usage
71+
72+
1. Set up environment:
73+
74+
```bash
75+
pip install -r requirements.txt
76+
```
77+
78+
2. Configure API keys:
79+
80+
```bash
81+
export ANTHROPIC_API_KEY=your_key_here
82+
# or
83+
export OPENAI_API_KEY=your_key_here
84+
```
85+
86+
3. Run the example:
87+
88+
```bash
89+
python -m examples.research_assistant
90+
```
91+
92+
## Implementation Details
93+
94+
### Tool Definition
95+
96+
```python
97+
Tool(
98+
name="tool_name",
99+
description="tool description",
100+
parameters={"param": "type"},
101+
handler=async_handler_function
102+
)
103+
```
104+
105+
### Agent Building
106+
107+
```python
108+
agent = (
109+
AgentBuilder()
110+
.add_tool(...)
111+
.add_tool(...)
112+
.with_memory(memory)
113+
.build(llm_caller)
114+
)
115+
```
116+
117+
### Goal Execution
118+
119+
```python
120+
actions = await agent.execute(
121+
goal="goal description",
122+
context={}
123+
)
124+
```
125+
126+
## Agent Capabilities
127+
128+
1. **Planning**
129+
130+
- Goal analysis
131+
- Task decomposition
132+
- Action sequencing
133+
- Dependency management
134+
135+
2. **Execution**
136+
137+
- Tool selection
138+
- Parameter preparation
139+
- Result handling
140+
- Error recovery
141+
142+
3. **Monitoring**
143+
144+
- Progress tracking
145+
- Goal evaluation
146+
- Plan adjustment
147+
- Performance analysis
148+
149+
4. **Learning**
150+
- Action history
151+
- Success patterns
152+
- Failure analysis
153+
- Strategy adaptation
154+
155+
## Best Practices
156+
157+
1. **Goal Design**
158+
159+
- Clear objectives
160+
- Measurable outcomes
161+
- Reasonable scope
162+
- Success criteria
163+
164+
2. **Tool Implementation**
165+
166+
- Focused functionality
167+
- Clear interfaces
168+
- Robust error handling
169+
- Performance optimization
170+
171+
3. **Memory Management**
172+
173+
- Relevant history
174+
- Context preservation
175+
- State cleanup
176+
- Storage efficiency
177+
178+
4. **Error Handling**
179+
- Graceful degradation
180+
- Recovery strategies
181+
- Feedback loops
182+
- Logging and monitoring
183+
184+
## Extensions
185+
186+
Consider extending this pattern with:
187+
188+
1. **Advanced Planning**
189+
190+
- Multi-goal handling
191+
- Priority management
192+
- Resource allocation
193+
- Constraint satisfaction
194+
195+
2. **Learning Capabilities**
196+
197+
- Strategy optimization
198+
- Pattern recognition
199+
- Performance tuning
200+
- Knowledge base
201+
202+
3. **Collaboration**
203+
204+
- Agent communication
205+
- Task delegation
206+
- Resource sharing
207+
- Conflict resolution
208+
209+
4. **Safety**
210+
211+
- Action validation
212+
- Resource limits
213+
- Security checks
214+
- Ethical constraints
215+
216+
5. **Integration**
217+
- External services
218+
- Data sources
219+
- Monitoring systems
220+
- Reporting tools

‎agents/autonomous-agent/agent.py

+294
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
from typing import List, Dict, Any, Callable, Optional, Union, Tuple
2+
from abc import ABC, abstractmethod
3+
from pydantic import BaseModel
4+
import asyncio
5+
import json
6+
from enum import Enum
7+
8+
class ActionStatus(str, Enum):
9+
"""Status of an action execution"""
10+
PENDING = "pending"
11+
IN_PROGRESS = "in_progress"
12+
COMPLETED = "completed"
13+
FAILED = "failed"
14+
15+
class Action(BaseModel):
16+
"""An action that can be executed by the agent"""
17+
id: str
18+
name: str
19+
description: str
20+
parameters: Dict[str, Any]
21+
status: ActionStatus = ActionStatus.PENDING
22+
result: Optional[Dict[str, Any]] = None
23+
24+
class Plan(BaseModel):
25+
"""A plan consisting of actions to achieve a goal"""
26+
goal: str
27+
actions: List[Action]
28+
context: Dict[str, Any] = {}
29+
status: ActionStatus = ActionStatus.PENDING
30+
31+
class Tool(BaseModel):
32+
"""A tool that can be used by the agent"""
33+
name: str
34+
description: str
35+
parameters: Dict[str, str]
36+
handler: Callable[[Dict[str, Any]], Any]
37+
38+
class Memory(BaseModel):
39+
"""Agent's memory of past actions and results"""
40+
plans: List[Plan] = []
41+
action_history: List[Action] = []
42+
context: Dict[str, Any] = {}
43+
44+
class AutonomousAgent:
45+
"""An agent that can plan and execute tasks autonomously"""
46+
47+
def __init__(
48+
self,
49+
tools: List[Tool],
50+
llm_caller: Callable[[str], str],
51+
memory: Optional[Memory] = None
52+
):
53+
self.tools = tools
54+
self.llm_caller = llm_caller
55+
self.memory = memory or Memory()
56+
57+
async def _create_plan(
58+
self,
59+
goal: str,
60+
context: Dict[str, Any]
61+
) -> Plan:
62+
"""Create a plan to achieve a goal"""
63+
tools_desc = "\n".join([
64+
f"- {tool.name}: {tool.description}\n Parameters: {tool.parameters}"
65+
for tool in self.tools
66+
])
67+
68+
prompt = (
69+
"Create a plan to achieve the following goal. Use available tools "
70+
"and break down the goal into specific actions.\n\n"
71+
f"Goal: {goal}\n\n"
72+
f"Context: {json.dumps(context, indent=2)}\n\n"
73+
f"Available Tools:\n{tools_desc}\n\n"
74+
"Previous Actions:\n" +
75+
"\n".join([
76+
f"- {action.name}: {action.status.value}"
77+
for action in self.memory.action_history[-5:] # Last 5 actions
78+
]) + "\n\n"
79+
"Respond with a plan in JSON format:\n"
80+
"{\n"
81+
' "actions": [\n'
82+
" {\n"
83+
' "id": "action-1",\n'
84+
' "name": "tool_name",\n'
85+
' "description": "what this action will do",\n'
86+
' "parameters": {}\n'
87+
" }\n"
88+
" ]\n"
89+
"}"
90+
)
91+
92+
try:
93+
response = await self.llm_caller(prompt)
94+
plan_data = json.loads(response)
95+
return Plan(
96+
goal=goal,
97+
actions=[Action(**action) for action in plan_data["actions"]],
98+
context=context
99+
)
100+
except Exception as e:
101+
raise Exception(f"Error creating plan: {str(e)}")
102+
103+
def _get_tool(self, name: str) -> Optional[Tool]:
104+
"""Get a tool by name"""
105+
return next((t for t in self.tools if t.name == name), None)
106+
107+
async def _execute_action(self, action: Action) -> Action:
108+
"""Execute a single action"""
109+
try:
110+
# Get the tool
111+
tool = self._get_tool(action.name)
112+
if not tool:
113+
raise ValueError(f"Tool not found: {action.name}")
114+
115+
# Update status
116+
action.status = ActionStatus.IN_PROGRESS
117+
118+
# Execute tool
119+
result = await tool.handler(action.parameters)
120+
121+
# Update action with result
122+
action.status = ActionStatus.COMPLETED
123+
action.result = result
124+
125+
# Add to history
126+
self.memory.action_history.append(action)
127+
128+
return action
129+
130+
except Exception as e:
131+
action.status = ActionStatus.FAILED
132+
action.result = {"error": str(e)}
133+
self.memory.action_history.append(action)
134+
return action
135+
136+
async def _evaluate_progress(
137+
self,
138+
plan: Plan,
139+
completed_actions: List[Action]
140+
) -> Tuple[bool, str]:
141+
"""Evaluate progress and decide if plan needs adjustment"""
142+
prompt = (
143+
"Evaluate the progress towards the goal and decide if the plan "
144+
"needs adjustment.\n\n"
145+
f"Goal: {plan.goal}\n\n"
146+
"Completed Actions:\n" +
147+
"\n".join([
148+
f"- {action.name}: {json.dumps(action.result, indent=2)}"
149+
for action in completed_actions
150+
]) + "\n\n"
151+
"Respond in JSON format:\n"
152+
"{\n"
153+
' "goal_achieved": true/false,\n'
154+
' "reasoning": "explanation"\n'
155+
"}"
156+
)
157+
158+
try:
159+
response = await self.llm_caller(prompt)
160+
eval_data = json.loads(response)
161+
return eval_data["goal_achieved"], eval_data["reasoning"]
162+
except Exception as e:
163+
raise Exception(f"Error evaluating progress: {str(e)}")
164+
165+
async def _adjust_plan(
166+
self,
167+
plan: Plan,
168+
completed_actions: List[Action],
169+
evaluation_reason: str
170+
) -> Plan:
171+
"""Adjust the plan based on progress and results"""
172+
prompt = (
173+
"Adjust the plan based on completed actions and evaluation. "
174+
"Create new actions to achieve the goal.\n\n"
175+
f"Goal: {plan.goal}\n\n"
176+
"Completed Actions:\n" +
177+
"\n".join([
178+
f"- {action.name}: {json.dumps(action.result, indent=2)}"
179+
for action in completed_actions
180+
]) + "\n\n"
181+
f"Evaluation: {evaluation_reason}\n\n"
182+
"Available Tools:\n" +
183+
"\n".join([
184+
f"- {tool.name}: {tool.description}"
185+
for tool in self.tools
186+
]) + "\n\n"
187+
"Respond with adjusted plan in JSON format:\n"
188+
"{\n"
189+
' "actions": [\n'
190+
" {\n"
191+
' "id": "action-1",\n'
192+
' "name": "tool_name",\n'
193+
' "description": "what this action will do",\n'
194+
' "parameters": {}\n'
195+
" }\n"
196+
" ]\n"
197+
"}"
198+
)
199+
200+
try:
201+
response = await self.llm_caller(prompt)
202+
plan_data = json.loads(response)
203+
return Plan(
204+
goal=plan.goal,
205+
actions=[Action(**action) for action in plan_data["actions"]],
206+
context=plan.context
207+
)
208+
except Exception as e:
209+
raise Exception(f"Error adjusting plan: {str(e)}")
210+
211+
async def execute(
212+
self,
213+
goal: str,
214+
context: Optional[Dict[str, Any]] = None
215+
) -> List[Action]:
216+
"""Execute a goal autonomously"""
217+
context = context or {}
218+
completed_actions = []
219+
220+
try:
221+
# Create initial plan
222+
plan = await self._create_plan(goal, context)
223+
self.memory.plans.append(plan)
224+
225+
while True:
226+
# Execute all actions in current plan
227+
for action in plan.actions:
228+
if action.status == ActionStatus.PENDING:
229+
action = await self._execute_action(action)
230+
if action.status == ActionStatus.COMPLETED:
231+
completed_actions.append(action)
232+
233+
# Evaluate progress
234+
goal_achieved, reason = await self._evaluate_progress(
235+
plan,
236+
completed_actions
237+
)
238+
239+
if goal_achieved:
240+
break
241+
242+
# Adjust plan if needed
243+
plan = await self._adjust_plan(
244+
plan,
245+
completed_actions,
246+
reason
247+
)
248+
self.memory.plans.append(plan)
249+
250+
return completed_actions
251+
252+
except Exception as e:
253+
raise Exception(f"Error executing goal: {str(e)}")
254+
255+
class AgentBuilder:
256+
"""Helper class to build autonomous agents"""
257+
258+
def __init__(self):
259+
self.tools: List[Tool] = []
260+
self.memory: Optional[Memory] = None
261+
262+
def add_tool(
263+
self,
264+
name: str,
265+
description: str,
266+
parameters: Dict[str, str],
267+
handler: Callable[[Dict[str, Any]], Any]
268+
) -> 'AgentBuilder':
269+
"""Add a tool to the agent"""
270+
self.tools.append(
271+
Tool(
272+
name=name,
273+
description=description,
274+
parameters=parameters,
275+
handler=handler
276+
)
277+
)
278+
return self
279+
280+
def with_memory(self, memory: Memory) -> 'AgentBuilder':
281+
"""Set agent's memory"""
282+
self.memory = memory
283+
return self
284+
285+
def build(
286+
self,
287+
llm_caller: Callable[[str], str]
288+
) -> AutonomousAgent:
289+
"""Build the agent"""
290+
return AutonomousAgent(
291+
tools=self.tools,
292+
llm_caller=llm_caller,
293+
memory=self.memory
294+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import asyncio
2+
import os
3+
from typing import Dict, Any
4+
from dotenv import load_dotenv
5+
import anthropic
6+
from ..agent import AgentBuilder, Memory
7+
8+
async def llm_call(prompt: str) -> str:
9+
"""Call Anthropic's Claude API"""
10+
client = anthropic.Anthropic()
11+
response = await client.messages.create(
12+
model="claude-3-opus-20240229",
13+
max_tokens=1000,
14+
temperature=0.7,
15+
system="You are a helpful research assistant.",
16+
messages=[{"role": "user", "content": prompt}]
17+
)
18+
return response.content[0].text
19+
20+
# Tool handlers for research tasks
21+
async def handle_web_search(params: Dict[str, Any]) -> Dict[str, Any]:
22+
"""Search the web for information (mock implementation)"""
23+
query = params["query"]
24+
return {
25+
"results": [
26+
{
27+
"title": f"Search result 1 for: {query}",
28+
"snippet": "This is a mock search result...",
29+
"url": "https://example.com/1"
30+
},
31+
{
32+
"title": f"Search result 2 for: {query}",
33+
"snippet": "Another mock search result...",
34+
"url": "https://example.com/2"
35+
}
36+
]
37+
}
38+
39+
async def handle_read_webpage(params: Dict[str, Any]) -> Dict[str, Any]:
40+
"""Extract content from a webpage (mock implementation)"""
41+
url = params["url"]
42+
return {
43+
"title": "Mock webpage title",
44+
"content": f"Mock content extracted from {url}...",
45+
"metadata": {
46+
"author": "John Doe",
47+
"date": "2024-02-15"
48+
}
49+
}
50+
51+
async def handle_summarize_text(params: Dict[str, Any]) -> Dict[str, Any]:
52+
"""Summarize text content"""
53+
text = params["text"]
54+
return {
55+
"summary": f"Summary of: {text}",
56+
"key_points": [
57+
"Key point 1",
58+
"Key point 2",
59+
"Key point 3"
60+
]
61+
}
62+
63+
async def handle_extract_facts(params: Dict[str, Any]) -> Dict[str, Any]:
64+
"""Extract factual information from text"""
65+
text = params["text"]
66+
return {
67+
"facts": [
68+
"Fact 1 extracted from text",
69+
"Fact 2 extracted from text",
70+
"Fact 3 extracted from text"
71+
],
72+
"confidence_scores": [0.9, 0.8, 0.7]
73+
}
74+
75+
async def handle_save_notes(params: Dict[str, Any]) -> Dict[str, Any]:
76+
"""Save research notes (mock implementation)"""
77+
notes = params["notes"]
78+
return {
79+
"saved": True,
80+
"location": "research_notes.md",
81+
"timestamp": "2024-02-15T12:00:00Z"
82+
}
83+
84+
async def main():
85+
# Load environment variables
86+
load_dotenv()
87+
88+
# Initialize memory
89+
memory = Memory(
90+
context={
91+
"research_topic": "artificial intelligence",
92+
"focus_areas": [
93+
"machine learning",
94+
"neural networks",
95+
"deep learning"
96+
]
97+
}
98+
)
99+
100+
# Build the research assistant
101+
assistant = (
102+
AgentBuilder()
103+
.add_tool(
104+
name="web_search",
105+
description="Search the web for information",
106+
parameters={
107+
"query": "search query string"
108+
},
109+
handler=handle_web_search
110+
)
111+
.add_tool(
112+
name="read_webpage",
113+
description="Extract content from a webpage",
114+
parameters={
115+
"url": "webpage URL"
116+
},
117+
handler=handle_read_webpage
118+
)
119+
.add_tool(
120+
name="summarize_text",
121+
description="Generate a summary of text content",
122+
parameters={
123+
"text": "text to summarize"
124+
},
125+
handler=handle_summarize_text
126+
)
127+
.add_tool(
128+
name="extract_facts",
129+
description="Extract factual information from text",
130+
parameters={
131+
"text": "text to analyze"
132+
},
133+
handler=handle_extract_facts
134+
)
135+
.add_tool(
136+
name="save_notes",
137+
description="Save research notes",
138+
parameters={
139+
"notes": "notes to save"
140+
},
141+
handler=handle_save_notes
142+
)
143+
.with_memory(memory)
144+
.build(llm_caller=llm_call)
145+
)
146+
147+
# Research goals to accomplish
148+
goals = [
149+
"Research recent advancements in neural networks and summarize key findings",
150+
"Investigate applications of deep learning in healthcare and compile examples",
151+
"Analyze trends in machine learning research and identify emerging areas"
152+
]
153+
154+
# Execute research tasks
155+
for goal in goals:
156+
print(f"\n{'-' * 50}")
157+
print(f"Executing Goal: {goal}")
158+
print(f"{'-' * 50}")
159+
160+
try:
161+
# Execute goal
162+
actions = await assistant.execute(goal)
163+
164+
# Print results
165+
print("\nCompleted Actions:")
166+
for action in actions:
167+
print(f"\n{action.description}:")
168+
print(f"Tool: {action.name}")
169+
print(f"Result: {action.result}")
170+
171+
except Exception as e:
172+
print(f"Error executing goal: {str(e)}")
173+
174+
if __name__ == "__main__":
175+
asyncio.run(main())

‎agents/domain-specific/README.md

+282
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
# Domain-Specific Agent Pattern
2+
3+
This pattern demonstrates how to create specialized agents with deep domain knowledge, constraints, and behaviors for specific fields.
4+
5+
## Overview
6+
7+
The domain-specific agent pattern is useful when:
8+
9+
- Tasks require specialized domain knowledge
10+
- Domain-specific constraints must be enforced
11+
- Context-aware behaviors are needed
12+
- Domain terminology and relationships matter
13+
14+
## Components
15+
16+
### Core Classes
17+
18+
1. **DomainConstraint**
19+
20+
- Validation rules
21+
- Domain-specific checks
22+
- Error messaging
23+
- Safety enforcement
24+
25+
2. **DomainKnowledge**
26+
27+
- Domain facts
28+
- Business rules
29+
- Terminology
30+
- Entity relationships
31+
32+
3. **DomainBehavior**
33+
34+
- Trigger conditions
35+
- Specialized actions
36+
- Context awareness
37+
- Adaptive responses
38+
39+
4. **DomainMemory**
40+
41+
- Domain context
42+
- Action history
43+
- Knowledge updates
44+
- State tracking
45+
46+
5. **DomainSpecificAgent**
47+
- Domain-aware planning
48+
- Constraint validation
49+
- Behavior triggering
50+
- Knowledge application
51+
52+
## Example Implementation
53+
54+
The `examples/medical_assistant.py` demonstrates a medical diagnosis system:
55+
56+
1. Medical Tools:
57+
58+
- Symptom analysis
59+
- Medical history
60+
- Drug interactions
61+
- Treatment recommendations
62+
63+
2. Domain Features:
64+
- Medical knowledge base
65+
- Safety constraints
66+
- Clinical behaviors
67+
- Patient context
68+
69+
## Usage
70+
71+
1. Set up environment:
72+
73+
```bash
74+
pip install -r requirements.txt
75+
```
76+
77+
2. Configure API keys:
78+
79+
```bash
80+
export ANTHROPIC_API_KEY=your_key_here
81+
# or
82+
export OPENAI_API_KEY=your_key_here
83+
```
84+
85+
3. Run the example:
86+
87+
```bash
88+
python -m examples.medical_assistant
89+
```
90+
91+
## Implementation Details
92+
93+
### Domain Constraint Definition
94+
95+
```python
96+
DomainConstraint(
97+
name="constraint_name",
98+
description="constraint description",
99+
validation_fn=validation_function,
100+
error_message="error details"
101+
)
102+
```
103+
104+
### Domain Knowledge Definition
105+
106+
```python
107+
domain_knowledge = {
108+
"facts": {...},
109+
"rules": {...},
110+
"terminology": {...},
111+
"relationships": {...}
112+
}
113+
```
114+
115+
### Agent Building
116+
117+
```python
118+
agent = (
119+
DomainAgentBuilder(domain_name)
120+
.add_tool(...)
121+
.add_constraint(...)
122+
.add_behavior(...)
123+
.with_knowledge(...)
124+
.build(llm_caller)
125+
)
126+
```
127+
128+
## Domain Capabilities
129+
130+
1. **Knowledge Management**
131+
132+
- Fact organization
133+
- Rule enforcement
134+
- Terminology mapping
135+
- Relationship tracking
136+
137+
2. **Constraint Handling**
138+
139+
- Input validation
140+
- Safety checks
141+
- Domain rules
142+
- Error handling
143+
144+
3. **Behavior Management**
145+
146+
- Context monitoring
147+
- Action triggering
148+
- Response adaptation
149+
- Pattern recognition
150+
151+
4. **Memory Management**
152+
- Context preservation
153+
- History tracking
154+
- Knowledge updates
155+
- State management
156+
157+
## Best Practices
158+
159+
1. **Domain Modeling**
160+
161+
- Clear boundaries
162+
- Essential concepts
163+
- Key relationships
164+
- Core constraints
165+
166+
2. **Knowledge Organization**
167+
168+
- Structured facts
169+
- Clear rules
170+
- Standard terminology
171+
- Explicit relationships
172+
173+
3. **Constraint Design**
174+
175+
- Safety first
176+
- Clear validation
177+
- Helpful errors
178+
- Graceful handling
179+
180+
4. **Behavior Implementation**
181+
- Clear triggers
182+
- Focused actions
183+
- Context awareness
184+
- Measurable outcomes
185+
186+
## Extensions
187+
188+
Consider extending this pattern with:
189+
190+
1. **Advanced Knowledge**
191+
192+
- Ontologies
193+
- Expert systems
194+
- Learning systems
195+
- Knowledge graphs
196+
197+
2. **Complex Constraints**
198+
199+
- Multi-step validation
200+
- Dependency checks
201+
- Dynamic rules
202+
- Compliance tracking
203+
204+
3. **Adaptive Behaviors**
205+
206+
- Pattern learning
207+
- Strategy evolution
208+
- Performance optimization
209+
- Context adaptation
210+
211+
4. **Enhanced Memory**
212+
213+
- Long-term storage
214+
- Pattern recognition
215+
- Knowledge synthesis
216+
- Context evolution
217+
218+
5. **Integration**
219+
- External systems
220+
- Domain services
221+
- Data sources
222+
- Monitoring tools
223+
224+
## Domain Examples
225+
226+
The pattern can be applied to various domains:
227+
228+
1. **Medical**
229+
230+
- Diagnosis
231+
- Treatment planning
232+
- Drug interactions
233+
- Patient monitoring
234+
235+
2. **Financial**
236+
237+
- Risk assessment
238+
- Portfolio management
239+
- Compliance checking
240+
- Fraud detection
241+
242+
3. **Legal**
243+
244+
- Document analysis
245+
- Case research
246+
- Compliance checking
247+
- Risk assessment
248+
249+
4. **Engineering**
250+
- Design validation
251+
- Safety analysis
252+
- Performance optimization
253+
- Quality control
254+
255+
## Safety Considerations
256+
257+
1. **Validation**
258+
259+
- Input checking
260+
- Output verification
261+
- Constraint enforcement
262+
- Error detection
263+
264+
2. **Monitoring**
265+
266+
- Action tracking
267+
- Performance metrics
268+
- Error patterns
269+
- Usage analytics
270+
271+
3. **Compliance**
272+
273+
- Domain regulations
274+
- Industry standards
275+
- Best practices
276+
- Audit trails
277+
278+
4. **Security**
279+
- Access control
280+
- Data protection
281+
- Privacy measures
282+
- Secure integration

‎agents/domain-specific/agent.py

+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
from typing import List, Dict, Any, Callable, Optional, Union, Type
2+
from pydantic import BaseModel, Field
3+
from enum import Enum
4+
import json
5+
from ..autonomous-agent.agent import AutonomousAgent, Tool, Memory, Action, Plan, ActionStatus
6+
7+
class DomainConstraint(BaseModel):
8+
"""Represents a domain-specific constraint"""
9+
name: str
10+
description: str
11+
validation_fn: Callable[[Dict[str, Any]], bool]
12+
error_message: str
13+
14+
class DomainKnowledge(BaseModel):
15+
"""Represents domain-specific knowledge"""
16+
facts: Dict[str, Any] = {}
17+
rules: Dict[str, str] = {}
18+
terminology: Dict[str, str] = {}
19+
relationships: Dict[str, List[str]] = {}
20+
21+
class DomainBehavior(BaseModel):
22+
"""Represents domain-specific behavior"""
23+
name: str
24+
description: str
25+
trigger_conditions: Dict[str, Any]
26+
action_template: Dict[str, Any]
27+
28+
class DomainMemory(Memory):
29+
"""Extended memory with domain-specific information"""
30+
domain_knowledge: DomainKnowledge = Field(default_factory=DomainKnowledge)
31+
domain_specific_history: List[Dict[str, Any]] = []
32+
33+
class DomainSpecificAgent(AutonomousAgent):
34+
"""An agent specialized for a specific domain"""
35+
36+
def __init__(
37+
self,
38+
domain_name: str,
39+
tools: List[Tool],
40+
llm_caller: Callable[[str], str],
41+
constraints: List[DomainConstraint],
42+
behaviors: List[DomainBehavior],
43+
domain_knowledge: DomainKnowledge,
44+
memory: Optional[DomainMemory] = None
45+
):
46+
super().__init__(tools, llm_caller, memory or DomainMemory())
47+
self.domain_name = domain_name
48+
self.constraints = constraints
49+
self.behaviors = behaviors
50+
self.domain_knowledge = domain_knowledge
51+
52+
async def _validate_action(self, action: Action) -> Tuple[bool, Optional[str]]:
53+
"""Validate action against domain constraints"""
54+
for constraint in self.constraints:
55+
try:
56+
if not constraint.validation_fn(action.parameters):
57+
return False, constraint.error_message
58+
except Exception as e:
59+
return False, f"Error validating {constraint.name}: {str(e)}"
60+
return True, None
61+
62+
async def _enrich_context(self, context: Dict[str, Any]) -> Dict[str, Any]:
63+
"""Enrich context with domain-specific knowledge"""
64+
return {
65+
**context,
66+
"domain_knowledge": {
67+
"facts": self.domain_knowledge.facts,
68+
"rules": self.domain_knowledge.rules,
69+
"terminology": self.domain_knowledge.terminology,
70+
"relationships": self.domain_knowledge.relationships
71+
}
72+
}
73+
74+
async def _check_behaviors(
75+
self,
76+
action: Action,
77+
context: Dict[str, Any]
78+
) -> Optional[Action]:
79+
"""Check if any domain-specific behaviors should be triggered"""
80+
for behavior in self.behaviors:
81+
matches = all(
82+
context.get(k) == v
83+
for k, v in behavior.trigger_conditions.items()
84+
)
85+
if matches:
86+
return Action(
87+
id=f"{action.id}_behavior_{behavior.name}",
88+
name=behavior.name,
89+
description=behavior.description,
90+
parameters=behavior.action_template
91+
)
92+
return None
93+
94+
async def _create_plan(
95+
self,
96+
goal: str,
97+
context: Dict[str, Any]
98+
) -> Plan:
99+
"""Create a domain-aware plan"""
100+
# Enrich context with domain knowledge
101+
enriched_context = await self._enrich_context(context)
102+
103+
# Add domain-specific prompt elements
104+
domain_prompt = (
105+
f"You are a specialized agent for the {self.domain_name} domain.\n"
106+
"Consider the following domain-specific knowledge:\n"
107+
f"Facts: {json.dumps(self.domain_knowledge.facts, indent=2)}\n"
108+
f"Rules: {json.dumps(self.domain_knowledge.rules, indent=2)}\n"
109+
f"Terminology: {json.dumps(self.domain_knowledge.terminology, indent=2)}\n"
110+
"Create a plan that adheres to domain constraints and leverages domain knowledge."
111+
)
112+
113+
# Create plan with domain awareness
114+
plan = await super()._create_plan(
115+
goal=f"{domain_prompt}\n\nGoal: {goal}",
116+
context=enriched_context
117+
)
118+
119+
return plan
120+
121+
async def _execute_action(self, action: Action) -> Action:
122+
"""Execute action with domain-specific validation and behaviors"""
123+
try:
124+
# Validate against domain constraints
125+
is_valid, error = await self._validate_action(action)
126+
if not is_valid:
127+
action.status = ActionStatus.FAILED
128+
action.result = {"error": error}
129+
return action
130+
131+
# Check for triggered behaviors
132+
behavior_action = await self._check_behaviors(
133+
action,
134+
self.memory.context
135+
)
136+
if behavior_action:
137+
# Execute behavior action first
138+
behavior_result = await super()._execute_action(behavior_action)
139+
if behavior_result.status == ActionStatus.FAILED:
140+
return behavior_result
141+
142+
# Execute main action
143+
action = await super()._execute_action(action)
144+
145+
# Record in domain-specific history
146+
if isinstance(self.memory, DomainMemory):
147+
self.memory.domain_specific_history.append({
148+
"action": action.dict(),
149+
"domain_context": self.memory.context
150+
})
151+
152+
return action
153+
154+
except Exception as e:
155+
action.status = ActionStatus.FAILED
156+
action.result = {"error": str(e)}
157+
return action
158+
159+
class DomainAgentBuilder:
160+
"""Helper class to build domain-specific agents"""
161+
162+
def __init__(self, domain_name: str):
163+
self.domain_name = domain_name
164+
self.tools: List[Tool] = []
165+
self.constraints: List[DomainConstraint] = []
166+
self.behaviors: List[DomainBehavior] = []
167+
self.domain_knowledge = DomainKnowledge()
168+
self.memory: Optional[DomainMemory] = None
169+
170+
def add_tool(
171+
self,
172+
name: str,
173+
description: str,
174+
parameters: Dict[str, str],
175+
handler: Callable[[Dict[str, Any]], Any]
176+
) -> 'DomainAgentBuilder':
177+
"""Add a tool to the agent"""
178+
self.tools.append(
179+
Tool(
180+
name=name,
181+
description=description,
182+
parameters=parameters,
183+
handler=handler
184+
)
185+
)
186+
return self
187+
188+
def add_constraint(
189+
self,
190+
name: str,
191+
description: str,
192+
validation_fn: Callable[[Dict[str, Any]], bool],
193+
error_message: str
194+
) -> 'DomainAgentBuilder':
195+
"""Add a domain constraint"""
196+
self.constraints.append(
197+
DomainConstraint(
198+
name=name,
199+
description=description,
200+
validation_fn=validation_fn,
201+
error_message=error_message
202+
)
203+
)
204+
return self
205+
206+
def add_behavior(
207+
self,
208+
name: str,
209+
description: str,
210+
trigger_conditions: Dict[str, Any],
211+
action_template: Dict[str, Any]
212+
) -> 'DomainAgentBuilder':
213+
"""Add a domain-specific behavior"""
214+
self.behaviors.append(
215+
DomainBehavior(
216+
name=name,
217+
description=description,
218+
trigger_conditions=trigger_conditions,
219+
action_template=action_template
220+
)
221+
)
222+
return self
223+
224+
def with_knowledge(
225+
self,
226+
facts: Dict[str, Any] = None,
227+
rules: Dict[str, str] = None,
228+
terminology: Dict[str, str] = None,
229+
relationships: Dict[str, List[str]] = None
230+
) -> 'DomainAgentBuilder':
231+
"""Add domain knowledge"""
232+
if facts:
233+
self.domain_knowledge.facts.update(facts)
234+
if rules:
235+
self.domain_knowledge.rules.update(rules)
236+
if terminology:
237+
self.domain_knowledge.terminology.update(terminology)
238+
if relationships:
239+
self.domain_knowledge.relationships.update(relationships)
240+
return self
241+
242+
def with_memory(self, memory: DomainMemory) -> 'DomainAgentBuilder':
243+
"""Set agent's memory"""
244+
self.memory = memory
245+
return self
246+
247+
def build(
248+
self,
249+
llm_caller: Callable[[str], str]
250+
) -> DomainSpecificAgent:
251+
"""Build the domain-specific agent"""
252+
return DomainSpecificAgent(
253+
domain_name=self.domain_name,
254+
tools=self.tools,
255+
llm_caller=llm_caller,
256+
constraints=self.constraints,
257+
behaviors=self.behaviors,
258+
domain_knowledge=self.domain_knowledge,
259+
memory=self.memory
260+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
import asyncio
2+
import os
3+
from typing import Dict, Any
4+
from dotenv import load_dotenv
5+
import anthropic
6+
from ..agent import DomainAgentBuilder, DomainMemory, DomainKnowledge
7+
8+
async def llm_call(prompt: str) -> str:
9+
"""Call Anthropic's Claude API"""
10+
client = anthropic.Anthropic()
11+
response = await client.messages.create(
12+
model="claude-3-opus-20240229",
13+
max_tokens=1000,
14+
temperature=0.7,
15+
system="You are a medical diagnosis assistant.",
16+
messages=[{"role": "user", "content": prompt}]
17+
)
18+
return response.content[0].text
19+
20+
# Domain-specific tool handlers
21+
async def handle_symptom_analysis(params: Dict[str, Any]) -> Dict[str, Any]:
22+
"""Analyze patient symptoms"""
23+
symptoms = params["symptoms"]
24+
return {
25+
"severity": "moderate",
26+
"urgency": "non-emergency",
27+
"possible_conditions": [
28+
"condition1",
29+
"condition2",
30+
"condition3"
31+
],
32+
"risk_factors": [
33+
"risk1",
34+
"risk2"
35+
]
36+
}
37+
38+
async def handle_medical_history_check(params: Dict[str, Any]) -> Dict[str, Any]:
39+
"""Check patient medical history"""
40+
patient_id = params["patient_id"]
41+
return {
42+
"previous_conditions": [
43+
"condition1",
44+
"condition2"
45+
],
46+
"allergies": [
47+
"allergy1",
48+
"allergy2"
49+
],
50+
"medications": [
51+
"medication1",
52+
"medication2"
53+
]
54+
}
55+
56+
async def handle_drug_interaction_check(params: Dict[str, Any]) -> Dict[str, Any]:
57+
"""Check for potential drug interactions"""
58+
medications = params["medications"]
59+
return {
60+
"interactions": [
61+
{
62+
"drugs": ["drug1", "drug2"],
63+
"severity": "high",
64+
"recommendation": "avoid combination"
65+
}
66+
],
67+
"alternatives": [
68+
"alt_drug1",
69+
"alt_drug2"
70+
]
71+
}
72+
73+
async def handle_treatment_recommendation(params: Dict[str, Any]) -> Dict[str, Any]:
74+
"""Generate treatment recommendations"""
75+
condition = params["condition"]
76+
patient_data = params["patient_data"]
77+
return {
78+
"primary_treatment": "treatment1",
79+
"alternatives": [
80+
"treatment2",
81+
"treatment3"
82+
],
83+
"lifestyle_changes": [
84+
"change1",
85+
"change2"
86+
],
87+
"follow_up": "2 weeks"
88+
}
89+
90+
# Domain-specific constraints
91+
def validate_patient_data(params: Dict[str, Any]) -> bool:
92+
"""Validate patient data completeness"""
93+
required_fields = ["age", "gender", "symptoms"]
94+
return all(field in params for field in required_fields)
95+
96+
def validate_medication_safety(params: Dict[str, Any]) -> bool:
97+
"""Validate medication safety"""
98+
if "medications" not in params:
99+
return True
100+
# Mock safety check
101+
unsafe_medications = ["unsafe_drug1", "unsafe_drug2"]
102+
return not any(med in unsafe_medications for med in params["medications"])
103+
104+
# Medical domain knowledge
105+
medical_knowledge = {
106+
"facts": {
107+
"common_conditions": [
108+
"hypertension",
109+
"diabetes",
110+
"asthma"
111+
],
112+
"vital_signs": {
113+
"normal_bp": "120/80",
114+
"normal_temp": "98.6F",
115+
"normal_hr": "60-100"
116+
}
117+
},
118+
"rules": {
119+
"emergency_symptoms": "Chest pain, difficulty breathing, or severe bleeding require immediate emergency care",
120+
"prescription_requirements": "Controlled substances require proper documentation and authorization",
121+
"follow_up_timing": "Critical conditions require follow-up within 24-48 hours"
122+
},
123+
"terminology": {
124+
"bp": "Blood Pressure",
125+
"hr": "Heart Rate",
126+
"bmi": "Body Mass Index"
127+
},
128+
"relationships": {
129+
"diabetes": ["blood_sugar", "insulin", "diet"],
130+
"hypertension": ["blood_pressure", "sodium", "stress"]
131+
}
132+
}
133+
134+
# Domain-specific behaviors
135+
diagnosis_behavior = {
136+
"name": "comprehensive_diagnosis",
137+
"description": "Perform comprehensive diagnosis when multiple symptoms present",
138+
"trigger_conditions": {
139+
"symptom_count": ">3",
140+
"severity": "high"
141+
},
142+
"action_template": {
143+
"name": "detailed_analysis",
144+
"parameters": {
145+
"include_specialists": True,
146+
"run_additional_tests": True
147+
}
148+
}
149+
}
150+
151+
async def main():
152+
# Load environment variables
153+
load_dotenv()
154+
155+
# Initialize domain memory
156+
memory = DomainMemory(
157+
context={
158+
"facility_type": "primary_care",
159+
"available_specialists": ["cardiology", "neurology", "endocrinology"],
160+
"emergency_protocols": ["protocol1", "protocol2"]
161+
}
162+
)
163+
164+
# Build the medical assistant
165+
assistant = (
166+
DomainAgentBuilder("medical_diagnosis")
167+
.add_tool(
168+
name="symptom_analysis",
169+
description="Analyze patient symptoms and determine possible conditions",
170+
parameters={
171+
"symptoms": "list of symptoms"
172+
},
173+
handler=handle_symptom_analysis
174+
)
175+
.add_tool(
176+
name="medical_history",
177+
description="Check patient medical history",
178+
parameters={
179+
"patient_id": "patient identifier"
180+
},
181+
handler=handle_medical_history_check
182+
)
183+
.add_tool(
184+
name="drug_interaction",
185+
description="Check for potential drug interactions",
186+
parameters={
187+
"medications": "list of medications"
188+
},
189+
handler=handle_drug_interaction_check
190+
)
191+
.add_tool(
192+
name="treatment_recommendation",
193+
description="Generate treatment recommendations",
194+
parameters={
195+
"condition": "diagnosed condition",
196+
"patient_data": "patient information"
197+
},
198+
handler=handle_treatment_recommendation
199+
)
200+
.add_constraint(
201+
name="patient_data_validation",
202+
description="Ensure all required patient data is provided",
203+
validation_fn=validate_patient_data,
204+
error_message="Missing required patient information"
205+
)
206+
.add_constraint(
207+
name="medication_safety",
208+
description="Ensure medication safety",
209+
validation_fn=validate_medication_safety,
210+
error_message="Unsafe medication detected"
211+
)
212+
.add_behavior(
213+
name=diagnosis_behavior["name"],
214+
description=diagnosis_behavior["description"],
215+
trigger_conditions=diagnosis_behavior["trigger_conditions"],
216+
action_template=diagnosis_behavior["action_template"]
217+
)
218+
.with_knowledge(**medical_knowledge)
219+
.with_memory(memory)
220+
.build(llm_caller=llm_call)
221+
)
222+
223+
# Example cases to diagnose
224+
cases = [
225+
{
226+
"goal": "Diagnose patient with symptoms: fever, cough, fatigue",
227+
"context": {
228+
"patient_data": {
229+
"age": 45,
230+
"gender": "F",
231+
"symptoms": ["fever", "cough", "fatigue"],
232+
"vitals": {
233+
"temperature": "101.2F",
234+
"blood_pressure": "128/82",
235+
"heart_rate": 88
236+
}
237+
}
238+
}
239+
},
240+
{
241+
"goal": "Recommend treatment for diagnosed hypertension",
242+
"context": {
243+
"patient_data": {
244+
"age": 62,
245+
"gender": "M",
246+
"condition": "hypertension",
247+
"medications": ["lisinopril"],
248+
"blood_pressure": "158/94"
249+
}
250+
}
251+
}
252+
]
253+
254+
# Process cases
255+
for case in cases:
256+
print(f"\n{'-' * 50}")
257+
print(f"Processing Case: {case['goal']}")
258+
print(f"{'-' * 50}")
259+
260+
try:
261+
# Execute diagnosis/treatment
262+
actions = await assistant.execute(
263+
goal=case["goal"],
264+
context=case["context"]
265+
)
266+
267+
# Print results
268+
print("\nCompleted Actions:")
269+
for action in actions:
270+
print(f"\n{action.description}:")
271+
print(f"Tool: {action.name}")
272+
print(f"Result: {action.result}")
273+
274+
except Exception as e:
275+
print(f"Error processing case: {str(e)}")
276+
277+
if __name__ == "__main__":
278+
asyncio.run(main())
+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Augmented LLM Pattern
2+
3+
This directory demonstrates the Augmented LLM pattern, which enhances a base LLM with additional capabilities like tools and memory.
4+
5+
## Components
6+
7+
### Core Components
8+
9+
- `augmented.py`: The main implementation of the Augmented LLM pattern
10+
- `Tool`: Abstract base class for implementing tools
11+
- `AugmentedLLM`: Main class that combines LLM with tools and memory
12+
13+
### Tools
14+
15+
Located in `tools/basic_tools.py`:
16+
17+
- `WebSearchTool`: Mock implementation of web search
18+
- `CalculatorTool`: Basic calculator functionality
19+
- `WeatherTool`: Mock weather information retrieval
20+
21+
### Examples
22+
23+
Located in `examples/basic_usage.py`:
24+
25+
- Demonstrates how to initialize and use the Augmented LLM
26+
- Shows tool usage and memory capabilities
27+
- Includes sample conversation flow
28+
29+
## Usage
30+
31+
1. Set up environment variables:
32+
33+
```bash
34+
ANTHROPIC_API_KEY=your_key_here # For Claude
35+
OPENAI_API_KEY=your_key_here # For GPT-4
36+
```
37+
38+
2. Install dependencies:
39+
40+
```bash
41+
pip install -r requirements.txt
42+
```
43+
44+
3. Run the example:
45+
46+
```bash
47+
python -m examples.basic_usage
48+
```
49+
50+
## Key Features
51+
52+
1. **Multiple LLM Provider Support**
53+
54+
- Anthropic Claude
55+
- OpenAI GPT-4
56+
- Extensible for other providers
57+
58+
2. **Tool Integration**
59+
60+
- Abstract tool interface
61+
- Easy to add new tools
62+
- Tool description formatting for prompts
63+
64+
3. **Memory Support**
65+
- Optional conversation history
66+
- Contextual awareness in responses
67+
- Memory formatting for prompts
68+
69+
## Implementation Details
70+
71+
### Tool Integration
72+
73+
Tools are implemented as classes inheriting from the `Tool` base class:
74+
75+
```python
76+
class Tool:
77+
def __init__(self, name: str, description: str):
78+
self.name = name
79+
self.description = description
80+
81+
@abstractmethod
82+
async def execute(self, **kwargs) -> str:
83+
pass
84+
```
85+
86+
### Memory Management
87+
88+
Memory is implemented as a list of conversation turns:
89+
90+
```python
91+
self.memory = [] if enable_memory else None
92+
```
93+
94+
### Provider-Specific Processing
95+
96+
Each provider has its own processing method:
97+
98+
- `_process_anthropic`: For Claude
99+
- `_process_openai`: For GPT-4
100+
101+
## Best Practices
102+
103+
1. **Tool Development**
104+
105+
- Keep tools focused and single-purpose
106+
- Provide clear descriptions
107+
- Handle errors gracefully
108+
- Mock external services in examples
109+
110+
2. **Memory Usage**
111+
112+
- Enable only when needed
113+
- Consider memory limitations
114+
- Clear memory when appropriate
115+
116+
3. **Error Handling**
117+
- Graceful API error handling
118+
- Tool execution error handling
119+
- Clear error messages
120+
121+
## Extensions
122+
123+
Consider extending this pattern with:
124+
125+
1. Tool result caching
126+
2. Parallel tool execution
127+
3. Tool chain orchestration
128+
4. Structured memory management
129+
5. Result validation
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
import os
3+
from dotenv import load_dotenv
4+
from ..augmented import AugmentedLLM
5+
from ..tools.basic_tools import WebSearchTool, CalculatorTool, WeatherTool
6+
7+
async def main():
8+
# Load environment variables
9+
load_dotenv()
10+
11+
# Initialize tools
12+
tools = [
13+
WebSearchTool(),
14+
CalculatorTool(),
15+
WeatherTool()
16+
]
17+
18+
# Initialize augmented LLM with tools and memory
19+
llm = AugmentedLLM(
20+
provider="anthropic", # or "openai"
21+
tools=tools,
22+
enable_memory=True
23+
)
24+
25+
# Example conversation
26+
queries = [
27+
"What's 234 * 456?",
28+
"What's the weather like in London?",
29+
"Can you search for information about quantum computing?",
30+
"What did I ask about earlier regarding calculations?"
31+
]
32+
33+
for query in queries:
34+
print(f"\nUser: {query}")
35+
response = await llm.process(query)
36+
print(f"Assistant: {response}")
37+
38+
if __name__ == "__main__":
39+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from typing import Dict, Any
2+
import aiohttp
3+
import json
4+
from ..augmented import Tool
5+
6+
class WebSearchTool(Tool):
7+
def __init__(self):
8+
super().__init__(
9+
name="web_search",
10+
description="Search the web for information about a query"
11+
)
12+
13+
async def execute(self, query: str) -> str:
14+
# This is a mock implementation
15+
return f"Mock web search results for: {query}"
16+
17+
class CalculatorTool(Tool):
18+
def __init__(self):
19+
super().__init__(
20+
name="calculator",
21+
description="Perform basic mathematical calculations"
22+
)
23+
24+
async def execute(self, expression: str) -> str:
25+
try:
26+
# WARNING: eval is used here for demonstration. In production,
27+
# use a safer method to evaluate mathematical expressions
28+
result = eval(expression, {"__builtins__": {}})
29+
return f"Result: {result}"
30+
except Exception as e:
31+
return f"Error calculating {expression}: {str(e)}"
32+
33+
class WeatherTool(Tool):
34+
def __init__(self, api_key: str = None):
35+
super().__init__(
36+
name="weather",
37+
description="Get current weather for a location"
38+
)
39+
self.api_key = api_key
40+
41+
async def execute(self, location: str) -> str:
42+
# This would normally use a real weather API
43+
return f"Mock weather data for {location}: 22°C, Partly Cloudy"

‎examples/building-blocks/augmented-llms/README.md

-57
This file was deleted.

‎requirements.txt

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
anthropic>=0.7.0
2+
openai>=1.0.0
3+
google-cloud-aiplatform>=1.36.0
4+
azure-openai>=1.0.0
5+
python-dotenv>=1.0.0
6+
aiohttp>=3.9.0
7+
pytest>=7.4.0
8+
pydantic>=2.0.0
9+
tenacity>=8.2.0
10+
typing-extensions>=4.8.0
11+
rich>=13.0.0
+204
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# Evaluator-Optimizer Pattern
2+
3+
This pattern demonstrates how to implement iterative improvement through evaluation and optimization feedback loops.
4+
5+
## Overview
6+
7+
The evaluator-optimizer pattern is useful when:
8+
9+
- Solutions need iterative refinement
10+
- Quality can be evaluated systematically
11+
- Multiple optimization strategies may be applicable
12+
- Progress can be measured quantitatively
13+
14+
## Components
15+
16+
### Core Classes
17+
18+
1. **Candidate**
19+
20+
- Represents a potential solution
21+
- Tracks evaluation metrics
22+
- Maintains iteration history
23+
- Stores feedback data
24+
25+
2. **Optimizer**
26+
27+
- Manages optimization process
28+
- Implements multiple strategies
29+
- Evaluates candidates
30+
- Generates improvements
31+
32+
3. **OptimizerBuilder**
33+
- Configures optimization process
34+
- Sets strategy parameters
35+
- Defines evaluation criteria
36+
- Manages optimization limits
37+
38+
## Example Implementation
39+
40+
The `examples/code_optimization.py` demonstrates code improvement:
41+
42+
1. Optimization Strategies:
43+
44+
- Iterative Improvement
45+
- Parallel Variations
46+
- Tournament Selection
47+
48+
2. Evaluation Criteria:
49+
50+
- Performance
51+
- Readability
52+
- Maintainability
53+
- Error Handling
54+
55+
3. Features:
56+
- Multiple optimization strategies
57+
- Weighted evaluation criteria
58+
- Progress tracking
59+
- Detailed feedback
60+
61+
## Usage
62+
63+
1. Set up environment:
64+
65+
```bash
66+
pip install -r requirements.txt
67+
```
68+
69+
2. Configure API keys:
70+
71+
```bash
72+
export ANTHROPIC_API_KEY=your_key_here
73+
# or
74+
export OPENAI_API_KEY=your_key_here
75+
```
76+
77+
3. Run the example:
78+
79+
```bash
80+
python -m examples.code_optimization
81+
```
82+
83+
## Implementation Details
84+
85+
### Candidate Definition
86+
87+
```python
88+
Candidate(
89+
id="candidate-id",
90+
content="solution content",
91+
score=0.85,
92+
feedback="evaluation feedback"
93+
)
94+
```
95+
96+
### Optimizer Configuration
97+
98+
```python
99+
optimizer = (
100+
OptimizerBuilder()
101+
.set_strategy(strategy)
102+
.set_max_iterations(5)
103+
.set_score_threshold(0.95)
104+
.build(llm_caller)
105+
)
106+
```
107+
108+
### Optimization
109+
110+
```python
111+
best, history = await optimizer.optimize(
112+
initial_solution,
113+
evaluation_criteria
114+
)
115+
```
116+
117+
## Optimization Strategies
118+
119+
1. **Iterative**
120+
121+
- Sequential improvements
122+
- Direct feedback incorporation
123+
- Step-by-step refinement
124+
- Linear progression
125+
126+
2. **Parallel**
127+
128+
- Multiple variations
129+
- Concurrent evaluation
130+
- Best candidate selection
131+
- Broader exploration
132+
133+
3. **Tournament**
134+
- Population-based
135+
- Competitive selection
136+
- Genetic-style evolution
137+
- Diverse solutions
138+
139+
## Best Practices
140+
141+
1. **Evaluation Design**
142+
143+
- Define clear criteria
144+
- Use quantitative metrics
145+
- Weight importance factors
146+
- Consider trade-offs
147+
148+
2. **Strategy Selection**
149+
150+
- Match problem characteristics
151+
- Consider resource constraints
152+
- Balance exploration/exploitation
153+
- Adapt to feedback
154+
155+
3. **Termination Conditions**
156+
157+
- Set realistic thresholds
158+
- Limit iteration count
159+
- Monitor improvement rate
160+
- Handle convergence
161+
162+
4. **Result Analysis**
163+
- Track improvement history
164+
- Analyze feedback patterns
165+
- Document optimizations
166+
- Validate improvements
167+
168+
## Extensions
169+
170+
Consider extending this pattern with:
171+
172+
1. **Advanced Strategies**
173+
174+
- Hybrid approaches
175+
- Adaptive methods
176+
- Multi-objective optimization
177+
- Constraint handling
178+
179+
2. **Evaluation Enhancement**
180+
181+
- Automated testing
182+
- Performance profiling
183+
- Quality metrics
184+
- Validation suites
185+
186+
3. **Learning**
187+
188+
- Strategy adaptation
189+
- Pattern recognition
190+
- Historical learning
191+
- Meta-optimization
192+
193+
4. **Visualization**
194+
195+
- Progress tracking
196+
- Strategy comparison
197+
- Result analysis
198+
- Performance metrics
199+
200+
5. **Integration**
201+
- CI/CD pipelines
202+
- Version control
203+
- Issue tracking
204+
- Documentation generation
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import asyncio
2+
import os
3+
from typing import Dict, Any
4+
from dotenv import load_dotenv
5+
import anthropic
6+
from ..optimizer import OptimizerBuilder, OptimizationStrategy
7+
8+
async def llm_call(prompt: str) -> str:
9+
"""Call Anthropic's Claude API"""
10+
client = anthropic.Anthropic()
11+
response = await client.messages.create(
12+
model="claude-3-opus-20240229",
13+
max_tokens=1000,
14+
temperature=0.7,
15+
system="You are a helpful code optimization assistant.",
16+
messages=[{"role": "user", "content": prompt}]
17+
)
18+
return response.content[0].text
19+
20+
async def main():
21+
# Load environment variables
22+
load_dotenv()
23+
24+
# Initial code to optimize
25+
initial_code = """
26+
def fibonacci(n):
27+
if n <= 0:
28+
return []
29+
elif n == 1:
30+
return [0]
31+
32+
sequence = [0, 1]
33+
while len(sequence) < n:
34+
sequence.append(sequence[-1] + sequence[-2])
35+
return sequence
36+
37+
def find_fibonacci_sum(n):
38+
sequence = fibonacci(n)
39+
return sum(sequence)
40+
"""
41+
42+
# Evaluation criteria
43+
criteria = {
44+
"performance": {
45+
"description": "Code should be efficient in terms of time and space complexity",
46+
"weight": 0.4
47+
},
48+
"readability": {
49+
"description": "Code should be clear, well-documented, and follow Python best practices",
50+
"weight": 0.3
51+
},
52+
"maintainability": {
53+
"description": "Code should be easy to modify and extend",
54+
"weight": 0.2
55+
},
56+
"error_handling": {
57+
"description": "Code should handle edge cases and invalid inputs gracefully",
58+
"weight": 0.1
59+
}
60+
}
61+
62+
# Create optimizer with different strategies
63+
strategies = [
64+
("Iterative Optimization", OptimizationStrategy.ITERATIVE),
65+
("Parallel Optimization", OptimizationStrategy.PARALLEL),
66+
("Tournament Optimization", OptimizationStrategy.TOURNAMENT)
67+
]
68+
69+
for strategy_name, strategy in strategies:
70+
print(f"\n{'-' * 50}")
71+
print(f"Running {strategy_name}")
72+
print(f"{'-' * 50}")
73+
74+
optimizer = (
75+
OptimizerBuilder()
76+
.set_strategy(strategy)
77+
.set_max_iterations(3)
78+
.set_score_threshold(0.95)
79+
.set_population_size(3)
80+
.build(llm_caller=llm_call)
81+
)
82+
83+
try:
84+
# Optimize code
85+
best, history = await optimizer.optimize(initial_code, criteria)
86+
87+
print(f"\nOptimization completed after {len(history)} iterations")
88+
print(f"Final score: {best.score}")
89+
print("\nOptimized Code:")
90+
print(best.content)
91+
print("\nFeedback:")
92+
print(best.feedback)
93+
94+
except Exception as e:
95+
print(f"Error during optimization: {str(e)}")
96+
97+
if __name__ == "__main__":
98+
asyncio.run(main())
+314
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
from typing import List, Dict, Any, Callable, Optional, Union, Tuple
2+
from abc import ABC, abstractmethod
3+
from pydantic import BaseModel
4+
import asyncio
5+
from enum import Enum
6+
7+
class OptimizationStrategy(str, Enum):
8+
"""Strategies for optimization"""
9+
ITERATIVE = "iterative"
10+
PARALLEL = "parallel"
11+
TOURNAMENT = "tournament"
12+
13+
class Candidate(BaseModel):
14+
"""A candidate solution"""
15+
id: str
16+
content: str
17+
score: Optional[float] = None
18+
feedback: Optional[str] = None
19+
iteration: int = 0
20+
21+
class EvaluationResult(BaseModel):
22+
"""Result of evaluating a candidate"""
23+
score: float
24+
feedback: str
25+
improvements: List[str]
26+
27+
class OptimizerConfig(BaseModel):
28+
"""Configuration for the optimizer"""
29+
max_iterations: int = 5
30+
score_threshold: float = 0.9
31+
optimization_strategy: OptimizationStrategy = OptimizationStrategy.ITERATIVE
32+
population_size: int = 3
33+
tournament_size: int = 2
34+
35+
class Optimizer:
36+
"""Improves solutions through evaluation and optimization"""
37+
38+
def __init__(
39+
self,
40+
llm_caller: Callable[[str], str],
41+
config: Optional[OptimizerConfig] = None
42+
):
43+
self.llm_caller = llm_caller
44+
self.config = config or OptimizerConfig()
45+
self.candidates: List[Candidate] = []
46+
47+
async def _evaluate_candidate(
48+
self,
49+
candidate: Candidate,
50+
evaluation_criteria: Dict[str, Any]
51+
) -> EvaluationResult:
52+
"""Evaluate a candidate solution"""
53+
prompt = (
54+
"Evaluate this solution based on the given criteria. Provide:\n"
55+
"1. A score between 0 and 1\n"
56+
"2. Specific feedback\n"
57+
"3. Suggested improvements\n\n"
58+
f"Solution:\n{candidate.content}\n\n"
59+
f"Criteria:\n{evaluation_criteria}\n\n"
60+
"Respond in JSON format:\n"
61+
"{\n"
62+
' "score": 0.8,\n'
63+
' "feedback": "Detailed feedback here",\n'
64+
' "improvements": ["improvement 1", "improvement 2"]\n'
65+
"}"
66+
)
67+
68+
try:
69+
response = await self.llm_caller(prompt)
70+
return EvaluationResult.parse_raw(response)
71+
except Exception as e:
72+
raise Exception(f"Error evaluating candidate: {str(e)}")
73+
74+
async def _optimize_candidate(
75+
self,
76+
candidate: Candidate,
77+
evaluation_result: EvaluationResult
78+
) -> Candidate:
79+
"""Generate an improved version of the candidate"""
80+
prompt = (
81+
"Improve this solution based on the evaluation feedback and "
82+
"suggested improvements. Maintain the same format and structure, "
83+
"but address the identified issues.\n\n"
84+
f"Original Solution:\n{candidate.content}\n\n"
85+
f"Feedback:\n{evaluation_result.feedback}\n\n"
86+
"Suggested Improvements:\n" +
87+
"\n".join(f"- {imp}" for imp in evaluation_result.improvements) +
88+
"\n\nImproved Solution:"
89+
)
90+
91+
try:
92+
improved_content = await self.llm_caller(prompt)
93+
return Candidate(
94+
id=f"{candidate.id}-v{candidate.iteration + 1}",
95+
content=improved_content,
96+
iteration=candidate.iteration + 1
97+
)
98+
except Exception as e:
99+
raise Exception(f"Error optimizing candidate: {str(e)}")
100+
101+
async def _generate_variations(
102+
self,
103+
candidate: Candidate,
104+
num_variations: int
105+
) -> List[Candidate]:
106+
"""Generate multiple variations of a candidate"""
107+
prompt = (
108+
f"Generate {num_variations} different variations of this solution. "
109+
"Each variation should maintain the same basic structure but vary in "
110+
"approach or implementation details.\n\n"
111+
f"Original Solution:\n{candidate.content}\n\n"
112+
"Respond with each variation separated by '---'\n"
113+
)
114+
115+
try:
116+
response = await self.llm_caller(prompt)
117+
variations = [v.strip() for v in response.split("---")]
118+
return [
119+
Candidate(
120+
id=f"{candidate.id}-var{i}",
121+
content=content,
122+
iteration=candidate.iteration
123+
)
124+
for i, content in enumerate(variations[:num_variations], 1)
125+
]
126+
except Exception as e:
127+
raise Exception(f"Error generating variations: {str(e)}")
128+
129+
async def _run_tournament(
130+
self,
131+
candidates: List[Candidate],
132+
evaluation_criteria: Dict[str, Any]
133+
) -> List[Candidate]:
134+
"""Run a tournament to select the best candidates"""
135+
# Randomly pair candidates for tournament rounds
136+
import random
137+
tournament_pairs = []
138+
available = candidates.copy()
139+
while len(available) >= 2:
140+
pair = random.sample(available, 2)
141+
tournament_pairs.append(pair)
142+
for p in pair:
143+
available.remove(p)
144+
145+
# If odd number, add last one to winners
146+
winners = available
147+
148+
# Evaluate pairs and select winners
149+
for pair in tournament_pairs:
150+
results = await asyncio.gather(*[
151+
self._evaluate_candidate(c, evaluation_criteria)
152+
for c in pair
153+
])
154+
# Add winner to next round
155+
winner = pair[0] if results[0].score > results[1].score else pair[1]
156+
winners.append(winner)
157+
158+
return winners
159+
160+
async def optimize(
161+
self,
162+
initial_solution: str,
163+
evaluation_criteria: Dict[str, Any]
164+
) -> Tuple[Candidate, List[Candidate]]:
165+
"""Optimize a solution through iterative improvement"""
166+
# Initialize first candidate
167+
current = Candidate(
168+
id="solution-v0",
169+
content=initial_solution,
170+
iteration=0
171+
)
172+
173+
history = [current]
174+
175+
if self.config.optimization_strategy == OptimizationStrategy.ITERATIVE:
176+
# Iterative improvement
177+
for i in range(self.config.max_iterations):
178+
# Evaluate current candidate
179+
eval_result = await self._evaluate_candidate(
180+
current,
181+
evaluation_criteria
182+
)
183+
184+
# Update candidate with evaluation results
185+
current.score = eval_result.score
186+
current.feedback = eval_result.feedback
187+
188+
# Check if good enough
189+
if current.score >= self.config.score_threshold:
190+
break
191+
192+
# Generate improved version
193+
current = await self._optimize_candidate(current, eval_result)
194+
history.append(current)
195+
196+
elif self.config.optimization_strategy == OptimizationStrategy.PARALLEL:
197+
# Parallel variations and improvement
198+
for i in range(self.config.max_iterations):
199+
# Generate variations
200+
variations = await self._generate_variations(
201+
current,
202+
self.config.population_size
203+
)
204+
205+
# Evaluate all variations
206+
eval_results = await asyncio.gather(*[
207+
self._evaluate_candidate(v, evaluation_criteria)
208+
for v in variations
209+
])
210+
211+
# Update variations with scores
212+
for var, result in zip(variations, eval_results):
213+
var.score = result.score
214+
var.feedback = result.feedback
215+
216+
# Select best variation
217+
best = max(variations, key=lambda x: x.score)
218+
if best.score >= self.config.score_threshold:
219+
current = best
220+
history.extend(variations)
221+
break
222+
223+
# Improve best variation
224+
current = await self._optimize_candidate(
225+
best,
226+
eval_results[variations.index(best)]
227+
)
228+
history.extend(variations + [current])
229+
230+
else: # TOURNAMENT
231+
# Tournament-based optimization
232+
current_population = [current]
233+
234+
for i in range(self.config.max_iterations):
235+
# Generate variations for tournament
236+
variations = await self._generate_variations(
237+
current,
238+
self.config.population_size - 1
239+
)
240+
current_population.extend(variations)
241+
242+
# Run tournament
243+
winners = await self._run_tournament(
244+
current_population,
245+
evaluation_criteria
246+
)
247+
248+
# Select best winner
249+
best = winners[0]
250+
for w in winners[1:]:
251+
eval_result = await self._evaluate_candidate(
252+
w,
253+
evaluation_criteria
254+
)
255+
w.score = eval_result.score
256+
w.feedback = eval_result.feedback
257+
if w.score > (best.score or 0):
258+
best = w
259+
260+
if best.score >= self.config.score_threshold:
261+
current = best
262+
history.extend(current_population)
263+
break
264+
265+
# Use best as seed for next generation
266+
current = best
267+
current_population = [current]
268+
history.extend(current_population)
269+
270+
return current, history
271+
272+
class OptimizerBuilder:
273+
"""Helper class to build optimizers"""
274+
275+
def __init__(self):
276+
self.config = OptimizerConfig()
277+
278+
def set_max_iterations(self, max_iterations: int) -> 'OptimizerBuilder':
279+
"""Set maximum number of optimization iterations"""
280+
self.config.max_iterations = max_iterations
281+
return self
282+
283+
def set_score_threshold(self, threshold: float) -> 'OptimizerBuilder':
284+
"""Set score threshold for early stopping"""
285+
self.config.score_threshold = threshold
286+
return self
287+
288+
def set_strategy(
289+
self,
290+
strategy: OptimizationStrategy
291+
) -> 'OptimizerBuilder':
292+
"""Set optimization strategy"""
293+
self.config.optimization_strategy = strategy
294+
return self
295+
296+
def set_population_size(self, size: int) -> 'OptimizerBuilder':
297+
"""Set population size for parallel/tournament strategies"""
298+
self.config.population_size = size
299+
return self
300+
301+
def set_tournament_size(self, size: int) -> 'OptimizerBuilder':
302+
"""Set tournament size for tournament strategy"""
303+
self.config.tournament_size = size
304+
return self
305+
306+
def build(
307+
self,
308+
llm_caller: Callable[[str], str]
309+
) -> Optimizer:
310+
"""Build the optimizer"""
311+
return Optimizer(
312+
llm_caller=llm_caller,
313+
config=self.config
314+
)
+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
# Orchestrator-Workers Pattern
2+
3+
This pattern demonstrates how to implement dynamic task decomposition and parallel execution using an orchestrator and specialized workers.
4+
5+
## Overview
6+
7+
The orchestrator-workers pattern is useful when:
8+
9+
- Complex tasks need to be broken down dynamically
10+
- Different subtasks require specialized handling
11+
- Tasks have dependencies and execution order constraints
12+
- Parallel execution can improve performance
13+
14+
## Components
15+
16+
### Core Classes
17+
18+
1. **Task**
19+
20+
- Represents a unit of work
21+
- Tracks status and dependencies
22+
- Stores execution results
23+
- Manages task metadata
24+
25+
2. **Worker**
26+
27+
- Handles specific task types
28+
- Executes task logic
29+
- Manages concurrency
30+
- Reports task results
31+
32+
3. **Orchestrator**
33+
34+
- Decomposes complex tasks
35+
- Manages task dependencies
36+
- Delegates to workers
37+
- Coordinates execution
38+
39+
4. **OrchestratorBuilder**
40+
- Configures worker pool
41+
- Sets up task routing
42+
- Manages concurrency limits
43+
- Simplifies orchestrator creation
44+
45+
## Example Implementation
46+
47+
The `examples/document_processing.py` demonstrates document analysis:
48+
49+
1. Worker Types:
50+
51+
- Text Extraction
52+
- Language Detection
53+
- Summarization
54+
- Topic Extraction
55+
- Sentiment Analysis
56+
- Report Formatting
57+
58+
2. Features:
59+
- Dynamic task decomposition
60+
- Parallel processing
61+
- Dependency management
62+
- Result aggregation
63+
64+
## Usage
65+
66+
1. Set up environment:
67+
68+
```bash
69+
pip install -r requirements.txt
70+
```
71+
72+
2. Configure API keys:
73+
74+
```bash
75+
export ANTHROPIC_API_KEY=your_key_here
76+
# or
77+
export OPENAI_API_KEY=your_key_here
78+
```
79+
80+
3. Run the example:
81+
82+
```bash
83+
python -m examples.document_processing
84+
```
85+
86+
## Implementation Details
87+
88+
### Task Definition
89+
90+
```python
91+
Task(
92+
id="task-id",
93+
type="task-type",
94+
input_data={},
95+
dependencies=["dep-1", "dep-2"]
96+
)
97+
```
98+
99+
### Worker Configuration
100+
101+
```python
102+
Worker(
103+
name="worker-name",
104+
task_types=["type-1", "type-2"],
105+
handler=async_handler_function,
106+
concurrency_limit=2
107+
)
108+
```
109+
110+
### Orchestrator Building
111+
112+
```python
113+
orchestrator = (
114+
OrchestratorBuilder()
115+
.add_worker(...)
116+
.add_worker(...)
117+
.build(llm_caller)
118+
)
119+
```
120+
121+
### Execution
122+
123+
```python
124+
results = await orchestrator.execute(input_data)
125+
```
126+
127+
## Best Practices
128+
129+
1. **Task Design**
130+
131+
- Keep tasks focused
132+
- Define clear interfaces
133+
- Handle errors gracefully
134+
- Document dependencies
135+
136+
2. **Worker Implementation**
137+
138+
- Manage resources efficiently
139+
- Set appropriate concurrency
140+
- Handle timeouts
141+
- Validate inputs/outputs
142+
143+
3. **Dependency Management**
144+
145+
- Avoid circular dependencies
146+
- Handle missing dependencies
147+
- Track task status
148+
- Clean up resources
149+
150+
4. **Error Handling**
151+
- Handle worker failures
152+
- Manage task timeouts
153+
- Provide fallback options
154+
- Log execution details
155+
156+
## Extensions
157+
158+
Consider extending this pattern with:
159+
160+
1. **Advanced Orchestration**
161+
162+
- Dynamic worker allocation
163+
- Load balancing
164+
- Priority queues
165+
- Task scheduling
166+
167+
2. **Monitoring**
168+
169+
- Task progress tracking
170+
- Worker performance metrics
171+
- Resource utilization
172+
- Error reporting
173+
174+
3. **Resilience**
175+
176+
- Task retries
177+
- Circuit breaking
178+
- Fallback strategies
179+
- State recovery
180+
181+
4. **Optimization**
182+
183+
- Task batching
184+
- Resource pooling
185+
- Result caching
186+
- Parallel execution
187+
188+
5. **Integration**
189+
- External services
190+
- Message queues
191+
- Storage systems
192+
- Monitoring tools
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import asyncio
2+
import os
3+
from typing import Dict, Any
4+
from dotenv import load_dotenv
5+
import anthropic
6+
from ..orchestrator import OrchestratorBuilder
7+
8+
async def llm_call(prompt: str) -> str:
9+
"""Call Anthropic's Claude API"""
10+
client = anthropic.Anthropic()
11+
response = await client.messages.create(
12+
model="claude-3-opus-20240229",
13+
max_tokens=1000,
14+
temperature=0.7,
15+
system="You are a helpful task decomposition assistant.",
16+
messages=[{"role": "user", "content": prompt}]
17+
)
18+
return response.content[0].text
19+
20+
# Worker handlers for different document processing tasks
21+
async def handle_text_extraction(data: Dict[str, Any]) -> Dict[str, Any]:
22+
"""Extract text from document (mock implementation)"""
23+
document = data["document"]
24+
return {
25+
"text": f"Extracted text from {document}",
26+
"pages": 5,
27+
"language": "en"
28+
}
29+
30+
async def handle_language_detection(data: Dict[str, Any]) -> Dict[str, Any]:
31+
"""Detect language of text"""
32+
text = data["text"]
33+
return {
34+
"language": "en",
35+
"confidence": 0.95
36+
}
37+
38+
async def handle_summarization(data: Dict[str, Any]) -> Dict[str, Any]:
39+
"""Summarize text content"""
40+
text = data["text"]
41+
return {
42+
"summary": f"Summary of: {text}",
43+
"length": "medium"
44+
}
45+
46+
async def handle_topic_extraction(data: Dict[str, Any]) -> Dict[str, Any]:
47+
"""Extract main topics from text"""
48+
text = data["text"]
49+
return {
50+
"topics": ["topic1", "topic2", "topic3"],
51+
"confidence_scores": [0.9, 0.8, 0.7]
52+
}
53+
54+
async def handle_sentiment_analysis(data: Dict[str, Any]) -> Dict[str, Any]:
55+
"""Analyze sentiment of text"""
56+
text = data["text"]
57+
return {
58+
"sentiment": "positive",
59+
"score": 0.8,
60+
"aspects": {
61+
"tone": "professional",
62+
"emotion": "confident"
63+
}
64+
}
65+
66+
async def handle_formatting(data: Dict[str, Any]) -> Dict[str, Any]:
67+
"""Format the final document analysis report"""
68+
return {
69+
"report": {
70+
"summary": data["summary"],
71+
"topics": data["topics"],
72+
"sentiment": data["sentiment"],
73+
"language": data["language"]
74+
},
75+
"format": "json"
76+
}
77+
78+
async def main():
79+
# Load environment variables
80+
load_dotenv()
81+
82+
# Build the document processor
83+
processor = (
84+
OrchestratorBuilder()
85+
.add_worker(
86+
name="extractor",
87+
task_types=["text_extraction"],
88+
handler=handle_text_extraction,
89+
concurrency_limit=2
90+
)
91+
.add_worker(
92+
name="language_detector",
93+
task_types=["language_detection"],
94+
handler=handle_language_detection,
95+
concurrency_limit=1
96+
)
97+
.add_worker(
98+
name="summarizer",
99+
task_types=["summarization"],
100+
handler=handle_summarization,
101+
concurrency_limit=1
102+
)
103+
.add_worker(
104+
name="topic_extractor",
105+
task_types=["topic_extraction"],
106+
handler=handle_topic_extraction,
107+
concurrency_limit=1
108+
)
109+
.add_worker(
110+
name="sentiment_analyzer",
111+
task_types=["sentiment_analysis"],
112+
handler=handle_sentiment_analysis,
113+
concurrency_limit=1
114+
)
115+
.add_worker(
116+
name="formatter",
117+
task_types=["formatting"],
118+
handler=handle_formatting,
119+
concurrency_limit=1
120+
)
121+
.build(llm_caller=llm_call)
122+
)
123+
124+
# Example document to process
125+
document = {
126+
"document": "sample_document.pdf",
127+
"options": {
128+
"include_summary": True,
129+
"include_topics": True,
130+
"include_sentiment": True
131+
}
132+
}
133+
134+
try:
135+
# Process document
136+
print("\nProcessing document...")
137+
results = await processor.execute(document)
138+
139+
# Print results
140+
print("\nProcessing Results:")
141+
for task_id, result in results.items():
142+
print(f"\n{task_id}:")
143+
print(result)
144+
145+
except Exception as e:
146+
print(f"Error processing document: {str(e)}")
147+
148+
if __name__ == "__main__":
149+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
from typing import List, Dict, Any, Callable, Optional, Union
2+
from abc import ABC, abstractmethod
3+
from pydantic import BaseModel
4+
import asyncio
5+
import json
6+
7+
class Task(BaseModel):
8+
"""A task to be executed by a worker"""
9+
id: str
10+
type: str
11+
input_data: Dict[str, Any]
12+
dependencies: List[str] = []
13+
status: str = "pending"
14+
result: Optional[Dict[str, Any]] = None
15+
16+
class Worker(BaseModel):
17+
"""A worker that can execute specific types of tasks"""
18+
name: str
19+
task_types: List[str]
20+
handler: Callable[[Dict[str, Any]], Any]
21+
concurrency_limit: int = 1
22+
23+
class Orchestrator:
24+
"""Manages task decomposition, delegation, and execution"""
25+
26+
def __init__(
27+
self,
28+
workers: List[Worker],
29+
llm_caller: Callable[[str], str]
30+
):
31+
self.workers = workers
32+
self.llm_caller = llm_caller
33+
self.tasks: Dict[str, Task] = {}
34+
self.worker_queues: Dict[str, asyncio.Queue] = {}
35+
self._validate_config()
36+
self._setup_queues()
37+
38+
def _validate_config(self):
39+
"""Validate orchestrator configuration"""
40+
# Check for duplicate worker names
41+
names = set()
42+
for worker in self.workers:
43+
if worker.name in names:
44+
raise ValueError(f"Duplicate worker name: {worker.name}")
45+
names.add(worker.name)
46+
47+
# Check that all task types have handlers
48+
task_types = set()
49+
for worker in self.workers:
50+
task_types.update(worker.task_types)
51+
52+
def _setup_queues(self):
53+
"""Set up task queues for each worker"""
54+
for worker in self.workers:
55+
self.worker_queues[worker.name] = asyncio.Queue()
56+
57+
async def _decompose_task(
58+
self,
59+
input_data: Dict[str, Any]
60+
) -> List[Task]:
61+
"""Use LLM to decompose input into subtasks"""
62+
prompt = (
63+
"Decompose this task into smaller subtasks. For each subtask, specify:\n"
64+
"1. A unique ID\n"
65+
"2. The task type (one of: " +
66+
", ".join(self._get_all_task_types()) + ")\n"
67+
"3. Input data\n"
68+
"4. Dependencies (other task IDs)\n\n"
69+
"Respond in JSON format:\n"
70+
"{\n"
71+
' "tasks": [\n'
72+
" {\n"
73+
' "id": "task-1",\n'
74+
' "type": "task_type",\n'
75+
' "input_data": {},\n'
76+
' "dependencies": []\n'
77+
" }\n"
78+
" ]\n"
79+
"}\n\n"
80+
f"Task to decompose:\n{json.dumps(input_data, indent=2)}"
81+
)
82+
83+
try:
84+
response = await self.llm_caller(prompt)
85+
task_data = json.loads(response)
86+
return [Task(**task) for task in task_data["tasks"]]
87+
except Exception as e:
88+
raise Exception(f"Error decomposing task: {str(e)}")
89+
90+
def _get_all_task_types(self) -> List[str]:
91+
"""Get all supported task types"""
92+
task_types = set()
93+
for worker in self.workers:
94+
task_types.update(worker.task_types)
95+
return sorted(list(task_types))
96+
97+
def _get_workers_for_task(self, task_type: str) -> List[Worker]:
98+
"""Get workers that can handle a specific task type"""
99+
return [
100+
worker for worker in self.workers
101+
if task_type in worker.task_types
102+
]
103+
104+
async def _execute_worker(self, worker: Worker):
105+
"""Execute tasks for a specific worker"""
106+
queue = self.worker_queues[worker.name]
107+
while True:
108+
task = await queue.get()
109+
try:
110+
# Execute task
111+
result = await worker.handler(task.input_data)
112+
113+
# Update task status and result
114+
task.status = "completed"
115+
task.result = result
116+
117+
# Check if any dependent tasks can now run
118+
await self._check_dependencies()
119+
120+
except Exception as e:
121+
task.status = "failed"
122+
task.result = {"error": str(e)}
123+
124+
finally:
125+
queue.task_done()
126+
127+
async def _check_dependencies(self):
128+
"""Check and queue tasks whose dependencies are met"""
129+
for task in self.tasks.values():
130+
if task.status == "pending":
131+
# Check if all dependencies are completed
132+
deps_completed = all(
133+
self.tasks[dep].status == "completed"
134+
for dep in task.dependencies
135+
)
136+
137+
if deps_completed:
138+
# Find suitable worker and queue task
139+
workers = self._get_workers_for_task(task.type)
140+
if workers:
141+
# Simple round-robin for now
142+
worker = workers[0]
143+
await self.worker_queues[worker.name].put(task)
144+
task.status = "queued"
145+
146+
async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
147+
"""Execute a complex task using orchestration"""
148+
try:
149+
# Decompose task
150+
tasks = await self._decompose_task(input_data)
151+
152+
# Store tasks
153+
for task in tasks:
154+
self.tasks[task.id] = task
155+
156+
# Start worker coroutines
157+
worker_tasks = []
158+
for worker in self.workers:
159+
for _ in range(worker.concurrency_limit):
160+
worker_tasks.append(
161+
asyncio.create_task(self._execute_worker(worker))
162+
)
163+
164+
# Queue initial tasks (those with no dependencies)
165+
for task in tasks:
166+
if not task.dependencies:
167+
workers = self._get_workers_for_task(task.type)
168+
if workers:
169+
worker = workers[0] # Simple round-robin
170+
await self.worker_queues[worker.name].put(task)
171+
task.status = "queued"
172+
173+
# Wait for all tasks to complete
174+
for queue in self.worker_queues.values():
175+
await queue.join()
176+
177+
# Cancel worker tasks
178+
for task in worker_tasks:
179+
task.cancel()
180+
181+
# Combine results
182+
return {
183+
task.id: task.result
184+
for task in self.tasks.values()
185+
if task.status == "completed"
186+
}
187+
188+
except Exception as e:
189+
raise Exception(f"Error in orchestration: {str(e)}")
190+
191+
class OrchestratorBuilder:
192+
"""Helper class to build orchestrators"""
193+
194+
def __init__(self):
195+
self.workers: List[Worker] = []
196+
197+
def add_worker(
198+
self,
199+
name: str,
200+
task_types: List[str],
201+
handler: Callable[[Dict[str, Any]], Any],
202+
concurrency_limit: int = 1
203+
) -> 'OrchestratorBuilder':
204+
"""Add a worker to the orchestrator"""
205+
self.workers.append(
206+
Worker(
207+
name=name,
208+
task_types=task_types,
209+
handler=handler,
210+
concurrency_limit=concurrency_limit
211+
)
212+
)
213+
return self
214+
215+
def build(
216+
self,
217+
llm_caller: Callable[[str], str]
218+
) -> Orchestrator:
219+
"""Build the orchestrator"""
220+
return Orchestrator(
221+
workers=self.workers,
222+
llm_caller=llm_caller
223+
)

‎workflows/parallelization/README.md

+193
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# Parallelization Pattern
2+
3+
This pattern demonstrates how to execute multiple LLM tasks in parallel and combine their results using various voting strategies.
4+
5+
## Overview
6+
7+
The parallelization pattern is useful when:
8+
9+
- Multiple perspectives or analyses are needed
10+
- Tasks can be executed independently
11+
- Results need to be combined systematically
12+
- Performance optimization is important
13+
14+
## Components
15+
16+
### Core Classes
17+
18+
1. **ParallelTask**
19+
20+
- Defines a task to run in parallel
21+
- Includes prompt template and weight
22+
- Supports context variables
23+
24+
2. **Parallelizer**
25+
26+
- Manages parallel task execution
27+
- Implements voting strategies
28+
- Handles result combination
29+
30+
3. **ParallelizerBuilder**
31+
- Fluent interface for configuration
32+
- Supports task and strategy setup
33+
- Enables custom result combining
34+
35+
## Example Implementation
36+
37+
The `examples/content_moderation.py` demonstrates parallel content moderation:
38+
39+
1. Parallel Checks:
40+
41+
- Toxicity detection
42+
- Adult content detection
43+
- Spam detection
44+
45+
2. Features:
46+
- Weighted voting
47+
- Custom result combination
48+
- Detailed explanations
49+
- Fail-safe design
50+
51+
## Usage
52+
53+
1. Set up environment:
54+
55+
```bash
56+
pip install -r requirements.txt
57+
```
58+
59+
2. Configure API keys:
60+
61+
```bash
62+
export ANTHROPIC_API_KEY=your_key_here
63+
# or
64+
export OPENAI_API_KEY=your_key_here
65+
```
66+
67+
3. Run the example:
68+
69+
```bash
70+
python -m examples.content_moderation
71+
```
72+
73+
## Implementation Details
74+
75+
### Task Definition
76+
77+
```python
78+
ParallelTask(
79+
name="task_name",
80+
prompt_template="Your prompt with {variables}",
81+
weight=1.0
82+
)
83+
```
84+
85+
### Parallelizer Building
86+
87+
```python
88+
parallelizer = (
89+
ParallelizerBuilder()
90+
.add_task(...)
91+
.add_task(...)
92+
.set_voting_strategy(strategy, custom_combiner)
93+
.build(llm_caller)
94+
)
95+
```
96+
97+
### Execution
98+
99+
```python
100+
result = await parallelizer.execute(context)
101+
```
102+
103+
## Voting Strategies
104+
105+
1. **Majority Voting**
106+
107+
- Most common response wins
108+
- Supports weighted votes
109+
- Default strategy
110+
111+
2. **Unanimous Voting**
112+
113+
- Requires all tasks to agree
114+
- Returns all responses if no consensus
115+
- Good for critical decisions
116+
117+
3. **Weighted Averaging**
118+
119+
- For numeric responses
120+
- Considers task weights
121+
- Falls back to majority if non-numeric
122+
123+
4. **Custom Combining**
124+
- User-defined combination logic
125+
- Access to all task results
126+
- Maximum flexibility
127+
128+
## Best Practices
129+
130+
1. **Task Design**
131+
132+
- Keep tasks independent
133+
- Use clear prompt templates
134+
- Assign appropriate weights
135+
- Handle edge cases
136+
137+
2. **Performance**
138+
139+
- Optimize number of tasks
140+
- Balance task complexity
141+
- Monitor execution times
142+
- Handle timeouts
143+
144+
3. **Result Combination**
145+
146+
- Choose appropriate strategy
147+
- Handle conflicting results
148+
- Provide clear explanations
149+
- Consider confidence levels
150+
151+
4. **Error Handling**
152+
- Handle task failures
153+
- Implement timeouts
154+
- Provide fallbacks
155+
- Log issues
156+
157+
## Extensions
158+
159+
Consider extending this pattern with:
160+
161+
1. **Advanced Execution**
162+
163+
- Dynamic task creation
164+
- Conditional execution
165+
- Task prioritization
166+
- Resource management
167+
168+
2. **Result Processing**
169+
170+
- Confidence scoring
171+
- Result filtering
172+
- Anomaly detection
173+
- Quality metrics
174+
175+
3. **Performance**
176+
177+
- Result caching
178+
- Task batching
179+
- Rate limiting
180+
- Load balancing
181+
182+
4. **Monitoring**
183+
184+
- Task timing
185+
- Error rates
186+
- Result distribution
187+
- Resource usage
188+
189+
5. **Integration**
190+
- Multiple LLM providers
191+
- External services
192+
- Custom task types
193+
- Result persistence

0 commit comments

Comments
 (0)
Please sign in to comment.