Skip to content

feat: add Temporal durable execution layer#32

Open
Bajuzjefe wants to merge 2 commits intomasumi-network:mainfrom
Bajuzjefe:feat/temporal-durability
Open

feat: add Temporal durable execution layer#32
Bajuzjefe wants to merge 2 commits intomasumi-network:mainfrom
Bajuzjefe:feat/temporal-durability

Conversation

@Bajuzjefe
Copy link
Copy Markdown

Summary

  • Add opt-in Temporal durable execution via KODO_EXECUTION_MODE=temporal
  • Default EXECUTION_MODE=direct — zero change for existing deployments
  • AgentWorkflow wraps Runner with automatic retries (3 attempts), heartbeat monitoring (120s), and crash recovery
  • execute_agent activity calls create_runner() — all events, forms, locks work identically
  • Signals: pause, resume, cancel; Queries: get_status, is_paused, get_error
  • koco temporal-worker CLI command to run the worker process
  • 16 unit tests covering dataclasses, workflow structure, config, and CLI

Addresses issue #8 (scale spooler) — durability layer.

Test plan

  • Verify default behavior unchanged (KODO_EXECUTION_MODE=direct or unset)
  • Set KODO_EXECUTION_MODE=temporal + start Temporal server → executions go through Temporal
  • Run koco temporal-worker → worker connects and processes agent jobs
  • Kill temporal worker mid-execution → Temporal retries on another worker
  • Run pytest tests/test_temporal.py -v → all 16 tests pass

Research and plan for 4 scaling contributions to Kodosumi:
- PR 1: PostgreSQL support (alongside SQLite)
- PR 2: Redis Streams event transport (alongside Ray queue polling)
- PR 3: Temporal durable execution (alongside direct execution)
- PR 4: Docker Compose development environment

All features are opt-in via config with zero impact on default behavior.
Addresses upstream issues masumi-network#8 (scale spooler) and masumi-network#11 (provide containers).
Add opt-in Temporal workflow wrapping via KODO_EXECUTION_MODE=temporal.
When unset (default "direct"), existing direct Ray execution is unchanged.

- Add AgentWorkflow with pause/resume/cancel signals and status queries
- Add execute_agent activity wrapping create_runner() with heartbeats
- Add temporal_worker module and `koco temporal-worker` CLI command
- Update Launch() to branch to Temporal path when mode is "temporal"
- Add temporalio as optional dependency group
- 16 unit tests covering dataclasses, workflow structure, config, and CLI

Addresses issue masumi-network#8 (scale spooler) — durability layer.
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Bugbot Autofix is ON, but it could not run because the branch was deleted or merged before autofix could start.

Comment @cursor review or bugbot run to trigger another review on this PR

Comment thread kodosumi/activities.py
error="Activity cancelled by Temporal")
except Exception as e:
return AgentJobResult(
fid=fid, status="failed", error=str(e))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Activity swallows exceptions, preventing Temporal retry policy

High Severity

The execute_agent activity catches all exceptions (both Exception and asyncio.CancelledError) and returns an AgentJobResult instead of letting them propagate. Temporal's retry policy only triggers when an activity raises an exception — a successful return value, even one containing status="failed", is treated as a completed activity. This means the RetryPolicy(maximum_attempts=3) configured in AgentWorkflow.run will never activate for code-level failures (e.g., create_runner failing, Ray errors). Only process-level crashes detected via heartbeat timeout would trigger retries, defeating a core stated goal of the integration.

Additional Locations (1)

Fix in Cursor Fix in Web

Comment thread kodosumi/workflows.py
@workflow.signal
async def resume(self):
self._paused = False
self._status = "running"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pause/resume signals have no effect on execution

High Severity

