Skip to content

Commit 5d2cd75

Browse files
authored
refactor: Update A2A protocol to align with spec revision (#633)
- Move protocol version from AgentCard to AgentInterface level - Simplify resource identifiers from hierarchical names to flat IDs (e.g., "tasks/123" -> "123") - Restructure Part types: remove FilePart/DataPart in favor of unified Part with url/raw/data variants - Add OAuth flow support (ImplicitOAuthFlow, PasswordOAuthFlow) - Rename Security to SecurityRequirement for clarity - Standardize spelling: CANCELLED -> CANCELED - Update all client transports (gRPC, JSON-RPC, REST) and tests - Rename resubscribe to subscribeToTask Breaking changes in proto API affecting: - Task operations (get, cancel, subscribe) - Push notification config management - Message part structure Fixes #632 🦕 Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent 18d2abf commit 5d2cd75

142 files changed

Lines changed: 7092 additions & 5569 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -612,24 +612,24 @@ client.deleteTaskPushNotificationConfigurations(
612612
new DeleteTaskPushNotificationConfigParams("task-1234", "config-4567", clientCallContext);
613613
```
614614

615-
#### Resubscribe to a task
615+
#### Subscribe to a task
616616

617617
```java
618-
// Resubscribe to an ongoing task with id "task-1234" using configured consumers
618+
// Subscribe to an ongoing task with id "task-1234" using configured consumers
619619
TaskIdParams taskIdParams = new TaskIdParams("task-1234");
620-
client.resubscribe(taskIdParams);
620+
client.subscribeToTask(taskIdParams);
621621

622-
// Or resubscribe with custom consumers and error handler
622+
// Or subscribe with custom consumers and error handler
623623
List<BiConsumer<ClientEvent, AgentCard>> customConsumers = List.of(
624-
(event, card) -> System.out.println("Resubscribe event: " + event)
624+
(event, card) -> System.out.println("Subscribe event: " + event)
625625
);
626626
Consumer<Throwable> customErrorHandler = error ->
627-
System.err.println("Resubscribe error: " + error.getMessage());
627+
System.err.println("Subscribe error: " + error.getMessage());
628628

629-
client.resubscribe(taskIdParams, customConsumers, customErrorHandler);
629+
client.subscribeToTask(taskIdParams, customConsumers, customErrorHandler);
630630

631631
// You can also optionally specify a ClientCallContext with call-specific config to use
632-
client.resubscribe(taskIdParams, clientCallContext);
632+
client.subscribeToTask(taskIdParams, clientCallContext);
633633
```
634634

635635
#### Retrieve details about the server agent that this client agent is communicating with

client/base/src/main/java/io/a2a/client/AbstractClient.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public abstract void deleteTaskPushNotificationConfigurations(
318318
@Nullable ClientCallContext context) throws A2AClientException;
319319

320320
/**
321-
* Resubscribe to a task's event stream.
321+
* Subscribe to a task's event stream.
322322
* This is only available if both the client and server support streaming.
323323
* The configured client consumers will be used to handle messages, tasks,
324324
* and update events received from the remote agent. The configured streaming
@@ -327,12 +327,12 @@ public abstract void deleteTaskPushNotificationConfigurations(
327327
* @param request the parameters specifying which task's notification configs to delete
328328
* @throws A2AClientException if resubscribing fails for any reason
329329
*/
330-
public void resubscribe(@NonNull TaskIdParams request) throws A2AClientException {
331-
resubscribe(request, consumers, streamingErrorHandler, null);
330+
public void subscribeToTask(@NonNull TaskIdParams request) throws A2AClientException {
331+
subscribeToTask(request, consumers, streamingErrorHandler, null);
332332
}
333333

334334
/**
335-
* Resubscribe to a task's event stream.
335+
* Subscribe to a task's event stream.
336336
* This is only available if both the client and server support streaming.
337337
* The configured client consumers will be used to handle messages, tasks,
338338
* and update events received from the remote agent. The configured streaming
@@ -342,13 +342,13 @@ public void resubscribe(@NonNull TaskIdParams request) throws A2AClientException
342342
* @param context optional client call context for the request
343343
* @throws A2AClientException if resubscribing fails for any reason
344344
*/
345-
public void resubscribe(@NonNull TaskIdParams request,
345+
public void subscribeToTask(@NonNull TaskIdParams request,
346346
@Nullable ClientCallContext context) throws A2AClientException {
347-
resubscribe(request, consumers, streamingErrorHandler, context);
347+
subscribeToTask(request, consumers, streamingErrorHandler, context);
348348
}
349349

350350
/**
351-
* Resubscribe to a task's event stream.
351+
* Subscribe to a task's event stream.
352352
* This is only available if both the client and server support streaming.
353353
* The specified client consumers will be used to handle messages, tasks, and
354354
* update events received from the remote agent. The specified streaming error
@@ -359,14 +359,14 @@ public void resubscribe(@NonNull TaskIdParams request,
359359
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
360360
* @throws A2AClientException if resubscribing fails for any reason
361361
*/
362-
public void resubscribe(@NonNull TaskIdParams request,
362+
public void subscribeToTask(@NonNull TaskIdParams request,
363363
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
364364
@Nullable Consumer<Throwable> streamingErrorHandler) throws A2AClientException {
365-
resubscribe(request, consumers, streamingErrorHandler, null);
365+
subscribeToTask(request, consumers, streamingErrorHandler, null);
366366
}
367367

368368
/**
369-
* Resubscribe to a task's event stream.
369+
* Subscribe to a task's event stream.
370370
* This is only available if both the client and server support streaming.
371371
* The specified client consumers will be used to handle messages, tasks, and
372372
* update events received from the remote agent. The specified streaming error
@@ -378,7 +378,7 @@ public void resubscribe(@NonNull TaskIdParams request,
378378
* @param context optional client call context for the request
379379
* @throws A2AClientException if resubscribing fails for any reason
380380
*/
381-
public abstract void resubscribe(@NonNull TaskIdParams request,
381+
public abstract void subscribeToTask(@NonNull TaskIdParams request,
382382
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
383383
@Nullable Consumer<Throwable> streamingErrorHandler,
384384
@Nullable ClientCallContext context) throws A2AClientException;

client/base/src/main/java/io/a2a/client/Client.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@
152152
*
153153
* // Later, reconnect and resume receiving events
154154
* String taskId = "task-123"; // From original request
155-
* client.resubscribe(
155+
* client.subscribeToTask(
156156
* new TaskIdParams(taskId),
157157
* List.of((event, card) -> {
158158
* // Process events from where we left off
@@ -529,7 +529,7 @@ public void deleteTaskPushNotificationConfigurations(
529529
}
530530

531531
/**
532-
* Resubscribe to an existing task to receive remaining events.
532+
* Subscribe to an existing task to receive remaining events.
533533
* <p>
534534
* This method is useful when a client disconnects during a long-running task and wants to
535535
* resume receiving events without starting a new task. The agent will deliver any events
@@ -551,27 +551,27 @@ public void deleteTaskPushNotificationConfigurations(
551551
* // ... client1 disconnects ...
552552
*
553553
* // Later, reconnect (client2)
554-
* client2.resubscribe(
554+
* client2.subscribeToTask(
555555
* new TaskIdParams(taskId),
556556
* List.of((event, card) -> {
557557
* if (event instanceof TaskUpdateEvent tue) {
558558
* System.out.println("Resumed - status: " +
559559
* tue.getTask().status().state());
560560
* }
561561
* }),
562-
* throwable -> System.err.println("Resubscribe error: " + throwable),
562+
* throwable -> System.err.println("Subscribe error: " + throwable),
563563
* null
564564
* );
565565
* }</pre>
566566
*
567-
* @param request the task ID to resubscribe to
567+
* @param request the task ID to subscribe to
568568
* @param consumers the event consumers for processing events (required)
569569
* @param streamingErrorHandler error handler for streaming errors (optional)
570570
* @param context custom call context for request interceptors (optional)
571-
* @throws A2AClientException if resubscription is not supported or if the task cannot be found
571+
* @throws A2AClientException if subscription is not supported or if the task cannot be found
572572
*/
573573
@Override
574-
public void resubscribe(@NonNull TaskIdParams request,
574+
public void subscribeToTask(@NonNull TaskIdParams request,
575575
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
576576
@Nullable Consumer<Throwable> streamingErrorHandler,
577577
@Nullable ClientCallContext context) throws A2AClientException {
@@ -588,7 +588,7 @@ public void resubscribe(@NonNull TaskIdParams request,
588588
overriddenErrorHandler.accept(e);
589589
}
590590
};
591-
clientTransport.resubscribe(request, eventHandler, overriddenErrorHandler, context);
591+
clientTransport.subscribeToTask(request, eventHandler, overriddenErrorHandler, context);
592592
}
593593

594594
/**

client/base/src/test/java/io/a2a/client/AuthenticationAuthorizationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.a2a.client;
22

3-
import static io.a2a.spec.AgentCard.CURRENT_PROTOCOL_VERSION;
43
import static org.junit.jupiter.api.Assertions.assertThrows;
54
import static org.junit.jupiter.api.Assertions.assertTrue;
65
import static org.mockserver.model.HttpRequest.request;
@@ -90,7 +89,6 @@ public void setUp() {
9089
.description("Test skill")
9190
.tags(Collections.singletonList("test"))
9291
.build()))
93-
.protocolVersions(CURRENT_PROTOCOL_VERSION)
9492
.supportedInterfaces(java.util.Arrays.asList(
9593
new AgentInterface(TransportProtocol.JSONRPC.asString(), AGENT_URL),
9694
new AgentInterface(TransportProtocol.HTTP_JSON.asString(), AGENT_URL),

client/base/src/test/java/io/a2a/client/ClientBuilderTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.a2a.client;
22

3-
import static io.a2a.spec.AgentCard.CURRENT_PROTOCOL_VERSION;
43

54
import java.util.Collections;
65
import java.util.List;
@@ -41,7 +40,6 @@ public class ClientBuilderTest {
4140
.tags(Collections.singletonList("hello world"))
4241
.examples(List.of("hi", "hello world"))
4342
.build()))
44-
.protocolVersions(CURRENT_PROTOCOL_VERSION)
4543
.supportedInterfaces(List.of(
4644
new AgentInterface(TransportProtocol.JSONRPC.asString(), "http://localhost:9999")))
4745
.build();

client/transport/grpc/src/main/java/io/a2a/client/transport/grpc/GrpcTransport.java

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEv
142142
public Task getTask(TaskQueryParams request, @Nullable ClientCallContext context) throws A2AClientException {
143143
checkNotNullParam("request", request);
144144
io.a2a.grpc.GetTaskRequest.Builder requestBuilder = io.a2a.grpc.GetTaskRequest.newBuilder();
145-
requestBuilder.setName("tasks/" + request.id());
145+
requestBuilder.setId(request.id());
146146
if (request.historyLength() != null) {
147147
requestBuilder.setHistoryLength(request.historyLength());
148148
}
@@ -164,7 +164,7 @@ public Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context
164164
checkNotNullParam("request", request);
165165

166166
io.a2a.grpc.CancelTaskRequest cancelTaskRequest = io.a2a.grpc.CancelTaskRequest.newBuilder()
167-
.setName("tasks/" + request.id())
167+
.setId(request.id())
168168
.setTenant(resolveTenant(request.tenant()))
169169
.build();
170170
PayloadAndHeaders payloadAndHeaders = applyInterceptors(CANCEL_TASK_METHOD, cancelTaskRequest, agentCard, context);
@@ -230,13 +230,13 @@ public ListTasksResult listTasks(ListTasksParams request, @Nullable ClientCallCo
230230

231231
@Override
232232
public TaskPushNotificationConfig createTaskPushNotificationConfiguration(TaskPushNotificationConfig request,
233-
@Nullable ClientCallContext context) throws A2AClientException {
233+
@Nullable ClientCallContext context) throws A2AClientException {
234234
checkNotNullParam("request", request);
235235

236236
String configId = request.pushNotificationConfig().id();
237237
io.a2a.grpc.CreateTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.CreateTaskPushNotificationConfigRequest.newBuilder()
238-
.setParent("tasks/" + request.taskId())
239-
.setConfig(ToProto.taskPushNotificationConfig(request))
238+
.setTaskId(request.taskId())
239+
.setConfig(ToProto.taskPushNotificationConfig(request).getPushNotificationConfig())
240240
.setConfigId(configId != null ? configId : request.taskId())
241241
.setTenant(resolveTenant(request.tenant()))
242242
.build();
@@ -251,14 +251,18 @@ public TaskPushNotificationConfig createTaskPushNotificationConfiguration(TaskPu
251251
}
252252

253253
@Override
254-
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(
255-
GetTaskPushNotificationConfigParams request,
254+
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams request,
256255
@Nullable ClientCallContext context) throws A2AClientException {
257256
checkNotNullParam("request", request);
257+
checkNotNullParam("taskId", request.id());
258+
if(request.pushNotificationConfigId() == null) {
259+
throw new IllegalArgumentException("Id must not be null");
260+
}
258261

259262
io.a2a.grpc.GetTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.GetTaskPushNotificationConfigRequest.newBuilder()
260-
.setName(getTaskPushNotificationConfigName(request))
263+
.setTaskId(request.id())
261264
.setTenant(resolveTenant(request.tenant()))
265+
.setId(request.pushNotificationConfigId())
262266
.build();
263267
PayloadAndHeaders payloadAndHeaders = applyInterceptors(GET_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, grpcRequest, agentCard, context);
264268

@@ -277,7 +281,7 @@ public ListTaskPushNotificationConfigResult listTaskPushNotificationConfiguratio
277281
checkNotNullParam("request", request);
278282

279283
io.a2a.grpc.ListTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.ListTaskPushNotificationConfigRequest.newBuilder()
280-
.setParent("tasks/" + request.id())
284+
.setTaskId(request.id())
281285
.setTenant(resolveTenant(request.tenant()))
282286
.setPageSize(request.pageSize())
283287
.setPageToken(request.pageToken())
@@ -300,7 +304,8 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
300304
checkNotNullParam("request", request);
301305

302306
io.a2a.grpc.DeleteTaskPushNotificationConfigRequest grpcRequest = io.a2a.grpc.DeleteTaskPushNotificationConfigRequest.newBuilder()
303-
.setName(getTaskPushNotificationConfigName(request.id(), request.pushNotificationConfigId()))
307+
.setTaskId(request.id())
308+
.setId(request.pushNotificationConfigId())
304309
.setTenant(resolveTenant(request.tenant()))
305310
.build();
306311
PayloadAndHeaders payloadAndHeaders = applyInterceptors(DELETE_TASK_PUSH_NOTIFICATION_CONFIG_METHOD, grpcRequest, agentCard, context);
@@ -314,14 +319,14 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
314319
}
315320

316321
@Override
317-
public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
322+
public void subscribeToTask(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
318323
Consumer<Throwable> errorConsumer, @Nullable ClientCallContext context) throws A2AClientException {
319324
checkNotNullParam("request", request);
320325
checkNotNullParam("eventConsumer", eventConsumer);
321326

322327
io.a2a.grpc.SubscribeToTaskRequest grpcRequest = io.a2a.grpc.SubscribeToTaskRequest.newBuilder()
323328
.setTenant(resolveTenant(request.tenant()))
324-
.setName("tasks/" + request.id())
329+
.setId(request.id())
325330
.build();
326331
PayloadAndHeaders payloadAndHeaders = applyInterceptors(SUBSCRIBE_TO_TASK_METHOD, grpcRequest, agentCard, context);
327332

@@ -331,12 +336,13 @@ public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> event
331336
A2AServiceStub stubWithMetadata = createAsyncStubWithMetadata(context, payloadAndHeaders);
332337
stubWithMetadata.subscribeToTask(grpcRequest, streamObserver);
333338
} catch (StatusRuntimeException e) {
334-
throw GrpcErrorMapper.mapGrpcError(e, "Failed to resubscribe task push notification config: ");
339+
throw GrpcErrorMapper.mapGrpcError(e, "Failed to subscribe task push notification config: ");
335340
}
336341
}
337342

338343
/**
339344
* Ensure tenant is set, using agent default if not provided in request
345+
*
340346
* @param request the initial request.
341347
* @return the updated request with the tenant set.
342348
*/
@@ -447,24 +453,6 @@ private A2AServiceStub createAsyncStubWithMetadata(@Nullable ClientCallContext c
447453
return asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
448454
}
449455

450-
private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams params) {
451-
return getTaskPushNotificationConfigName(params.id(), params.pushNotificationConfigId());
452-
}
453-
454-
private String getTaskPushNotificationConfigName(String taskId, @Nullable String pushNotificationConfigId) {
455-
StringBuilder name = new StringBuilder();
456-
name.append("tasks/");
457-
name.append(taskId);
458-
if (pushNotificationConfigId != null) {
459-
name.append("/pushNotificationConfigs/");
460-
name.append(pushNotificationConfigId);
461-
}
462-
//name.append("/pushNotificationConfigs/");
463-
// Use taskId as default config ID if none provided
464-
//name.append(pushNotificationConfigId != null ? pushNotificationConfigId : taskId);
465-
return name.toString();
466-
}
467-
468456
private PayloadAndHeaders applyInterceptors(String methodName, Object payload,
469457
AgentCard agentCard, @Nullable ClientCallContext clientCallContext) {
470458
PayloadAndHeaders payloadAndHeaders = new PayloadAndHeaders(payload,

client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
255255
}
256256

257257
@Override
258-
public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
258+
public void subscribeToTask(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
259259
Consumer<Throwable> errorConsumer, @Nullable ClientCallContext context) throws A2AClientException {
260260
checkNotNullParam("request", request);
261261
checkNotNullParam("eventConsumer", eventConsumer);

0 commit comments

Comments
 (0)