Skip to content

Commit

Permalink
Merge pull request #37 from jlowin/flow
Browse files Browse the repository at this point in the history
update utility functions for status
  • Loading branch information
jlowin authored May 15, 2024
2 parents 06f7ac5 + 07d6809 commit 302fb44
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 80 deletions.
85 changes: 5 additions & 80 deletions src/controlflow/core/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import datetime
import functools
import inspect
import uuid
from contextlib import contextmanager
from enum import Enum
Expand Down Expand Up @@ -29,7 +27,11 @@
from controlflow.utilities.context import ctx
from controlflow.utilities.logging import get_logger
from controlflow.utilities.prefect import wrap_prefect_tool
from controlflow.utilities.tasks import collect_tasks, visit_task_collection
from controlflow.utilities.tasks import (
all_complete,
collect_tasks,
visit_task_collection,
)
from controlflow.utilities.types import (
NOTSET,
AssistantTool,
Expand Down Expand Up @@ -431,80 +433,3 @@ def mark_skipped(self):
self._parent.mark_successful(validate=False)

return f"{self.friendly_name()} marked skipped. Updated task definition: {self.model_dump()}"


def any_incomplete(tasks: list[Task]) -> bool:
return any(t.status == TaskStatus.INCOMPLETE for t in tasks)


def all_complete(tasks: list[Task]) -> bool:
return all(t.status != TaskStatus.INCOMPLETE for t in tasks)


def all_successful(tasks: list[Task]) -> bool:
return all(t.status == TaskStatus.SUCCESSFUL for t in tasks)


def any_failed(tasks: list[Task]) -> bool:
return any(t.status == TaskStatus.FAILED for t in tasks)


def none_failed(tasks: list[Task]) -> bool:
return not any_failed(tasks)


def task(
fn=None,
*,
objective: str = None,
instructions: str = None,
agents: list["Agent"] = None,
tools: list[ToolType] = None,
user_access: bool = None,
):
"""
A decorator that turns a Python function into a Task. The Task objective is
set to the function name, and the instructions are set to the function
docstring. When the function is called, the arguments are provided to the
task as context, and the task is run to completion. If successful, the task
result is returned; if failed, an error is raised.
"""

if fn is None:
return functools.partial(
task,
objective=objective,
instructions=instructions,
agents=agents,
tools=tools,
user_access=user_access,
)

sig = inspect.signature(fn)

if objective is None:
objective = fn.__name__

if instructions is None:
instructions = fn.__doc__

@functools.wraps(fn)
def wrapper(*args, **kwargs):
# first process callargs
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()

task = Task(
objective=objective,
instructions=instructions,
agents=agents,
context=bound.arguments,
result_type=fn.__annotations__.get("return"),
user_access=user_access or False,
tools=tools or [],
)

task.run()
return task.result

return wrapper
20 changes: 20 additions & 0 deletions src/controlflow/utilities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,23 @@ def visit_task(task: "Task"):
return task.run()

return visit_task_collection(val, visit_task)


def any_incomplete(tasks: list["Task"]) -> bool:
return any(t.is_incomplete() for t in tasks)


def all_complete(tasks: list["Task"]) -> bool:
return all(t.is_complete() for t in tasks)


def all_successful(tasks: list["Task"]) -> bool:
return all(t.is_successful() for t in tasks)


def any_failed(tasks: list["Task"]) -> bool:
return any(t.is_failed() for t in tasks)


def none_failed(tasks: list["Task"]) -> bool:
return not any_failed(tasks)

0 comments on commit 302fb44

Please sign in to comment.