Skip to content

Commit

Permalink
Improve moderation
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed May 10, 2024
1 parent ccf1f22 commit 9c44d7b
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 53 deletions.
26 changes: 19 additions & 7 deletions examples/multi_agent_conversation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from control_flow import Agent, Task, ai_flow
from control_flow.core.controller.collaboration import Moderator
from control_flow.core.controller.moderators import Moderator

jerry = Agent(
name="Jerry",
Expand Down Expand Up @@ -50,15 +50,27 @@
""",
)

newman = Agent(
name="Newman",
description="The antagonist and foil to Jerry.",
instructions="""
You are Newman from the show Seinfeld. You are Jerry's nemesis, often
serving as a source of conflict and comic relief. Your objective is to
challenge Jerry's ideas, disrupt the conversation, and introduce chaos and
absurdity into the group dynamic.
""",
)


@ai_flow
def demo():
with Task("Discuss a topic", agents=[jerry, george, elaine, kramer]):
finish = Task(
"Finish the conversation after everyone speaks at least once",
agents=[jerry],
)
finish.run_until_complete(moderator=Moderator())
topic = "milk and cereal"
task = Task(
"Discuss a topic; everyone should speak at least once",
agents=[jerry, george, elaine, kramer, newman],
context=dict(topic=topic),
)
task.run_until_complete(moderator=Moderator())


demo()
3 changes: 3 additions & 0 deletions src/control_flow/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ async def run_async(self, tasks: list[Task] | Task | None = None):

controller = Controller(agents=[self], tasks=tasks or [], flow=get_flow())
return await controller.run_agent_async(agent=self)

def __hash__(self):
return id(self)
7 changes: 4 additions & 3 deletions src/control_flow/core/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ async def _run_agent(self, agent: Agent, thread: Thread = None) -> Run:
)

instructions = instructions_template.render()
breakpoint()

tools = self.flow.tools + agent.get_tools()

for task in self.tasks:
tools = tools + task.get_tools()
# add tools for any inactive tasks that the agent is assigned to
for task in self.all_tasks():
if task.is_incomplete() and agent in task.agents:
tools = tools + task.get_tools()

# filter tools because duplicate names are not allowed
final_tools = []
Expand Down
3 changes: 2 additions & 1 deletion src/control_flow/core/controller/instruction_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class TasksTemplate(Template):
You must complete the objective even if the task doesn't require a
result. For example, a task that asks you to choose, discuss, or perform
an action must be completed by posting messages before the task is
marked complete.
marked complete. The objective may require participation from multiple
agents. Do not mark a task as complete until the objective is fully met.
A "parent" is a task that spawned another task as a subtask. Generally,
the subtasks will need to be completed BEFORE the parent task. If you
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from typing import TYPE_CHECKING, Any, Generator

import marvin
from pydantic import BaseModel
from pydantic import BaseModel, Field

from control_flow.core.agent import Agent
from control_flow.core.flow import get_flow_messages
from control_flow.core.flow import Flow, get_flow_messages
from control_flow.core.task import Task
from control_flow.instructions import get_instructions

Expand All @@ -29,14 +29,45 @@ def __call__(
yield from self.run(agents=agents, tasks=tasks)


class AgentModerator(BaseModerator):
agent: Agent
participate: bool = Field(
False,
description="If True, the moderator can participate in the conversation. Default is False.",
)

def __init__(self, agent: Agent, **kwargs):
super().__init__(agent=agent, **kwargs)

def run(self, agents: list[Agent], tasks: list[Task]) -> Generator[Any, Any, Agent]:
while True:
history = get_flow_messages()

with Flow():
task = Task(
"Choose the next agent that should speak.",
instructions="""
You are acting as a moderator. Choose the next agent to
speak. Complete the task and stay silent. Do not post
any messages, even to confirm marking the task as
successful.
""",
result_type=[a.name for a in agents],
context=dict(agents=agents, history=history, tasks=tasks),
agents=[self.agent],
parent=None,
)
agent_name = task.run_until_complete()
yield next(a for a in agents if a.name == agent_name)


class Moderator(BaseModerator):
model: str = None

def run(self, agents: list[Agent], tasks: list[Task]) -> Generator[Any, Any, Agent]:
while True:
instructions = get_instructions()
history = get_flow_messages()

context = dict(
tasks=tasks, messages=history, global_instructions=instructions
)
Expand All @@ -46,7 +77,7 @@ def run(self, agents: list[Agent], tasks: list[Task]) -> Generator[Any, Any, Age
instructions="""
Given the conversation context, choose the AI agent most
qualified to take the next turn at completing the tasks. Take into
account any tasks, instructions, and tools.
account any tasks, history, instructions, and tools.
""",
model_kwargs=dict(model=self.model) if self.model else None,
)
Expand Down
13 changes: 13 additions & 0 deletions src/control_flow/core/flow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import contextmanager
from typing import Callable, Literal

from marvin.beta.assistants import Thread
Expand Down Expand Up @@ -34,6 +35,18 @@ def _load_thread_from_ctx(cls, v):
def add_message(self, message: str, role: Literal["user", "assistant"] = None):
prefect_task(self.thread.add)(message, role=role)

@contextmanager
def _context(self):
with ctx(flow=self, tasks=[]):
yield self

def __enter__(self):
self.__cm = self._context()
return self.__cm.__enter__()

def __exit__(self, *exc_info):
return self.__cm.__exit__(*exc_info)


def get_flow() -> Flow:
"""
Expand Down
94 changes: 56 additions & 38 deletions src/control_flow/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ class TaskStatus(Enum):
SKIPPED = "skipped"


NOTSET = "__notset__"


class Task(ControlFlowModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4().hex[:4]))
objective: str
instructions: str | None = None
agents: list["Agent"] = []
context: dict = {}
parent: "Task | None" = Field(
None,
NOTSET,
description="The task that spawned this task.",
validate_default=True,
)
Expand Down Expand Up @@ -83,7 +86,7 @@ def _turn_list_into_literal_result_type(cls, v):

