8000 Python: [Bug]: WorkflowExecutor re-sends already-answered RequestInfoEvents after checkpoint restore · Issue #3255 · microsoft/agent-framework · GitHub
[go: up one dir, main page]

Skip to content

Python: [Bug]: WorkflowExecutor re-sends already-answered RequestInfoEvents after checkpoint restore #3255

@Sherlocco

Description

@Sherlocco

Description

Environment

  • Package: agent-framework[azure]==1.0.0b260106
  • Python: 3.12
  • OS: Windows 11

Description

When using request_response() within a sub-workflow that is managed by a WorkflowExecutor, resuming from a checkpoint causes already-answered requests to be re-sent to the parent workflow. This results in either:

  1. ValueError: Response provided for unknown request ID: <uuid> when trying to reply to duplicate requests
  2. Workflow hanging because expected_response_count is incorrect

Reproduction Steps

  1. Create a parent workflow with an Orchestrator executor
  2. Create a sub-workflow (e.g., TicketingWorkflow) wrapped in a WorkflowExecutor
  3. In the sub-workflow, use request_response() to pause and wait for user input (e.g., ticket confirmation)
  4. Save a checkpoint when the workflow becomes idle with pending requests
  5. Resume the workflow from checkpoint and provide a response
  6. The sub-workflow continues and makes another request_response() call (e.g., for ticket description)
  7. Bug occurs: The parent receives BOTH the old (already-answered) request AND the new request

Root Cause Analysis

The issue is in WorkflowExecutor._process_workflow_result() (file: _workflow_executor.py):

async def _process_workflow_result(
    self,
    result: WorkflowRunResult,
    execution_context: ExecutionContext,
    ctx: WorkflowContext,
) -> None:
    # Get all pending request events from the workflow result
    request_info_events = result.get_request_info_events()  # <-- Returns ALL events, including old ones
    
    for event in request_info_events:
        if event.request_id not in execution_context.pending_requests:
            execution_context.pending_requests[event.request_id] = event
            # ...
    
    execution_context.expected_response_count = len(execution_context.pending_requests)  # <-- Incorrect count

The problem:

  1. result.get_request_info_events() returns ALL RequestInfoEvents from the workflow's event stream, including ones that were already answered before the checkpoint
  2. After checkpoint restore, when the sub-workflow continues and yields a NEW request, the old request is ALSO in the event stream
  3. This causes expected_response_count to be 2 when only 1 new request actually exists
  4. The parent workflow receives duplicate SubWorkflowRequestMessages for requests it already handled

Expected Behavior

After checkpoint restore, _process_workflow_result should only process NEW RequestInfoEvents that haven't been answered yet. Already-processed request IDs should be tracked and filtered out.

Code Sample

"""
Minimal reproduction: WorkflowExecutor re-sends already-answered RequestInfoEvents after checkpoint restore

This demonstrates a bug in agent-framework where resuming from a checkpoint causes
already-answered requests to be re-sent when a sub-workflow has multiple request_response() calls.

Based on: https://github.com/microsoft/agent-framework/blob/main/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py

Bug: After checkpoint restore, when the sub-workflow continues and makes a SECOND
request_response() call, the parent workflow receives BOTH the old (already-answered)
request AND the new request, causing:
- ValueError: Response provided for unknown request ID
- OR incorrect expected_response_count

Requirements:
    pip install "agent-framework[azure]==1.0.0b260106"
"""

import asyncio
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, override

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    RequestInfoEvent,
    SubWorkflowRequestMessage,
    SubWorkflowResponseMessage,
    Workflow,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowExecutor,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)

CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "bug_repro_checkpoints"


# =============================================================================
# Request/Response models
# =============================================================================

@dataclass
class StartRequest:
    """Initial request to start the workflow."""
    topic: str


@dataclass
class ConfirmationRequest:
    """FIRST request_response() - asks user for confirmation."""
    id: str = ""
    prompt: str = "Do you want to proceed? (yes/no)"
    
    def __post_init__(self):
        if not self.id:
            import uuid
            self.id = str(uuid.uuid4())


@dataclass
class DescriptionRequest:
    """SECOND request_response() - asks user for description."""
    id: str = ""
    prompt: str = "Please provide a description:"
    
    def __post_init__(self):
        if not self.id:
            import uuid
            self.id = str(uuid.uuid4())


@dataclass
class FinalOutput:
    """Final output from the workflow."""
    topic: str
    confirmed: bool
    description: str


# =============================================================================
# Sub-workflow executors
# =============================================================================

