Skip to content

Commit dc2adf9

Browse files
committed
feat(workflow): Use SingleAgentReactNode for leaf single_turn agents in new Workflow
In the new _workflow_class.py Workflow, leaf single_turn LlmAgents now run through SingleAgentReactNode instead of _SingleLlmAgent. This enables proper event deduplication via _output_delegated — only the LlmCallNode content event appears in the stream, not a duplicate wrapper output event. The old _workflow.py Workflow retains the original _SingleLlmAgent path. Runtime detection uses event_queue presence on InvocationContext to distinguish the two paths. Also updates the use_as_output sample to use an LLM agent as the delegated child node, and rewrites wrapper tests per adk-style guidelines covering both old and new workflow paths. Change-Id: I96aea3f12fe432d0449268eb75580c900fd047d7
1 parent ca32732 commit dc2adf9

5 files changed

Lines changed: 401 additions & 352 deletions

File tree

contributing/workflow_samples/use_as_output/README.md

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ This sample demonstrates how to use `ctx.run_node(node, use_as_output=True)` to
66

77
When `use_as_output=True` is set, the child node's output replaces the parent's output. The parent's own output event is suppressed to avoid duplication, and the child's output flows downstream through the graph as if the parent produced it.
88

9+
The child node can be any node type — this sample uses a single_turn LLM agent (`summarizer`) as the delegated child.
10+
911
## Sample Inputs
1012

11-
- Any text input (e.g. `hello world`)
13+
- Any text input (e.g. `The quick brown fox jumped over the lazy dog near the riverbank on a warm summer afternoon`)
1214

1315
## Graph
1416

@@ -17,33 +19,38 @@ When `use_as_output=True` is set, the child node's output replaces the parent's
1719
|
1820
v
1921
[orchestrate]
20-
| (delegates output to [transform] via ctx.run_node)
22+
| (delegates output to [summarizer] via ctx.run_node)
2123
v
2224
[finalize]
2325
```
2426

2527
## How To
2628

27-
1. **Mark the orchestrator as rerun_on_resume**: The parent node that calls `ctx.run_node` must use `@node(rerun_on_resume=True)`.
29+
1. **Define the child node**: The child can be a function or an LLM agent. Its output becomes the parent's output and flows to downstream nodes.
30+
31+
```python
32+
from google.adk import Agent
33+
34+
summarizer = Agent(
35+
name='summarizer',
36+
model='gemini-2.5-flash',
37+
instruction='Summarize the following text in one sentence.',
38+
)
39+
```
40+
41+
2. **Mark the orchestrator as rerun_on_resume**: The parent node that calls `ctx.run_node` must use `@node(rerun_on_resume=True)`.
2842

2943
```python
3044
from google.adk.workflow import node
3145

3246
@node(rerun_on_resume=True)
3347
async def orchestrate(ctx: Context, node_input: str) -> str:
3448
return await ctx.run_node(
35-
transform, node_input=node_input, use_as_output=True
49+
summarizer, node_input=node_input, use_as_output=True
3650
)
3751
```
3852

39-
2. **Define the child node**: The child is a plain function. Its output becomes the parent's output and flows to downstream nodes.
40-
41-
```python
42-
def transform(node_input: str) -> str:
43-
return node_input.upper()
44-
```
45-
46-
3. **Downstream receives delegated output**: The `finalize` node receives the child's output (`"HELLO WORLD"`) as its `node_input`, not the parent's.
53+
3. **Downstream receives delegated output**: The `finalize` node receives the LLM's summary as its `node_input`, not the parent's.
4754

4855
```python
4956
def finalize(node_input: str) -> str:

contributing/workflow_samples/use_as_output/agent.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,24 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from google.adk import Agent
1516
from google.adk import Context
1617
from google.adk.workflow import node
1718
from google.adk.workflow._base_node import START
1819
from google.adk.workflow._workflow_class import Workflow
1920

2021

21-
def transform(node_input: str) -> str:
22-
return node_input.upper()
22+
summarizer = Agent(
23+
name='summarizer',
24+
model='gemini-2.5-flash',
25+
instruction='Summarize the following text in one sentence.',
26+
)
2327

