Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/packages/core/agent_framework/_workflows/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ async def run_until_convergence(self) -> AsyncGenerator[WorkflowEvent, None]:
logger.info("Skipping 'after_initial_execution' checkpoint because we resumed from a checkpoint")

while self._iteration < self._max_iterations:
# Check if there are any messages to process before starting iteration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this prevent the workflow from ever getting started because there may not be messages in the queue at the beginning?

# This prevents unnecessary iterations when resuming from completed checkpoints
if not await self._ctx.has_messages():
logger.info("No messages to process; workflow is idle")
break

logger.info(f"Starting superstep {self._iteration + 1}")

# Run iteration concurrently with live event streaming: we poll
Expand Down
203 changes: 106 additions & 97 deletions python/packages/core/agent_framework/_workflows/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ async def _run_workflow_with_tracing(
saw_request = False
emitted_in_progress_pending = False
try:
# Reset context for a new run if supported
if reset_context:
self._runner.reset_iteration_count()
self._runner.context.reset_for_new_run()
await self._shared_state.clear()

# Set streaming mode after reset
self._runner_context.set_streaming(streaming)

# Add workflow started event (telemetry + surface state to consumers)
span.add_event(OtelAttr.WORKFLOW_STARTED)
# Emit explicit start/status events to the stream
Expand All @@ -313,15 +322,6 @@ async def _run_workflow_with_tracing(
in_progress = WorkflowStatusEvent(WorkflowRunState.IN_PROGRESS)
yield in_progress

# Reset context for a new run if supported
if reset_context:
self._runner.reset_iteration_count()
self._runner.context.reset_for_new_run()
await self._shared_state.clear()

# Set streaming mode after reset
self._runner_context.set_streaming(streaming)

# Execute initial setup if provided
if initial_executor_fn:
await initial_executor_fn()
Expand Down Expand Up @@ -423,57 +423,61 @@ async def run_stream_from_checkpoint(
"""
self._ensure_not_running()
try:
# Restore checkpoint and process any pending messages
has_checkpointing = self._runner.context.has_checkpointing()

async def checkpoint_restoration() -> None:
has_checkpointing = self._runner.context.has_checkpointing()
if not has_checkpointing and checkpoint_storage is None:
raise ValueError(
"Cannot restore from checkpoint: either provide checkpoint_storage parameter "
"or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)."
)

if not has_checkpointing and checkpoint_storage is None:
raise ValueError(
"Cannot restore from checkpoint: either provide checkpoint_storage parameter "
"or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)."
)
restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage)

restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage)

if not restored:
raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}")

# Process any pending messages from the checkpoint first
# This ensures that RequestInfoExecutor state is properly populated
# before we try to handle responses
if await self._runner.context.has_messages():
# Run one iteration to process pending messages
# This will populate RequestInfoExecutor._request_events properly
await self._runner._run_iteration() # type: ignore

if responses:
request_info_executor = self._find_request_info_executor()
if request_info_executor:
for request_id, response_data in responses.items():
ctx: WorkflowContext[Any] = WorkflowContext(
request_info_executor.id,
[self.__class__.__name__],
self._shared_state,
self._runner.context,
trace_contexts=None, # No parent trace context for new workflow span
source_span_ids=None, # No source span for response handling
)
if not restored:
raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}")

# Process any pending messages from the checkpoint first
had_messages_before = await self._runner.context.has_messages()
if had_messages_before:
# Run one iteration to process pending messages
await self._runner._run_iteration() # type: ignore

# Check if workflow is already complete
# Only return early if checkpoint had NO messages to begin with
if not had_messages_before and not await self._runner.context.has_messages():
return

if not await request_info_executor.has_pending_request(request_id, ctx):
logger.debug(
f"Skipping pre-supplied response for request {request_id}; "
f"no pending request found after checkpoint restoration."
)
continue

await request_info_executor.handle_response(
response_data,
request_id,
ctx,
# Handle any pre-supplied responses
if responses:
request_info_executor = self._find_request_info_executor()
if request_info_executor:
for request_id, response_data in responses.items():
ctx: WorkflowContext[Any] = WorkflowContext(
request_info_executor.id,
[self.__class__.__name__],
self._shared_state,
self._runner.context,
trace_contexts=None, # No parent trace context for new workflow span
source_span_ids=None, # No source span for response handling
)

if not await request_info_executor.has_pending_request(request_id, ctx):
logger.debug(
f"Skipping pre-supplied response for request {request_id}; "
f"no pending request found after checkpoint restoration."
)
continue

await request_info_executor.handle_response(
response_data,
request_id,
ctx,
)

# Continue workflow execution from restored state
async for event in self._run_workflow_with_tracing(
initial_executor_fn=checkpoint_restoration,
initial_executor_fn=None, # Already restored above
reset_context=False, # Don't reset context when resuming from checkpoint
streaming=True,
):
Expand Down Expand Up @@ -603,59 +607,64 @@ async def run_from_checkpoint(
"""
self._ensure_not_running()
try:
# Restore checkpoint and process any pending messages
has_checkpointing = self._runner.context.has_checkpointing()

async def checkpoint_restoration() -> None:
has_checkpointing = self._runner.context.has_checkpointing()
if not has_checkpointing and checkpoint_storage is None:
raise ValueError(
"Cannot restore from checkpoint: either provide checkpoint_storage parameter "
"or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)."
)

if not has_checkpointing and checkpoint_storage is None:
raise ValueError(
"Cannot restore from checkpoint: either provide checkpoint_storage parameter "
"or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)."
)
restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage)

restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage)

if not restored:
raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}")

