Skip to content

Commit 973f887

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Fixing the spans produced by agent calls to have the right parent spans
PiperOrigin-RevId: 881003835
1 parent 143b656 commit 973f887

6 files changed

Lines changed: 887 additions & 508 deletions

File tree

core/src/main/java/com/google/adk/agents/BaseAgent.java

Lines changed: 57 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import com.google.errorprone.annotations.CanIgnoreReturnValue;
3030
import com.google.errorprone.annotations.DoNotCall;
3131
import com.google.genai.types.Content;
32+
import io.opentelemetry.context.Context;
3233
import io.reactivex.rxjava3.core.Completable;
3334
import io.reactivex.rxjava3.core.Flowable;
3435
import io.reactivex.rxjava3.core.Maybe;
35-
import io.reactivex.rxjava3.core.Single;
3636
import java.util.ArrayList;
3737
import java.util.HashSet;
3838
import java.util.List;
@@ -312,38 +312,47 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
312312
private Flowable<Event> run(
313313
InvocationContext parentContext,
314314
Function<InvocationContext, Flowable<Event>> runImplementation) {
315+
Context otelParentContext = Context.current();
316+
InvocationContext invocationContext = createInvocationContext(parentContext);
317+
315318
return Flowable.defer(
316-
() -> {
317-
InvocationContext invocationContext = createInvocationContext(parentContext);
318-
319-
return callCallback(
320-
beforeCallbacksToFunctions(
321-
invocationContext.pluginManager(), beforeAgentCallback),
322-
invocationContext)
323-
.flatMapPublisher(
324-
beforeEventOpt -> {
325-
if (invocationContext.endInvocation()) {
326-
return Flowable.fromOptional(beforeEventOpt);
327-
}
328-
329-
Flowable<Event> beforeEvents = Flowable.fromOptional(beforeEventOpt);
330-
Flowable<Event> mainEvents =
331-
Flowable.defer(() -> runImplementation.apply(invocationContext));
332-
Flowable<Event> afterEvents =
333-
Flowable.defer(
334-
() ->
335-
callCallback(
336-
afterCallbacksToFunctions(
337-
invocationContext.pluginManager(), afterAgentCallback),
338-
invocationContext)
339-
.flatMapPublisher(Flowable::fromOptional));
340-
341-
return Flowable.concat(beforeEvents, mainEvents, afterEvents);
342-
})
343-
.compose(
344-
Tracing.traceAgent(
345-
"invoke_agent " + name(), name(), description(), invocationContext));
346-
});
319+
() -> {
320+
return callCallback(
321+
beforeCallbacksToFunctions(
322+
invocationContext.pluginManager(), beforeAgentCallback),
323+
invocationContext)
324+
.flatMapPublisher(
325+
beforeEvent -> {
326+
if (invocationContext.endInvocation()) {
327+
return Flowable.just(beforeEvent);
328+
}
329+
330+
return Flowable.just(beforeEvent)
331+
.concatWith(runMainAndAfter(invocationContext, runImplementation));
332+
})
333+
.switchIfEmpty(
334+
Flowable.defer(() -> runMainAndAfter(invocationContext, runImplementation)));
335+
})
336+
.compose(
337+
Tracing.traceAgent(
338+
otelParentContext,
339+
"invoke_agent " + name(),
340+
name(),
341+
description(),
342+
invocationContext));
343+
}
344+
345+
private Flowable<Event> runMainAndAfter(
346+
InvocationContext invocationContext,
347+
Function<InvocationContext, Flowable<Event>> runImplementation) {
348+
Flowable<Event> mainEvents = runImplementation.apply(invocationContext);
349+
Flowable<Event> afterEvents =
350+
callCallback(
351+
afterCallbacksToFunctions(invocationContext.pluginManager(), afterAgentCallback),
352+
invocationContext)
353+
.flatMapPublisher(Flowable::just);
354+
355+
return Flowable.concat(mainEvents, afterEvents);
347356
}
348357

349358
/**
@@ -383,13 +392,13 @@ private <T> ImmutableList<Function<CallbackContext, Maybe<Content>>> callbacksTo
383392
*
384393
* @param agentCallbacks Callback functions.
385394
* @param invocationContext Current invocation context.
386-
* @return single emitting first event, or empty if none.
395+
* @return Maybe emitting first event, or empty if none.
387396
*/
388-
private Single<Optional<Event>> callCallback(
397+
private Maybe<Event> callCallback(
389398
List<Function<CallbackContext, Maybe<Content>>> agentCallbacks,
390399
InvocationContext invocationContext) {
391400
if (agentCallbacks.isEmpty()) {
392-
return Single.just(Optional.empty());
401+
return Maybe.empty();
393402
}
394403

395404
CallbackContext callbackContext =
@@ -398,27 +407,25 @@ private Single<Optional<Event>> callCallback(
398407
return Flowable.fromIterable(agentCallbacks)
399408
.concatMap(
400409
callback -> {
401-
Maybe<Content> maybeContent = callback.apply(callbackContext);
402-
403-
return maybeContent
410+
return callback
411+
.apply(callbackContext)
404412
.map(
405413
content -> {
406414
invocationContext.setEndInvocation(true);
407-
return Optional.of(
408-
Event.builder()
409-
.id(Event.generateEventId())
410-
.invocationId(invocationContext.invocationId())
411-
.author(name())
412-
.branch(invocationContext.branch().orElse(null))
413-
.actions(callbackContext.eventActions())
414-
.content(content)
415-
.build());
415+
return Event.builder()
416+
.id(Event.generateEventId())
417+
.invocationId(invocationContext.invocationId())
418+
.author(name())
419+
.branch(invocationContext.branch().orElse(null))
420+
.actions(callbackContext.eventActions())
421+
.content(content)
422+
.build();
416423
})
417424
.toFlowable();
418425
})
419426
.firstElement()
420427
.switchIfEmpty(
421-
Single.defer(
428+
Maybe.defer(
422429
() -> {
423430
if (callbackContext.state().hasDelta()) {
424431
Event.Builder eventBuilder =
@@ -429,9 +436,9 @@ private Single<Optional<Event>> callCallback(
429436
.branch(invocationContext.branch().orElse(null))
430437
.actions(callbackContext.eventActions());
431438

432-
return Single.just(Optional.of(eventBuilder.build()));
439+
return Maybe.just(eventBuilder.build());
433440
} else {
434-
return Single.just(Optional.empty());
441+
return Maybe.empty();
435442
}
436443
}));
437444
}

0 commit comments

Comments
 (0)