Skip to content

fix(agents): detect silently stalled ParallelAgent branches via branch_idle_timeout_secs#5465

Open
STHITAPRAJNAS wants to merge 2 commits intogoogle:mainfrom
STHITAPRAJNAS:fix/parallel-agent-silent-stall-5455
Open

fix(agents): detect silently stalled ParallelAgent branches via branch_idle_timeout_secs#5465
STHITAPRAJNAS wants to merge 2 commits intogoogle:mainfrom
STHITAPRAJNAS:fix/parallel-agent-silent-stall-5455

Conversation

@STHITAPRAJNAS
Copy link
Copy Markdown

Summary

Fixes #5455/run_sse stream hangs indefinitely when a ParallelAgent branch silently stalls.

Root cause

In _merge_agent_run (and the pre-3.11 variant), each branch runs as an asyncio.Task via process_an_agent. That coroutine iterates events_for_one_agent — an async generator rooted in llm.generate_content_async. If the upstream model backend (LiteLLM/vLLM in the report) stops emitting chunks without closing the TCP connection, __anext__() awaits forever: no exception, no StopAsyncIteration. The shared queue.get() in the merge loop then also blocks indefinitely.

Fix

_iter_with_idle_timeout(gen, timeout_secs, branch_name) — a thin wrapper that calls asyncio.wait_for(gen.__anext__(), timeout=timeout_secs) for each chunk. If no chunk arrives within the window it logs a WARNING with the branch name and raises asyncio.TimeoutError, which propagates out of the branch task and unblocks the parent.

ParallelAgent.branch_idle_timeout_secs: Optional[float] = None — opt-in field (default None preserves existing behaviour). Example:

ParallelAgent(
    name="recommend",
    sub_agents=[workout_agent, sleep_agent],
    branch_idle_timeout_secs=120.0,
)

Pre-3.11 bug fix (bonus): _merge_agent_run_pre_3_11 could silently drop a branch exception when the branch put its sentinel before propagate_exceptions() ran in the next loop iteration. Added one more propagate_exceptions() call after the while loop exits.

put_nowait in finally blocks: changed await queue.put(sentinel)queue.put_nowait(sentinel) so the sentinel is always enqueued even if the task is cancelled mid-finally (the queue is unbounded, so put_nowait never blocks or raises).

Test plan

  • test_iter_with_idle_timeout_passes_events_through — normal generator passes through unchanged
  • test_iter_with_idle_timeout_raises_on_stall — stalled generator raises TimeoutError with branch name in message
  • test_iter_with_idle_timeout_stops_on_empty_generator — empty generator exits cleanly
  • test_merge_timeout_stalled_branch_raises — parametrised over both merge functions (3.11+ and pre-3.11)
  • test_merge_timeout_does_not_fire_for_fast_branches — generous timeout does not interrupt normal branches
  • test_merge_timeout_none_disables_guardNone preserves original behaviour
  • test_parallel_agent_stalled_branch_raises_timeout — end-to-end: one branch stalls after first event (matches issue /run_sse stream stays open when a ParallelAgent branch has no terminal event #5455 production pattern)
  • test_parallel_agent_stalled_from_start_raises_timeout — branch stalls before emitting any event
  • test_parallel_agent_slow_but_not_stalled_completes — slow (but non-stalled) branch is not incorrectly cancelled
  • test_parallel_agent_timeout_logged_as_warning — branch name appears in caplog records
  • test_parallel_agent_timeout_error_is_timeout_in_exception_group — 3.11+: ExceptionGroup wraps TimeoutError
  • All 8 existing test_parallel_agent.py tests still pass (no regression)

STHITAPRAJNAS and others added 2 commits April 23, 2026 23:50
…view

- Replace <= with == in the truncation test: the guard is deterministic
  so the result should be exactly 255 chars, not merely at most 255
- Assert the concrete expected string ("x" * 241 + "...[truncated]")
  rather than importing private module symbols, reducing coupling to
  internal implementation details
- Add test_from_event_with_none_error_message to cover the falsy branch
  of the truncation condition and document that None passes through
  unchanged
…silent stalls

When an upstream model stream (LiteLLM/vLLM or any backend) silently stops
producing chunks without closing the connection, process_an_agent in
_merge_agent_run blocks forever on __anext__() with no exception or EOF.
The queue.get() in the merge loop then also waits indefinitely, leaving the
/run_sse HTTP stream open until an external gateway times out (issue google#5455).

Root cause: there is no per-chunk idle timeout anywhere in the parallel branch
merge loop, so a silently stalled model connection is indistinguishable from
a legitimately slow one.

Fix: add _iter_with_idle_timeout(), which wraps each __anext__() call with
asyncio.wait_for(). If no event arrives within branch_idle_timeout_secs, it
logs a warning and raises asyncio.TimeoutError, unblocking both the branch
task and the merge loop so the SSE stream can close with an explicit error
rather than hanging.

Additional fix (pre-3.11 path): _merge_agent_run_pre_3_11 could silently
swallow a branch exception when the branch put its sentinel into the queue
before propagate_exceptions() ran. Added a final propagate_exceptions() call
after the while loop exits to catch that race.

Also: changed await queue.put(sentinel) → queue.put_nowait(sentinel) in all
finally blocks. The queue is unbounded so put_nowait never blocks, and it
cannot be interrupted by CancelledError during task cancellation.

Usage:
  ParallelAgent(
      name='recommend',
      sub_agents=[workout_agent, sleep_agent],
      branch_idle_timeout_secs=120.0,  # raise if no event for 2 min
  )

Fixes google#5455
@google-cla
Copy link
Copy Markdown

google-cla Bot commented Apr 24, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@adk-bot
Copy link
Copy Markdown
Collaborator

adk-bot commented Apr 24, 2026

Response from ADK Triaging Agent

Hello @STHITAPRAJNAS, thank you for your contribution!

Before we can merge this pull request, you'll need to sign the Contributor License Agreement (CLA). You can find more information at https://cla.developers.google.com/.

Once the CLA is signed, the 'cla/google' check will pass, and we can proceed with the review. Thanks!

@adk-bot adk-bot added the core [Component] This issue is related to the core interface and implementation label Apr 24, 2026
@rohityan rohityan self-assigned this Apr 24, 2026
@rohityan rohityan added the request clarification [Status] The maintainer need clarification or more information from the author label Apr 24, 2026
@rohityan
Copy link
Copy Markdown
Collaborator

Hi @STHITAPRAJNAS , Thank you for your contribution! It appears you haven't yet signed the Contributor License Agreement (CLA). Please visit https://cla.developers.google.com/ to complete the signing process. Once the CLA is signed, we'll be able to proceed with the review of your PR. Thank you!

@STHITAPRAJNAS
Copy link
Copy Markdown
Author

@rohityan actually I did sign and when I attempt again , it says that I have already signed . Can we check now if the workflow detects the CLA

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core [Component] This issue is related to the core interface and implementation request clarification [Status] The maintainer need clarification or more information from the author

Projects

None yet

Development

Successfully merging this pull request may close these issues.

/run_sse stream stays open when a ParallelAgent branch has no terminal event

4 participants