diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 7c4f7478..4d3868d0 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -887,9 +887,12 @@ fn start_event_loop_with_transport( tokio::spawn(async move { loop { event_queue.wait_for_events().await; - let batch = event_queue.dequeue_batch(10).await; + loop { + let batch = event_queue.dequeue_configured_batch().await; + if batch.is_empty() { + break; + } - if !batch.is_empty() { for envelope in batch { // Route to internal subscribers (e.g. RemoteSessionStateTracker) // sequentially so that text chunks are appended in order. diff --git a/src/crates/core/src/agentic/events/queue.rs b/src/crates/core/src/agentic/events/queue.rs index 36f32315..35f2ef7b 100644 --- a/src/crates/core/src/agentic/events/queue.rs +++ b/src/crates/core/src/agentic/events/queue.rs @@ -131,6 +131,11 @@ impl EventQueue { batch } + /// Dequeue a batch using the queue's configured batch size. + pub async fn dequeue_configured_batch(&self) -> Vec { + self.dequeue_batch(self.config.batch_size).await + } + /// Clear all events for a session pub async fn clear_session(&self, session_id: &str) -> BitFunResult<()> { // Remove all events for this session from the queue diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 141a8e61..14ef2831 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -1104,18 +1104,20 @@ impl ExecutionEngine { // Emit dialog turn completed event debug!("Preparing to send DialogTurnCompleted event"); - self.emit_event( - AgenticEvent::DialogTurnCompleted { - session_id: context.session_id.clone(), - turn_id: context.dialog_turn_id.clone(), - total_rounds: round_index + 1, - total_tools, - duration_ms, - subagent_parent_info: event_subagent_parent_info, - }, - EventPriority::High, - ) - .await; + let _ = self + .event_queue + .enqueue( + AgenticEvent::DialogTurnCompleted { + session_id: context.session_id.clone(), + turn_id: context.dialog_turn_id.clone(), + total_rounds: round_index + 1, + total_tools, + duration_ms, + subagent_parent_info: event_subagent_parent_info, + }, + None, + ) + .await; debug!("DialogTurnCompleted event sent"); diff --git a/src/crates/core/src/agentic/execution/stream_processor.rs b/src/crates/core/src/agentic/execution/stream_processor.rs index c2b8861e..b8bb9498 100644 --- a/src/crates/core/src/agentic/execution/stream_processor.rs +++ b/src/crates/core/src/agentic/execution/stream_processor.rs @@ -491,7 +491,7 @@ impl StreamProcessor { }, subagent_parent_info: ctx.event_subagent_parent_info.clone(), }, - Some(EventPriority::Normal), + None, ) .await; } else if ctx.tool_call_buffer.tool_name.is_empty() { @@ -522,7 +522,7 @@ impl StreamProcessor { }, subagent_parent_info: ctx.event_subagent_parent_info.clone(), }, - Some(EventPriority::Normal), + None, ) .await; } @@ -557,7 +557,7 @@ impl StreamProcessor { text, subagent_parent_info: ctx.event_subagent_parent_info.clone(), }, - Some(EventPriority::Normal), + None, ) .await; } @@ -582,7 +582,7 @@ impl StreamProcessor { is_end: false, subagent_parent_info: ctx.event_subagent_parent_info.clone(), }, - Some(EventPriority::Normal), + None, ) .await; } diff --git a/src/crates/core/src/agentic/tools/pipeline/state_manager.rs b/src/crates/core/src/agentic/tools/pipeline/state_manager.rs index d0c2f5bf..1e36d7e3 100644 --- a/src/crates/core/src/agentic/tools/pipeline/state_manager.rs +++ b/src/crates/core/src/agentic/tools/pipeline/state_manager.rs @@ -4,7 +4,7 @@ use super::types::ToolTask; use crate::agentic::core::ToolExecutionState; -use crate::agentic::events::{AgenticEvent, EventPriority, EventQueue, ToolEventData}; +use crate::agentic::events::{AgenticEvent, EventQueue, ToolEventData}; use dashmap::DashMap; use log::debug; use std::sync::Arc; @@ -215,25 +215,6 @@ impl ToolStateManager { }, }; - // Determine priority based on tool event type - let priority = match &task.state { - // Critical state change: High priority (user needs to see immediately) - ToolExecutionState::Running { .. } // Start execution - | ToolExecutionState::AwaitingConfirmation { .. } // Need confirmation - | ToolExecutionState::Completed { .. } // Completed - | ToolExecutionState::Failed { .. } // Failed - => EventPriority::High, - - // Cancel event: Critical priority (need immediate feedback) - ToolExecutionState::Cancelled { .. } => EventPriority::Critical, - - // Progress state: Normal priority (avoid blocking critical events) - ToolExecutionState::Queued { .. } - | ToolExecutionState::Waiting { .. } - | ToolExecutionState::Streaming { .. } - => EventPriority::Normal, - }; - let event_subagent_parent_info = task.context.subagent_parent_info.map(|info| info.into()); let event = AgenticEvent::ToolEvent { session_id: task.context.session_id, @@ -242,7 +223,7 @@ impl ToolStateManager { subagent_parent_info: event_subagent_parent_info, }; - let _ = self.event_queue.enqueue(event, Some(priority)).await; + let _ = self.event_queue.enqueue(event, None).await; } /// Get statistics diff --git a/src/crates/events/src/agentic.rs b/src/crates/events/src/agentic.rs index f61264eb..28a7d6f7 100644 --- a/src/crates/events/src/agentic.rs +++ b/src/crates/events/src/agentic.rs @@ -339,21 +339,46 @@ impl AgenticEvent { Self::SessionStateChanged { .. } | Self::SessionTitleGenerated { .. } - | Self::DialogTurnCompleted { .. } | Self::ContextCompressionFailed { .. } => AgenticEventPriority::High, Self::ImageAnalysisStarted { .. } | Self::ImageAnalysisCompleted { .. } | Self::TextChunk { .. } | Self::ThinkingChunk { .. } - | Self::ToolEvent { .. } | Self::ModelRoundStarted { .. } | Self::ModelRoundCompleted { .. } | Self::TokenUsageUpdated { .. } + | Self::DialogTurnCompleted { .. } | Self::ContextCompressionStarted { .. } | Self::ContextCompressionCompleted { .. } => AgenticEventPriority::Normal, + Self::ToolEvent { tool_event, .. } => tool_event.default_priority(), + _ => AgenticEventPriority::Low, } } } + +impl ToolEventData { + /// Get the default priority for a specific tool event variant. + pub fn default_priority(&self) -> AgenticEventPriority { + match self { + Self::Cancelled { .. } => AgenticEventPriority::Critical, + + Self::Started { .. } + | Self::Completed { .. } + | Self::Failed { .. } + | Self::ConfirmationNeeded { .. } => AgenticEventPriority::High, + + Self::EarlyDetected { .. } + | Self::ParamsPartial { .. } + | Self::Queued { .. } + | Self::Waiting { .. } + | Self::Progress { .. } + | Self::Streaming { .. } + | Self::StreamChunk { .. } + | Self::Confirmed { .. } + | Self::Rejected { .. } => AgenticEventPriority::Normal, + } + } +} diff --git a/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx b/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx index 56575039..002ff4b3 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx +++ b/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx @@ -99,7 +99,11 @@ const SessionsSection: React.FC = ({ const running = new Set(); for (const session of flowChatState.sessions.values()) { const machine = stateMachineManager.get(session.sessionId); - if (machine && machine.getCurrentState() === SessionExecutionState.PROCESSING) { + if ( + machine && + (machine.getCurrentState() === SessionExecutionState.PROCESSING || + machine.getCurrentState() === SessionExecutionState.FINISHING) + ) { running.add(session.sessionId); } } diff --git a/src/web-ui/src/app/layout/FloatingMiniChat.tsx b/src/web-ui/src/app/layout/FloatingMiniChat.tsx index 3741f07e..19e14912 100644 --- a/src/web-ui/src/app/layout/FloatingMiniChat.tsx +++ b/src/web-ui/src/app/layout/FloatingMiniChat.tsx @@ -70,7 +70,10 @@ export const FloatingMiniChat: React.FC = () => { } const lastTurn = activeSession.dialogTurns[activeSession.dialogTurns.length - 1]; - const isStreaming = lastTurn.status === 'processing' || lastTurn.status === 'image_analyzing'; + const isStreaming = + lastTurn.status === 'processing' || + lastTurn.status === 'finishing' || + lastTurn.status === 'image_analyzing'; return { isStreaming }; }, [flowChatState]); diff --git a/src/web-ui/src/flow_chat/components/btw/BtwSessionPanel.tsx b/src/web-ui/src/flow_chat/components/btw/BtwSessionPanel.tsx index e3685725..d5fd2eed 100644 --- a/src/web-ui/src/flow_chat/components/btw/BtwSessionPanel.tsx +++ b/src/web-ui/src/flow_chat/components/btw/BtwSessionPanel.tsx @@ -143,7 +143,10 @@ export const BtwSessionPanel: React.FC = ({ const lastModelRound = lastDialogTurn?.modelRounds[lastDialogTurn.modelRounds.length - 1]; const lastItem = lastModelRound?.items[lastModelRound.items.length - 1]; const lastItemContent = lastItem && 'content' in lastItem ? String((lastItem as any).content || '') : ''; - const isTurnProcessing = lastDialogTurn?.status === 'processing' || lastDialogTurn?.status === 'image_analyzing'; + const isTurnProcessing = + lastDialogTurn?.status === 'processing' || + lastDialogTurn?.status === 'finishing' || + lastDialogTurn?.status === 'image_analyzing'; const [isContentGrowing, setIsContentGrowing] = useState(true); const lastContentRef = useRef(lastItemContent); const contentTimeoutRef = useRef | null>(null); diff --git a/src/web-ui/src/flow_chat/components/modern/VirtualMessageList.tsx b/src/web-ui/src/flow_chat/components/modern/VirtualMessageList.tsx index b40e76f6..7f8f8ac7 100644 --- a/src/web-ui/src/flow_chat/components/modern/VirtualMessageList.tsx +++ b/src/web-ui/src/flow_chat/components/modern/VirtualMessageList.tsx @@ -1447,6 +1447,7 @@ export const VirtualMessageList = forwardRef((_, ref) => if ( lastDialogTurn.status === 'processing' || + lastDialogTurn.status === 'finishing' || lastDialogTurn.status === 'image_analyzing' ) { return true; @@ -1781,8 +1782,10 @@ export const VirtualMessageList = forwardRef((_, ref) => : undefined; const content = lastItem && 'content' in lastItem ? (lastItem as any).content : ''; - const isTurnProcessing = lastDialogTurn?.status === 'processing' || - lastDialogTurn?.status === 'image_analyzing'; + const isTurnProcessing = + lastDialogTurn?.status === 'processing' || + lastDialogTurn?.status === 'finishing' || + lastDialogTurn?.status === 'image_analyzing'; return { lastItem, lastDialogTurn, content, isTurnProcessing }; }, [activeSession]); diff --git a/src/web-ui/src/flow_chat/components/toolbar-mode/ToolbarMode.tsx b/src/web-ui/src/flow_chat/components/toolbar-mode/ToolbarMode.tsx index 15a3827d..991a1d53 100644 --- a/src/web-ui/src/flow_chat/components/toolbar-mode/ToolbarMode.tsx +++ b/src/web-ui/src/flow_chat/components/toolbar-mode/ToolbarMode.tsx @@ -118,7 +118,10 @@ export const ToolbarMode: React.FC = () => { const lastTurn = activeSession.dialogTurns[activeSession.dialogTurns.length - 1]; - const isStreaming = lastTurn.status === 'processing' || lastTurn.status === 'image_analyzing'; + const isStreaming = + lastTurn.status === 'processing' || + lastTurn.status === 'finishing' || + lastTurn.status === 'image_analyzing'; if (!isStreaming || !lastTurn.modelRounds || lastTurn.modelRounds.length === 0) { return { isStreaming, toolName: null, content: null }; diff --git a/src/web-ui/src/flow_chat/services/FlowChatManager.ts b/src/web-ui/src/flow_chat/services/FlowChatManager.ts index 836d2159..b49ceaa1 100644 --- a/src/web-ui/src/flow_chat/services/FlowChatManager.ts +++ b/src/web-ui/src/flow_chat/services/FlowChatManager.ts @@ -54,6 +54,7 @@ export class FlowChatManager { eventBatcher: new EventBatcher({ onFlush: (events) => this.processBatchedEvents(events) }), + pendingTurnCompletions: new Map(), contentBuffers: new Map(), activeTextItems: new Map(), saveDebouncers: new Map(), @@ -351,7 +352,11 @@ export class FlowChatManager { const session = this.context.flowChatStore.getState().sessions.get(parentSessionId); const turn = session?.dialogTurns.find(t => t.id === dialogTurnId); if (!turn) return; - if (turn.status !== 'processing' && turn.status !== 'image_analyzing') { + if ( + turn.status !== 'processing' && + turn.status !== 'finishing' && + turn.status !== 'image_analyzing' + ) { // Only inject into an actively streaming turn; otherwise we'd create dangling streaming items. return; } diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts index 71917623..8f6bc688 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts @@ -49,6 +49,25 @@ import { } from './SubagentModule'; const log = createLogger('EventHandlerModule'); +const TURN_COMPLETION_QUIET_WINDOW_MS = 500; + +function isStreamingExecutionState(state: SessionExecutionState): boolean { + return state === SessionExecutionState.PROCESSING || state === SessionExecutionState.FINISHING; +} + +function logDroppedDataEvent( + eventName: string, + sessionId: string, + turnId: string | null, + details: Record +): void { + log.debug('Dropped agentic data event', { + eventName, + sessionId, + turnId, + ...details, + }); +} /** * Event filtering mechanism: determines if an event should be processed @@ -56,10 +75,14 @@ const log = createLogger('EventHandlerModule'); export function shouldProcessEvent( sessionId: string, turnId: string | null, - eventType: 'data' | 'control' | 'state_sync' + eventType: 'data' | 'control' | 'state_sync', + eventName = 'unknown' ): boolean { const machine = stateMachineManager.get(sessionId); if (!machine) { + if (eventType === 'data') { + logDroppedDataEvent(eventName, sessionId, turnId, { reason: 'missing_state_machine' }); + } return false; } @@ -77,16 +100,21 @@ export function shouldProcessEvent( return false; } - if (currentState !== SessionExecutionState.PROCESSING) { + if (!isStreamingExecutionState(currentState)) { + logDroppedDataEvent(eventName, sessionId, turnId, { + reason: 'state_not_accepting_data', + currentState, + currentDialogTurnId: context.currentDialogTurnId, + }); return false; } if (turnId && context.currentDialogTurnId !== turnId) { - log.debug('Event filtered: turnId mismatch', { + logDroppedDataEvent(eventName, sessionId, turnId, { + reason: 'turn_id_mismatch', sessionId, - eventTurnId: turnId, - currentTurnId: context.currentDialogTurnId, - currentState + currentState, + currentDialogTurnId: context.currentDialogTurnId, }); return false; } @@ -267,6 +295,175 @@ function extractEventRemoteSshHost(event?: Record | null): stri return h?.trim() || undefined; } +function clearPendingTurnCompletion( + context: FlowChatContext, + sessionId: string, + turnId?: string +): void { + const pending = context.pendingTurnCompletions.get(sessionId); + if (!pending) { + return; + } + + if (turnId && pending.turnId !== turnId) { + return; + } + + if (pending.timer) { + clearTimeout(pending.timer); + } + + context.pendingTurnCompletions.delete(sessionId); +} + +function touchPendingTurnCompletion( + context: FlowChatContext, + sessionId: string, + turnId: string +): void { + const pending = context.pendingTurnCompletions.get(sessionId); + if (!pending || pending.turnId !== turnId) { + return; + } + + pending.lastActivityAt = Date.now(); + schedulePendingTurnCompletion(context, sessionId, turnId); +} + +function schedulePendingTurnCompletion( + context: FlowChatContext, + sessionId: string, + turnId: string +): void { + const pending = context.pendingTurnCompletions.get(sessionId); + if (!pending || pending.turnId !== turnId) { + return; + } + + if (pending.timer) { + clearTimeout(pending.timer); + } + + pending.timer = setTimeout(() => { + finalizePendingTurnCompletion(context, sessionId, turnId); + }, TURN_COMPLETION_QUIET_WINDOW_MS); +} + +function beginTurnCompletion(context: FlowChatContext, sessionId: string, turnId: string): void { + clearPendingTurnCompletion(context, sessionId); + + context.pendingTurnCompletions.set(sessionId, { + turnId, + lastActivityAt: Date.now(), + timer: null, + }); + + schedulePendingTurnCompletion(context, sessionId, turnId); +} + +function flushPendingBatchedEvents(context: FlowChatContext): void { + if (context.eventBatcher.getBufferSize() > 0) { + context.eventBatcher.flushNow(); + } +} + +function finalizeTurnCompletionState( + context: FlowChatContext, + sessionId: string, + turnId: string +): void { + const store = FlowChatStore.getInstance(); + const session = store.getState().sessions.get(sessionId); + + if (!session) { + clearPendingTurnCompletion(context, sessionId, turnId); + return; + } + + completeActiveTextItems(context, sessionId, turnId); + + const sessionContentBuffer = context.contentBuffers.get(sessionId); + if (sessionContentBuffer) { + sessionContentBuffer.clear(); + } + + context.flowChatStore.markSessionFinished(sessionId); + + context.flowChatStore.updateDialogTurn(sessionId, turnId, turn => { + const updatedModelRounds = turn.modelRounds.map((round) => { + if (round.isStreaming) { + return { + ...round, + isStreaming: false, + isComplete: true, + status: 'completed' as const, + endTime: Date.now() + }; + } + return round; + }); + + return { + ...turn, + modelRounds: updatedModelRounds, + status: 'completed' as const, + endTime: Date.now() + }; + }); + + const currentState = stateMachineManager.getCurrentState(sessionId); + if (isStreamingExecutionState(currentState)) { + stateMachineManager.transition(sessionId, SessionExecutionEvent.FINISHING_SETTLED); + } else { + log.debug('Skipping FINISHING_SETTLED transition', { currentState, sessionId, turnId }); + } + + const dialogTurn = store.getState().sessions.get(sessionId)?.dialogTurns.find(t => t.id === turnId); + if (dialogTurn) { + appendPlanDisplayItemsIfNeeded(context, sessionId, turnId, dialogTurn); + } + + saveDialogTurnToDisk(context, sessionId, turnId).catch(error => { + log.warn('Failed to save dialog turn (non-critical)', { sessionId, turnId, error }); + }); + + clearPendingTurnCompletion(context, sessionId, turnId); +} + +function finalizePendingTurnCompletion( + context: FlowChatContext, + sessionId: string, + turnId: string +): void { + const pending = context.pendingTurnCompletions.get(sessionId); + if (!pending || pending.turnId !== turnId) { + return; + } + + const elapsed = Date.now() - pending.lastActivityAt; + if (elapsed < TURN_COMPLETION_QUIET_WINDOW_MS) { + schedulePendingTurnCompletion(context, sessionId, turnId); + return; + } + + flushPendingBatchedEvents(context); + finalizeTurnCompletionState(context, sessionId, turnId); +} + +function finalizePendingTurnCompletionNow(context: FlowChatContext, sessionId: string): void { + const pending = context.pendingTurnCompletions.get(sessionId); + if (!pending) { + return; + } + + if (pending.timer) { + clearTimeout(pending.timer); + } + + flushPendingBatchedEvents(context); + finalizeTurnCompletionState(context, sessionId, pending.turnId); +} + /** * Handle session title generated event (from AI auto-generation) */ @@ -290,6 +487,7 @@ function handleSessionDeleted(context: FlowChatContext, event: any): void { log.info('Remote session deleted', { sessionId }); removedSessionIds.forEach(id => { + clearPendingTurnCompletion(context, id); pendingImageAnalysisTurns.delete(id); stateMachineManager.delete(id); context.processingManager.clearSessionStatus(id); @@ -313,11 +511,14 @@ function handleSessionStateChanged(event: any): void { const frontendState = mapBackendStateToFrontend(newState); const currentFrontendState = machine.getCurrentState(); + const isExpectedFinishingDrift = + currentFrontendState === SessionExecutionState.FINISHING && + frontendState === SessionExecutionState.IDLE; const context = machine.getContext(); (context as any).backendSyncedAt = Date.now(); - if (currentFrontendState !== frontendState) { + if (currentFrontendState !== frontendState && !isExpectedFinishingDrift) { log.warn('Frontend and backend state mismatch', { sessionId, frontend: currentFrontendState, @@ -471,6 +672,9 @@ function handleDialogTurnStarted(context: FlowChatContext, event: any): void { return; } + finalizePendingTurnCompletionNow(context, sessionId); + clearPendingTurnCompletion(context, sessionId, turnId); + const store = FlowChatStore.getInstance(); // Clean up temp image analysis turn if one exists for this session @@ -591,7 +795,7 @@ function handleTextChunk(context: FlowChatContext, event: any): void { const targetSessionId = parentSessionId || sessionId; const targetTurnId = parentTurnId || turnId; - if (!shouldProcessEvent(targetSessionId, targetTurnId, 'data')) { + if (!shouldProcessEvent(targetSessionId, targetTurnId, 'data', 'TextChunk')) { return; } @@ -612,8 +816,9 @@ function handleTextChunk(context: FlowChatContext, event: any): void { } if (!subagentParentInfo) { + touchPendingTurnCompletion(context, sessionId, turnId); const currentState = stateMachineManager.getCurrentState(sessionId); - if (currentState === SessionExecutionState.PROCESSING) { + if (isStreamingExecutionState(currentState)) { stateMachineManager.transition(sessionId, SessionExecutionEvent.TEXT_CHUNK_RECEIVED, { content: text, }).catch(error => { @@ -750,9 +955,13 @@ function handleToolEvent( const targetSessionId = parentSessionId || sessionId; const targetTurnId = parentTurnId || turnId; - if (!shouldProcessEvent(targetSessionId, targetTurnId, 'data')) { + if (!shouldProcessEvent(targetSessionId, targetTurnId, 'data', 'ToolEvent')) { return; } + + if (!subagentParentInfo) { + touchPendingTurnCompletion(context, sessionId, turnId); + } const eventData: ToolEventData = { sessionId, @@ -812,7 +1021,7 @@ function handleModelRoundStart(context: FlowChatContext, event: any): void { return; } - if (!shouldProcessEvent(sessionId, turnId, 'data')) { + if (!shouldProcessEvent(sessionId, turnId, 'data', 'ModelRoundStarted')) { return; } @@ -830,8 +1039,10 @@ function handleModelRoundStart(context: FlowChatContext, event: any): void { return; } + touchPendingTurnCompletion(context, sessionId, turnId); + const currentState = stateMachineManager.getCurrentState(sessionId); - if (currentState === SessionExecutionState.PROCESSING) { + if (isStreamingExecutionState(currentState)) { stateMachineManager.transition(sessionId, SessionExecutionEvent.MODEL_ROUND_START, { modelRoundId: roundId, }).catch(error => { @@ -1023,58 +1234,25 @@ function handleDialogTurnComplete( return; } - if (context.eventBatcher.getBufferSize() > 0) { - context.eventBatcher.flushNow(); - } - - completeActiveTextItems(context, sessionId, turnId); - - const sessionContentBuffer = context.contentBuffers.get(sessionId); - if (sessionContentBuffer) { - sessionContentBuffer.clear(); - } - - context.flowChatStore.markSessionFinished(sessionId); - context.flowChatStore.updateDialogTurn(sessionId, turnId, turn => { - const updatedModelRounds = turn.modelRounds.map((round) => { - if (round.isStreaming) { - return { - ...round, - isStreaming: false, - isComplete: true, - status: 'completed' as const, - endTime: Date.now() - }; - } - return round; - }); - return { ...turn, - modelRounds: updatedModelRounds, - status: 'completed' as const, - endTime: Date.now() + status: 'finishing' as const, }; }); const currentState = stateMachineManager.getCurrentState(sessionId); if (currentState === SessionExecutionState.PROCESSING) { - stateMachineManager.transition(sessionId, SessionExecutionEvent.STREAM_COMPLETE).catch(error => { - log.error('State machine transition failed on stream complete', { sessionId, error }); - }); + void stateMachineManager + .transition(sessionId, SessionExecutionEvent.BACKEND_STREAM_COMPLETED) + .catch(error => { + log.error('State machine transition failed on backend stream completed', { sessionId, error }); + }); } else { - log.debug('Skipping STREAM_COMPLETE transition', { currentState, sessionId }); - } - - const dialogTurn = session.dialogTurns.find(t => t.id === turnId); - if (dialogTurn) { - appendPlanDisplayItemsIfNeeded(context, sessionId, turnId, dialogTurn); + log.debug('Skipping BACKEND_STREAM_COMPLETED transition', { currentState, sessionId, turnId }); } - - saveDialogTurnToDisk(context, sessionId, turnId).catch(error => { - log.warn('Failed to save dialog turn (non-critical)', { sessionId, turnId, error }); - }); + + beginTurnCompletion(context, sessionId, turnId); } /** @@ -1088,6 +1266,7 @@ function handleDialogTurnFailed(context: FlowChatContext, event: any): void { } log.error('Dialog turn failed', { sessionId, turnId, error }); + clearPendingTurnCompletion(context, sessionId, turnId); const store = FlowChatStore.getInstance(); const session = store.getState().sessions.get(sessionId); @@ -1154,7 +1333,7 @@ function handleDialogTurnFailed(context: FlowChatContext, event: any): void { } const currentState = stateMachineManager.getCurrentState(sessionId); - if (currentState === SessionExecutionState.PROCESSING) { + if (isStreamingExecutionState(currentState)) { stateMachineManager.transition(sessionId, SessionExecutionEvent.ERROR_OCCURRED, { error: error || 'Execution failed' }).catch(err => { @@ -1186,6 +1365,7 @@ function handleDialogTurnCancelled( } log.info('Dialog turn cancelled', { sessionId, turnId }); + clearPendingTurnCompletion(context, sessionId, turnId); const store = FlowChatStore.getInstance(); const session = store.getState().sessions.get(sessionId); @@ -1243,10 +1423,12 @@ function handleDialogTurnCancelled( // so the machine is already IDLE. When cancellation comes from an // external source (mobile remote), the machine is still PROCESSING. const currentState = stateMachineManager.getCurrentState(sessionId); - if (currentState === SessionExecutionState.PROCESSING) { - stateMachineManager.transition(sessionId, SessionExecutionEvent.STREAM_COMPLETE).catch(error => { - log.error('State machine transition failed on cancelled stream complete', { sessionId, error }); - }); + if (isStreamingExecutionState(currentState)) { + void stateMachineManager + .transition(sessionId, SessionExecutionEvent.FINISHING_SETTLED) + .catch(error => { + log.error('State machine transition failed on cancelled finishing settled', { sessionId, error }); + }); } } diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts index ceb58563..eef64c2d 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts @@ -268,6 +268,7 @@ export async function saveAllInProgressTurns(context: FlowChatContext): Promise< if ( lastTurn.status === 'processing' || + lastTurn.status === 'finishing' || lastTurn.status === 'pending' || lastTurn.status === 'image_analyzing' ) { diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts index 6ce4ccf0..0ea0d593 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts @@ -160,6 +160,14 @@ export function cleanupSessionBuffers(context: FlowChatContext, sessionId: strin if (batcherSize > 0) { context.eventBatcher.clear(); } + + const pendingCompletion = context.pendingTurnCompletions.get(sessionId); + if (pendingCompletion) { + if (pendingCompletion.timer) { + clearTimeout(pendingCompletion.timer); + } + context.pendingTurnCompletions.delete(sessionId); + } const contentBuffer = context.contentBuffers.get(sessionId); if (contentBuffer) { @@ -176,6 +184,13 @@ export function cleanupSessionBuffers(context: FlowChatContext, sessionId: strin * Clear all buffers and transient state. */ export function clearAllBuffers(context: FlowChatContext): void { + for (const pendingCompletion of context.pendingTurnCompletions.values()) { + if (pendingCompletion.timer) { + clearTimeout(pendingCompletion.timer); + } + } + context.pendingTurnCompletions.clear(); + context.contentBuffers.clear(); context.activeTextItems.clear(); diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts index 3575101a..0154cfb4 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts @@ -14,6 +14,11 @@ export interface FlowChatContext { flowChatStore: FlowChatStore; processingManager: typeof processingStatusManager; eventBatcher: EventBatcher; + pendingTurnCompletions: Map | null; + }>; /** Content buffers: sessionId -> (roundId -> content) */ contentBuffers: Map>; /** Active text items: sessionId -> (roundId -> textItemId) */ diff --git a/src/web-ui/src/flow_chat/state-machine/SessionStateMachine.ts b/src/web-ui/src/flow_chat/state-machine/SessionStateMachine.ts index 859d0271..5642dc20 100644 --- a/src/web-ui/src/flow_chat/state-machine/SessionStateMachine.ts +++ b/src/web-ui/src/flow_chat/state-machine/SessionStateMachine.ts @@ -5,6 +5,7 @@ import { SessionExecutionState, SessionExecutionEvent, + ProcessingPhase, SessionStateMachine, SessionStateMachineContext, StateTransition, @@ -146,9 +147,15 @@ export class SessionStateMachineImpl { } private updateContext(event: SessionExecutionEvent, payload?: any) { - if (this.currentState === SessionExecutionState.PROCESSING) { + if ( + this.currentState === SessionExecutionState.PROCESSING || + this.currentState === SessionExecutionState.FINISHING + ) { const newPhase = PHASE_TRANSITIONS[event]; - if (newPhase !== null && newPhase !== undefined) { + const shouldUpdatePhase = + this.currentState === SessionExecutionState.PROCESSING || + event === SessionExecutionEvent.BACKEND_STREAM_COMPLETED; + if (shouldUpdatePhase && newPhase !== null && newPhase !== undefined) { this.context.processingPhase = newPhase; } } else { @@ -202,8 +209,12 @@ export class SessionStateMachineImpl { this.context.errorRecovery.recoverable = payload?.recoverable !== false; break; + case SessionExecutionEvent.BACKEND_STREAM_COMPLETED: + this.context.processingPhase = ProcessingPhase.FINALIZING; + break; + case SessionExecutionEvent.USER_CANCEL: - case SessionExecutionEvent.STREAM_COMPLETE: + case SessionExecutionEvent.FINISHING_SETTLED: case SessionExecutionEvent.RESET: if (this.currentState === SessionExecutionState.IDLE) { const queuedInput = this.context.queuedInput; diff --git a/src/web-ui/src/flow_chat/state-machine/derivedState.ts b/src/web-ui/src/flow_chat/state-machine/derivedState.ts index 1a214773..296f1048 100644 --- a/src/web-ui/src/flow_chat/state-machine/derivedState.ts +++ b/src/web-ui/src/flow_chat/state-machine/derivedState.ts @@ -1,5 +1,5 @@ /** - * Derived state computation (three-state design) + * Derived state computation * Computes derived state for UI components based on state machine state * * Design principles: @@ -27,6 +27,7 @@ export function deriveSessionState( const { processingPhase } = context; const draftTrimmed = currentState === SessionExecutionState.PROCESSING || + currentState === SessionExecutionState.FINISHING || currentState === SessionExecutionState.ERROR ? options?.processingInputDraftTrimmed?.trim() ?? '' : ''; @@ -43,15 +44,18 @@ export function deriveSessionState( ? (plannerStats.completed / context.planner!.todos.length) * 100 : 0; - const isProcessing = currentState === SessionExecutionState.PROCESSING; + const isProcessing = + currentState === SessionExecutionState.PROCESSING || + currentState === SessionExecutionState.FINISHING; const isError = currentState === SessionExecutionState.ERROR; const isIdle = currentState === SessionExecutionState.IDLE; + const canCancel = currentState === SessionExecutionState.PROCESSING; return { isInputDisabled: false, showSendButton: !isProcessing, - showCancelButton: isProcessing, + showCancelButton: canCancel, sendButtonMode: getSendButtonMode( currentState, @@ -82,12 +86,13 @@ export function deriveSessionState( progressBarColor: getProgressBarColor(processingPhase), isProcessing, - canCancel: isProcessing, + canCancel, canSendNewMessage: isIdle || isError, hasQueuedInput: (context.queuedInput?.trim()?.length ?? 0) > 0 || ((currentState === SessionExecutionState.PROCESSING || + currentState === SessionExecutionState.FINISHING || currentState === SessionExecutionState.ERROR) && draftTrimmed.length > 0), @@ -109,10 +114,13 @@ function getSendButtonMode( return hasQueued ? 'split' : 'retry'; } - if (state === SessionExecutionState.PROCESSING) { + if (state === SessionExecutionState.PROCESSING || state === SessionExecutionState.FINISHING) { if (phase === ProcessingPhase.TOOL_CONFIRMING || hasPendingConfirmations) { return 'confirm'; } + if (state === SessionExecutionState.FINISHING) { + return 'send'; + } const hasFollowUpDraft = (queuedInput?.trim()?.length ?? 0) > 0 || processingDraftTrimmed.length > 0; return hasFollowUpDraft ? 'split' : 'cancel'; @@ -128,6 +136,9 @@ function getProgressBarMode(phase: ProcessingPhase | null): SessionDerivedState[ case ProcessingPhase.STREAMING: return 'determinate'; + + case ProcessingPhase.FINALIZING: + return 'indeterminate'; default: return 'indeterminate'; @@ -177,6 +188,9 @@ function getProgressBarLabel( ? ((Date.now() - context.stats.startTime) / 1000).toFixed(1) : '0'; return `Generating response (${chars} chars) · ${duration}s`; + + case ProcessingPhase.FINALIZING: + return 'Finalizing response...'; case ProcessingPhase.TOOL_CALLING: const toolsExecuted = context.stats.toolsExecuted; diff --git a/src/web-ui/src/flow_chat/state-machine/transitions.ts b/src/web-ui/src/flow_chat/state-machine/transitions.ts index 2c6838a2..ee9fd097 100644 --- a/src/web-ui/src/flow_chat/state-machine/transitions.ts +++ b/src/web-ui/src/flow_chat/state-machine/transitions.ts @@ -1,15 +1,16 @@ /** - * State transition table definition (three-state design) + * State transition table definition */ import { SessionExecutionState, SessionExecutionEvent, StateTransitionTable, ProcessingPhase } from './types'; /** - * Minimal three-state transition table + * State transition table * * Design philosophy: * - IDLE: idle, can start new task * - PROCESSING: running, can be cancelled or error + * - FINISHING: backend completed, frontend is draining late events before becoming idle * - ERROR: error state, can reset or retry * * Cancellation logic: USER_CANCEL → immediately switch to IDLE (no backend wait) @@ -21,10 +22,11 @@ export const STATE_TRANSITIONS: StateTransitionTable = { [SessionExecutionState.PROCESSING]: { [SessionExecutionEvent.USER_CANCEL]: SessionExecutionState.IDLE, + [SessionExecutionEvent.FINISHING_SETTLED]: SessionExecutionState.IDLE, [SessionExecutionEvent.ERROR_OCCURRED]: SessionExecutionState.ERROR, - [SessionExecutionEvent.STREAM_COMPLETE]: SessionExecutionState.IDLE, + [SessionExecutionEvent.BACKEND_STREAM_COMPLETED]: SessionExecutionState.FINISHING, [SessionExecutionEvent.MODEL_ROUND_START]: SessionExecutionState.PROCESSING, [SessionExecutionEvent.TEXT_CHUNK_RECEIVED]: SessionExecutionState.PROCESSING, @@ -35,6 +37,20 @@ export const STATE_TRANSITIONS: StateTransitionTable = { [SessionExecutionEvent.TOOL_CONFIRMED]: SessionExecutionState.PROCESSING, [SessionExecutionEvent.TOOL_REJECTED]: SessionExecutionState.IDLE, }, + + [SessionExecutionState.FINISHING]: { + [SessionExecutionEvent.USER_CANCEL]: SessionExecutionState.IDLE, + [SessionExecutionEvent.ERROR_OCCURRED]: SessionExecutionState.ERROR, + [SessionExecutionEvent.FINISHING_SETTLED]: SessionExecutionState.IDLE, + [SessionExecutionEvent.MODEL_ROUND_START]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TEXT_CHUNK_RECEIVED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TOOL_DETECTED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TOOL_STARTED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TOOL_COMPLETED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TOOL_CONFIRMATION_NEEDED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TOOL_CONFIRMED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.TOOL_REJECTED]: SessionExecutionState.IDLE, + }, [SessionExecutionState.ERROR]: { [SessionExecutionEvent.RESET]: SessionExecutionState.IDLE, @@ -56,7 +72,8 @@ export const PHASE_TRANSITIONS: Record