Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/core/event_bus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,40 @@ pub enum DomainEvent {
thread_id: Option<String>,
client_id: Option<String>,
},
/// An artifact record has been **created** (`ArtifactStatus::Pending`)
/// but no bytes are on disk yet — the producing tool has only just
/// reserved the row. Published by
/// [`crate::openhuman::artifacts::store::create_artifact`].
/// Bridged to the web channel as an `artifact_pending` socket event
/// so the frontend can render an in-progress / "Generating…" card the
/// moment the tool dispatches, instead of waiting until the file
/// arrives via [`Self::ArtifactReady`]. The pending card is replaced
/// in place when the matching `ArtifactReady` / `ArtifactFailed`
/// event with the same `artifact_id` arrives. Sub-task #3162 of #1535.
ArtifactPending {
/// UUID of the freshly-created artifact record.
artifact_id: String,
/// Lowercase variant of `ArtifactKind` (`presentation`,
/// `document`, `image`, `other`).
kind: String,
/// Human-readable title (also the on-disk filename stem).
title: String,
/// Absolute workspace root the artifact belongs to — see
/// [`Self::ArtifactReady::workspace_dir`] for rationale.
workspace_dir: String,
/// Relative path under `<workspace>/artifacts/` where the file
/// *will* land. The frontend uses it to render a stable card key
/// so subsequent `ArtifactReady` can swap the same surface in
/// place without flicker.
path: String,
/// Chat thread the artifact belongs to, when the producing turn
/// carried an `APPROVAL_CHAT_CONTEXT`. `None` for CLI / cron /
/// sub-agent paths — no client to fan out to.
thread_id: Option<String>,
/// Socket.IO client id (room) to surface the card to, when known.
/// `None` for non-chat callers.
client_id: Option<String>,
},

// ── Webhooks ────────────────────────────────────────────────────────
/// An incoming webhook request from the transport layer, ready for routing.
Expand Down Expand Up @@ -956,7 +990,9 @@ impl DomainEvent {

Self::ApprovalRequested { .. } | Self::ApprovalDecided { .. } => "approval",

Self::ArtifactReady { .. } | Self::ArtifactFailed { .. } => "artifact",
Self::ArtifactReady { .. }
| Self::ArtifactFailed { .. }
| Self::ArtifactPending { .. } => "artifact",

Self::McpServerInstalled { .. }
| Self::McpServerConnected { .. }
Expand Down Expand Up @@ -1056,6 +1092,7 @@ impl DomainEvent {
Self::ApprovalDecided { .. } => "ApprovalDecided",
Self::ArtifactReady { .. } => "ArtifactReady",
Self::ArtifactFailed { .. } => "ArtifactFailed",
Self::ArtifactPending { .. } => "ArtifactPending",
Self::McpServerInstalled { .. } => "McpServerInstalled",
Self::McpServerConnected { .. } => "McpServerConnected",
Self::McpServerDisconnected { .. } => "McpServerDisconnected",
Expand Down
28 changes: 28 additions & 0 deletions src/openhuman/artifacts/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,17 @@ fn sanitize_filename_stem(title: &str) -> String {
/// `extension` is the file extension WITHOUT the leading dot
/// (e.g. `"pptx"`, `"pdf"`). Used to build the rendered filename
/// under the artifact directory.
///
/// Publishes [`DomainEvent::ArtifactPending`] on the global bus the
/// moment the row is reserved so the chat surface can render an
/// in-progress / "Generating…" card immediately (#3162). When the
/// matching [`finalize_artifact`] / [`fail_artifact`] later fires it
/// reuses the same `artifact_id`, so the card swaps in place without
/// flicker. Same chat-context routing rules as the Ready/Failed pair —
/// `thread_id` / `client_id` come from the
/// [`crate::openhuman::approval::ApprovalChatContext`] task-local and
/// are `None` for CLI / cron / sub-agent paths, in which case the web
/// bridge silently drops the event for lack of a routing target.
pub async fn create_artifact(
workspace_dir: &Path,
kind: super::types::ArtifactKind,
Expand Down Expand Up @@ -342,6 +353,23 @@ pub async fn create_artifact(
meta.kind.as_str(),
absolute_path
);

// Surface the "Generating…" card the moment the row is reserved so
// the user doesn't stare at an empty composer until the tool finishes
// (#3162). When `finalize_artifact` / `fail_artifact` later fires the
// matching Ready/Failed event with the same `artifact_id`, the
// frontend can swap the card in place.
let (thread_id, client_id) = current_chat_context();
crate::core::event_bus::publish_global(crate::core::event_bus::DomainEvent::ArtifactPending {
artifact_id: meta.id.clone(),
kind: meta.kind.as_str().to_string(),
title: meta.title.clone(),
workspace_dir: workspace_dir.to_string_lossy().into_owned(),
path: meta.path.clone(),
thread_id,
client_id,
});

Ok((meta, absolute_path))
}

Expand Down
136 changes: 136 additions & 0 deletions src/openhuman/artifacts/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,139 @@ async fn validate_artifact_id_rejects_slashes() {
"unexpected error: {err}"
);
}

// ── create_artifact event publication (#3162) ─────────────────────────────

use crate::core::event_bus::{
init_global, subscribe_global, DomainEvent, EventHandler, SubscriptionHandle,
};
use async_trait::async_trait;
use std::sync::{Arc, Mutex as StdMutex};

#[derive(Clone)]
struct PendingCollector {
events: Arc<StdMutex<Vec<DomainEvent>>>,
}

impl PendingCollector {
fn new() -> Self {
Self {
events: Arc::new(StdMutex::new(Vec::new())),
}
}

fn subscribe(&self) -> Option<SubscriptionHandle> {
subscribe_global(Arc::new(self.clone()))
}

fn snapshot(&self) -> Vec<DomainEvent> {
self.events.lock().unwrap().clone()
}
}

#[async_trait]
impl EventHandler for PendingCollector {
fn name(&self) -> &str {
"test::pending_collector"
}

/// Filter at the bus boundary so the broadcast channel never delivers
/// non-artifact traffic to this subscriber — keeps the per-test
/// receive buffer small even when other tests pump unrelated events
/// in parallel.
fn domains(&self) -> Option<&[&str]> {
Some(&["artifact"])
}

async fn handle(&self, event: &DomainEvent) {
// domains() filter guarantees only artifact-domain variants
// arrive, but match defensively in case the enum grows.
if matches!(
event,
DomainEvent::ArtifactPending { .. }
| DomainEvent::ArtifactReady { .. }
| DomainEvent::ArtifactFailed { .. }
) {
self.events.lock().unwrap().push(event.clone());
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/// #3162: `create_artifact` publishes `DomainEvent::ArtifactPending`
/// the moment the row is reserved, so the chat surface can render an
/// in-progress "Generating…" card before the file lands on disk.
#[tokio::test]
async fn create_artifact_publishes_artifact_pending_event() {
init_global(256);
let collector = PendingCollector::new();
let _handle = collector.subscribe();

let tmp = TempDir::new().unwrap();
let (meta, _path) = create_artifact(tmp.path(), ArtifactKind::Presentation, "Q3 Deck", "pptx")
.await
.expect("create_artifact succeeds");
let expected_workspace = tmp.path().to_string_lossy().into_owned();

// The bus is broadcast-based and processed off-task — wait until the
// subscriber's tokio task delivers OUR event. Filtering by
// artifact_id + workspace_dir keeps the test robust against parallel
// `cargo test` runs that may publish unrelated artifact lifecycle
// events into the same process-wide bus.
let matches_this_artifact = |event: &DomainEvent| {
matches!(
event,
DomainEvent::ArtifactPending {
artifact_id,
workspace_dir,
..
} if artifact_id == &meta.id && workspace_dir == &expected_workspace
)
};
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
if collector.snapshot().iter().any(matches_this_artifact) {
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"ArtifactPending event for id={} was not observed within 2s",
meta.id
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}

let mine: Vec<DomainEvent> = collector
.snapshot()
.into_iter()
.filter(matches_this_artifact)
.collect();
assert_eq!(
mine.len(),
1,
"exactly one ArtifactPending for {} expected, got {mine:?}",
meta.id
);
let DomainEvent::ArtifactPending {
artifact_id,
kind,
title,
workspace_dir,
path,
thread_id,
client_id,
} = &mine[0]
else {
unreachable!("filter pinned us to ArtifactPending");
};
assert_eq!(*artifact_id, meta.id);
assert_eq!(kind, "presentation");
assert_eq!(title, "Q3 Deck");
assert_eq!(*workspace_dir, expected_workspace);
assert_eq!(*path, meta.path);
// No chat context bound on this test task, so the routing fields are
// None — the web bridge drops the event in that case, which is the
// intended degradation path for CLI / cron / sub-agent callers.
assert!(thread_id.is_none(), "thread_id leaked, got {thread_id:?}");
assert!(client_id.is_none(), "client_id leaked, got {client_id:?}");
}
57 changes: 49 additions & 8 deletions src/openhuman/channels/providers/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,22 @@ pub fn register_approval_surface_subscriber() {
static ARTIFACT_SURFACE_HANDLE: OnceLock<SubscriptionHandle> = OnceLock::new();

/// Bridge artifact lifecycle events onto the web channel.
/// `DomainEvent::ArtifactReady` / `ArtifactFailed` (published by
/// `artifacts::store::{finalize,fail}_artifact` from #2778's producer
/// surface) carry the thread_id + client_id when the producing turn
/// ran under an `APPROVAL_CHAT_CONTEXT`. When present, fan out as an
/// `artifact_ready` / `artifact_failed` socket event so the frontend
/// `chatRuntimeSlice` can upsert the snapshot and the `ArtifactCard`
/// can render in the message timeline. Sub-task #2779 of #1535.
/// `DomainEvent::ArtifactPending` / `ArtifactReady` / `ArtifactFailed`
/// (published by `artifacts::store::{create,finalize,fail}_artifact`)
/// carry the thread_id + client_id when the producing turn ran under an
/// `APPROVAL_CHAT_CONTEXT`. When present, fan out as an
/// `artifact_pending` / `artifact_ready` / `artifact_failed` socket
/// event so the frontend `chatRuntimeSlice` can upsert the snapshot and
/// the `ArtifactCard` can render in the message timeline:
///
/// - `artifact_pending` → render an in-progress "Generating…" card the
/// moment the producing tool dispatches (#3162).
/// - `artifact_ready` → swap the same card to a download surface when
/// the file lands (#2779).
/// - `artifact_failed` → swap to a retry-hint card on producer error.
///
/// The card is keyed on `artifact_id`, so the Pending → Ready/Failed
/// transition reuses the same surface instead of flickering a new one.
/// Idempotent. No-op for non-chat events (thread/client id absent).
pub fn register_artifact_surface_subscriber() {
if ARTIFACT_SURFACE_HANDLE.get().is_some() {
Expand All @@ -83,7 +92,7 @@ pub fn register_artifact_surface_subscriber() {
Some(handle) => {
let _ = ARTIFACT_SURFACE_HANDLE.set(handle);
log::info!(
"[web-channel] artifact-surface subscriber registered (domain=artifact) — will bridge ArtifactReady/Failed → artifact_ready/artifact_failed socket events"
"[web-channel] artifact-surface subscriber registered (domain=artifact) — will bridge ArtifactPending/Ready/Failed → artifact_pending/artifact_ready/artifact_failed socket events"
);
}
None => {
Expand Down Expand Up @@ -175,6 +184,38 @@ impl EventHandler for ArtifactSurfaceSubscriber {
..Default::default()
});
}
DomainEvent::ArtifactPending {
artifact_id,
kind,
title,
workspace_dir,
path,
thread_id,
client_id,
} => {
let (Some(thread_id), Some(client_id)) = (thread_id, client_id) else {
log::debug!(
"[web-channel] artifact-surface skip ArtifactPending id={artifact_id}: no chat context"
);
return;
};
log::info!(
"[web-channel] artifact-surface emitting artifact_pending id={artifact_id} kind={kind} thread_id={thread_id} client_id={client_id}"
);
publish_web_channel_event(WebChannelEvent {
event: "artifact_pending".to_string(),
client_id: client_id.clone(),
thread_id: thread_id.clone(),
args: Some(serde_json::json!({
"artifact_id": artifact_id,
"kind": kind,
"title": title,
"workspace_dir": workspace_dir,
"path": path,
})),
..Default::default()
});
}
_ => {}
}
}
Expand Down
Loading