Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .agents/skills/debug-openshell-cluster/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Component images (server, sandbox) can reach kubelet via two paths:

**Local/external pull mode** (default local via `mise run cluster`): Local images are tagged to the configured local registry base (default `127.0.0.1:5000/openshell/*`), pushed to that registry, and pulled by k3s via `registries.yaml` mirror endpoint (typically `host.docker.internal:5000`). The `cluster` task pushes prebuilt local tags (`openshell/*:dev`, falling back to `localhost:5000/openshell/*:dev` or `127.0.0.1:5000/openshell/*:dev`).

Gateway image builds now stage a partial Rust workspace from `deploy/docker/Dockerfile.images`. If cargo fails with a missing manifest under `/build/crates/...`, verify that every current gateway dependency crate (including `openshell-driver-kubernetes`) is copied into the staged workspace there.

```bash
# Verify image refs currently used by openshell deployment
openshell doctor exec -- kubectl -n openshell get statefulset openshell -o jsonpath="{.spec.template.spec.containers[*].image}"
Expand Down
26 changes: 23 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 26 additions & 22 deletions architecture/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,21 @@ graph TD
| Persistence | `crates/openshell-server/src/persistence/mod.rs` | `Store` enum (SQLite/Postgres), generic object CRUD, protobuf codec |
| Persistence: SQLite | `crates/openshell-server/src/persistence/sqlite.rs` | `SqliteStore` with sqlx |
| Persistence: Postgres | `crates/openshell-server/src/persistence/postgres.rs` | `PostgresStore` with sqlx |
| Sandbox K8s | `crates/openshell-server/src/sandbox/mod.rs` | `SandboxClient`, CRD creation/deletion, Kubernetes watcher, phase derivation |
| Compute runtime | `crates/openshell-server/src/compute/mod.rs` | `ComputeRuntime`, gateway-owned sandbox lifecycle orchestration over a compute backend |
| Compute driver: Kubernetes | `crates/openshell-driver-kubernetes/src/driver.rs` | Kubernetes CRD create/delete, endpoint resolution, watch stream, pod template translation |
Comment thread
drew marked this conversation as resolved.
| Sandbox index | `crates/openshell-server/src/sandbox_index.rs` | `SandboxIndex` -- in-memory name/pod-to-id correlation |
| Watch bus | `crates/openshell-server/src/sandbox_watch.rs` | `SandboxWatchBus`, `PlatformEventBus`, Kubernetes event tailer |
| Watch bus | `crates/openshell-server/src/sandbox_watch.rs` | `SandboxWatchBus` -- in-memory broadcast for persisted sandbox updates |
| Tracing bus | `crates/openshell-server/src/tracing_bus.rs` | `TracingLogBus` -- captures tracing events keyed by `sandbox_id` |

Proto definitions consumed by the gateway:

| Proto file | Package | Defines |
|------------|---------|---------|
| `proto/openshell.proto` | `openshell.v1` | `OpenShell` service, sandbox/provider/SSH/watch messages |
| `proto/openshell.proto` | `openshell.v1` | `OpenShell` service, public sandbox resource model, provider/SSH/watch messages |
| `proto/compute_driver.proto` | `openshell.compute.v1` | Internal `ComputeDriver` service, driver-native sandbox observations, endpoint resolution, compute watch stream envelopes |
| `proto/inference.proto` | `openshell.inference.v1` | `Inference` service: `SetClusterInference`, `GetClusterInference`, `GetInferenceBundle` |
| `proto/datamodel.proto` | `openshell.datamodel.v1` | `Sandbox`, `SandboxSpec`, `SandboxStatus`, `Provider`, `SandboxPhase` |
| `proto/sandbox.proto` | `openshell.sandbox.v1` | `SandboxPolicy`, `NetworkPolicyRule`, `SettingValue`, `EffectiveSetting`, `SettingScope`, `PolicySource`, `GetSandboxSettingsRequest/Response`, `GetGatewaySettingsRequest/Response` |
| `proto/datamodel.proto` | `openshell.datamodel.v1` | `Provider` |
| `proto/sandbox.proto` | `openshell.sandbox.v1` | Sandbox supervisor policy, settings, and config messages |

## Startup Sequence

Expand All @@ -94,11 +96,10 @@ The gateway boots in `main()` (`crates/openshell-server/src/main.rs`) and procee
4. **Build `Config`** -- Assembles a `openshell_core::Config` from the parsed arguments.
5. **Call `run_server()`** (`crates/openshell-server/src/lib.rs`):
1. Connect to the persistence store (`Store::connect`), which auto-detects SQLite vs Postgres from the URL prefix and runs migrations.
2. Create `SandboxClient` (initializes a `kube::Client` from in-cluster or kubeconfig).
2. Create `ComputeRuntime` with the in-process Kubernetes compute backend (`KubernetesComputeDriver`).
3. Build `ServerState` (shared via `Arc<ServerState>` across all handlers).
4. **Spawn background tasks**:
- `spawn_sandbox_watcher` -- watches Kubernetes Sandbox CRDs and syncs state to the store.
- `spawn_kube_event_tailer` -- watches Kubernetes Events in the sandbox namespace and publishes them to the `PlatformEventBus`.
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, updates persisted sandbox records, and republishes platform events.
5. Create `MultiplexService`.
6. Bind `TcpListener` on `config.bind_address`.
7. Optionally create `TlsAcceptor` from cert/key files.
Expand Down Expand Up @@ -137,7 +138,7 @@ All handlers share an `Arc<ServerState>` (`crates/openshell-server/src/lib.rs`):
pub struct ServerState {
pub config: Config,
pub store: Arc<Store>,
pub sandbox_client: SandboxClient,
pub compute: ComputeRuntime,
pub sandbox_index: SandboxIndex,
pub sandbox_watch_bus: SandboxWatchBus,
pub tracing_log_bus: TracingLogBus,
Expand All @@ -148,10 +149,10 @@ pub struct ServerState {
```

