Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,10 @@ This section is intentionally redundant so a coding agent can implement the conf
- `tracker.terminal_states`: list of strings, default `["Closed", "Cancelled", "Canceled", "Duplicate", "Done"]`
- `polling.interval_ms`: integer, default `30000`
- `workspace.root`: path, default `<system-temp>/symphony_workspaces`
- `worker.ssh_hosts` (extension): list of SSH host strings, optional; when omitted, work runs
locally
- `worker.max_concurrent_agents_per_host` (extension): positive integer, optional; shared per-host
cap applied across configured SSH hosts
- `hooks.after_create`: shell script or null
- `hooks.before_run`: shell script or null
- `hooks.after_run`: shell script or null
Expand Down Expand Up @@ -729,6 +733,12 @@ Per-state limit:

The runtime counts issues by their current tracked state in the `running` map.

Optional SSH host limit:

- When `worker.max_concurrent_agents_per_host` is set, each configured SSH host may run at most
that many concurrent agents at once.
- Hosts at that cap are skipped for new dispatch until capacity frees up.

### 8.4 Retry and Backoff

Retry entry creation:
Expand Down Expand Up @@ -2108,3 +2118,58 @@ Use the same validation profiles as Section 17:
- Verify hook execution and workflow path resolution on the target host OS/shell environment.
- If the optional HTTP server is shipped, verify the configured port behavior and loopback/default
bind expectations on the target environment.

## Appendix A. SSH Worker Extension (Optional)

This appendix describes a common extension profile in which Symphony keeps one central
orchestrator but executes worker runs on one or more remote hosts over SSH.

### A.1 Execution Model

- The orchestrator remains the single source of truth for polling, claims, retries, and
reconciliation.
- `worker.ssh_hosts` provides the candidate SSH destinations for remote execution.
- Each worker run is assigned to one host at a time, and that host becomes part of the run's
effective execution identity along with the issue workspace.
- `workspace.root` is interpreted on the remote host, not on the orchestrator host.
- The coding-agent app-server is launched over SSH stdio instead of as a local subprocess, so the
orchestrator still owns the session lifecycle even though commands execute remotely.
- Continuation turns inside one worker lifetime should stay on the same host and workspace.
- A remote host should satisfy the same basic contract as a local worker environment: reachable
shell, writable workspace root, coding-agent executable, and any required auth or repository
prerequisites.

### A.2 Scheduling Notes

- SSH hosts may be treated as a pool for dispatch.
- Implementations may prefer the previously used host on retries when that host is still
available.
- `worker.max_concurrent_agents_per_host` is an optional shared per-host cap across configured SSH
hosts.
- When all SSH hosts are at capacity, dispatch should wait rather than silently falling back to a
different execution mode.
- Implementations may fail over to another host when the original host is unavailable before work
has meaningfully started.
- Once a run has already produced side effects, a transparent rerun on another host should be
treated as a new attempt, not as invisible failover.

### A.3 Problems to Consider

- Remote environment drift:
- Each host needs the expected shell environment, coding-agent executable, auth, and repository
prerequisites.
- Workspace locality:
- Workspaces are usually host-local, so moving an issue to a different host is typically a cold
restart unless shared storage exists.
- Path and command safety:
- Remote path resolution, shell quoting, and workspace-boundary checks matter more once execution
crosses a machine boundary.
- Startup and failover semantics:
- Implementations should distinguish host-connectivity/startup failures from in-workspace agent
failures so the same ticket is not accidentally re-executed on multiple hosts.
- Host health and saturation:
- A dead or overloaded host should reduce available capacity, not cause duplicate execution or an
accidental fallback to local work.
- Cleanup and observability:
- Operators need to know which host owns a run, where its workspace lives, and whether cleanup
happened on the right machine.
10 changes: 0 additions & 10 deletions elixir/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ dialyzer:
$(MIX) dialyzer --format short

e2e:
@if [ -z "$$LINEAR_API_KEY" ]; then \
echo "LINEAR_API_KEY is required for \`make e2e\`."; \
echo "Export it first, for example:"; \
echo " export LINEAR_API_KEY=\$$(tr -d '\\r\\n' < ~/.linear_api_key)"; \
exit 1; \
fi
@if ! command -v codex >/dev/null 2>&1; then \
echo "\`codex\` must be on PATH for \`make e2e\`."; \
exit 1; \
fi
SYMPHONY_RUN_LIVE_E2E=1 $(MIX) test test/symphony_elixir/live_e2e_test.exs

ci:
Expand Down
21 changes: 16 additions & 5 deletions elixir/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,23 @@ make e2e
Optional environment variables:

- `SYMPHONY_LIVE_LINEAR_TEAM_KEY` defaults to `SYME2E`
- `SYMPHONY_LIVE_CODEX_COMMAND` defaults to `codex app-server`
- `SYMPHONY_LIVE_SSH_WORKER_HOSTS` uses those SSH hosts when set, as a comma-separated list

The live test creates a temporary Linear project and issue, writes a temporary `WORKFLOW.md`,
runs a real agent turn, verifies the workspace side effect, requires Codex to comment on and close
the Linear issue, then marks the project completed so the run remains visible in Linear.
`make e2e` fails fast with a clear error if `LINEAR_API_KEY` is unset.
`make e2e` runs two live scenarios:
- one with a local worker
- one with SSH workers

If `SYMPHONY_LIVE_SSH_WORKER_HOSTS` is unset, the SSH scenario uses `docker compose` to start two
disposable SSH workers on `localhost:<port>`. The live test generates a temporary SSH keypair,
mounts the host `~/.codex/auth.json` into each worker, verifies that Symphony can talk to them
over real SSH, then runs the same orchestration flow against those worker addresses. This keeps
the transport representative without depending on long-lived external machines.