The pause and resume signal handlers only mutate _paused and _status state variables, but run() never checks _paused — it immediately starts execute_activity and awaits completion. Without a workflow.wait_condition(lambda: not self._paused) call, sending a pause signal changes queryable state but does not actually pause activity execution. Similarly, cancel_workflow is only checked once before the activity starts; once execute_activity is running, the signal has no effect.

Additional Locations (1)

Fix in Cursor Fix in Web

Comment thread kodosumi/runner/main.py
jwt=request.cookies.get(TOKEN_KEY) or request.headers.get(
HEADER_KEY),
inputs=inputs_dict,
extra=extra,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsanitized extra dict breaks Temporal JSON serialization

High Severity

_launch_temporal carefully converts entry_point (callable→string) and inputs (BaseModel→dict) for Temporal JSON serialization, but passes extra through unmodified. For endpoints defined with app.enter(), _method_lookup stores a Model instance (a non-serializable custom class) in extra['model']. Temporal's DataConverter will fail to JSON-serialize the AgentJobInput dataclass, causing a runtime error. Since enter() is the primary way to define interactive agent endpoints, this breaks the Temporal path for the main Kodosumi use case.

Additional Locations (1)

Fix in Cursor Fix in Web

Comment thread kodosumi/activities.py
jwt=job_input.jwt,
panel_url=job_input.panel_url,
fid=job_input.fid,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing actor cleanup makes retry always fail

High Severity

When a Temporal worker crashes, the detached Ray Runner actor (created with lifetime="detached" and name=fid) survives in the Ray cluster. On retry, create_runner() is called with the same fid via fid=job_input.fid, which attempts to create a new detached actor with an identical name — this raises a ValueError from Ray because the actor already exists. There is no cleanup of the previous actor before creation. The existing kill_runner() helper could handle this, but it's never called. This makes crash recovery — the stated primary goal of the Temporal integration — non-functional whenever the Ray cluster survives the worker failure.

Fix in Cursor Fix in Web

Copy link
Copy Markdown

@nori-masumi nori-masumi Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DevRel Review — Temporal Durable Execution Layer

The approach here is solid. Temporal is a natural fit for long-running agent jobs that need crash recovery — Ray's own fault tolerance is actor-level, not workflow-level, so this fills a real gap for production deployments.

What looks good:

  • Opt-in via KODO_EXECUTION_MODE=temporal with zero behavioural change when unset — safe to merge alongside existing deployments
  • Signal/query surface (pause, resume, cancel, get_status, is_paused, get_error) matches what operators need for observability
  • 16 unit tests covering dataclasses, config, workflow structure, and CLI is a reasonable baseline
  • koco temporal-worker CLI aligns with the existing koco CLI pattern

Questions for maintainers to consider before merge:

  1. Retry configurationmax_attempts=3 and heartbeat_timeout=120s are currently hardcoded. For production operators running long inference jobs (>2 min), 120s heartbeat timeout may be too tight. Are these intended to be env-var configurable, or is that a follow-up PR?

  2. Ray interaction — When EXECUTION_MODE=temporal, the Temporal worker still calls create_runner() which presumably uses Ray under the hood. Is there any risk of double-scheduling if both Ray's actor supervision and Temporal's retry logic try to recover the same execution? The SCALING.md doesn't address this interaction.

  3. Temporal server dependency — The test plan assumes a local Temporal server (tctl/temporal server start-dev). The Docker Compose PR (#33) should probably include Temporal server as a service to make the dev loop self-contained — worth coordinating with @Bajuzjefe.

  4. Worker process lifecycle — If koco temporal-worker exits, does Temporal queue jobs for when it reconnects, or do in-flight executions need manual retrigger? Worth documenting the recovery guarantees clearly in SCALING.md.

Doc note: If this is merged, the Kodosumi installation guide at docs.kodosumi.io will need a new section covering Temporal worker setup, env vars, and the koco temporal-worker command. Happy to help draft that once the PR lands.

Overall this is a well-structured contribution addressing a real production gap. The test coverage and opt-in design are exactly right.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants