From 69a5c4d53d91b30d4f4461ac807dc98476009b99 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sun, 12 May 2024 13:31:55 -0400 Subject: [PATCH] Update task decorator --- README.md | 8 ++--- examples/documentation.py | 8 ++--- examples/readme_example.py | 6 ++-- src/control_flow/__init__.py | 6 ++-- src/control_flow/core/task.py | 59 +++++++++++++++++++++++++++++++++++ src/control_flow/dx.py | 20 ++++++------ 6 files changed, 83 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 5906cf26..fa299a83 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ This targeted approach results in AI systems that are easier to build, maintain, - **Flow**: A container for an AI-enhanced workflow, defined using the `@flow` decorator. Flows maintain consistent context and history across tasks. -- **Task**: A discrete objective for AI agents to solve, defined using the `@ai_task` decorator or declared inline. Tasks specify the expected inputs and outputs, acting as a bridge between AI agents and traditional code. +- **Task**: A discrete objective for AI agents to solve, defined using the `@task` decorator or declared inline. Tasks specify the expected inputs and outputs, acting as a bridge between AI agents and traditional code. - **Agent**: An AI agent that can be assigned tasks. Agents are powered by specialized AI models that excel at specific tasks, such as text generation or decision making based on unstructured data. @@ -47,7 +47,7 @@ pip install . ## Example ```python -from control_flow import Agent, Task, flow, ai_task, instructions +from control_flow import Agent, Task, flow, task, instructions from pydantic import BaseModel @@ -56,12 +56,12 @@ class Name(BaseModel): last_name: str -@ai_task(user_access=True) +@task(user_access=True) def get_user_name() -> Name: pass -@ai_task(agents=[Agent(name="poetry-bot", instructions="loves limericks")]) +@task(agents=[Agent(name="poetry-bot", instructions="loves limericks")]) def write_poem_about_user(name: Name, interests: list[str]) -> str: """write a poem based on the provided `name` and `interests`""" pass diff --git a/examples/documentation.py b/examples/documentation.py index 75a708f0..fdae7aa0 100644 --- a/examples/documentation.py +++ b/examples/documentation.py @@ -2,7 +2,7 @@ from pathlib import Path import control_flow -from control_flow import ai_task, flow +from control_flow import flow, task from marvin.beta.assistants import Assistant, Thread from marvin.tools.filesystem import read, write @@ -27,7 +27,7 @@ def glob(pattern: str) -> list[str]: ) -@ai_task(model="gpt-3.5-turbo") +@task(model="gpt-3.5-turbo") def examine_source_code(source_dir: Path, extensions: list[str]): """ Load all matching files in the root dir and all subdirectories and @@ -35,14 +35,14 @@ def examine_source_code(source_dir: Path, extensions: list[str]): """ -@ai_task(model="gpt-3.5-turbo") +@task(model="gpt-3.5-turbo") def read_docs(docs_dir: Path): """ Read all documentation in the docs dir and subdirectories, if any. """ -@ai_task +@task def write_docs(docs_dir: Path, instructions: str = None): """ Write new documentation based on the provided instructions. diff --git a/examples/readme_example.py b/examples/readme_example.py index 63b4b1b1..390537fa 100644 --- a/examples/readme_example.py +++ b/examples/readme_example.py @@ -1,4 +1,4 @@ -from control_flow import Agent, Task, ai_task, flow, instructions +from control_flow import Agent, Task, flow, instructions, task from pydantic import BaseModel @@ -7,12 +7,12 @@ class Name(BaseModel): last_name: str -@ai_task(user_access=True) +@task(user_access=True) def get_user_name() -> Name: pass -@ai_task(agents=[Agent(name="poetry-bot", instructions="loves limericks")]) +@task(agents=[Agent(name="poetry-bot", instructions="loves limericks")]) def write_poem_about_user(name: Name, interests: list[str]) -> str: """write a poem based on the provided `name` and `interests`""" pass diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index ec97d3d6..9e0b44b1 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,11 +1,11 @@ from .settings import settings -# from .agent_old import ai_task, Agent, run_ai +# from .agent_old import task, Agent, run_ai from .core.flow import Flow, reset_global_flow as _reset_global_flow, flow from .core.agent import Agent -from .core.task import Task +from .core.task import Task, task from .core.controller.controller import Controller from .instructions import instructions -from .dx import run_ai, ai_task +from .dx import run_ai _reset_global_flow() diff --git a/src/control_flow/core/task.py b/src/control_flow/core/task.py index bc4cfc34..bd69b635 100644 --- a/src/control_flow/core/task.py +++ b/src/control_flow/core/task.py @@ -1,4 +1,6 @@ import datetime +import functools +import inspect import uuid from contextlib import contextmanager from enum import Enum @@ -23,6 +25,7 @@ model_validator, ) +from control_flow.core.agent import Agent from control_flow.core.flow import get_flow from control_flow.instructions import get_instructions from control_flow.utilities.context import ctx @@ -377,3 +380,59 @@ def any_failed(tasks: list[Task]) -> bool: def none_failed(tasks: list[Task]) -> bool: return not any_failed(tasks) + + +def task( + fn=None, + *, + objective: str = None, + instructions: str = None, + agents: list[Agent] = None, + tools: list[AssistantTool | Callable] = None, + user_access: bool = None, +): + """ + A decorator that turns a Python function into a Task. The Task objective is + set to the function name, and the instructions are set to the function + docstring. When the function is called, the arguments are provided to the + task as context, and the task is run to completion. If successful, the task + result is returned; if failed, an error is raised. + """ + + if fn is None: + return functools.partial( + task, + objective=objective, + instructions=instructions, + agents=agents, + tools=tools, + user_access=user_access, + ) + + sig = inspect.signature(fn) + + if objective is None: + objective = fn.__name__ + + if instructions is None: + instructions = fn.__doc__ + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + # first process callargs + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + task = Task( + objective=objective, + agents=agents, + context=bound.arguments, + result_type=fn.__annotations__.get("return"), + user_access=user_access or False, + tools=tools or [], + ) + + task.run() + return task.result + + return wrapper diff --git a/src/control_flow/dx.py b/src/control_flow/dx.py index 3a2e8823..f3c2d453 100644 --- a/src/control_flow/dx.py +++ b/src/control_flow/dx.py @@ -15,7 +15,7 @@ NOT_PROVIDED = object() -def ai_task( +def task( fn=None, *, objective: str = None, @@ -30,7 +30,7 @@ def ai_task( if fn is None: return functools.partial( - ai_task, + task, objective=objective, agents=agents, tools=tools, @@ -90,7 +90,7 @@ def run_ai( ) -> T | list[T]: """ Create and run an agent to complete a task with the given objective and - context. This function is similar to an inline version of the @ai_task + context. This function is similar to an inline version of the @task decorator. This inline version is useful when you want to create and run an ad-hoc AI @@ -116,7 +116,7 @@ def run_ai( # create tasks if tasks: - ai_tasks = [ + tasks = [ Task( objective=t, context=context or {}, @@ -126,7 +126,7 @@ def run_ai( for t in tasks ] else: - ai_tasks = [] + tasks = [] # create agent if agents is None: @@ -135,17 +135,17 @@ def run_ai( # create Controller from control_flow.core.controller.controller import Controller - controller = Controller(tasks=ai_tasks, agents=agents, flow=flow) + controller = Controller(tasks=tasks, agents=agents, flow=flow) controller.run() - if ai_tasks: - if all(task.status == TaskStatus.SUCCESSFUL for task in ai_tasks): - result = [task.result for task in ai_tasks] + if tasks: + if all(task.status == TaskStatus.SUCCESSFUL for task in tasks): + result = [task.result for task in tasks] if single_result: result = result[0] return result elif failed_tasks := [ - task for task in ai_tasks if task.status == TaskStatus.FAILED + task for task in tasks if task.status == TaskStatus.FAILED ]: raise ValueError( f'Failed tasks: {", ".join([task.objective for task in failed_tasks])}'