2929import com .google .errorprone .annotations .CanIgnoreReturnValue ;
3030import com .google .errorprone .annotations .DoNotCall ;
3131import com .google .genai .types .Content ;
32+ import io .opentelemetry .context .Context ;
3233import io .reactivex .rxjava3 .core .Completable ;
3334import io .reactivex .rxjava3 .core .Flowable ;
3435import io .reactivex .rxjava3 .core .Maybe ;
35- import io .reactivex .rxjava3 .core .Single ;
3636import java .util .ArrayList ;
3737import java .util .HashSet ;
3838import java .util .List ;
@@ -312,38 +312,47 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
312312 private Flowable <Event > run (
313313 InvocationContext parentContext ,
314314 Function <InvocationContext , Flowable <Event >> runImplementation ) {
315+ Context otelParentContext = Context .current ();
316+ InvocationContext invocationContext = createInvocationContext (parentContext );
317+
315318 return Flowable .defer (
316- () -> {
317- InvocationContext invocationContext = createInvocationContext (parentContext );
318-
319- return callCallback (
320- beforeCallbacksToFunctions (
321- invocationContext .pluginManager (), beforeAgentCallback ),
322- invocationContext )
323- .flatMapPublisher (
324- beforeEventOpt -> {
325- if (invocationContext .endInvocation ()) {
326- return Flowable .fromOptional (beforeEventOpt );
327- }
328-
329- Flowable <Event > beforeEvents = Flowable .fromOptional (beforeEventOpt );
330- Flowable <Event > mainEvents =
331- Flowable .defer (() -> runImplementation .apply (invocationContext ));
332- Flowable <Event > afterEvents =
333- Flowable .defer (
334- () ->
335- callCallback (
336- afterCallbacksToFunctions (
337- invocationContext .pluginManager (), afterAgentCallback ),
338- invocationContext )
339- .flatMapPublisher (Flowable ::fromOptional ));
340-
341- return Flowable .concat (beforeEvents , mainEvents , afterEvents );
342- })
343- .compose (
344- Tracing .traceAgent (
345- "invoke_agent " + name (), name (), description (), invocationContext ));
346- });
319+ () -> {
320+ return callCallback (
321+ beforeCallbacksToFunctions (
322+ invocationContext .pluginManager (), beforeAgentCallback ),
323+ invocationContext )
324+ .flatMapPublisher (
325+ beforeEvent -> {
326+ if (invocationContext .endInvocation ()) {
327+ return Flowable .just (beforeEvent );
328+ }
329+
330+ return Flowable .just (beforeEvent )
331+ .concatWith (runMainAndAfter (invocationContext , runImplementation ));
332+ })
333+ .switchIfEmpty (
334+ Flowable .defer (() -> runMainAndAfter (invocationContext , runImplementation )));
335+ })
336+ .compose (
337+ Tracing .traceAgent (
338+ otelParentContext ,
339+ "invoke_agent " + name (),
340+ name (),
341+ description (),
342+ invocationContext ));
343+ }
344+
345+ private Flowable <Event > runMainAndAfter (
346+ InvocationContext invocationContext ,
347+ Function <InvocationContext , Flowable <Event >> runImplementation ) {
348+ Flowable <Event > mainEvents = runImplementation .apply (invocationContext );
349+ Flowable <Event > afterEvents =
350+ callCallback (
351+ afterCallbacksToFunctions (invocationContext .pluginManager (), afterAgentCallback ),
352+ invocationContext )
353+ .flatMapPublisher (Flowable ::just );
354+
355+ return Flowable .concat (mainEvents , afterEvents );
347356 }
348357
349358 /**
@@ -383,13 +392,13 @@ private <T> ImmutableList<Function<CallbackContext, Maybe<Content>>> callbacksTo
383392 *
384393 * @param agentCallbacks Callback functions.
385394 * @param invocationContext Current invocation context.
386- * @return single emitting first event, or empty if none.
395+ * @return Maybe emitting first event, or empty if none.
387396 */
388- private Single < Optional < Event > > callCallback (
397+ private Maybe < Event > callCallback (
389398 List <Function <CallbackContext , Maybe <Content >>> agentCallbacks ,
390399 InvocationContext invocationContext ) {
391400 if (agentCallbacks .isEmpty ()) {
392- return Single . just ( Optional . empty () );
401+ return Maybe . empty ();
393402 }
394403
395404 CallbackContext callbackContext =
@@ -398,27 +407,25 @@ private Single<Optional<Event>> callCallback(
398407 return Flowable .fromIterable (agentCallbacks )
399408 .concatMap (
400409 callback -> {
401- Maybe <Content > maybeContent = callback .apply (callbackContext );
402-
403- return maybeContent
410+ return callback
411+ .apply (callbackContext )
404412 .map (
405413 content -> {
406414 invocationContext .setEndInvocation (true );
407- return Optional .of (
408- Event .builder ()
409- .id (Event .generateEventId ())
410- .invocationId (invocationContext .invocationId ())
411- .author (name ())
412- .branch (invocationContext .branch ().orElse (null ))
413- .actions (callbackContext .eventActions ())
414- .content (content )
415- .build ());
415+ return Event .builder ()
416+ .id (Event .generateEventId ())
417+ .invocationId (invocationContext .invocationId ())
418+ .author (name ())
419+ .branch (invocationContext .branch ().orElse (null ))
420+ .actions (callbackContext .eventActions ())
421+ .content (content )
422+ .build ();
416423 })
417424 .toFlowable ();
418425 })
419426 .firstElement ()
420427 .switchIfEmpty (
421- Single .defer (
428+ Maybe .defer (
422429 () -> {
423430 if (callbackContext .state ().hasDelta ()) {
424431 Event .Builder eventBuilder =
@@ -429,9 +436,9 @@ private Single<Optional<Event>> callCallback(
429436 .branch (invocationContext .branch ().orElse (null ))
430437 .actions (callbackContext .eventActions ());
431438
432- return Single .just (Optional . of ( eventBuilder .build () ));
439+ return Maybe .just (eventBuilder .build ());
433440 } else {
434- return Single . just ( Optional . empty () );
441+ return Maybe . empty ();
435442 }
436443 }));
437444 }
0 commit comments