Set `SYMPHONY_LIVE_SSH_WORKER_HOSTS` if you want `make e2e` to target real SSH hosts instead.

The live test creates a temporary Linear project and issue, writes a temporary `WORKFLOW.md`, runs
a real agent turn, verifies the workspace side effect, requires Codex to comment on and close the
Linear issue, then marks the project completed so the run remains visible in Linear.

## FAQ

Expand Down
102 changes: 88 additions & 14 deletions elixir/lib/symphony_elixir/agent_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,58 @@ defmodule SymphonyElixir.AgentRunner do
alias SymphonyElixir.Codex.AppServer
alias SymphonyElixir.{Config, Linear.Issue, PromptBuilder, Tracker, Workspace}

@type worker_host :: String.t() | nil

@spec run(map(), pid() | nil, keyword()) :: :ok | no_return()
def run(issue, codex_update_recipient \\ nil, opts \\ []) do
Logger.info("Starting agent run for #{issue_context(issue)}")
worker_hosts =
candidate_worker_hosts(Keyword.get(opts, :worker_host), Config.settings!().worker.ssh_hosts)

Logger.info("Starting agent run for #{issue_context(issue)} worker_hosts=#{inspect(worker_hosts_for_log(worker_hosts))}")

case run_on_worker_hosts(issue, codex_update_recipient, opts, worker_hosts) do
:ok ->
:ok

{:error, reason} ->
Logger.error("Agent run failed for #{issue_context(issue)}: #{inspect(reason)}")
raise RuntimeError, "Agent run failed for #{issue_context(issue)}: #{inspect(reason)}"
end
end

case Workspace.create_for_issue(issue) do
defp run_on_worker_hosts(issue, codex_update_recipient, opts, [worker_host | rest]) do
case run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do
:ok ->
:ok

{:error, reason} when rest != [] ->
Logger.warning("Agent run failed for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)} reason=#{inspect(reason)}; trying next worker host")
run_on_worker_hosts(issue, codex_update_recipient, opts, rest)

{:error, reason} ->
{:error, reason}
end
end

defp run_on_worker_hosts(_issue, _codex_update_recipient, _opts, []), do: {:error, :no_worker_hosts_available}

defp run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do
Logger.info("Starting worker attempt for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)}")

case Workspace.create_for_issue(issue, worker_host) do
{:ok, workspace} ->
send_worker_runtime_info(codex_update_recipient, issue, worker_host, workspace)

try do
with :ok <- Workspace.run_before_run_hook(workspace, issue),
:ok <- run_codex_turns(workspace, issue, codex_update_recipient, opts) do
:ok
else
{:error, reason} ->
Logger.error("Agent run failed for #{issue_context(issue)}: #{inspect(reason)}")
raise RuntimeError, "Agent run failed for #{issue_context(issue)}: #{inspect(reason)}"
with :ok <- Workspace.run_before_run_hook(workspace, issue, worker_host) do
run_codex_turns(workspace, issue, codex_update_recipient, opts, worker_host)
end
after
Workspace.run_after_run_hook(workspace, issue)
Workspace.run_after_run_hook(workspace, issue, worker_host)
end

{:error, reason} ->
Logger.error("Agent run failed for #{issue_context(issue)}: #{inspect(reason)}")
raise RuntimeError, "Agent run failed for #{issue_context(issue)}: #{inspect(reason)}"
{:error, reason}
end
end

Expand All @@ -46,11 +76,27 @@ defmodule SymphonyElixir.AgentRunner do

defp send_codex_update(_recipient, _issue, _message), do: :ok

defp run_codex_turns(workspace, issue, codex_update_recipient, opts) do
defp send_worker_runtime_info(recipient, %Issue{id: issue_id}, worker_host, workspace)
when is_binary(issue_id) and is_pid(recipient) and is_binary(workspace) do
send(
recipient,
{:worker_runtime_info, issue_id,
%{
worker_host: worker_host,
workspace_path: workspace
}}
)

:ok
end

defp send_worker_runtime_info(_recipient, _issue, _worker_host, _workspace), do: :ok

defp run_codex_turns(workspace, issue, codex_update_recipient, opts, worker_host) do
max_turns = Keyword.get(opts, :max_turns, Config.settings!().agent.max_turns)
issue_state_fetcher = Keyword.get(opts, :issue_state_fetcher, &Tracker.fetch_issue_states_by_ids/1)

with {:ok, session} <- AppServer.start_session(workspace) do
with {:ok, session} <- AppServer.start_session(workspace, worker_host: worker_host) do
try do
do_run_codex_turns(session, workspace, issue, codex_update_recipient, opts, issue_state_fetcher, 1, max_turns)
after
Expand Down Expand Up @@ -142,6 +188,34 @@ defmodule SymphonyElixir.AgentRunner do

defp active_issue_state?(_state_name), do: false

defp candidate_worker_hosts(nil, []), do: [nil]

defp candidate_worker_hosts(preferred_host, configured_hosts) when is_list(configured_hosts) do
hosts =
configured_hosts
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.uniq()

case preferred_host do
host when is_binary(host) and host != "" ->
[host | Enum.reject(hosts, &(&1 == host))]

_ when hosts == [] ->
[nil]

_ ->
hosts
end
end

defp worker_hosts_for_log(worker_hosts) do
Enum.map(worker_hosts, &worker_host_for_log/1)
end

defp worker_host_for_log(nil), do: "local"
defp worker_host_for_log(worker_host), do: worker_host

defp normalize_issue_state(state_name) when is_binary(state_name) do
state_name
|> String.trim()
Expand Down
Loading
Loading