Skip to content

Commit c1bdd9b

Browse files
committed
refactor: format dyanmic fan out fan in sample better.
Change-Id: Ifbab67f85122a1400f67df189b0bad24a3eb848b
1 parent 2cc100a commit c1bdd9b

1 file changed

Lines changed: 36 additions & 24 deletions

File tree

  • contributing/workflow_samples/dynamic_fan_out_fan_in

contributing/workflow_samples/dynamic_fan_out_fan_in/agent.py

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,44 +13,56 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16-
from google.adk import Agent, Context, Event, Workflow
17-
from google.adk.workflow import node
16+
1817
import asyncio
1918

19+
from google.adk import Agent
20+
from google.adk import Context
21+
from google.adk import Event
22+
from google.adk import Workflow
23+
from google.adk.workflow import node
24+
2025
# Worker agent to generate a headline for a single topic
2126
generator = Agent(
2227
name="generator",
2328
model="gemini-2.5-flash",
24-
instruction="Write a catchy headline about the topic provided in the user message.",
29+
instruction=(
30+
"Write a catchy one-line headline about the topic provided in the user"
31+
" message."
32+
),
2533
)
2634

35+
2736
@node(rerun_on_resume=True)
2837
async def orchestrator(ctx: Context, node_input: str) -> str:
29-
"""Orchestrator node that performs dynamic fan-out and fan-in."""
30-
# Split input comma-separated string into topics
31-
topics = [t.strip() for t in node_input.split(",") if t.strip()]
32-
yield Event(message=f"Processing {len(topics)} topics in parallel.")
33-
34-
# Fan-out: Schedule a dynamic node for each topic
35-
tasks = []
36-
for i, topic in enumerate(topics):
37-
tasks.append(
38-
ctx.run_node(
39-
generator,
40-
node_input=topic,
41-
sub_branch=f"branch_{i}" # Isolate events to prevent context mess-up
42-
)
38+
"""Orchestrator node that performs dynamic fan-out and fan-in."""
39+
# Split input comma-separated string into topics
40+
topics = [t.strip() for t in node_input.split(",") if t.strip()]
41+
yield Event(message=f"Processing {len(topics)} topics in parallel.")
42+
43+
# Fan-out: Schedule a dynamic node for each topic
44+
tasks = []
45+
for i, topic in enumerate(topics):
46+
tasks.append(
47+
ctx.run_node(
48+
generator,
49+
node_input=topic,
50+
is_parallel=True,
4351
)
52+
)
53+
54+
# Wait for all tasks to complete
55+
results = await asyncio.gather(*tasks)
4456

45-
# Wait for all tasks to complete
46-
results = await asyncio.gather(*tasks)
57+
# Fan-in: Aggregate results
58+
aggregated = "### Aggregated Headlines\n\n"
59+
aggregated += "| Topic | Headline |\n"
60+
aggregated += "| :--- | :--- |\n"
61+
for topic, headline in zip(topics, results):
62+
aggregated += f"| {topic} | {headline} |\n"
4763

48-
# Fan-in: Aggregate results
49-
aggregated = "Aggregated Headlines:\n"
50-
for topic, headline in zip(topics, results):
51-
aggregated += f"- Topic [{topic}]: {headline}\n"
64+
yield Event(message=aggregated)
5265

53-
yield Event(output=aggregated)
5466

5567
root_agent = Workflow(
5668
name="dynamic_fan_out_fan_in",

0 commit comments

Comments
 (0)