2428

2529
@node(rerun_on_resume=True)
2630
async def orchestrate(ctx: Context, node_input: str) -> str:
2731
return await ctx.run_node(
28-
transform, node_input=node_input, use_as_output=True
32+
summarizer, node_input=node_input, use_as_output=True
2933
)
3034

3135

src/google/adk/agents/llm/_single_agent_react_node.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ async def _run_impl(
8383
ctx: Context,
8484
node_input: Any,
8585
) -> AsyncGenerator[Any, None]:
86-
# Ensure IC has the agent — needed by downstream code
87-
# (e.g. _build_response_event reads invocation_context.agent.name).
88-
# When run via Runner(node=...), ic.agent is None.
86+
# Set IC agent — needed by downstream code
87+
# (e.g. _build_basic_request reads agent.canonical_model).
88+
# When run inside a Workflow, ic.agent may be the Workflow itself
89+
# which lacks canonical_model. Always override.
8990
# TODO: remove this dependency.
90-
if ctx._invocation_context.agent is None:
91-
ctx._invocation_context.agent = self.agent
91+
ctx._invocation_context.agent = self.agent
9292

9393
# Always provide our own scheduler so child nodes
9494
# (LlmCallNode, ParallelToolCallNode) are managed by this node,

src/google/adk/workflow/_llm_agent_wrapper.py

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
1717
- Sets a branch for content isolation (single_turn mode only)
1818
- Converts node_input to user content (single_turn mode only)
19+
- Runs leaf single_turn agents via SingleAgentReactNode (new Workflow)
20+
or _SingleLlmAgent (old Workflow)
1921
- Re-emits finish_task output so the outer node_runner can route it
2022
"""
2123

@@ -57,8 +59,12 @@ class _LlmAgentWrapper(BaseNode):
5759
"""Adapts a task/single_turn LlmAgent for use as a workflow graph node.
5860
5961
Output handling by mode:
60-
single_turn (leaf, no sub_agents): Bypasses Mesh by running
61-
_SingleLlmAgent directly. Output is extracted via
62+
single_turn (leaf, no sub_agents, new Workflow): Runs the ReAct
63+
loop via SingleAgentReactNode. LlmCallNode events are enqueued
64+
internally; the wrapper post-processes the final text output
65+
(output_schema validation, output_key storage).
66+
single_turn (leaf, no sub_agents, old Workflow): Bypasses Mesh by
67+
running _SingleLlmAgent directly. Output is extracted via
6268
LlmAgent._maybe_save_output_to_state and emitted as a
6369
separate Event before END_OF_AGENT.
6470
single_turn (with sub_agents): Runs the full LlmAgent which
@@ -72,6 +78,7 @@ class _LlmAgentWrapper(BaseNode):
7278
agent: LlmAgent = Field(...)
7379
rerun_on_resume: bool = Field(default=True)
7480
_single: Any = PrivateAttr(default=None)
81+
_react: Any = PrivateAttr(default=None)
7582

7683
@model_validator(mode='before')
7784
@classmethod
@@ -96,12 +103,21 @@ def _validate_and_default_mode(self) -> _LlmAgentWrapper:
96103
if self.agent.mode == 'task':
97104
self.wait_for_output = True
98105

99-
# For leaf single_turn agents, use _SingleLlmAgent directly,
100-
# bypassing the _Mesh orchestration layer.
106+
# For leaf single_turn agents, prepare both execution paths.
107+
# The old Workflow (uses _node_runner.py, no event_queue) needs
108+
# _SingleLlmAgent. The new Workflow (_workflow_class.py, sets
109+
# event_queue) uses SingleAgentReactNode. The choice is made at
110+
# runtime in _run_impl based on event_queue presence.
101111
if self.agent.mode == 'single_turn' and not self.agent.sub_agents:
112+
from ..agents.llm._single_agent_react_node import (
113+
SingleAgentReactNode,
114+
)
102115
from ..agents.llm._single_llm_agent import _SingleLlmAgent
103116

104117
self._single = _SingleLlmAgent.from_base_llm_agent(self.agent)
118+
self._react = SingleAgentReactNode(
119+
name=self.agent.name, agent=self.agent
120+
)
105121