class MultiStepExecutor(Executor):
    """
    Executor with TWO request_info() calls - this triggers the bug.
    
    The bug occurs when:
    1. First request_info() (confirmation) - checkpoint saved
    2. User responds, checkpoint restored
    3. Second request_info() (description) - BUG: old confirmation request re-sent!
    """

    def __init__(self) -> None:
        super().__init__(id="multi_step_executor")
        self._topic: str = ""
        self._confirmed: bool = False

    @handler
    async def handle_start(
        self, 
        request: StartRequest, 
        ctx: WorkflowContext
    ) -> None:
        print(f"\n[SubWorkflow] Received start request: {request.topic}")
        self._topic = request.topic
        
        # =====================================================================
        # FIRST request_info() - Confirmation
        # =====================================================================
        print("[SubWorkflow] Sending FIRST request (confirmation)...")
        await ctx.request_info(
            request_data=ConfirmationRequest(prompt=f"Confirm processing '{request.topic}'? (yes/no)"),
            response_type=str,
        )

    @response_handler
    async def handle_confirmation_response(
        self,
        original_request: ConfirmationRequest,
        response: str,
        ctx: WorkflowContext,
    ) -> None:
        """Handle confirmation response and send second request."""
        print(f"[SubWorkflow] Received confirmation response: {response}")
        
        if response.lower().strip() not in ("yes", "y"):
            await ctx.yield_output(FinalOutput(
                topic=self._topic,
                confirmed=False,
                description="",
            ))
            return
        
        self._confirmed = True
        
        # =====================================================================
        # SECOND request_info() - Description
        # This is where the BUG manifests after checkpoint restore
        # =====================================================================
        print("[SubWorkflow] Sending SECOND request (description)...")
        await ctx.request_info(
            request_data=DescriptionRequest(prompt=f"Provide description for '{self._topic}':"),
            response_type=str,
        )

    @response_handler
    async def handle_description_response(
        self,
        original_request: DescriptionRequest,
        response: str,
        ctx: WorkflowContext[None, FinalOutput],
    ) -> None:
        """Handle description response and output final result."""
        print(f"[SubWorkflow] Received description response: {response}")
        
        await ctx.yield_output(FinalOutput(
            topic=self._topic,
            confirmed=True,
            description=response,
        ))

    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"topic": self._topic, "confirmed": self._confirmed}

    @override
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self._topic = state.get("topic", "")
        self._confirmed = state.get("confirmed", False)
        print(f"[SubWorkflow] Restored state: topic={self._topic}, confirmed={self._confirmed}")


# =============================================================================
# Parent workflow executor
# =============================================================================

class Coordinator(Executor):
    """
    Parent workflow coordinator that handles sub-workflow requests.
    
    This executor intercepts SubWorkflowRequestMessage and re-emits
    the request to the outer workflow for user interaction.
    """

    def __init__(self) -> None:
        super().__init__(id="coordinator")
        self._pending_requests: dict[str, SubWorkflowRequestMessage] = {}

    @handler
    async def kick_off(self, topic: str, ctx: WorkflowContext[StartRequest]) -> None:
        """Start the workflow with a topic."""
        print(f"\n[Coordinator] Starting workflow with topic: {topic}")
        await ctx.send_message(StartRequest(topic=topic))

    @handler
    async def collect_output(
        self, 
        output: FinalOutput, 
        ctx: WorkflowContext[None, FinalOutput]
    ) -> None:
        """Collect and yield the final output."""
        print(f"\n[Coordinator] Received final output: {output}")
        await ctx.yield_output(output)

    @handler
    async def handle_sub_workflow_request(
        self,
        request: SubWorkflowRequestMessage,
        ctx: WorkflowContext,
    ) -> None:
        """
        Handle requests from the sub-workflow.
        
        This is where we can observe the bug: after checkpoint restore,
        this handler receives BOTH the old confirmation request AND the new
        description request.
        """
        data = request.source_event.data
        
        if not isinstance(data, (ConfirmationRequest, DescriptionRequest)):
            raise TypeError(f"Unexpected request type: {type(data)}")
        
        print(f"\n[Coordinator] Received sub-workflow request:")
        print(f"  - Type: {type(data).__name__}")
        print(f"  - ID: {data.id}")
        print(f"  - Prompt: {data.prompt}")
        
        # Track pending request
        self._pending_requests[data.id] = request
        print(f"  - Total pending requests: {len(self._pending_requests)}")
        
        # Forward to outer workflow
        await ctx.request_info(request_data=data, response_type=str)

    @response_handler
    async def handle_request_response(
        self,
        original_request: ConfirmationRequest | DescriptionRequest,
        response: str,
        ctx: WorkflowContext[SubWorkflowResponseMessage],
    ) -> None:
        """Process response and send back to sub-workflow."""
        print(f"\n[Coordinator] Handling response for request ID: {original_request.id}")
        print(f"  - Response: {response}")
        
        request_message = self._pending_requests.pop(original_request.id, None)
        
        if request_message is None:
            raise ValueError(
                f"No matching pending request found for ID: {original_request.id}"
            )
        
        await ctx.send_message(request_message.create_response(response))

    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"pending_requests": self._pending_requests}

    @override
    as
9B0C
ync def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self._pending_requests = state.get("pending_requests", {})
        print(f"\n[Coordinator] Restored {len(self._pending_requests)} pending requests")


# =============================================================================
# Workflow construction
# =============================================================================