# Process any pending messages from the checkpoint first
# This ensures that RequestInfoExecutor state is properly populated
# before we try to handle responses
if await self._runner.context.has_messages():
# Run one iteration to process pending messages
# This will populate RequestInfoExecutor._request_events properly
await self._runner._run_iteration() # type: ignore

if responses:
request_info_executor = self._find_request_info_executor()
if request_info_executor:
for request_id, response_data in responses.items():
ctx: WorkflowContext[Any] = WorkflowContext(
request_info_executor.id,
[self.__class__.__name__],
self._shared_state,
self._runner.context,
trace_contexts=None, # No parent trace context for new workflow span
source_span_ids=None, # No source span for response handling
)
if not restored:
raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}")

if not await request_info_executor.has_pending_request(request_id, ctx):
logger.debug(
f"Skipping pre-supplied response for request {request_id}; "
f"no pending request found after checkpoint restoration."
)
continue

await request_info_executor.handle_response(
response_data,
request_id,
ctx,
# Process any pending messages from the checkpoint first
had_messages_before = await self._runner.context.has_messages()
if had_messages_before:
# Run one iteration to process pending messages
await self._runner._run_iteration() # type: ignore

# Check if workflow is already complete
# Only return early if checkpoint had NO messages to begin with
if not had_messages_before and not await self._runner.context.has_messages():
# Return empty result - workflow was already complete
Copy link

Copilot AI Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The empty WorkflowRunResult([], []) doesn't preserve any status events. If the workflow was previously completed, there might be value in returning the final status from the checkpoint. Consider whether the return value should include the completion status or document why an empty result is appropriate.

Suggested change
# Return empty result - workflow was already complete
# Return result with final status event if available
final_status = self._runner.context.get_status() if hasattr(self._runner.context, "get_status") else None
if final_status is not None:
status_event = WorkflowStatusEvent(final_status)
return WorkflowRunResult([], [status_event])

Copilot uses AI. Check for mistakes.
return WorkflowRunResult([], [])

# Handle any pre-supplied responses
if responses:
request_info_executor = self._find_request_info_executor()
if request_info_executor:
for request_id, response_data in responses.items():
ctx: WorkflowContext[Any] = WorkflowContext(
request_info_executor.id,
[self.__class__.__name__],
self._shared_state,
self._runner.context,
trace_contexts=None, # No parent trace context for new workflow span
source_span_ids=None, # No source span for response handling
)

if not await request_info_executor.has_pending_request(request_id, ctx):
logger.debug(
f"Skipping pre-supplied response for request {request_id}; "
f"no pending request found after checkpoint restoration."
)
continue

await request_info_executor.handle_response(
response_data,
request_id,
ctx,
)

# Continue workflow execution from restored state
events = [
event
async for event in self._run_workflow_with_tracing(
initial_executor_fn=checkpoint_restoration,
initial_executor_fn=None, # Already restored above
reset_context=False, # Don't reset context when resuming from checkpoint
)
]
Expand Down
Loading