fix(agents): detect silently stalled ParallelAgent branches via branch_idle_timeout_secs#5465
Conversation
…view
- Replace <= with == in the truncation test: the guard is deterministic
so the result should be exactly 255 chars, not merely at most 255
- Assert the concrete expected string ("x" * 241 + "...[truncated]")
rather than importing private module symbols, reducing coupling to
internal implementation details
- Add test_from_event_with_none_error_message to cover the falsy branch
of the truncation condition and document that None passes through
unchanged
…silent stalls When an upstream model stream (LiteLLM/vLLM or any backend) silently stops producing chunks without closing the connection, process_an_agent in _merge_agent_run blocks forever on __anext__() with no exception or EOF. The queue.get() in the merge loop then also waits indefinitely, leaving the /run_sse HTTP stream open until an external gateway times out (issue google#5455). Root cause: there is no per-chunk idle timeout anywhere in the parallel branch merge loop, so a silently stalled model connection is indistinguishable from a legitimately slow one. Fix: add _iter_with_idle_timeout(), which wraps each __anext__() call with asyncio.wait_for(). If no event arrives within branch_idle_timeout_secs, it logs a warning and raises asyncio.TimeoutError, unblocking both the branch task and the merge loop so the SSE stream can close with an explicit error rather than hanging. Additional fix (pre-3.11 path): _merge_agent_run_pre_3_11 could silently swallow a branch exception when the branch put its sentinel into the queue before propagate_exceptions() ran. Added a final propagate_exceptions() call after the while loop exits to catch that race. Also: changed await queue.put(sentinel) → queue.put_nowait(sentinel) in all finally blocks. The queue is unbounded so put_nowait never blocks, and it cannot be interrupted by CancelledError during task cancellation. Usage: ParallelAgent( name='recommend', sub_agents=[workout_agent, sleep_agent], branch_idle_timeout_secs=120.0, # raise if no event for 2 min ) Fixes google#5455
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
|
Response from ADK Triaging Agent Hello @STHITAPRAJNAS, thank you for your contribution! Before we can merge this pull request, you'll need to sign the Contributor License Agreement (CLA). You can find more information at https://cla.developers.google.com/. Once the CLA is signed, the 'cla/google' check will pass, and we can proceed with the review. Thanks! |
|
Hi @STHITAPRAJNAS , Thank you for your contribution! It appears you haven't yet signed the Contributor License Agreement (CLA). Please visit https://cla.developers.google.com/ to complete the signing process. Once the CLA is signed, we'll be able to proceed with the review of your PR. Thank you! |
|
@rohityan actually I did sign and when I attempt again , it says that I have already signed . Can we check now if the workflow detects the CLA |
Summary
Fixes #5455 —
/run_ssestream hangs indefinitely when aParallelAgentbranch silently stalls.Root cause
In
_merge_agent_run(and the pre-3.11 variant), each branch runs as anasyncio.Taskviaprocess_an_agent. That coroutine iteratesevents_for_one_agent— an async generator rooted inllm.generate_content_async. If the upstream model backend (LiteLLM/vLLM in the report) stops emitting chunks without closing the TCP connection,__anext__()awaits forever: no exception, noStopAsyncIteration. The sharedqueue.get()in the merge loop then also blocks indefinitely.Fix
_iter_with_idle_timeout(gen, timeout_secs, branch_name)— a thin wrapper that callsasyncio.wait_for(gen.__anext__(), timeout=timeout_secs)for each chunk. If no chunk arrives within the window it logs aWARNINGwith the branch name and raisesasyncio.TimeoutError, which propagates out of the branch task and unblocks the parent.ParallelAgent.branch_idle_timeout_secs: Optional[float] = None— opt-in field (defaultNonepreserves existing behaviour). Example:Pre-3.11 bug fix (bonus):
_merge_agent_run_pre_3_11could silently drop a branch exception when the branch put its sentinel beforepropagate_exceptions()ran in the next loop iteration. Added one morepropagate_exceptions()call after thewhileloop exits.put_nowaitinfinallyblocks: changedawait queue.put(sentinel)→queue.put_nowait(sentinel)so the sentinel is always enqueued even if the task is cancelled mid-finally(the queue is unbounded, soput_nowaitnever blocks or raises).Test plan
test_iter_with_idle_timeout_passes_events_through— normal generator passes through unchangedtest_iter_with_idle_timeout_raises_on_stall— stalled generator raisesTimeoutErrorwith branch name in messagetest_iter_with_idle_timeout_stops_on_empty_generator— empty generator exits cleanlytest_merge_timeout_stalled_branch_raises— parametrised over both merge functions (3.11+ and pre-3.11)test_merge_timeout_does_not_fire_for_fast_branches— generous timeout does not interrupt normal branchestest_merge_timeout_none_disables_guard—Nonepreserves original behaviourtest_parallel_agent_stalled_branch_raises_timeout— end-to-end: one branch stalls after first event (matches issue /run_sse stream stays open when a ParallelAgent branch has no terminal event #5455 production pattern)test_parallel_agent_stalled_from_start_raises_timeout— branch stalls before emitting any eventtest_parallel_agent_slow_but_not_stalled_completes— slow (but non-stalled) branch is not incorrectly cancelledtest_parallel_agent_timeout_logged_as_warning— branch name appears incaplogrecordstest_parallel_agent_timeout_error_is_timeout_in_exception_group— 3.11+:ExceptionGroupwrapsTimeoutErrortest_parallel_agent.pytests still pass (no regression)