feat(flows): tinyflows Workflows (B1) — engine seam + flows domain#4433
Conversation
…ation breakage The 'finish TinyAgents harness migration (tinyhumansai#4249)' merge left main not compiling: - orchestration/graph/mod.rs + orchestration/ops.rs reference the removed SqlRunLedgerCheckpointer (0 defs in tree) -> use the crate's tinyagents::graph::SqliteCheckpointer (same swap agent_orchestration already made), at a dedicated orchestration_graph_checkpoints.db. - agent_registry/agents/loader.rs passes a bare graph_fn where the field is now Option<_> -> wrap in Some(..). - mcp_server/resources.rs missing frontend_agent/reasoning_agent catalog entries (failing the resource-catalog test) -> add them. Isolated from the flows feature so it can be cherry-picked into a hotfix.
Embed the published tinyflows 0.2 workflow-engine crate as the Workflows
feature and wire it to real OpenHuman services.
- Cargo.toml: tinyflows = "0.2" (single tinyagents 1.3.0 / rusqlite 0.40.0).
- Seam src/openhuman/tinyflows/: 5 capability adapters (LlmProvider->inference,
ToolInvoker->composio, HttpClient->HttpRequestTool, CodeRunner->sandbox,
StateStore->flow_state), a RunObserver, build_capabilities(), and
open_flow_checkpointer() reusing the crate's tinyagents::SqliteCheckpointer.
- Domain src/openhuman/flows/: Flow entity + flow_definitions/flow_state store,
validate-on-save via tinyflows::validate, CRUD + enable/disable, and flows_run
(compile -> run_with_checkpointer over the real adapters).
- Registered in core/all.rs, openhuman/mod.rs, channels startup (trigger
subscriber skeleton; real trigger->run bridge is B2).
RPC: openhuman.flows_{create,get,list,update,delete,set_enabled,run}.
Leaves the existing SKILL-bundle src/openhuman/workflows/ untouched.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughThis PR adds a ChangesFlows domain and TinyFlows capability seam
Orchestration and agent/resource wiring
Estimated code review effort: 4 (Complex) | ~75 minutes Possibly related issues
Possibly related PRs
Suggested labels: Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
Comment |
Verification
HTTP adapter test noteOpenHuman's real |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ea280c6f9e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let result = tool | ||
| .execute(request) |
There was a problem hiding this comment.
Route HTTP flow nodes through the approval gate
When flows_run is invoked from the agent-facing controller, an http_request node reaches this direct HttpRequestTool::execute call outside the ApprovalSecurityMiddleware; HttpRequestTool::execute only checks can_act()/rate limits, while the Prompt decision for Network actions is normally made before tool execution. In Supervised or Full autonomy, a flow can therefore make outbound network calls without the user approval that Network-class tools require.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Valid, and this is the explicit B1→B2 boundary. B1 deliberately defers the trust/approval wiring — TrustedAutomationSource::Workflow plus routing flow tool/HTTP nodes through the approval gate — to B2 (see the PR description's scope note). In B1, flows_run is a direct RPC (no auto-triggering, no UI), and the autonomy-tier gate (can_act()), the domain allowlist, and the DNS/SSRF guard all still apply inside HttpRequestTool::execute — so a readonly tier blocks outbound network entirely. What's missing is the per-call Network approval prompt in supervised/full, which B2 adds when it wires the workflow trust source. Tracking it there to keep the trust model in one place rather than duplicating gate logic in the seam.
| state: Arc::new(caps::FlowStateStore { | ||
| config, | ||
| namespace: "default".to_string(), |
There was a problem hiding this comment.
Isolate state-store keys per flow
Every run builds FlowStateStore with the same "default" namespace, and the backing flow_state table is keyed only by (namespace, key). When two saved flows use a state node with the same key, one flow will read or overwrite the other's state; pass at least the flow_id (or run/thread id, depending on the intended durability) into the capabilities namespace to keep workflow state isolated.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in e58c527 — build_capabilities now takes a per-flow state_namespace and flows_run passes flow:<id>, so two saved flows no longer share the default namespace and can't read/overwrite each other's state. (Store-level isolation is already covered by flow_state_store_round_trips_and_is_namespace_scoped.)
| let outcome = | ||
| tinyflows::engine::run_with_checkpointer(&compiled, input, &caps, checkpointer, &thread_id) | ||
| .await | ||
| .map_err(|e| e.to_string())?; |
There was a problem hiding this comment.
Record failed runs before returning engine errors
If the engine or a capability fails during execution, this map_err returns before store::record_run, so the attempted run leaves last_run_at and last_status unchanged even though Flow::last_status includes "failed". For example, an http_request node that is blocked without on_error: continue makes the latest run fail, but flows_get/flows_list will still show the previous success or pending state.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in e58c527 — flows_run now records a failed run (updating last_run_at/last_status) before returning the engine/capability error, so flows_get/flows_list reflect the failed attempt instead of the prior success/pending state.
| let command = format!( | ||
| "{} {} {}", | ||
| shell_quote(interpreter), | ||
| shell_quote(&script_path.to_string_lossy()), | ||
| shell_quote(&input_path.to_string_lossy()), | ||
| ); |
There was a problem hiding this comment.
Use container paths for Docker code runs
When the sandbox policy resolves to Docker, docker_exec mounts action_dir at /workspace and runs inside that container path, but this command passes host-absolute script_path/input_path values. In Docker-backed runtimes those paths do not exist in the container, so every JavaScript/Python code node fails before user code starts; build the command with container-relative paths for the Docker backend.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in e58c527 — the code command now references the script/input by action_dir-relative paths and passes action_dir as the sandbox working dir. That resolves under both backends: Local uses working_dir as the host cwd, and Docker bind-mounts action_dir at /workspace with -w /workspace, so the relative paths exist inside the container.
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
src/openhuman/orchestration/graph/mod.rs (1)
384-393: 🚀 Performance & Scalability | 🔵 Trivial | 💤 Low valueCache the orchestration checkpointer — this opens a new
SqliteCheckpointeron every wake cycle; a lazy sharedArcwould avoid reopening SQLite on the hot path.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/orchestration/graph/mod.rs` around lines 384 - 393, The orchestration checkpointer is being reopened on every wake cycle instead of being reused. Update the orchestration graph setup around the SqliteCheckpointer::open path to cache a single lazy shared Arc<SqliteCheckpointer<OrchestrationState>> (for example via a OnceLock or similar shared holder) and return the existing instance on subsequent wake cycles rather than constructing a new one each time.src/openhuman/flows/ops.rs (1)
82-90: 🚀 Performance & Scalability | 🔵 TrivialOrphaned state/checkpoint data on flow deletion.
flows_deleteremoves theflow_definitionsrow but doesn't clean up the correspondingflow_stateKV rows orcheckpoints.dbentries for that flow. This is more of a lifecycle/retention gap than a defect — worth tracking for a future cleanup pass so deleted flows don't leave orphaned state/checkpoint data behind indefinitely.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/flows/ops.rs` around lines 82 - 90, `flows_delete` only removes the flow definition and leaves behind related `flow_state` and checkpoint data, so extend the deletion path to clean up all flow-owned runtime data as part of the same lifecycle. Update `flows_delete` to call the appropriate state/checkpoint cleanup helpers after `store::remove_flow`, using the flow id to remove any matching `flow_state` KV entries and `checkpoints.db` records, and keep the existing `RpcOutcome` success response unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/flows/schemas.rs`:
- Around line 308-315: handle_run currently awaits ops::flows_run directly, so
the RPC can hang indefinitely if the underlying flow stalls. Update handle_run
to enforce a bounded timeout around the flows_run call, or thread a
deadline/timeout through ops::flows_run into
tinyflows::engine::run_with_checkpointer so long-running HTTP/LLM/capability
steps fail predictably. Use the handle_run and ops::flows_run symbols to locate
the call chain and apply the timeout at the appropriate boundary.
In `@src/openhuman/flows/store.rs`:
- Around line 181-202: The update_flow_graph path is doing a full
read-modify-write via upsert_flow, which can overwrite concurrent changes to
fields like enabled, last_run_at, and last_status. Refactor update_flow_graph to
issue a targeted SQL update for only name, graph_json, and updated_at, using the
existing get_flow lookup only to confirm the flow exists and preserve
created_at/other untouched columns. Keep the Flow struct reconstruction out of
the persistence write path and avoid calling upsert_flow here.
- Around line 26-62: `with_connection` in `store.rs` is creating a fresh SQLite
connection and re-initializing the schema on every call, which makes
`kv_get`/`kv_set` and other flow-store operations prone to immediate SQLITE_BUSY
under concurrent use. Update the connection setup in `with_connection` to
configure SQLite for concurrent access (set a `busy_timeout` and enable
WAL/journal settings before use), and avoid repeating expensive schema
initialization on each invocation by moving one-time setup behind a reusable
initialization path or shared connection lifecycle. Use the existing
`with_connection` helper and the `Connection::open`/`execute_batch` flow as the
place to apply the fix.
In `@src/openhuman/flows/types.rs`:
- Around line 29-30: `last_status` is documented and typed as allowing failed,
but the run flow never persists that value because errors return before
`record_run`. Update the run/error handling in the flow execution path that
writes `flows_run` and sets `last_status` so compile/run failures explicitly
store failed, or if that state is not intended, remove failed from the
`last_status` docs and surrounding handling to match the actual behavior.
In `@src/openhuman/tinyflows/caps.rs`:
- Around line 312-332: The work_dir cleanup in the code path around
execute_in_sandbox only runs after a successful call, so any hard error returned
via the current map_err/? path leaves the temporary directory behind. Update the
flow in caps.rs so cleanup is performed regardless of outcome, including when
execute_in_sandbox fails, while keeping the existing non-fatal cleanup logging
behavior. Use the execute_in_sandbox call, the cleanup/remove_dir_all block, and
the work_dir variable as the key points to restructure the control flow.
In `@src/openhuman/tinyflows/mod.rs`:
- Around line 26-78: Move the logic in build_capabilities and
open_flow_checkpointer out of mod.rs into a dedicated module such as caps.rs or
a new sibling file, since mod.rs should stay export-focused. Keep the external
API unchanged by re-exporting these symbols from mod.rs with pub use, and
preserve the current behavior around SecurityPolicy setup, adapter wiring, and
SqliteCheckpointer creation.
- Around line 34-52: The FlowStateStore namespace is hardcoded to "default" in
build_capabilities, causing all flow runs to share the same state keys. Update
build_capabilities to accept flow_id and use it when constructing
caps::FlowStateStore so each flow gets an isolated namespace. Keep the change
localized to build_capabilities and the FlowStateStore initialization in
tinyflows::mod.
---
Nitpick comments:
In `@src/openhuman/flows/ops.rs`:
- Around line 82-90: `flows_delete` only removes the flow definition and leaves
behind related `flow_state` and checkpoint data, so extend the deletion path to
clean up all flow-owned runtime data as part of the same lifecycle. Update
`flows_delete` to call the appropriate state/checkpoint cleanup helpers after
`store::remove_flow`, using the flow id to remove any matching `flow_state` KV
entries and `checkpoints.db` records, and keep the existing `RpcOutcome` success
response unchanged.
In `@src/openhuman/orchestration/graph/mod.rs`:
- Around line 384-393: The orchestration checkpointer is being reopened on every
wake cycle instead of being reused. Update the orchestration graph setup around
the SqliteCheckpointer::open path to cache a single lazy shared
Arc<SqliteCheckpointer<OrchestrationState>> (for example via a OnceLock or
similar shared holder) and return the existing instance on subsequent wake
cycles rather than constructing a new one each time.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 986486f9-f320-4b99-9988-183276522898
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (20)
Cargo.tomlsrc/core/all.rssrc/openhuman/agent_registry/agents/loader.rssrc/openhuman/channels/runtime/startup.rssrc/openhuman/flows/bus.rssrc/openhuman/flows/mod.rssrc/openhuman/flows/ops.rssrc/openhuman/flows/ops_tests.rssrc/openhuman/flows/schemas.rssrc/openhuman/flows/store.rssrc/openhuman/flows/store_tests.rssrc/openhuman/flows/types.rssrc/openhuman/mcp_server/resources.rssrc/openhuman/mod.rssrc/openhuman/orchestration/graph/mod.rssrc/openhuman/orchestration/ops.rssrc/openhuman/tinyflows/caps.rssrc/openhuman/tinyflows/mod.rssrc/openhuman/tinyflows/observability.rssrc/openhuman/tinyflows/tests.rs
…ng, Docker paths)
- FlowStateStore: scope per-flow ("flow:<id>") instead of a shared "default"
namespace, so two flows sharing a state key no longer collide
(build_capabilities now takes a state_namespace; flows_run passes flow_id).
- flows_run: record a "failed" run before returning an engine/capability error,
so last_run_at/last_status reflect a failed attempt instead of the prior state.
- CodeRunner: reference the script/input by action_dir-relative paths and pass
action_dir as the sandbox working dir, so code nodes work under the Docker
backend (action_dir mounted at /workspace) as well as Local.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/openhuman/tinyflows/mod.rs (1)
30-60: 📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win
mod.rsstill contains business logic — move out per coding guidelines.
build_capabilitiesandopen_flow_checkpointerstill contain real construction logic (SecurityPolicy setup, five-adapter wiring, filesystem/DB path handling) rather than export-focused wiring. This was already raised in a prior review and remains unresolved. As per coding guidelines,src/openhuman/**/mod.rs: "mod.rs files should be export-focused only: mod/pub mod, pub use, and controller schema wiring; no business logic." Move both functions intocaps.rs(or a new sibling module) and re-export viapub useto keepcrate::openhuman::tinyflows::build_capabilities/open_flow_checkpointercall sites unaffected.Also applies to: 72-85
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/tinyflows/mod.rs` around lines 30 - 60, The issue is that src/openhuman/tinyflows/mod.rs still contains construction/business logic in build_capabilities and open_flow_checkpointer instead of being export-only. Move the full implementations of build_capabilities and open_flow_checkpointer into caps.rs or a new sibling module, keeping the SecurityPolicy setup, adapter wiring, and filesystem/DB path handling there. Then re-export those functions from mod.rs with pub use so existing call sites like crate::openhuman::tinyflows::build_capabilities and open_flow_checkpointer continue to work unchanged.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@src/openhuman/tinyflows/mod.rs`:
- Around line 30-60: The issue is that src/openhuman/tinyflows/mod.rs still
contains construction/business logic in build_capabilities and
open_flow_checkpointer instead of being export-only. Move the full
implementations of build_capabilities and open_flow_checkpointer into caps.rs or
a new sibling module, keeping the SecurityPolicy setup, adapter wiring, and
filesystem/DB path handling there. Then re-export those functions from mod.rs
with pub use so existing call sites like
crate::openhuman::tinyflows::build_capabilities and open_flow_checkpointer
continue to work unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cd01b318-a970-4cf5-98c7-25c6c1cab7f6
📒 Files selected for processing (4)
src/openhuman/flows/ops.rssrc/openhuman/tinyflows/caps.rssrc/openhuman/tinyflows/mod.rssrc/openhuman/tinyflows/tests.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/openhuman/tinyflows/tests.rs
- src/openhuman/flows/ops.rs
- src/openhuman/tinyflows/caps.rs
A tool_call with no slug errors under the default on_error:stop, so flows_run fails deterministically (no external services) and must stamp last_status=failed / last_run_at — covering the new error branch.
- flows_run: wrap the engine run in a bounded timeout (FLOW_RUN_TIMEOUT_SECS) so a hung LLM/tool/HTTP call can't block the RPC indefinitely; records a failed run on timeout. - store: set PRAGMA busy_timeout + WAL journal on each connection so concurrent run/state writes retry instead of erroring SQLITE_BUSY. - store: update_flow_graph now issues a targeted UPDATE of name/graph_json/ updated_at instead of a full-row upsert, so it can't clobber a concurrent set_enabled / record_run. - CodeRunner: clean up the temp work dir even when execute_in_sandbox itself errors (cleanup moved before the ?). - mod.rs: move build_capabilities/open_flow_checkpointer into caps.rs and re-export them, keeping mod.rs export-only per the repo convention.
…n review fixes Rebased onto current main (8aab529), which already carries the tinyhumansai#4399/tinyhumansai#4430 compile reconciliation (graph_fn Some, SqliteCheckpointer migration, MCP catalog) via tinyhumansai#4433 — so this is slimmed to the fixes main still lacks: - app_state: bypass the runtime-snapshot cache when OPENHUMAN_SERVICE_MOCK is active. tinyhumansai#4399 raised RUNTIME_SNAPSHOT_TTL 2s->10s, so the mock-service e2e (which ages the cache with a 2.1s sleep) reads a stale snapshot and never sees the injected failure. Bypassing read+write under the mock fixes it (full config_auth_app_state_connectivity_e2e binary: 50 passed). - orchestration/ops: split steering out of seed_state into apply_cycle_steering, invoked only after the has_new_work guard, so no-op wakes don't consume a cycle tick and prematurely expire steering (CodeRabbit). - orchestration/store: user_version-gated world-diff migration — drop legacy non-unique index, de-dupe race rows, unique index, reconcile terminal_state (M3gA-Mind + CodeRabbit) + test. - orchestration/graph/state: agent-scoped cycle_id (avoid cross-peer collision) + migration-seam note (Codex + M3gA-Mind). - orchestration/schemas: ingest-health MAX(last_message_at) excludes pinned master/subconscious windows (CodeRabbit). - frontend: orchestration socket listeners via socketService.on/off (queues through startup/reconnect) + messages Retry -> refresh() (CodeRabbit/Codex). - docs: orchestration.md fenced-block language (markdownlint).
Summary
Adds the Workflows feature (B1): embeds the published
tinyflowsworkflow-engine crate and wires it toOpenHuman's real services through a thin adapter seam, plus a new
flows::domain that stores workflowgraphs, validates them on save, and runs them end-to-end.
tinyflows is a host-agnostic Rust workflow engine (typed node graph → validate → compile → run on
tinyagents). Everything that touches the outside world is a capability trait the host implements; this PRimplements those five traits over OpenHuman's inference stack, curated Composio tools, allowlisted HTTP,
and the code sandbox — and adds durable, human-in-the-loop-capable execution.
Consumption mirrors the existing
tinyplace = "1.0.1"crates.io dependency. Since the repo already migratedto
tinyagents 1.3(#4249), tinyflows pulls the same tinyagents version — no conflict — and the runcheckpointer reuses the crate's own
tinyagents::SqliteCheckpointerrather than a bespoke one.What's in this PR (B1)
Cargo.toml:tinyflows = "0.2"(resolves to a single tinyagents 1.3.0 / rusqlite 0.40.0).src/openhuman/tinyflows/— five capability adapters (LlmProvider,ToolInvoker,HttpClient,CodeRunner,StateStore) over the real services, aRunObserver, andbuild_capabilities()/open_flow_checkpointer()(the crate SQLite checkpointer at<workspace>/flows/checkpoints.db).src/openhuman/flows/—Flowentity +flow_definitions/flow_stateSQLite store,CRUD + enable/disable, validate-on-save via
tinyflows::validate, and aflows_runcontrollerthat compiles a stored graph and runs it through the real adapters with
run_with_checkpointer.src/core/all.rs,src/openhuman/mod.rs, and the channels startup (a trigger-subscriberskeleton; the real trigger→run bridge is B2).
RPC surface:
openhuman.flows_{create,get,list,update,delete,set_enabled,run}.mainbuild break (first commit)While building this, I found
upstream/maindoes not currently compile — the "finish TinyAgents harnessmigration (#4249)" merge left leftovers:
orchestration/graph/mod.rs+orchestration/ops.rsstill reference the removedSqlRunLedgerCheckpointer(0 definitions in the tree) → swapped to the crate's
tinyagents::graph::SqliteCheckpointer(the samepattern
agent_orchestration/delegation.rsalready uses), at a dedicatedorchestration_graph_checkpoints.db.agent_registry/agents/loader.rspasses a baregraph_fnwhere the field is nowOption<_>→ wrapped inSome(..).mcp_server/resources.rswas missing catalog entries forfrontend_agent/reasoning_agent(failing theresource-catalog test) → added.
These are isolated in the first commit (
fix: unblock main build …) so they can be cherry-picked into aseparate hotfix if maintainers prefer. B1 can't have a green build without them. Flag for maintainers: if
a fix for this is already in flight, drop the first commit.
Scope / non-goals
B1 is the engine + RPC only. Out of scope (later phases): trigger→run bridge,
flows_resume,TrustedAutomationSource::Workflow+ tool curation, connection-ref resolution (B2); canvas/list-page/nav UI(B3–B5). Leaves the existing
src/openhuman/workflows/SKILL-bundle runner untouched (distinct feature).Testing
engine::runsmoke overtrigger → http_request); code-runner test is#[ignore]when no node/python runtime is present.flows_runend-to-end.GGML_NATIVE=OFF cargo checkgreen;openhuman-core schema | grep flows_lists 7 controllers.Notes
tinyagents::SqliteCheckpointer(the migration-blessed pattern inagent_orchestration/delegation.rs), so no bespoke checkpoint store is added.connection_refis carried but not yet resolved to a specific account, and tool curation/scope is not yetenforced — both are deliberately deferred to B2 (documented).
Summary by CodeRabbit