Skip to content

Commit

Permalink
Merge pull request #21 from jlowin/agent-selection
Browse files Browse the repository at this point in the history
Update agent selection
  • Loading branch information
jlowin authored May 11, 2024
2 parents 674e3b0 + 354df84 commit 7922a6c
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 113 deletions.
30 changes: 30 additions & 0 deletions examples/teacher_student.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from control_flow import Agent, Task
from control_flow.dx import ai_flow
from control_flow.instructions import instructions

teacher = Agent(name="teacher")
student = Agent(name="student")


@ai_flow
def demo():
with Task("Teach a class by asking and answering 3 questions") as task:
for _ in range(3):
question = Task(
"ask the student a question. Wait for the student to answer your question before asking another one.",
str,
agents=[teacher],
)
with instructions("one sentence max"):
Task(
"answer the question",
str,
agents=[student],
context=dict(question=question),
)

task.run()
return task


t = demo()
2 changes: 1 addition & 1 deletion examples/write_and_critique_paper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def write_paper(topic: str) -> str:
draft = Task(
"produce a 3-sentence draft on the topic",
str,
agents=[writer],
# agents=[writer],
context=dict(topic=topic),
)
edits = Task("edit the draft", str, agents=[editor], depends_on=[draft])
Expand Down
36 changes: 18 additions & 18 deletions src/control_flow/core/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ def _validate_tasks(cls, v):
return v

def _create_end_run_tool(self) -> FunctionTool:
@marvin.utilities.tools.tool_from_function
def end_run():
"""
End your turn if you have no tasks to work on. Only call this tool
if necessary; otherwise you can end your turn normally.
"""
raise EndRun()

return marvin.utilities.tools.tool_from_function(
end_run,
description="End your turn if you have no tasks to work on. Only call this tool in an emergency; otherwise you can end your turn normally.",
)
return end_run

async def _run_agent(
self, agent: Agent, tasks: list[Task] = None, thread: Thread = None
Expand Down Expand Up @@ -148,34 +150,32 @@ async def _run_agent(agent: Agent, tasks: list[Task], thread: Thread = None):

@expose_sync_method("run_once")
async def run_once_async(self):
"""
Run the controller for a single iteration of the provided tasks. An agent will be selected to run the tasks.
"""
# get the tasks to run
if self.run_dependencies:
tasks = self.graph.upstream_dependencies(self.tasks)
else:
tasks = self.tasks
tasks = self.graph.upstream_dependencies(self.tasks)

# get the agents
agent_candidates = {a for t in tasks for a in t.agents if t.is_ready()}
if self.agents:
agents = self.agents
agents = list(agent_candidates.intersection(self.agents))
else:
# if we are running dependencies, only load agents for tasks that are ready
if self.run_dependencies:
agents = list({a for t in tasks for a in t.agents if t.is_ready()})
else:
agents = list({a for t in tasks for a in t.agents})
agents = list(agent_candidates)

# select the next agent
if len(agents) == 0:
agent = Agent()
raise ValueError(
"No agents were provided that are assigned to tasks that are ready to be run."
)
elif len(agents) == 1:
agent = agents[0]
else:
agent = marvin_moderator(
agents=agents,
tasks=tasks,
context=dict(
history=get_flow_messages(), instructions=get_instructions()
),
history=get_flow_messages(),
instructions=get_instructions(),
)

return await self._run_agent(agent, tasks=tasks)
Expand Down
23 changes: 12 additions & 11 deletions src/control_flow/core/controller/instruction_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,18 @@ class TasksTemplate(Template):
template: str = """
## Tasks
### Your assignments
You have been assigned to complete certain tasks. Each task has an
objective and criteria for success. Your job is to perform any required
actions and then mark each assigned task as successful. If a task
requires a result, you must provide it. Only work on tasks that are
assigned to you.
A task may have a "parent", meaning it is a subtask and should be
completed before its parent. You can `skip` a subtask if you are able to
complete the parent task first.
A task may have tasks it "depends_on", meaning it must be completed
after the tasks it depends on. Tasks will error if you try to complete
them before their dependencies are met.
### Dependencies
Tasks may have explicit (`depends_on`) or implicit (`subtasks`)
dependencies. Tasks complete (or skip) dependencies before completing a
downstream task; tasks will error if explicit dependencies are not met.
### Current tasks
At the start of your turn, these are all the tasks being worked on. This
Expand All @@ -97,7 +93,12 @@ class TasksTemplate(Template):
marked complete. The objective may require participation from multiple
agents. Do not mark a task as complete until the objective is fully met.
If a task does take a result, do not also write the result in a message.
You can only complete tasks you are assigned to.
If a task requires a result, do not repeat the result in a message. For
example, if the task is to ask a question and accepts a string response,
only put the question in the task result; do not also post the question
in a message.
Some tasks may require collaboration with other agents to be completed;
others may take you multiple attempts. A task can only be marked
Expand Down
4 changes: 3 additions & 1 deletion src/control_flow/core/controller/moderators.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ def run(self, agents: list[Agent], tasks: list[Task]) -> Generator[Any, Any, Age
def marvin_moderator(
agents: list[Agent],
tasks: list[Task],
history: list = None,
instructions: list[str] = None,
context: dict = None,
model: str = None,
) -> Agent:
context = context or {}
context.update(tasks=tasks)
context.update(tasks=tasks, history=history, instructions=instructions)
agent = marvin.classify(
context,
agents,
Expand Down
32 changes: 16 additions & 16 deletions src/control_flow/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ class EdgeType(Enum):
## write draft based on outline
Edges:
outline -> paper # child_of (outline is a child of paper)
draft -> paper # child_of (draft is a child of paper)
outline -> draft # dependency_of (outline is a dependency of draft)
outline -> paper # SUBTASK (outline is a subtask of paper)
draft -> paper # SUBTASK (draft is a subtask of paper)
outline -> draft # DEPENDENCY (outline is a dependency of draft)
"""

DEPENDENCY_OF = "dependency_of"
CHILD_OF = "child_of"
DEPENDENCY = "dependency"
SUBTASK = "subtask"


class Edge(BaseModel):
Expand Down Expand Up @@ -60,23 +60,23 @@ def add_task(self, task: Task):
if task in self.tasks:
return
self.tasks.add(task)
if task.parent:
for subtask in task.subtasks:
self.add_edge(
Edge(
upstream=task.parent,
upstream=subtask,
downstream=task,
type=EdgeType.CHILD_OF,
type=EdgeType.SUBTASK,
)
)
if task.depends_on:
for upstream in task.depends_on:
self.add_edge(
Edge(
upstream=upstream,
downstream=task,
type=EdgeType.DEPENDENCY_OF,
)

for upstream in task.depends_on:
self.add_edge(
Edge(
upstream=upstream,
downstream=task,
type=EdgeType.DEPENDENCY,
)
)
self._cache.clear()

def add_edge(self, edge: Edge):
Expand Down
Loading

0 comments on commit 7922a6c

Please sign in to comment.