Skip to content

Commit cc3ba8a

Browse files
committed
refactor(workflow): Make JoinNode stateless by moving join logic to orchestrator
Moved the stateful aggregation logic from JoinNode to the Workflow orchestrator to simplify node implementation and prevent state leaks in loops. Removed in_nodes from Context as it is no longer needed. Change-Id: If0677f65c735ee3bce985478f7990b4a3335b7df
1 parent 546ca73 commit cc3ba8a

9 files changed

Lines changed: 142 additions & 229 deletions

File tree

contributing/workflow_samples/fan_out_fan_in/tests/go.json

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -57,56 +57,9 @@
5757
"output": "og"
5858
},
5959
{
60-
"actions": {
61-
"stateDelta": {
62-
"root_agent@1/join_for_results@1_join_state": {
63-
"make_uppercase": {
64-
"branch": "make_uppercase@1",
65-
"input": "GO"
66-
}
67-
}
68-
}
69-
},
7060
"author": "root_agent",
71-
"branch": "make_uppercase@1",
7261
"id": "e-5",
7362
"invocationId": "i-1",
74-
"nodeInfo": {
75-
"path": "root_agent@1/join_for_results@1"
76-
}
77-
},
78-
{
79-
"actions": {
80-
"stateDelta": {
81-
"root_agent@1/join_for_results@1_join_state": {
82-
"count_characters": {
83-
"branch": "count_characters@1",
84-
"input": 2
85-
},
86-
"make_uppercase": {
87-
"branch": "make_uppercase@1",
88-
"input": "GO"
89-
}
90-
}
91-
}
92-
},
93-
"author": "root_agent",
94-
"branch": "count_characters@1",
95-
"id": "e-6",
96-
"invocationId": "i-1",
97-
"nodeInfo": {
98-
"path": "root_agent@1/join_for_results@1"
99-
}
100-
},
101-
{
102-
"actions": {
103-
"stateDelta": {
104-
"root_agent@1/join_for_results@1_join_state": null
105-
}
106-
},
107-
"author": "root_agent",
108-
"id": "e-7",
109-
"invocationId": "i-1",
11063
"nodeInfo": {
11164
"outputFor": [
11265
"root_agent@1/join_for_results@1"
@@ -129,7 +82,7 @@
12982
],
13083
"role": "user"
13184
},
132-
"id": "e-8",
85+
"id": "e-6",
13386
"invocationId": "i-1",
13487
"nodeInfo": {
13588
"path": "root_agent@1/aggregate@1"

contributing/workflow_samples/nested_workflow/tests/1984.json

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"content": {
3737
"parts": [
3838
{
39-
"text": "LeBron James"
39+
"text": "Scarlett Johansson"
4040
}
4141
],
4242
"role": "model"
@@ -58,7 +58,7 @@
5858
"content": {
5959
"parts": [
6060
{
61-
"text": "LeBron James, often hailed as \"King James,\" is a legendary basketball player widely considered one of the greatest of all time. With four NBA championships, four MVP awards, and the league's all-time scoring record, he has consistently shattered expectations and redefined player longevity. Beyond the court, James is a powerful voice for social justice and a successful entrepreneur and philanthropist through initiatives like the I PROMISE program."
61+
"text": "Scarlett Johansson is an acclaimed actress renowned for her distinctive voice and versatile performances across a wide range of genres. From her captivating early roles in films like \"Lost in Translation\" to her iconic portrayal of Black Widow in the Marvel Cinematic Universe, she has consistently delivered powerful performances. A four-time Golden Globe nominee and two-time Academy Award nominee, she remains one of Hollywood's most bankable and respected stars."
6262
}
6363
],
6464
"role": "model"
@@ -81,7 +81,7 @@
8181
"content": {
8282
"parts": [
8383
{
84-
"text": "The Bhopal disaster occurred in December when a deadly leak of methyl isocyanate gas from a Union Carbide pesticide plant instantly killed thousands and sickened hundreds of thousands more. This catastrophic industrial accident remains one of the world's worst, profoundly influencing international environmental regulations and corporate accountability."
84+
"text": "In December 1984, the Union Carbide chemical plant in Bhopal, India, experienced a catastrophic gas leak, releasing deadly methyl isocyanate. This disaster killed thousands instantly and caused hundreds of thousands of injuries, becoming one of the world's worst industrial accidents and a lasting symbol of corporate negligence."
8585
}
8686
],
8787
"role": "model"
@@ -98,55 +98,31 @@
9898
}
9999
},
100100
{
101-
"actions": {
102-
"stateDelta": {
103-
"root_agent@1/join_for_aggregation@1_join_state": {
104-
"find_famous_person": {
105-
"branch": "find_famous_person@1",
106-
"input": "LeBron James, often hailed as \"King James,\" is a legendary basketball player widely considered one of the greatest of all time. With four NBA championships, four MVP awards, and the league's all-time scoring record, he has consistently shattered expectations and redefined player longevity. Beyond the court, James is a powerful voice for social justice and a successful entrepreneur and philanthropist through initiatives like the I PROMISE program."
107-
}
108-
}
109-
}
110-
},
111101
"author": "root_agent",
112-
"branch": "find_famous_person@1",
113102
"id": "e-6",
114103
"invocationId": "i-1",
115-
"nodeInfo": {
116-
"path": "root_agent@1/join_for_aggregation@1"
117-
}
118-
},
119-
{
120-
"actions": {
121-
"stateDelta": {
122-
"root_agent@1/join_for_aggregation@1_join_state": null
123-
}
124-
},
125-
"author": "root_agent",
126-
"id": "e-7",
127-
"invocationId": "i-1",
128104
"nodeInfo": {
129105
"outputFor": [
130106
"root_agent@1/join_for_aggregation@1"
131107
],
132108
"path": "root_agent@1/join_for_aggregation@1"
133109
},
134110
"output": {
135-
"find_famous_person": "LeBron James, often hailed as \"King James,\" is a legendary basketball player widely considered one of the greatest of all time. With four NBA championships, four MVP awards, and the league's all-time scoring record, he has consistently shattered expectations and redefined player longevity. Beyond the court, James is a powerful voice for social justice and a successful entrepreneur and philanthropist through initiatives like the I PROMISE program.",
136-
"find_historical_event": "The Bhopal disaster occurred in December when a deadly leak of methyl isocyanate gas from a Union Carbide pesticide plant instantly killed thousands and sickened hundreds of thousands more. This catastrophic industrial accident remains one of the world's worst, profoundly influencing international environmental regulations and corporate accountability."
111+
"find_famous_person": "Scarlett Johansson is an acclaimed actress renowned for her distinctive voice and versatile performances across a wide range of genres. From her captivating early roles in films like \"Lost in Translation\" to her iconic portrayal of Black Widow in the Marvel Cinematic Universe, she has consistently delivered powerful performances. A four-time Golden Globe nominee and two-time Academy Award nominee, she remains one of Hollywood's most bankable and respected stars.",
112+
"find_historical_event": "In December 1984, the Union Carbide chemical plant in Bhopal, India, experienced a catastrophic gas leak, releasing deadly methyl isocyanate. This disaster killed thousands instantly and caused hundreds of thousands of injuries, becoming one of the world's worst industrial accidents and a lasting symbol of corporate negligence."
137113
}
138114
},
139115
{
140116
"author": "root_agent",
141117
"content": {
142118
"parts": [
143119
{
144-
"text": "# Year: 1984\n\n## Famous Person Bio:\n\nLeBron James, often hailed as \"King James,\" is a legendary basketball player widely considered one of the greatest of all time. With four NBA championships, four MVP awards, and the league's all-time scoring record, he has consistently shattered expectations and redefined player longevity. Beyond the court, James is a powerful voice for social justice and a successful entrepreneur and philanthropist through initiatives like the I PROMISE program.\n\n## Historical Event:\n\nThe Bhopal disaster occurred in December when a deadly leak of methyl isocyanate gas from a Union Carbide pesticide plant instantly killed thousands and sickened hundreds of thousands more. This catastrophic industrial accident remains one of the world's worst, profoundly influencing international environmental regulations and corporate accountability."
120+
"text": "# Year: 1984\n\n## Famous Person Bio:\n\nScarlett Johansson is an acclaimed actress renowned for her distinctive voice and versatile performances across a wide range of genres. From her captivating early roles in films like \"Lost in Translation\" to her iconic portrayal of Black Widow in the Marvel Cinematic Universe, she has consistently delivered powerful performances. A four-time Golden Globe nominee and two-time Academy Award nominee, she remains one of Hollywood's most bankable and respected stars.\n\n## Historical Event:\n\nIn December 1984, the Union Carbide chemical plant in Bhopal, India, experienced a catastrophic gas leak, releasing deadly methyl isocyanate. This disaster killed thousands instantly and caused hundreds of thousands of injuries, becoming one of the world's worst industrial accidents and a lasting symbol of corporate negligence."
145121
}
146122
],
147123
"role": "user"
148124
},
149-
"id": "e-8",
125+
"id": "e-7",
150126
"invocationId": "i-1",
151127
"nodeInfo": {
152128
"path": "root_agent@1/aggregate_results@1"

src/google/adk/agents/context.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ def __init__(
8585
node_path: str = '',
8686
run_id: str = '',
8787
triggered_by: str = '',
88-
in_nodes: set[str] | None = None,
8988
resume_inputs: dict[str, Any] | None = None,
9089
schedule_dynamic_node_internal: ScheduleDynamicNodeInternal | None = None,
9190
node_rerun_on_resume: bool = True,
@@ -107,7 +106,6 @@ def __init__(
107106
node_path: The path of the current node in the workflow graph.
108107
run_id: The execution ID of the current node.
109108
triggered_by: The name of the node that triggered the current node.
110-
in_nodes: Names of predecessor nodes.
111109
resume_inputs: Inputs for resuming node, keyed by interrupt id.
112110
node_rerun_on_resume: Whether the node reruns on resume.
113111
retry_count: Number of times this node has been retried.
@@ -134,9 +132,6 @@ def __init__(
134132
self._node_path = node_path
135133
self._run_id = run_id
136134
self._triggered_by = triggered_by
137-
self._in_nodes = (
138-
frozenset(in_nodes) if in_nodes is not None else frozenset()
139-
)
140135
self._resume_inputs = resume_inputs or {}
141136
self._workflow_scheduler = schedule_dynamic_node_internal
142137
self._node_rerun_on_resume = node_rerun_on_resume
@@ -227,11 +222,6 @@ def attempt_count(self) -> int:
227222
"""Returns the current attempt number (1-based)."""
228223
return self._attempt_count
229224

230-
@property
231-
def in_nodes(self) -> frozenset[str]:
232-
"""Returns names of nodes that are predecessors of the current node."""
233-
return self._in_nodes
234-
235225
@property
236226
def resume_inputs(self) -> dict[str, Any]:
237227
"""Returns inputs for resuming node, keyed by interrupt id."""

src/google/adk/workflow/_base_node.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,11 @@ async def _run_impl(
243243
)
244244
yield # AsyncGenerator requires at least one yield statement
245245

246+
@property
247+
def _requires_all_predecessors(self) -> bool:
248+
"""If True, the node waits for all predecessors to complete before running."""
249+
return False
250+
246251

247252
START = BaseNode(name='__START__')
248253
"""Sentinel node marking the entry point of a workflow graph.

src/google/adk/workflow/_join_node.py

Lines changed: 19 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
from __future__ import annotations
1818

19-
import logging
2019
from collections.abc import AsyncGenerator
20+
import logging
2121
from typing import Any
2222

2323
from typing_extensions import override
@@ -48,10 +48,20 @@ class JoinNode(BaseNode):
4848
"""A node that waits for all specified predecessors to trigger it before
4949
outputting."""
5050

51-
wait_for_output: bool = True
51+
@property
52+
@override
53+
def _requires_all_predecessors(self) -> bool:
54+
return True
5255

53-
def _get_state_key(self, node_path: str) -> str:
54-
return f'{node_path}_join_state'
56+
@override
57+
def _validate_input_data(self, data: Any) -> Any:
58+
"""Validates individual trigger inputs against input_schema."""
59+
if self.input_schema and isinstance(data, dict):
60+
return {
61+
k: self._validate_schema(v, self.input_schema)
62+
for k, v in data.items()
63+
}
64+
return super()._validate_input_data(data)
5565

5666
@override
5767
async def _run_impl(
@@ -60,59 +70,8 @@ async def _run_impl(
6070
ctx: Context,
6171
node_input: Any,
6272
) -> AsyncGenerator[Any, None]:
63-
if not ctx.in_nodes:
64-
raise ValueError(
65-
f'JoinNode {self.name} has no predecessors defined in graph.'
66-
)
67-
68-
state_key = self._get_state_key(ctx.node_path)
69-
join_state = (ctx.state.get(state_key) or {}).copy()
70-
71-
triggering_node = ctx.triggered_by
72-
if not triggering_node:
73-
logger.warning(
74-
'JoinNode %s received trigger from node with no name. Ignoring.',
75-
self.name,
76-
)
77-
return
78-
79-
if triggering_node not in ctx.in_nodes:
80-
logger.warning(
81-
'JoinNode %s received trigger from unexpected node %s. Ignoring.',
82-
self.name,
83-
triggering_node,
84-
)
85-
return
86-
87-
# Recording the output and branch from previous node.
88-
join_state[triggering_node] = {
89-
'input': node_input,
90-
'branch': ctx._invocation_context.branch,
91-
}
92-
93-
if set(join_state.keys()) == ctx.in_nodes:
94-
# Extract outputs and branches
95-
outputs = {}
96-
branches = []
97-
for k, v in join_state.items():
98-
if isinstance(v, dict) and 'input' in v:
99-
outputs[k] = v['input']
100-
branches.append(v.get('branch') or '')
101-
else:
102-
# Fallback for old state structure
103-
outputs[k] = v
104-
105-
common_branch = _get_common_branch_prefix(branches)
106-
107-
yield Event(
108-
output=outputs,
109-
branch=common_branch,
110-
# Clear state for future runs
111-
state={state_key: None},
112-
)
113-
else:
114-
# Update state with recorded outputs from previous nodes and wait for
115-
# more triggers.
116-
yield Event(
117-
state={state_key: join_state},
118-
)
73+
"""JoinNode simply passes through the aggregated inputs provided by the orchestrator."""
74+
yield Event(
75+
output=node_input,
76+
branch=ctx._invocation_context.branch,
77+
)

src/google/adk/workflow/_node_runner.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ def __init__(
5757
run_id: str | None = None,
5858
# Graph context
5959
triggered_by: str = "",
60-
in_nodes: set[str] | None = None,
6160
# Output delegation (use_as_output)
6261
additional_output_for_ancestor: str | None = None,
6362
# Resume state from a previous run
@@ -75,7 +74,6 @@ def __init__(
7574
counter string ("1", "2", …) unique per node path.
7675
Falls back to "1" if not provided.
7776
triggered_by: Name of the node that triggered this run.
78-
in_nodes: Names of predecessor nodes in the graph.
7977
additional_output_for_ancestor: Ancestor node path whose
8078
output this node's output also represents (use_as_output).
8179
prior_output: Output from a previous run, carried
@@ -96,7 +94,6 @@ def __init__(
9694

9795
# Graph context
9896
self._triggered_by = triggered_by
99-
self._in_nodes = in_nodes
10097

10198
# Output delegation
10299
self._additional_output_for_ancestor = additional_output_for_ancestor
@@ -246,7 +243,6 @@ def _create_child_context(
246243
schedule_dynamic_node_internal=scheduler,
247244
node_rerun_on_resume=self._node.rerun_on_resume,
248245
triggered_by=self._triggered_by,
249-
in_nodes=self._in_nodes,
250246
output_for_ancestors=ancestors,
251247
event_author=self._parent_ctx.event_author,
252248
state_schema=self._node.state_schema

0 commit comments

Comments
 (0)