Skip to content

Commit

Permalink
Merge pull request #1046 from PrefectHQ/planning
Browse files Browse the repository at this point in the history
Add planning
  • Loading branch information
jlowin authored Jan 31, 2025
2 parents e2fcda3 + 1eab12f commit a037e80
Show file tree
Hide file tree
Showing 12 changed files with 856 additions and 194 deletions.
102 changes: 92 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,105 @@ We believe working with AI should spark joy (and maybe a few "wow" moments):

- 🧩 **Task-Centric Architecture**: Break complex AI workflows into manageable, observable steps.
- 🤖 **Specialized Agents**: Deploy task-specific AI agents for efficient problem-solving.
- 🔒 **Type-Safe Results**: Bridge the gap between AI and traditional software with type-safe, validated outputs
- 🔒 **Type-Safe Results**: Bridge the gap between AI and traditional software with type-safe, validated outputs.
- 🎛️ **Flexible Control**: Continuously tune the balance of control and autonomy in your workflows.
- 🕹️ **Multi-Agent Orchestration**: Coordinate multiple AI agents within a single workflow or task.
- 🧵 **Thread management**: Manage the agentic loop by composing tasks into customizable threads.
- 🧵 **Thread Management**: Manage the agentic loop by composing tasks into customizable threads.
- 🔗 **Ecosystem Integration**: Seamlessly work with your existing code, tools, and the broader AI ecosystem.
- 🚀 **Developer Speed:** Start simple, scale up, sleep well
- 🚀 **Developer Speed**: Start simple, scale up, sleep well.

## Core Abstractions

Marvin is built around a few powerful abstractions that make it easy to work with AI:

### Tasks

Tasks are the fundamental unit of work in Marvin. Each task represents a clear objective that can be accomplished by an AI agent:

```python
# The simplest way to run a task
result = marvin.run("Write a haiku about coding")

# Create a task with more control
task = marvin.Task(
instructions="Write a haiku about coding",
result_type=str,
tools=[my_custom_tool]
)
```

Tasks are:
- 🎯 **Objective-Focused**: Each task has clear instructions and a type-safe result
- 🛠️ **Tool-Enabled**: Tasks can use custom tools to interact with your code and data
- 📊 **Observable**: Monitor progress, inspect results, and debug failures
- 🔄 **Composable**: Build complex workflows by connecting tasks together

### Agents and Teams

Agents are portable LLM configurations that can be assigned to tasks. They encapsulate everything an AI needs to work effectively:

```python
# Create a specialized agent
writer = marvin.Agent(
name="Technical Writer",
instructions="Write clear, engaging content for developers"
)

# Create a team of agents that work together
team = marvin.Swarm([
writer,
marvin.Agent("Editor"),
marvin.Agent("Fact Checker")
])

# Use agents with tasks
result = marvin.run(
"Write a blog post about Python type hints",
agents=[team] # or team
)
```

Agents are:
- 📝 **Specialized**: Give agents specific instructions and personalities
- 🎭 **Portable**: Reuse agent configurations across different tasks
- 🤝 **Collaborative**: Form teams of agents that work together
- 🔧 **Customizable**: Configure model, temperature, and other settings

### Planning and Orchestration

Marvin makes it easy to break down complex objectives into manageable tasks:

```python
# Let Marvin plan a complex workflow
tasks = marvin.plan("Create a blog post about AI trends")
marvin.run_tasks(tasks)

# Or orchestrate tasks manually
with marvin.Thread() as thread:
research = marvin.run("Research recent AI developments")
outline = marvin.run("Create an outline", context={"research": research})
draft = marvin.run("Write the first draft", context={"outline": outline})
```

Planning features:
- 📋 **Smart Planning**: Break down complex objectives into discrete, dependent tasks
- 🔄 **Task Dependencies**: Tasks can depend on each other's outputs
- 📈 **Progress Tracking**: Monitor the execution of your workflow
- 🧵 **Thread Management**: Share context and history between tasks

## Keep it Simple

Marvin includes high-level functions for the most common tasks, like summarizing text, classifying data, extracting structured information, and more.

