Skip to content

Commit b63f28c

Browse files
kabirehsavoie
andauthored
feat!: Use AgentEmitter instead of EventQueue in AgentExecutor methods (#621)
BREAKING CHANGE: AgentEmitter contains the methods from the old TaskUpdater and is now the only way agents send results back to the caller. This hides the EventQueue mechanism from users. Also introduced a check that when placing a full Task object on the queue, which should only be done for calls with no existing Task, that the Task's ID is the one expected for the queue, as calculated by the RequestContext. Added Message and Task builders to AgentEmitter to help with using the proper taskID and contextId. Treat A2AErrors internally as a state transition to FAILED. Register EventConsumer callback before starting agent to fix a race condition/hang when an agent completes before the callback is registered. Fixes #604 🦕 --------- Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com> Co-authored-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent 02ba508 commit b63f28c

File tree

29 files changed

+1465
-527
lines changed

29 files changed

+1465
-527
lines changed

README.md

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public class WeatherAgentCardProducer {
143143
import io.a2a.server.agentexecution.AgentExecutor;
144144
import io.a2a.server.agentexecution.RequestContext;
145145
import io.a2a.server.events.EventQueue;
146-
import io.a2a.server.tasks.TaskUpdater;
146+
import io.a2a.server.tasks.AgentEmitter;
147147
import io.a2a.spec.JSONRPCError;
148148
import io.a2a.spec.Message;
149149
import io.a2a.spec.Part;
@@ -173,14 +173,12 @@ public class WeatherAgentExecutorProducer {
173173
}
174174

175175
@Override
176-
public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
177-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
178-
176+
public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
179177
// mark the task as submitted and start working on it
180178
if (context.getTask() == null) {
181-
updater.submit();
179+
agentEmitter.submit();
182180
}
183-
updater.startWork();
181+
agentEmitter.startWork();
184182

185183
// extract the text from the message
186184
String userMessage = extractTextFromMessage(context.getMessage());
@@ -189,16 +187,16 @@ public class WeatherAgentExecutorProducer {
189187
String response = weatherAgent.chat(userMessage);
190188

191189
// create the response part
192-
TextPart responsePart = new TextPart(response, null);
190+
TextPart responsePart = new TextPart(response);
193191
List<Part<?>> parts = List.of(responsePart);
194192

195193
// add the response as an artifact and complete the task
196-
updater.addArtifact(parts, null, null, null);
197-
updater.complete();
194+
agentEmitter.addArtifact(parts);
195+
agentEmitter.complete();
198196
}
199197

200198
@Override
201-
public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
199+
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
202200
Task task = context.getTask();
203201

204202
if (task.getStatus().state() == TaskState.CANCELED) {
@@ -212,8 +210,7 @@ public class WeatherAgentExecutorProducer {
212210
}
213211

214212
// cancel the task
215-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
216-
updater.cancel();
213+
agentEmitter.cancel();
217214
}
218215

219216
private String extractTextFromMessage(Message message) {

examples/cloud-deployment/README.md

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,7 @@ The agent (`CloudAgentExecutorProducer`) implements a command-based protocol:
224224

225225
```java
226226
@Override
227-
public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
228-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
227+
public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
229228
String messageText = extractTextFromMessage(context.getMessage()).trim().toLowerCase();
230229

231230
// Get pod name from Kubernetes downward API
@@ -234,21 +233,21 @@ public void execute(RequestContext context, EventQueue eventQueue) throws JSONRP
234233
if ("complete".equals(messageText)) {
235234
// Completion trigger - add final artifact and complete
236235
String artifactText = "Completed by " + podName;
237-
List<Part<?>> parts = List.of(new TextPart(artifactText, null));
238-
updater.addArtifact(parts);
239-
updater.complete(); // Transition to COMPLETED state
236+
List<Part<?>> parts = List.of(new TextPart(artifactText));
237+
agentEmitter.addArtifact(parts);
238+
agentEmitter.complete(); // Transition to COMPLETED state
240239
} else if (context.getTask() == null) {
241240
// Initial "start" message - create task in SUBMITTED → WORKING state
242-
updater.submit();
243-
updater.startWork();
241+
agentEmitter.submit();
242+
agentEmitter.startWork();
244243
String artifactText = "Started by " + podName;
245-
List<Part<?>> parts = List.of(new TextPart(artifactText, null));
246-
updater.addArtifact(parts);
244+
List<Part<?>> parts = List.of(new TextPart(artifactText));
245+
agentEmitter.addArtifact(parts);
247246
} else {
248247
// Subsequent "process" messages - add artifacts (fire-and-forget, stays WORKING)
249248
String artifactText = "Processed by " + podName;
250-
List<Part<?>> parts = List.of(new TextPart(artifactText, null));
251-
updater.addArtifact(parts);
249+
List<Part<?>> parts = List.of(new TextPart(artifactText));
250+
agentEmitter.addArtifact(parts);
252251
}
253252
}
254253
```

examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77

88
import io.a2a.server.agentexecution.AgentExecutor;
99
import io.a2a.server.agentexecution.RequestContext;
10-
import io.a2a.server.events.EventQueue;
11-
import io.a2a.server.tasks.TaskUpdater;
10+
import io.a2a.server.tasks.AgentEmitter;
1211
import io.a2a.spec.A2AError;
1312
import io.a2a.spec.InternalError;
1413
import io.a2a.spec.Message;
@@ -46,8 +45,7 @@ public AgentExecutor agentExecutor() {
4645
private static class CloudAgentExecutor implements AgentExecutor {
4746

4847
@Override
49-
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
50-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
48+
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
5149

5250
try {
5351
// Extract user message and normalize
@@ -75,26 +73,26 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
7573
LOGGER.info("Completion requested on pod: {}", podName);
7674
String artifactText = "Completed by " + podName;
7775
List<Part<?>> parts = List.of(new TextPart(artifactText));
78-
updater.addArtifact(parts);
79-
updater.complete();
76+
agentEmitter.addArtifact(parts);
77+
agentEmitter.complete();
8078
LOGGER.info("Task completed on pod: {}", podName);
8179

8280
} else if (context.getTask() == null) {
8381
// Initial message - create task in SUBMITTED → WORKING state
8482
LOGGER.info("Creating new task on pod: {}", podName);
85-
updater.submit();
86-
updater.startWork();
83+
agentEmitter.submit();
84+
agentEmitter.startWork();
8785
String artifactText = "Started by " + podName;
8886
List<Part<?>> parts = List.of(new TextPart(artifactText));
89-
updater.addArtifact(parts);
87+
agentEmitter.addArtifact(parts);
9088
LOGGER.info("Task created and started on pod: {}", podName);
9189

9290
} else {
9391
// Subsequent messages - add artifacts (fire-and-forget, stays in WORKING)
9492
LOGGER.info("Adding artifact on pod: {}", podName);
9593
String artifactText = "Processed by " + podName;
9694
List<Part<?>> parts = List.of(new TextPart(artifactText));
97-
updater.addArtifact(parts);
95+
agentEmitter.addArtifact(parts);
9896
// No state change - task remains in WORKING
9997
LOGGER.info("Artifact added on pod: {}", podName);
10098
}
@@ -109,10 +107,9 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
109107
}
110108

111109
@Override
112-
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
110+
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
113111
LOGGER.info("Task cancellation requested");
114-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
115-
updater.cancel();
112+
agentEmitter.cancel();
116113
}
117114

118115
/**

examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
import jakarta.enterprise.context.ApplicationScoped;
44
import jakarta.enterprise.inject.Produces;
55

6-
import io.a2a.A2A;
76
import io.a2a.server.agentexecution.AgentExecutor;
87
import io.a2a.server.agentexecution.RequestContext;
9-
import io.a2a.server.events.EventQueue;
8+
import io.a2a.server.tasks.AgentEmitter;
109
import io.a2a.spec.A2AError;
1110
import io.a2a.spec.UnsupportedOperationError;
1211

@@ -17,12 +16,12 @@ public class AgentExecutorProducer {
1716
public AgentExecutor agentExecutor() {
1817
return new AgentExecutor() {
1918
@Override
20-
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
21-
eventQueue.enqueueEvent(A2A.toAgentMessage("Hello World"));
19+
public void execute(RequestContext context, AgentEmitter emitter) throws A2AError {
20+
emitter.sendMessage("Hello World");
2221
}
2322

2423
@Override
25-
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
24+
public void cancel(RequestContext context, AgentEmitter emitter) throws A2AError {
2625
throw new UnsupportedOperationError();
2726
}
2827
};

extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreTestAgentExecutor.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88

99
import io.a2a.server.agentexecution.AgentExecutor;
1010
import io.a2a.server.agentexecution.RequestContext;
11-
import io.a2a.server.events.EventQueue;
11+
import io.a2a.server.tasks.AgentEmitter;
1212
import io.a2a.server.tasks.PushNotificationSender;
13-
import io.a2a.server.tasks.TaskUpdater;
1413
import io.a2a.spec.A2AError;
1514
import io.a2a.spec.InvalidRequestError;
1615
import io.a2a.spec.Message;
@@ -33,31 +32,29 @@ public class JpaDatabasePushNotificationConfigStoreTestAgentExecutor {
3332
public AgentExecutor agentExecutor() {
3433
return new AgentExecutor() {
3534
@Override
36-
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
37-
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
35+
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
3836
String command = getLastTextPart(context.getMessage());
3937

4038
// Switch based on the command from the test client
4139
switch (command) {
4240
case "create":
43-
taskUpdater.submit();
41+
agentEmitter.submit();
4442
break;
4543
case "update":
4644
// Perform a meaningful update, like adding an artifact.
4745
// This state change is what will trigger the notification.
48-
taskUpdater.addArtifact(List.of(new TextPart("updated-artifact")), "art-1", "test", null);
46+
agentEmitter.addArtifact(List.of(new TextPart("updated-artifact")), "art-1", "test", null);
4947
break;
5048
default:
5149
// On the first message (which might have no text), just submit.
52-
taskUpdater.submit();
50+
agentEmitter.submit();
5351
break;
5452
}
5553
}
5654

5755
@Override
58-
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
59-
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
60-
taskUpdater.cancel();
56+
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
57+
agentEmitter.cancel();
6158
}
6259
};
6360
}

extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx
149149
String taskId = "test-task-2";
150150
EventQueue queue = queueManager.createOrTap(taskId);
151151

152-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
152+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, getTaskStatusUpdateEventWithNewId(taskId));
153153
queueManager.onReplicatedEvent(replicatedEvent);
154154

155155
assertEquals(0, strategy.getCallCount());
@@ -172,7 +172,7 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
172172
assertEquals(taskId, countingStrategy.getLastTaskId());
173173
assertEquals(event, countingStrategy.getLastEvent());
174174

175-
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
175+
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, getTaskStatusUpdateEventWithNewId(taskId));
176176
queueManager.onReplicatedEvent(replicatedEvent);
177177

178178
assertEquals(2, countingStrategy.getCallCount());
@@ -483,7 +483,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup
483483

484484
// Create queue and enqueue an event
485485
EventQueue queue = queueManager.createOrTap(taskId);
486-
queue.enqueueEvent(testEvent);
486+
queue.enqueueEvent(getTaskStatusUpdateEventWithNewId(taskId));
487487

488488
// Dequeue to clear the queue
489489
try {
@@ -607,6 +607,11 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept
607607
"Second event should be QueueClosedEvent");
608608
}
609609

610+
private TaskStatusUpdateEvent getTaskStatusUpdateEventWithNewId(String taskId) {
611+
return TaskStatusUpdateEvent.builder((TaskStatusUpdateEvent) testEvent).taskId(taskId).build();
612+
}
613+
614+
610615
private static class NoOpReplicationStrategy implements ReplicationStrategy {
611616
@Override
612617
public void send(String taskId, Event event) {

extras/queue-manager-replicated/tests-multi-instance/quarkus-common/src/main/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/common/MultiInstanceReplicationAgentExecutor.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
import io.a2a.server.agentexecution.AgentExecutor;
44
import io.a2a.server.agentexecution.RequestContext;
5-
import io.a2a.server.events.EventQueue;
6-
import io.a2a.server.tasks.TaskUpdater;
5+
import io.a2a.server.tasks.AgentEmitter;
76
import io.a2a.spec.A2AError;
87
import io.a2a.spec.Task;
98
import io.a2a.spec.TextPart;
@@ -18,9 +17,8 @@
1817
*/
1918
public class MultiInstanceReplicationAgentExecutor implements AgentExecutor {
2019
@Override
21-
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
20+
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
2221
Task task = context.getTask();
23-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
2422

2523
// Check if message contains "close" signal
2624
boolean shouldClose = context.getMessage().parts().stream()
@@ -30,19 +28,18 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
3028

3129
if (shouldClose) {
3230
// Close the task
33-
updater.complete();
31+
agentEmitter.complete();
3432
} else if (task == null) {
3533
// First message - create task in SUBMITTED state
36-
updater.submit();
34+
agentEmitter.submit();
3735
} else {
3836
// Subsequent messages - add as artifact
39-
updater.addArtifact(context.getMessage().parts());
37+
agentEmitter.addArtifact(context.getMessage().parts());
4038
}
4139
}
4240

4341
@Override
44-
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
45-
TaskUpdater updater = new TaskUpdater(context, eventQueue);
46-
updater.cancel();
42+
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
43+
agentEmitter.cancel();
4744
}
4845
}

extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/ReplicationTestAgentExecutor.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77

88
import io.a2a.server.agentexecution.AgentExecutor;
99
import io.a2a.server.agentexecution.RequestContext;
10-
import io.a2a.server.events.EventQueue;
11-
import io.a2a.server.tasks.TaskUpdater;
10+
import io.a2a.server.tasks.AgentEmitter;
1211
import io.a2a.spec.A2AError;
1312
import io.a2a.spec.InvalidRequestError;
1413
import io.a2a.spec.Message;
@@ -28,37 +27,34 @@ public class ReplicationTestAgentExecutor {
2827
public AgentExecutor agentExecutor() {
2928
return new AgentExecutor() {
3029
@Override
31-
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
32-
33-
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
30+
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
3431
String lastText = getLastTextPart(context.getMessage());
3532

3633
switch (lastText) {
3734
case "create":
3835
// Submit task - this should trigger TaskStatusUpdateEvent
39-
taskUpdater.submit();
36+
agentEmitter.submit();
4037
break;
4138
case "working":
4239
// Move task to WORKING state without completing - keeps queue alive
43-
taskUpdater.submit();
44-
taskUpdater.startWork();
40+
agentEmitter.submit();
41+
agentEmitter.startWork();
4542
break;
4643
case "complete":
4744
// Complete the task - should trigger poison pill generation
48-
taskUpdater.submit();
49-
taskUpdater.startWork();
50-
taskUpdater.addArtifact(List.of(new TextPart("Task completed")));
51-
taskUpdater.complete();
45+
agentEmitter.submit();
46+
agentEmitter.startWork();
47+
agentEmitter.addArtifact(List.of(new TextPart("Task completed")));
48+
agentEmitter.complete();
5249
break;
5350
default:
5451
throw new InvalidRequestError("Unknown command: " + lastText);
5552
}
5653
}
5754

5855
@Override
59-
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
60-
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
61-
taskUpdater.cancel();
56+
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
57+
agentEmitter.cancel();
6258
}
6359
};
6460
}

0 commit comments

Comments
 (0)