Skip to content

Commit

Permalink
Merge pull request #23 from jlowin/flow
Browse files Browse the repository at this point in the history
Streamline flow decorator
  • Loading branch information
jlowin authored May 12, 2024
2 parents bb1d009 + a7b2cba commit 38b9d4b
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 74 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This targeted approach results in AI systems that are easier to build, maintain,

## Key Concepts

- **Flow**: A container for an AI-enhanced workflow, defined using the `@ai_flow` decorator. Flows maintain consistent context and history across tasks.
- **Flow**: A container for an AI-enhanced workflow, defined using the `@flow` decorator. Flows maintain consistent context and history across tasks.

- **Task**: A discrete objective for AI agents to solve, defined using the `@ai_task` decorator or declared inline. Tasks specify the expected inputs and outputs, acting as a bridge between AI agents and traditional code.

Expand Down Expand Up @@ -47,7 +47,7 @@ pip install .
## Example

```python
from control_flow import Agent, Task, ai_flow, ai_task, instructions
from control_flow import Agent, Task, flow, ai_task, instructions
from pydantic import BaseModel


Expand All @@ -67,7 +67,7 @@ def write_poem_about_user(name: Name, interests: list[str]) -> str:
pass


@ai_flow()
@flow()
def demo():
# set instructions that will be used for multiple tasks
with instructions("talk like a pirate"):
Expand Down
4 changes: 2 additions & 2 deletions examples/choose_a_number.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from control_flow import Agent, Task, ai_flow
from control_flow import Agent, Task, flow

a1 = Agent(name="A1", instructions="You struggle to make decisions.")
a2 = Agent(
Expand All @@ -7,7 +7,7 @@
)


@ai_flow
@flow
def demo():
task = Task("choose a number between 1 and 100", agents=[a1, a2], result_type=int)
return task.run()
Expand Down
4 changes: 2 additions & 2 deletions examples/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pathlib import Path

import control_flow
from control_flow import ai_flow, ai_task
from control_flow import ai_task, flow
from marvin.beta.assistants import Assistant, Thread
from marvin.tools.filesystem import read, write

Expand Down Expand Up @@ -49,7 +49,7 @@ def write_docs(docs_dir: Path, instructions: str = None):
"""


@ai_flow(assistant=assistant)
@flow(assistant=assistant)
def docs_flow(instructions: str):
examine_source_code(ROOT / "src", extensions=[".py"])
# read_docs(ROOT / "docs")
Expand Down
4 changes: 2 additions & 2 deletions examples/multi_agent_conversation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from control_flow import Agent, Task, ai_flow
from control_flow import Agent, Task, flow
from control_flow.core.controller.moderators import Moderator

jerry = Agent(
Expand Down Expand Up @@ -62,7 +62,7 @@
)


@ai_flow
@flow
def demo():
topic = "milk and cereal"
task = Task(
Expand Down
4 changes: 2 additions & 2 deletions examples/pineapple_pizza.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from control_flow import Agent, Task, ai_flow
from control_flow import Agent, Task, flow
from control_flow.instructions import instructions

a1 = Agent(
Expand All @@ -23,7 +23,7 @@
a3 = Agent(name="Moderator")


@ai_flow
@flow
def demo():
topic = "pineapple on pizza"

Expand Down
4 changes: 2 additions & 2 deletions examples/readme_example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from control_flow import Agent, Task, ai_flow, ai_task, instructions
from control_flow import Agent, Task, ai_task, flow, instructions
from pydantic import BaseModel


Expand All @@ -18,7 +18,7 @@ def write_poem_about_user(name: Name, interests: list[str]) -> str:
pass


@ai_flow()
@flow()
def demo():
# set instructions that will be used for multiple tasks
with instructions("talk like a pirate"):
Expand Down
4 changes: 2 additions & 2 deletions examples/teacher_student.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from control_flow import Agent, Task
from control_flow.dx import ai_flow
from control_flow.dx import flow
from control_flow.instructions import instructions

teacher = Agent(name="teacher")
student = Agent(name="student")


@ai_flow
@flow
def demo():
with Task("Teach a class by asking and answering 3 questions") as task:
for _ in range(3):
Expand Down
4 changes: 2 additions & 2 deletions src/control_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .settings import settings

# from .agent_old import ai_task, Agent, run_ai
from .core.flow import Flow, reset_global_flow as _reset_global_flow
from .core.flow import Flow, reset_global_flow as _reset_global_flow, flow
from .core.agent import Agent
from .core.task import Task
from .core.controller.controller import Controller
from .instructions import instructions
from .dx import ai_flow, run_ai, ai_task
from .dx import run_ai, ai_task

_reset_global_flow()
51 changes: 51 additions & 0 deletions src/control_flow/core/flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import functools
from contextlib import contextmanager
from typing import TYPE_CHECKING, Callable, Literal

import prefect
from marvin.beta.assistants import Thread
from openai.types.beta.threads import Message
from prefect import task as prefect_task
Expand All @@ -9,6 +11,7 @@
import control_flow
from control_flow.utilities.context import ctx
from control_flow.utilities.logging import get_logger
from control_flow.utilities.marvin import patch_marvin
from control_flow.utilities.types import AssistantTool, ControlFlowModel

if TYPE_CHECKING:
Expand Down Expand Up @@ -98,3 +101,51 @@ def get_flow_messages(limit: int = None) -> list[Message]:
"""
flow = get_flow()
return flow.thread.get_messages(limit=limit)


def flow(
fn=None,
*,
thread: Thread = None,
tools: list[AssistantTool | Callable] = None,
agents: list["Agent"] = None,
):
"""
A decorator that runs a function as a Flow
"""

if fn is None:
return functools.partial(
flow,
thread=thread,
tools=tools,
agents=agents,
)

@functools.wraps(fn)
def wrapper(
*args,
flow_kwargs: dict = None,
**kwargs,
):
flow_kwargs = flow_kwargs or {}

if thread is not None:
flow_kwargs.setdefault("thread", thread)
if tools is not None:
flow_kwargs.setdefault("tools", tools)
if agents is not None:
flow_kwargs.setdefault("agents", agents)

p_fn = prefect.flow(fn)

flow_obj = Flow(**flow_kwargs)

logger.info(
f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"'
)

with ctx(flow=flow_obj), patch_marvin():
return p_fn(*args, **kwargs)

return wrapper
51 changes: 1 addition & 50 deletions src/control_flow/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,19 @@
import inspect
from typing import Callable, TypeVar

from prefect import flow as prefect_flow
from prefect import task as prefect_task

from control_flow.core.agent import Agent
from control_flow.core.flow import Flow
from control_flow.core.task import Task, TaskStatus
from control_flow.utilities.context import ctx
from control_flow.utilities.logging import get_logger
from control_flow.utilities.marvin import patch_marvin
from control_flow.utilities.types import AssistantTool, Thread
from control_flow.utilities.types import AssistantTool

logger = get_logger(__name__)
T = TypeVar("T")
NOT_PROVIDED = object()


def ai_flow(
fn=None,
*,
thread: Thread = None,
tools: list[AssistantTool | Callable] = None,
model: str = None,
):
"""
Prepare a function to be executed as a Control Flow flow.
"""

if fn is None:
return functools.partial(
ai_flow,
thread=thread,
tools=tools,
model=model,
)

@functools.wraps(fn)
def wrapper(
*args,
flow_kwargs: dict = None,
**kwargs,
):
p_fn = prefect_flow(fn)

flow_obj = Flow(
**{
"thread": thread,
"tools": tools or [],
"model": model,
**(flow_kwargs or {}),
}
)

logger.info(
f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"'
)

with ctx(flow=flow_obj), patch_marvin():
return p_fn(*args, **kwargs)

return wrapper


def ai_task(
fn=None,
*,
Expand Down
4 changes: 2 additions & 2 deletions tests/flows/test_sign_guestbook.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from control_flow import Agent, ai_flow, run_ai
from control_flow import Agent, flow, run_ai

# define assistants

Expand All @@ -25,7 +25,7 @@ def view_guestbook():
# define flow


@ai_flow
@flow
def guestbook_flow():
run_ai(
"""
Expand Down
10 changes: 5 additions & 5 deletions tests/flows/test_user_access.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from control_flow import Agent, ai_flow, run_ai
from control_flow import Agent, flow, run_ai

# define assistants

Expand All @@ -8,7 +8,7 @@


def test_no_user_access_fails():
@ai_flow
@flow
def user_access_flow():
run_ai(
"This task requires human user access. Inform the user that today is a good day.",
Expand All @@ -20,7 +20,7 @@ def user_access_flow():


def test_user_access_agent_succeeds():
@ai_flow
@flow
def user_access_flow():
run_ai(
"This task requires human user access. Inform the user that today is a good day.",
Expand All @@ -31,7 +31,7 @@ def user_access_flow():


def test_user_access_task_succeeds():
@ai_flow
@flow
def user_access_flow():
run_ai(
"This task requires human user access. Inform the user that today is a good day.",
Expand All @@ -43,7 +43,7 @@ def user_access_flow():


def test_user_access_agent_and_task_succeeds():
@ai_flow
@flow
def user_access_flow():
run_ai(
"This task requires human user access. Inform the user that today is a good day.",
Expand Down

0 comments on commit 38b9d4b

Please sign in to comment.