- **`store`** -- persistence backend (SQLite or Postgres) for all object types.
- **`sandbox_client`** -- Kubernetes client scoped to the sandbox namespace; creates/deletes CRDs and resolves pod IPs.
- **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Used by the event tailer to correlate Kubernetes events.
- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles orphaned `Provisioning` records that no longer have a backing compute resource.
- **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Updated from compute-driver sandbox snapshots.
- **`sandbox_watch_bus`** -- `broadcast`-based notification bus keyed by sandbox ID. Producers call `notify(&id)` when the persisted sandbox record changes; consumers in `WatchSandbox` streams receive `()` signals and re-read the record.
- **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for Kubernetes events.
- **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for compute-driver platform events.
- **`settings_mutex`** -- serializes settings mutations (global and sandbox) to prevent read-modify-write races. Held for the duration of any setting set/delete or global policy set/delete operation. See [Gateway Settings Channel](gateway-settings.md#global-policy-lifecycle).

## Protocol Multiplexing
Expand Down Expand Up @@ -380,7 +381,7 @@ All buses use `tokio::sync::broadcast` channels keyed by sandbox ID. Buffer size

Broadcast lag is translated to `Status::resource_exhausted` via `broadcast_to_status()`.

**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into both the `handle_deleted` reconciler (Kubernetes watcher) and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic orphan sweep for stale `Provisioning` records, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.

**Validation:** `WatchSandbox` validates that the sandbox exists before subscribing to any bus, preventing entries from being created for non-existent IDs. `PushSandboxLogs` validates sandbox existence once on the first batch of the stream.

Expand All @@ -392,7 +393,7 @@ The `ExecSandbox` RPC (`crates/openshell-server/src/grpc.rs`) executes a command

1. Validate request: `sandbox_id`, `command`, and environment key format (`^[A-Za-z_][A-Za-z0-9_]*$`).
2. Verify sandbox exists and is in `Ready` phase.
3. Resolve target: prefer agent pod IP (via `sandbox_client.agent_pod_ip()`), fall back to Kubernetes service DNS (`<name>.<namespace>.svc.cluster.local`).
3. Resolve target: prefer agent pod IP, fall back to Kubernetes service DNS (`<name>.<namespace>.svc.cluster.local`). If the sandbox is not connectable yet (for example the pod exists but has no IP), the gateway returns `FAILED_PRECONDITION` instead of surfacing the condition as an internal server fault.
4. Build the remote command string: sort environment variables, shell-escape all values, prepend `cd <workdir> &&` if `workdir` is set.
5. **Start a single-use SSH proxy**: binds an ephemeral local TCP port, accepts one connection, performs the NSSH1 handshake with the sandbox, and bidirectionally copies data.
6. **Connect via `russh`**: establishes an SSH connection through the local proxy, authenticates with `none` auth as user `sandbox`, opens a session channel, and executes the command.
Expand Down Expand Up @@ -499,27 +500,30 @@ The Helm chart template is at `deploy/helm/openshell/templates/statefulset.yaml`

### Sandbox CRD Management

`SandboxClient` (`crates/openshell-server/src/sandbox/mod.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs.
`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface.

- **Create**: Translates a `Sandbox` proto into a Kubernetes `DynamicObject` with labels (`openshell.ai/sandbox-id`, `openshell.ai/managed-by: openshell`) and a spec that includes the pod template, environment variables, and gateway-required env vars (`OPENSHELL_SANDBOX_ID`, `OPENSHELL_ENDPOINT`, `OPENSHELL_SSH_LISTEN_ADDR`, etc.). When callers do not provide custom `volumeClaimTemplates`, the server injects a default `workspace` PVC and mounts it at `/sandbox` so the default sandbox home/workdir survives pod rescheduling.
- **Get**: `GetSandbox` looks up a sandbox CRD by name and returns a driver-native platform observation (`openshell.compute.v1.DriverSandbox`) with raw status and condition data from the object.
- **List**: `ListSandboxes` enumerates sandbox CRDs and returns driver-native platform observations for each, sorted by name for stable results.
- **Create**: Translates an internal `openshell.compute.v1.DriverSandbox` message into a Kubernetes `DynamicObject` with labels (`openshell.ai/sandbox-id`, `openshell.ai/managed-by: openshell`) and a spec that includes the pod template, environment variables, and gateway-required env vars (`OPENSHELL_SANDBOX_ID`, `OPENSHELL_ENDPOINT`, `OPENSHELL_SSH_LISTEN_ADDR`, etc.). When callers do not provide custom `volumeClaimTemplates`, the driver injects a default `workspace` PVC and mounts it at `/sandbox` so the default sandbox home/workdir survives pod rescheduling.
- **Delete**: Calls the Kubernetes API to delete the CRD by name. Returns `false` if already gone (404).
- **Stop**: `proto/compute_driver.proto` now reserves `StopSandbox` for a non-destructive lifecycle transition. Resume is intentionally not a dedicated compute-driver RPC; the gateway is expected to auto-resume a stopped sandbox when a client connects or executes into it.
- **Pod IP resolution**: `agent_pod_ip()` fetches the agent pod and reads `status.podIP`.

### Sandbox Watcher

`spawn_sandbox_watcher()` (`crates/openshell-server/src/sandbox/mod.rs`) runs a Kubernetes watcher on `Sandbox` CRDs and processes three event types:
The Kubernetes driver emits `WatchSandboxes` events through `proto/compute_driver.proto`. `ComputeRuntime` consumes that stream, translates the driver-native snapshots into public `openshell.v1.Sandbox` resources, derives the public phase, and applies the results to the store.

- **Applied**: Extracts the sandbox ID from labels (or falls back to name prefix stripping), reads the CRD status, derives the phase, and upserts the sandbox record in the store. Notifies the watch bus.
- **Applied**: Extracts the sandbox ID from labels (or falls back to name prefix stripping), reads the CRD status, emits a driver-native snapshot, and lets the gateway translate that into the stored public sandbox record. Notifies the watch bus.
- **Deleted**: Removes the sandbox record from the store and the index. Notifies the watch bus.
- **Restarted**: Re-processes all objects (full resync).

### Phase Derivation
### Gateway Phase Derivation

`derive_phase()` maps Kubernetes condition state to `SandboxPhase`:
`ComputeRuntime::derive_phase()` (`crates/openshell-server/src/compute/mod.rs`) maps driver-native compute status to the public `SandboxPhase` exposed by `proto/openshell.proto`:

| Condition | Phase |
|-----------|-------|
| `deletionTimestamp` is set | `Deleting` |
| Driver status `deleting=true` | `Deleting` |
| Ready condition `status=True` | `Ready` |
| Ready condition `status=False`, terminal reason | `Error` |
| Ready condition `status=False`, transient reason | `Provisioning` |
Expand All @@ -530,7 +534,7 @@ All other `Ready=False` reasons are treated as terminal failures (`Error` phase)

### Kubernetes Event Tailer

`spawn_kube_event_tailer()` (`crates/openshell-server/src/sandbox_watch.rs`) watches all Kubernetes `Event` objects in the sandbox namespace and correlates them to sandbox IDs using `SandboxIndex`:
The Kubernetes driver also watches namespace-scoped Kubernetes `Event` objects and correlates them to sandbox IDs before emitting them as compute-driver platform events:

- Events involving `kind: Sandbox` are correlated by sandbox name.
- Events involving `kind: Pod` are correlated by agent pod name.
Expand Down
1 change: 1 addition & 0 deletions crates/openshell-core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"../../proto/openshell.proto",
"../../proto/datamodel.proto",
"../../proto/sandbox.proto",
"../../proto/compute_driver.proto",
"../../proto/inference.proto",
"../../proto/test.proto",
];
Expand Down
94 changes: 94 additions & 0 deletions crates/openshell-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,48 @@
//! Configuration management for OpenShell components.

use serde::{Deserialize, Serialize};
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;

/// Compute backends the gateway can orchestrate sandboxes through.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ComputeDriverKind {
Kubernetes,
Podman,
}