106122
return self
107123

@@ -122,6 +138,10 @@ def model_copy(
122138
copied._single = copied._single.model_copy(
123139
update={'name': update['name']}
124140
)
141+
if copied._react is not None:
142+
copied._react = copied._react.model_copy(
143+
update={'name': update['name'], 'agent': copied.agent}
144+
)
125145
return copied
126146

127147
def _validate_input(self, node_input: Any) -> None:
@@ -153,13 +173,28 @@ def _prepare_input(
153173
ic = ctx._invocation_context.model_copy(
154174
update={'branch': branch},
155175
)
176+
# event_queue is excluded from model_copy; propagate manually
177+
# so child NodeRunners can enqueue events.
178+
ic.event_queue = ctx._invocation_context.event_queue
156179
agent_ctx = Context(
157180
invocation_context=ic,
158181
node_path=ctx.node_path,
159182
execution_id=ctx.execution_id,
160183
)
161184
return agent_ctx, agent_input
162185

186+
def _use_react_path(self, ctx: Context) -> bool:
187+
"""Returns True if we should use SingleAgentReactNode (new Workflow).
188+
189+
The new Workflow (_workflow_class.py) runs via Runner._run_node_async
190+
which sets ic.event_queue. The old Workflow (_workflow.py) runs via
191+
Runner.run_async (agent path) which does not set event_queue.
192+
"""
193+
return (
194+
self._react is not None
195+
and ctx._invocation_context.event_queue is not None
196+
)
197+
163198
@override
164199
async def _run_impl(
165200
self,
@@ -171,6 +206,47 @@ async def _run_impl(
171206
self._validate_input(node_input)
172207
agent_ctx, agent_input = self._prepare_input(ctx, node_input)
173208

209+
if self._use_react_path(ctx):
210+
# New Workflow: leaf single_turn via SingleAgentReactNode.
211+
# Inject input as user content in session, then run the react
212+
# loop. LlmCallNode events are enqueued to event_queue internally;
213+
# only the final text output comes through the generator.
214+
if agent_input is not None:
215+
ic = agent_ctx._invocation_context
216+
ic.session.events.append(
217+
Event(
218+
invocation_id=ic.invocation_id,
219+
author='user',
220+
branch=ic.branch,
221+
content=agent_input,
222+
)
223+
)
224+
output = None
225+
async for event in self._react.run(
226+
ctx=agent_ctx, node_input=None
227+
):
228+
if isinstance(event, Event) and event.output is not None:
229+
output = event.output
230+
231+
if output is not None:
232+
if isinstance(output, str) and self.agent.output_schema:
233+
if not output.strip():
234+
return
235+
from ..utils._schema_utils import validate_schema
236+
237+
output = validate_schema(self.agent.output_schema, output)
238+
if self.agent.output_key:
239+
ctx.actions.state_delta[self.agent.output_key] = output
240+
# Mark output as delegated so the wrapper's NodeRunner
241+
# captures the value without enqueuing a separate output
242+
# event. The LlmCallNode content event (already enqueued)
243+
# carries the visible response; this avoids duplication.
244+
ctx._output_delegated = True
245+
yield output
246+
return
247+
yield # noqa: unreachable — keeps this an async generator
248+
249+
# Determine inner runner: _SingleLlmAgent for leaf, agent for others.
174250
inner = self._single if self._single is not None else self.agent
175251

176252
# When the agent has parallel_worker=True, call run_node_impl()
@@ -182,8 +258,8 @@ async def _run_impl(
182258

183259
if self.agent.mode == 'single_turn':
184260
if self._single is not None:
185-
# Leaf agent bypass: since we skip LlmAgent.run_node_impl(),
186-
# replicate its output handling here.
261+
# Old Workflow: leaf agent bypass. Since we skip
262+
# LlmAgent.run_node_impl(), replicate its output handling.
187263
# _maybe_save_output_to_state applies output_schema/output_key
188264
# and clears content on the final response. We emit the output
189265
# as a pathless Event before END_OF_AGENT.

0 commit comments

Comments
 (0)