|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | from __future__ import annotations |
16 | | -from google.adk import Agent, Context, Event, Workflow |
17 | | -from google.adk.workflow import node |
| 16 | + |
18 | 17 | import asyncio |
19 | 18 |
|
| 19 | +from google.adk import Agent |
| 20 | +from google.adk import Context |
| 21 | +from google.adk import Event |
| 22 | +from google.adk import Workflow |
| 23 | +from google.adk.workflow import node |
| 24 | + |
20 | 25 | # Worker agent to generate a headline for a single topic |
21 | 26 | generator = Agent( |
22 | 27 | name="generator", |
23 | 28 | model="gemini-2.5-flash", |
24 | | - instruction="Write a catchy headline about the topic provided in the user message.", |
| 29 | + instruction=( |
| 30 | + "Write a catchy one-line headline about the topic provided in the user" |
| 31 | + " message." |
| 32 | + ), |
25 | 33 | ) |
26 | 34 |
|
| 35 | + |
27 | 36 | @node(rerun_on_resume=True) |
28 | 37 | async def orchestrator(ctx: Context, node_input: str) -> str: |
29 | | - """Orchestrator node that performs dynamic fan-out and fan-in.""" |
30 | | - # Split input comma-separated string into topics |
31 | | - topics = [t.strip() for t in node_input.split(",") if t.strip()] |
32 | | - yield Event(message=f"Processing {len(topics)} topics in parallel.") |
33 | | - |
34 | | - # Fan-out: Schedule a dynamic node for each topic |
35 | | - tasks = [] |
36 | | - for i, topic in enumerate(topics): |
37 | | - tasks.append( |
38 | | - ctx.run_node( |
39 | | - generator, |
40 | | - node_input=topic, |
41 | | - sub_branch=f"branch_{i}" # Isolate events to prevent context mess-up |
42 | | - ) |
| 38 | + """Orchestrator node that performs dynamic fan-out and fan-in.""" |
| 39 | + # Split input comma-separated string into topics |
| 40 | + topics = [t.strip() for t in node_input.split(",") if t.strip()] |
| 41 | + yield Event(message=f"Processing {len(topics)} topics in parallel.") |
| 42 | + |
| 43 | + # Fan-out: Schedule a dynamic node for each topic |
| 44 | + tasks = [] |
| 45 | + for i, topic in enumerate(topics): |
| 46 | + tasks.append( |
| 47 | + ctx.run_node( |
| 48 | + generator, |
| 49 | + node_input=topic, |
| 50 | + is_parallel=True, |
43 | 51 | ) |
| 52 | + ) |
| 53 | + |
| 54 | + # Wait for all tasks to complete |
| 55 | + results = await asyncio.gather(*tasks) |
44 | 56 |
|
45 | | - # Wait for all tasks to complete |
46 | | - results = await asyncio.gather(*tasks) |
| 57 | + # Fan-in: Aggregate results |
| 58 | + aggregated = "### Aggregated Headlines\n\n" |
| 59 | + aggregated += "| Topic | Headline |\n" |
| 60 | + aggregated += "| :--- | :--- |\n" |
| 61 | + for topic, headline in zip(topics, results): |
| 62 | + aggregated += f"| {topic} | {headline} |\n" |
47 | 63 |
|
48 | | - # Fan-in: Aggregate results |
49 | | - aggregated = "Aggregated Headlines:\n" |
50 | | - for topic, headline in zip(topics, results): |
51 | | - aggregated += f"- Topic [{topic}]: {headline}\n" |
| 64 | + yield Event(message=aggregated) |
52 | 65 |
|
53 | | - yield Event(output=aggregated) |
54 | 66 |
|
55 | 67 | root_agent = Workflow( |
56 | 68 | name="dynamic_fan_out_fan_in", |
|
0 commit comments