- 📖 **Summarize**: Get a quick summary of a text
- 🏷️ **Classify**: Categorize data into predefined classes
- 🔍 **Extract**: Extract structured information from a text
- 🪄 **Cast**: Transform data into a different type
-**Generate**: Create structured data from a description
- 💬 **Say**: Converse with an LLM
- 🦾 **`@fn`**: Write custom AI functions without source code
- 🚀 **`marvin.run`**: Execute any task with an AI agent
- 📖 **`marvin.summarize`**: Get a quick summary of a text
- 🏷️ **`marvin.classify`**: Categorize data into predefined classes
- 🔍 **`marvin.extract`**: Extract structured information from a text
- 🪄 **`marvin.cast`**: Transform data into a different type
-**`marvin.generate`**: Create structured data from a description
- 💬 **`marvin.say`**: Converse with an LLM
- 🧠 **`marvin.plan`**: Break down complex objectives into tasks
- 🦾 **`@marvin.fn`**: Write custom AI functions without source code

All Marvin functions have thread management built-in, meaning they can be composed into chains of tasks that share context and history.

Expand Down Expand Up @@ -200,3 +280,5 @@ print(f"# {article.title}\n\n{article.content}")
>- Used by major companies like Google, Mozilla, and Unity
>```
</details>
## Keep it Simple
28 changes: 28 additions & 0 deletions docs/concepts/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,33 @@ task = marvin.Task(

Tasks can be configured with a parent task. Creating hierarchies of tasks can help agents understand the relationships between different tasks and to better understand the task they are working on.

### Task Planning

When a task is created with `plan=True`, the agent is given the ability to break down complex tasks into smaller, more manageable subtasks. This is particularly useful for handling large or complex objectives that benefit from being tackled incrementally.

The agent can create subtasks at any point during task execution, and these subtasks become part of the task hierarchy. Each subtask is treated as a dependency of the parent task, ensuring that all subtasks are completed before the parent task is considered finished.

```python
import marvin

task = marvin.Task(
instructions="Write a comprehensive report about AI",
plan=True # Enable task planning
)
```

When planning is enabled, the agent can:
- Create one or more subtasks at any time
- Define clear, focused objectives for each subtask
- Establish dependencies between subtasks
- Monitor progress through the task hierarchy

This feature is especially valuable for:
- Breaking down complex tasks into manageable pieces
- Creating checkpoints in long-running processes
- Enabling parallel work on independent subtasks
- Maintaining clear progress tracking

### Depends on

Tasks can be configured with a list of tasks that they depend on. This information helps Marvin prioritize work and avoid running a task before its upstream dependencies are complete.
Expand All @@ -266,6 +293,7 @@ Marvin tasks support several additional options to control their behavior:
- `allow_fail`: Whether the task is allowed to be marked as failed (default: False)
- `allow_skip`: Whether the task is allowed to be skipped (default: False)
- `cli`: Whether to enable CLI interaction tools for the agent (default: False)
- `plan`: Whether to enable task planning capabilities, allowing the agent to create subtasks to help complete the parent task (default: False)

## Runtime properties

Expand Down
120 changes: 120 additions & 0 deletions docs/functions/plan.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
title: Plan
description: Break down complex objectives into manageable tasks
icon: list-check
---

The `plan` function helps you break down complex objectives into a series of smaller, manageable tasks.
It's particularly useful for handling large or complex goals that benefit from being tackled incrementally.

For simple tasks, you can use `marvin.run()` directly. The `plan` function is designed for complex
workflows that benefit from being broken down into steps - see [Task Planning](/concepts/tasks#task-planning)
for more details.

## Usage

Create a simple task plan:

```python
import marvin