impl ComputeDriverKind {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Kubernetes => "kubernetes",
Self::Podman => "podman",
}
}
}

impl fmt::Display for ComputeDriverKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}

impl FromStr for ComputeDriverKind {
type Err = String;

fn from_str(value: &str) -> Result<Self, Self::Err> {
match value.trim().to_ascii_lowercase().as_str() {
"kubernetes" => Ok(Self::Kubernetes),
"podman" => Ok(Self::Podman),
other => Err(format!(
"unsupported compute driver '{other}'. expected one of: kubernetes, podman"
)),
}
}
}

/// Server configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -24,6 +64,14 @@ pub struct Config {
/// Database URL for persistence.
pub database_url: String,

/// Compute drivers configured for the gateway.
///
/// The config shape allows multiple drivers so the gateway can evolve
/// toward multi-backend routing. Current releases require exactly one
/// configured driver.
#[serde(default = "default_compute_drivers")]
pub compute_drivers: Vec<ComputeDriverKind>,

/// Kubernetes namespace for sandboxes.
#[serde(default = "default_sandbox_namespace")]
pub sandbox_namespace: String,
Expand Down Expand Up @@ -120,6 +168,7 @@ impl Config {
log_level: default_log_level(),
tls,
database_url: String::new(),
compute_drivers: default_compute_drivers(),
sandbox_namespace: default_sandbox_namespace(),
sandbox_image: String::new(),
sandbox_image_pull_policy: String::new(),
Expand Down Expand Up @@ -157,6 +206,16 @@ impl Config {
self
}

/// Create a new configuration with the configured compute drivers.
#[must_use]
pub fn with_compute_drivers<I>(mut self, drivers: I) -> Self
where
I: IntoIterator<Item = ComputeDriverKind>,
{
self.compute_drivers = drivers.into_iter().collect();
self
}

/// Create a new configuration with a sandbox namespace.
#[must_use]
pub fn with_sandbox_namespace(mut self, namespace: impl Into<String>) -> Self {
Expand Down Expand Up @@ -261,6 +320,10 @@ fn default_sandbox_namespace() -> String {
"default".to_string()
}

fn default_compute_drivers() -> Vec<ComputeDriverKind> {
vec![ComputeDriverKind::Kubernetes]
}

fn default_ssh_gateway_host() -> String {
"127.0.0.1".to_string()
}
Expand All @@ -284,3 +347,34 @@ const fn default_ssh_handshake_skew_secs() -> u64 {
const fn default_ssh_session_ttl_secs() -> u64 {
86400 // 24 hours
}

#[cfg(test)]
mod tests {
use super::{ComputeDriverKind, Config};

#[test]
fn compute_driver_kind_parses_supported_values() {
assert_eq!(
"kubernetes".parse::<ComputeDriverKind>().unwrap(),
ComputeDriverKind::Kubernetes
);
assert_eq!(
"podman".parse::<ComputeDriverKind>().unwrap(),
ComputeDriverKind::Podman
);
}

#[test]
fn compute_driver_kind_rejects_unknown_values() {
let err = "docker".parse::<ComputeDriverKind>().unwrap_err();
assert!(err.contains("unsupported compute driver 'docker'"));
}

#[test]
fn config_defaults_to_kubernetes_driver() {
assert_eq!(
Config::new(None).compute_drivers,
vec![ComputeDriverKind::Kubernetes]
);
}
}
Loading
Loading