Skip to content

Commit

Permalink
Merge pull request #24 from jlowin/task
Browse files Browse the repository at this point in the history
Update task decorator
  • Loading branch information
jlowin authored May 12, 2024
2 parents 38b9d4b + 69a5c4d commit a8f2dbb
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 24 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This targeted approach results in AI systems that are easier to build, maintain,

- **Flow**: A container for an AI-enhanced workflow, defined using the `@flow` decorator. Flows maintain consistent context and history across tasks.

- **Task**: A discrete objective for AI agents to solve, defined using the `@ai_task` decorator or declared inline. Tasks specify the expected inputs and outputs, acting as a bridge between AI agents and traditional code.
- **Task**: A discrete objective for AI agents to solve, defined using the `@task` decorator or declared inline. Tasks specify the expected inputs and outputs, acting as a bridge between AI agents and traditional code.

- **Agent**: An AI agent that can be assigned tasks. Agents are powered by specialized AI models that excel at specific tasks, such as text generation or decision making based on unstructured data.

Expand Down Expand Up @@ -47,7 +47,7 @@ pip install .
## Example

```python
from control_flow import Agent, Task, flow, ai_task, instructions
from control_flow import Agent, Task, flow, task, instructions
from pydantic import BaseModel


Expand All @@ -56,12 +56,12 @@ class Name(BaseModel):
last_name: str


@ai_task(user_access=True)
@task(user_access=True)
def get_user_name() -> Name:
pass


@ai_task(agents=[Agent(name="poetry-bot", instructions="loves limericks")])
@task(agents=[Agent(name="poetry-bot", instructions="loves limericks")])
def write_poem_about_user(name: Name, interests: list[str]) -> str:
"""write a poem based on the provided `name` and `interests`"""
pass
Expand Down
8 changes: 4 additions & 4 deletions examples/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pathlib import Path

import control_flow
from control_flow import ai_task, flow
from control_flow import flow, task
from marvin.beta.assistants import Assistant, Thread
from marvin.tools.filesystem import read, write

Expand All @@ -27,22 +27,22 @@ def glob(pattern: str) -> list[str]:
)


@ai_task(model="gpt-3.5-turbo")
@task(model="gpt-3.5-turbo")
def examine_source_code(source_dir: Path, extensions: list[str]):
"""
Load all matching files in the root dir and all subdirectories and
read them carefully.
"""


@ai_task(model="gpt-3.5-turbo")
@task(model="gpt-3.5-turbo")
def read_docs(docs_dir: Path):
"""
Read all documentation in the docs dir and subdirectories, if any.
"""


@ai_task
@task
def write_docs(docs_dir: Path, instructions: str = None):
"""
Write new documentation based on the provided instructions.
Expand Down
6 changes: 3 additions & 3 deletions examples/readme_example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from control_flow import Agent, Task, ai_task, flow, instructions
from control_flow import Agent, Task, flow, instructions, task
from pydantic import BaseModel


Expand All @@ -7,12 +7,12 @@ class Name(BaseModel):
last_name: str


@ai_task(user_access=True)
@task(user_access=True)
def get_user_name() -> Name:
pass


@ai_task(agents=[Agent(name="poetry-bot", instructions="loves limericks")])
@task(agents=[Agent(name="poetry-bot", instructions="loves limericks")])
def write_poem_about_user(name: Name, interests: list[str]) -> str:
"""write a poem based on the provided `name` and `interests`"""
pass
Expand Down
6 changes: 3 additions & 3 deletions src/control_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .settings import settings

# from .agent_old import ai_task, Agent, run_ai
# from .agent_old import task, Agent, run_ai
from .core.flow import Flow, reset_global_flow as _reset_global_flow, flow
from .core.agent import Agent
from .core.task import Task
from .core.task import Task, task
from .core.controller.controller import Controller
from .instructions import instructions
from .dx import run_ai, ai_task
from .dx import run_ai

_reset_global_flow()
59 changes: 59 additions & 0 deletions src/control_flow/core/task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import functools
import inspect
import uuid
from contextlib import contextmanager
from enum import Enum
Expand All @@ -23,6 +25,7 @@
model_validator,
)

from control_flow.core.agent import Agent
from control_flow.core.flow import get_flow
from control_flow.instructions import get_instructions
from control_flow.utilities.context import ctx
Expand Down Expand Up @@ -377,3 +380,59 @@ def any_failed(tasks: list[Task]) -> bool:

def none_failed(tasks: list[Task]) -> bool:
return not any_failed(tasks)


def task(
fn=None,
*,
objective: str = None,
instructions: str = None,
agents: list[Agent] = None,
tools: list[AssistantTool | Callable] = None,
user_access: bool = None,
):
"""
A decorator that turns a Python function into a Task. The Task objective is
set to the function name, and the instructions are set to the function
docstring. When the function is called, the arguments are provided to the
task as context, and the task is run to completion. If successful, the task
result is returned; if failed, an error is raised.
"""

if fn is None:
return functools.partial(
task,
objective=objective,
instructions=instructions,
agents=agents,
tools=tools,
user_access=user_access,
)

sig = inspect.signature(fn)

if objective is None:
objective = fn.__name__

if instructions is None:
instructions = fn.__doc__

@functools.wraps(fn)
def wrapper(*args, **kwargs):
# first process callargs
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()

task = Task(
objective=objective,
agents=agents,
context=bound.arguments,
result_type=fn.__annotations__.get("return"),
user_access=user_access or False,
tools=tools or [],
)

task.run()
return task.result

return wrapper
20 changes: 10 additions & 10 deletions src/control_flow/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
NOT_PROVIDED = object()


def ai_task(
def task(
fn=None,
*,
objective: str = None,
Expand All @@ -30,7 +30,7 @@ def ai_task(

if fn is None:
return functools.partial(
ai_task,
task,
objective=objective,
agents=agents,
tools=tools,
Expand Down Expand Up @@ -90,7 +90,7 @@ def run_ai(
) -> T | list[T]:
"""
Create and run an agent to complete a task with the given objective and
context. This function is similar to an inline version of the @ai_task
context. This function is similar to an inline version of the @task
decorator.
This inline version is useful when you want to create and run an ad-hoc AI
Expand All @@ -116,7 +116,7 @@ def run_ai(

# create tasks
if tasks:
ai_tasks = [
tasks = [
Task(
objective=t,
context=context or {},
Expand All @@ -126,7 +126,7 @@ def run_ai(
for t in tasks
]
else:
ai_tasks = []
tasks = []

# create agent
if agents is None:
Expand All @@ -135,17 +135,17 @@ def run_ai(
# create Controller
from control_flow.core.controller.controller import Controller

controller = Controller(tasks=ai_tasks, agents=agents, flow=flow)
controller = Controller(tasks=tasks, agents=agents, flow=flow)
controller.run()

if ai_tasks:
if all(task.status == TaskStatus.SUCCESSFUL for task in ai_tasks):
result = [task.result for task in ai_tasks]
if tasks:
if all(task.status == TaskStatus.SUCCESSFUL for task in tasks):
result = [task.result for task in tasks]
if single_result:
result = result[0]
return result
elif failed_tasks := [
task for task in ai_tasks if task.status == TaskStatus.FAILED
task for task in tasks if task.status == TaskStatus.FAILED
]:
raise ValueError(
f'Failed tasks: {", ".join([task.objective for task in failed_tasks])}'
Expand Down

0 comments on commit a8f2dbb

Please sign in to comment.