def build_sub_workflow() -> WorkflowExecutor:
    """Build the sub-workflow with the multi-step executor."""
    sub_workflow = (
        WorkflowBuilder()
        .register_executor(MultiStepExecutor, name="multi_step")
        .set_start_executor("multi_step")
        .build()
    )
    return WorkflowExecutor(sub_workflow, id="sub_workflow")


def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow:
    """Build the parent workflow that embeds the sub-workflow."""
    return (
        WorkflowBuilder()
        .register_executor(Coordinator, name="coordinator")
        .register_executor(build_sub_workflow, name="sub_executor")
        .set_start_executor("coordinator")
        .add_edge("coordinator", "sub_executor")
        .add_edge("sub_executor", "coordinator")
        .with_checkpointing(storage)
        .build()
    )


# =============================================================================
# Main - demonstrates the bug
# =============================================================================

async def main() -> None:
    # Setup
    CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
    for file in CHECKPOINT_DIR.glob("*.json"):
        file.unlink()
    
    storage = FileCheckpointStorage(CHECKPOINT_DIR)
    
    # =========================================================================
    # STEP 1: Run until first request_response() (confirmation)
    # =========================================================================
    print("\n" + "=" * 70)
    print("STEP 1: Run until first request (confirmation)")
    print("=" * 70)
    
    workflow = build_parent_workflow(storage)
    
    request_id: str | None = None
    async for event in workflow.run_stream("Test Topic"):
        if isinstance(event, RequestInfoEvent):
            request_id = event.request_id
            print(f"\n>>> Captured request ID: {request_id}")
            print(f">>> Request data: {event.data}")
        
        if (isinstance(event, WorkflowStatusEvent) and 
            event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS):
            print("\n>>> Workflow idle with pending requests - checkpoint saved")
            break
    
    if request_id is None:
        raise RuntimeError("No request captured!")
    
    # Get checkpoint
    checkpoints = await storage.list_checkpoints(workflow.id)
    checkpoints.sort(key=lambda cp: cp.timestamp)
    resume_checkpoint = checkpoints[-1]
    print(f"\n>>> Checkpoint ID: {resume_checkpoint.checkpoint_id}")
    
    # =========================================================================
    # STEP 2: Resume and respond to first request (confirmation)
    # =========================================================================
    print("\n" + "=" * 70)
    print("STEP 2: Resume from checkpoint and respond 'yes' to confirmation")
    print("=" * 70)
    
    workflow2 = build_parent_workflow(storage)
    
    # Resume from checkpoint
    request_info_event: RequestInfoEvent | None = None
    async for event in workflow2.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id):
        if isinstance(event, RequestInfoEvent):
            request_info_event = event
            print(f"\n>>> Captured request after resume: {event.data}")
    
    if request_info_event is None:
        raise RuntimeError("No request_info_event captured after resume!")
    
    # Send response "yes" - this should trigger the second request_response()
    print("\n>>> Sending response 'yes' to confirmation...")
    
    second_request_id: str | None = None
    async for event in workflow2.send_responses_streaming({request_info_event.request_id: "yes"}):
        if isinstance(event, RequestInfoEvent):
            second_request_id = event.request_id
            print(f"\n>>> Captured SECOND request ID: {second_request_id}")
            print(f">>> Request data: {event.data}")
        
        if (isinstance(event, WorkflowStatusEvent) and 
            event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS):
            print("\n>>> Workflow idle with pending requests - checkpoint saved")
            break
        
        if isinstance(event, WorkflowOutputEvent):
            print(f"\n>>> Got output: {event.data}")
            break
    
    if second_request_id is None:
        raise RuntimeError("No second request captured!")
    
    # =========================================================================
    # BUG ANALYSIS
    # =========================================================================
    print("\n" + "=" * 70)
    print("BUG ANALYSIS")
    print("=" * 70)
    print("""
The bug was demonstrated in STEP 2 above.

After resuming from checkpoint and responding to the FIRST request (confirmation),
the sub-workflow continued and made a SECOND request (description).

However, the Coordinator received TWO requests:
1. ConfirmationRequest (ID: 5efdd296...) - THIS WAS ALREADY ANSWERED!
2. DescriptionRequest (ID: fbe52803...) - This is the new, legitimate request

The log shows:
  - Total pending requests: 2   <-- BUG! Should be 1

Root cause: WorkflowExecutor._process_workflow_result() calls 
result.get_request_info_events() which returns ALL RequestInfoEvents,
including ones that were already answered before the checkpoint.

This causes:
- expected_response_count = 2 (should be 1)
- Duplicate SubWorkflowRequestMessage sent to parent
- ValueError when trying to respond to duplicate request
""")


if __name__ == "__main__":
    asyncio.run(main())

Error Messages / Stack Traces

Package Versions

agent-framework[azure]==1.0.0b260106

Python Version

python 3.12

Additional Context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingpythonworkflowsRelated to Workflows in agent-framework

Type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0