# Create and execute a series of tasks
tasks = marvin.plan("Create a new blog post about AI trends")
marvin.run_tasks(tasks)
```

## Parameters

- `instructions`: The objective to achieve
- `agent`: Optional agent to use for planning
- `thread`: Optional thread for conversation context
- `context`: Optional dict of additional context
- `available_agents`: Optional list of agents that can be used for planning
- `tools`: Optional list of tools that can be used for planning
- `parent_task`: Optional parent task for the planned tasks

## Returns

Returns a list of `Task` objects that, when completed, will achieve the specified objective.

## Async Support

The function is also available in an async version:

```python
import marvin
import asyncio

async def main():
tasks = await marvin.plan_async(
"Create a new blog post about AI trends"
)
await marvin.run_tasks_async(tasks)

asyncio.run(main())
```

## Examples

### Multiple Agents

Assign different tasks to specialized agents:

```python
import marvin

# Create specialized agents
researcher = marvin.Agent(name="Researcher", model="openai:gpt-4")
writer = marvin.Agent(name="Writer", model="openai:gpt-4")
editor = marvin.Agent(name="Editor", model="openai:gpt-4")

# Plan tasks with multiple agents
tasks = marvin.plan(
"Create a comprehensive research report on quantum computing",
available_agents=[researcher, writer, editor]
)
marvin.run_tasks(tasks)
```

### With Context

Provide additional requirements:

```python
import marvin

# Provide detailed context for planning
context = {
"target_audience": "technical professionals",
"word_count": 2000,
"key_topics": ["neural networks", "transformers", "reinforcement learning"]
}

tasks = marvin.plan(
"Write a technical blog post about AI advancements",
context=context
)
marvin.run_tasks(tasks)
```

### With Tools

Make specific tools available to tasks:

```python
import marvin

def fetch_papers(topic: str, limit: int = 5) -> list[str]:
"""Fetch latest research papers on a topic."""
pass

def analyze_sentiment(text: str) -> float:
"""Analyze sentiment of text."""
pass

tasks = marvin.plan(
"Create a sentiment analysis report for recent AI papers",
tools=[fetch_papers, analyze_sentiment]
)
marvin.run_tasks(tasks)
```
3 changes: 3 additions & 0 deletions src/marvin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from marvin.fns.fn import fn
from marvin.fns.say import say, say_async
from marvin.fns.summarize import summarize, summarize_async
from marvin.fns.plan import plan, plan_async

ensure_tables_exist()

Expand All @@ -53,6 +54,8 @@
"generate_schema",
"generate_schema_async",
"instructions",
"plan",
"plan_async",
"run",
"run_async",
"run_tasks",
Expand Down
37 changes: 36 additions & 1 deletion src/marvin/agents/actor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from contextvars import ContextVar
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Sequence
from typing import TYPE_CHECKING, Any, Optional, Sequence

from pydantic_ai.result import RunResult

Expand All @@ -18,6 +20,12 @@
from marvin.engine.end_turn import EndTurn
from marvin.engine.orchestrator import Orchestrator

# Global context var for current actor
_current_actor: ContextVar[Optional["Actor"]] = ContextVar(
"current_actor",
default=None,
)


@dataclass(kw_only=True)
class Actor(ABC):
Expand Down Expand Up @@ -46,9 +54,27 @@ class Actor(ABC):

prompt: str | Path = field(repr=False)

_tokens: list[Any] = field(default_factory=list, init=False, repr=False)

def __hash__(self) -> int:
return hash(self.id)

def __enter__(self):
"""Set this actor as the current actor in context."""
token = _current_actor.set(self)
self._tokens.append(token)
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
"""Reset the current actor in context."""
if self._tokens: # Only reset if we have tokens
_current_actor.reset(self._tokens.pop())

@classmethod
def get_current(cls) -> Optional["Actor"]:
"""Get the current actor from context."""
return _current_actor.get()

@abstractmethod
async def _run(
self,
Expand Down Expand Up @@ -140,3 +166,12 @@ def as_team(self) -> "Team":
raise NotImplementedError(
"Subclass must implement as_team in order to be properly orchestrated.",
)


def get_current_actor() -> Actor | None:
"""Get the currently active actor from context.
Returns:
The current Actor instance or None if no actor is active.
"""
return Actor.get_current()
Loading

0 comments on commit a037e80

Please sign in to comment.