@field_validator("parent", mode="before")
def _load_parent_task_from_ctx(cls, v):
if v is None:
if v is NOTSET:
v = ctx.get("tasks", None)
if v:
# get the most recently-added task
Expand Down Expand Up @@ -156,33 +159,6 @@ def dependency_agents(self) -> list["Agent"]:
agents.extend(task.agents)
return agents

def run_iter(
self,
agents: list["Agent"] = None,
moderator: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
):
from control_flow.core.controller.collaboration import round_robin

if moderator is None:
moderator = round_robin

if agents is None:
agents = self.dependency_agents()

if not agents:
raise ValueError(
f"Task {self.id} has no agents assigned to it or its children."
"Please specify agents to run the task, or assign agents to the task."
)

all_tasks = self.trace_dependencies()

for agent in moderator(agents, tasks=all_tasks):
if self.is_complete():
break
agent.run(tasks=all_tasks)
yield True

def run(self, agent: "Agent" = None):
"""
Runs the task with provided agent. If no agent is provided, a default agent is used.
Expand All @@ -198,11 +174,10 @@ def run(self, agent: "Agent" = None):
else:
raise ValueError(
f"Task {self.id} has multiple agents assigned to it or its "
"children. Please specify one to run the task, or call task.run_iter() "
"or task.run_until_complete() to use all agents."
"children. Please specify one to run the task or call run_until_complete()."
)

run_gen = self.run_iter(agents=[agent])
run_gen = run_iter(tasks=[self], agents=[agent])
return next(run_gen)

def run_until_complete(
Expand All @@ -214,13 +189,10 @@ def run_until_complete(
Runs the task with provided agents until it is complete.
"""

for _ in self.run_iter(agents=agents, moderator=moderator):
continue

if self.is_successful():
return self.result
elif self.is_failed():
run_until_complete(tasks=[self], agents=agents, moderator=moderator)
if self.is_failed():
raise ValueError(f"Task {self.id} failed: {self.error}")
return self.result

@contextmanager
def _context(self):
Expand Down Expand Up @@ -345,3 +317,49 @@ def any_failed(tasks: list[Task]) -> bool:

def none_failed(tasks: list[Task]) -> bool:
return not any_failed(tasks)


def run_iter(
tasks: list["Task"],
agents: list["Agent"] = None,
moderator: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
):
from control_flow.core.controller.moderators import round_robin

if moderator is None:
moderator = round_robin

if agents is None:
agents = list(set([a for t in tasks for a in t.dependency_agents()]))

if not agents:
raise ValueError("Tasks have no agents assigned. Please specify agents.")

all_tasks = list(set([a for t in tasks for a in t.trace_dependencies()]))

for agent in moderator(agents, tasks=all_tasks):
if any(t.is_failed() for t in tasks):
break
elif all(t.is_complete() for t in tasks):
break
agent.run(tasks=all_tasks)
yield True


def run_until_complete(
tasks: list["Task"],
agents: list["Agent"] = None,
moderator: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
raise_on_error: bool = True,
) -> T:
"""
Runs the task with provided agents until it is complete.
"""

for _ in run_iter(tasks=tasks, agents=agents, moderator=moderator):
continue

if raise_on_error and any(t.is_failed() for t in tasks):
raise ValueError(
f"At least one task failed: {', '.join(t.id for t in tasks if t.is_failed())}"
)

0 comments on commit 9c44d7b

Please sign in to comment.