Skip to content

Commit ca8fd8d

Browse files
Add interceptor to propagate headers to Nexus operations
1 parent f51ee95 commit ca8fd8d

File tree

6 files changed

+117
-4
lines changed

6 files changed

+117
-4
lines changed

core/src/main/java/io/temporal/samples/nexus/README.MD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ temporal operator namespace create --namespace my-caller-namespace
5353
temporal operator nexus endpoint create \
5454
--name my-nexus-endpoint-name \
5555
--target-namespace my-target-namespace \
56-
--target-task-queue my-handler-task-queue \
56+
--target-task-queue my-handler-task-queue
5757
--description-file ./core/src/main/java/io/temporal/samples/nexus/service/description.md
5858
```
5959

core/src/main/java/io/temporal/samples/nexus/caller/CallerWorker.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import io.temporal.client.WorkflowClient;
2323
import io.temporal.samples.nexus.options.ClientOptions;
24+
import io.temporal.samples.nexus.propogate.NexusMDCContextPropagation;
2425
import io.temporal.worker.Worker;
2526
import io.temporal.worker.WorkerFactory;
27+
import io.temporal.worker.WorkerFactoryOptions;
2628
import io.temporal.worker.WorkflowImplementationOptions;
2729
import io.temporal.workflow.NexusServiceOptions;
2830
import java.util.Collections;
@@ -33,7 +35,12 @@ public class CallerWorker {
3335
public static void main(String[] args) {
3436
WorkflowClient client = ClientOptions.getWorkflowClient(args);
3537

36-
WorkerFactory factory = WorkerFactory.newInstance(client);
38+
WorkerFactory factory =
39+
WorkerFactory.newInstance(
40+
client,
41+
WorkerFactoryOptions.newBuilder()
42+
.setWorkerInterceptors(new NexusMDCContextPropagation())
43+
.build());
3744

3845
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
3946
worker.registerWorkflowImplementationTypes(

core/src/main/java/io/temporal/samples/nexus/caller/EchoCallerWorkflowImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.temporal.workflow.NexusOperationOptions;
2424
import io.temporal.workflow.NexusServiceOptions;
2525
import io.temporal.workflow.Workflow;
26+
import org.slf4j.MDC;
27+
2628
import java.time.Duration;
2729

2830
public class EchoCallerWorkflowImpl implements EchoCallerWorkflow {
@@ -38,6 +40,7 @@ public class EchoCallerWorkflowImpl implements EchoCallerWorkflow {
3840

3941
@Override
4042
public String echo(String message) {
43+
MDC.put("caller_workflow_id", Workflow.getInfo().getWorkflowId());
4144
return nexusService.echo(new NexusService.EchoInput(message)).getMessage();
4245
}
4346
}

core/src/main/java/io/temporal/samples/nexus/handler/HandlerWorker.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,23 @@
2121

2222
import io.temporal.client.WorkflowClient;
2323
import io.temporal.samples.nexus.options.ClientOptions;
24+
import io.temporal.samples.nexus.propogate.NexusMDCContextPropagation;
2425
import io.temporal.worker.Worker;
2526
import io.temporal.worker.WorkerFactory;
27+
import io.temporal.worker.WorkerFactoryOptions;
2628

2729
public class HandlerWorker {
2830
public static final String DEFAULT_TASK_QUEUE_NAME = "my-handler-task-queue";
2931

3032
public static void main(String[] args) {
3133
WorkflowClient client = ClientOptions.getWorkflowClient(args);
3234

33-
WorkerFactory factory = WorkerFactory.newInstance(client);
35+
WorkerFactory factory =
36+
WorkerFactory.newInstance(
37+
client,
38+
WorkerFactoryOptions.newBuilder()
39+
.setWorkerInterceptors(new NexusMDCContextPropagation())
40+
.build());
3441

3542
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
3643
worker.registerWorkflowImplementationTypes(HelloHandlerWorkflowImpl.class);

core/src/main/java/io/temporal/samples/nexus/handler/NexusServiceImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@
2626
import io.temporal.nexus.Nexus;
2727
import io.temporal.nexus.WorkflowRunOperation;
2828
import io.temporal.samples.nexus.service.NexusService;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import org.slf4j.MDC;
2932

3033
// To create a service implementation, annotate the class with @ServiceImpl and provide the
3134
// interface that the service implements. The service implementation class should have methods that
3235
// return OperationHandler that correspond to the operations defined in the service interface.
3336
@ServiceImpl(service = NexusService.class)
3437
public class NexusServiceImpl {
38+
private static final Logger logger = LoggerFactory.getLogger(NexusServiceImpl.class);
39+
3540
@OperationImpl
3641
public OperationHandler<NexusService.EchoInput, NexusService.EchoOutput> echo() {
3742
// OperationHandler.sync is a meant for exposing simple RPC handlers.
@@ -41,7 +46,12 @@ public OperationHandler<NexusService.EchoInput, NexusService.EchoOutput> echo()
4146
// calling
4247
// Nexus.getOperationContext().getWorkflowClient(ctx) to make arbitrary calls such as
4348
// signaling, querying, or listing workflows.
44-
(ctx, details, input) -> new NexusService.EchoOutput(input.getMessage()));
49+
(ctx, details, input) -> {
50+
if (MDC.get("caller_workflow_id") != null) {
51+
logger.info("Echo called from a workflow with ID : {}", MDC.get("caller_workflow_id"));
52+
}
53+
return new NexusService.EchoOutput(input.getMessage());
54+
});
4555
}
4656

4757
@OperationImpl
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package io.temporal.samples.nexus.propogate;
2+
3+
import io.nexusrpc.OperationException;
4+
import io.nexusrpc.handler.OperationContext;
5+
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
6+
import io.temporal.common.interceptors.WorkerInterceptorBase;
7+
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
8+
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
9+
import org.slf4j.MDC;
10+
11+
import java.util.Map;
12+
13+
/*** Propagates MDC context from the caller workflow to the Nexus service through the headers. */
14+
public class NexusMDCContextPropagation extends WorkerInterceptorBase {
15+
16+
@Override
17+
public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
18+
return new WorkflowInboundCallsInterceptorNexusMDC(next);
19+
}
20+
21+
public static class WorkflowInboundCallsInterceptorNexusMDC
22+
extends io.temporal.common.interceptors.WorkflowInboundCallsInterceptorBase {
23+
private final WorkflowInboundCallsInterceptor next;
24+
25+
public WorkflowInboundCallsInterceptorNexusMDC(WorkflowInboundCallsInterceptor next) {
26+
super(next);
27+
this.next = next;
28+
}
29+
30+
@Override
31+
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
32+
next.init(new WorkflowOutboundCallsInterceptorNexusMDC(outboundCalls));
33+
}
34+
}
35+
36+
public static class WorkflowOutboundCallsInterceptorNexusMDC
37+
extends io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase {
38+
private final WorkflowOutboundCallsInterceptor next;
39+
40+
public WorkflowOutboundCallsInterceptorNexusMDC(WorkflowOutboundCallsInterceptor next) {
41+
super(next);
42+
this.next = next;
43+
}
44+
45+
@Override
46+
public <R>
47+
WorkflowOutboundCallsInterceptor.ExecuteNexusOperationOutput<R> executeNexusOperation(
48+
WorkflowOutboundCallsInterceptor.ExecuteNexusOperationInput<R> input) {
49+
Map<String, String> contextMap = MDC.getCopyOfContextMap();
50+
if (contextMap != null) {
51+
input.getHeaders().putAll(contextMap);
52+
}
53+
return this.next.executeNexusOperation(input);
54+
}
55+
}
56+
57+
@Override
58+
public NexusOperationInboundCallsInterceptor interceptNexusOperation(
59+
OperationContext context, NexusOperationInboundCallsInterceptor next) {
60+
return new NexusOperationInboundCallsInterceptorNexusMDC(next);
61+
}
62+
63+
private static class NexusOperationInboundCallsInterceptorNexusMDC
64+
extends io.temporal.common.interceptors.NexusOperationInboundCallsInterceptorBase {
65+
private final NexusOperationInboundCallsInterceptor next;
66+
67+
public NexusOperationInboundCallsInterceptorNexusMDC(NexusOperationInboundCallsInterceptor next) {
68+
super(next);
69+
this.next = next;
70+
}
71+
72+
@Override
73+
public NexusOperationInboundCallsInterceptor.StartOperationOutput startOperation(
74+
NexusOperationInboundCallsInterceptor.StartOperationInput input) throws OperationException {
75+
input.getOperationContext().getHeaders().forEach(MDC::put);
76+
return next.startOperation(input);
77+
}
78+
79+
@Override
80+
public NexusOperationInboundCallsInterceptor.CancelOperationOutput cancelOperation(
81+
NexusOperationInboundCallsInterceptor.CancelOperationInput input) {
82+
input.getOperationContext().getHeaders().forEach(MDC::put);
83+
return next.cancelOperation(input);
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)