Skip to content

Commit c4f81e0

Browse files
authored
added client interceptor (temporalio#361)
1 parent 680846f commit c4f81e0

11 files changed

Lines changed: 335 additions & 21 deletions
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.interceptor;
21+
22+
import java.util.AbstractMap;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.Stream;
28+
29+
/** Simple counter class. */
30+
public class ClientCounter {
31+
private static final String NUM_OF_GET_RESULT = "numOfGetResult";
32+
private static final String NUM_OF_WORKFLOW_EXECUTIONS = "numOfWorkflowExec";
33+
private static final String NUM_OF_SIGNALS = "numOfSignals";
34+
private static final String NUM_OF_QUERIES = "numOfQueries";
35+
private static final Map<String, Map<String, Integer>> perWorkflowIdMap =
36+
Collections.synchronizedMap(new HashMap<>());
37+
38+
public String getInfo() {
39+
StringBuilder stringBuilder = new StringBuilder();
40+
for (String workflowRunId : perWorkflowIdMap.keySet()) {
41+
stringBuilder.append("\n** Workflow ID: " + workflowRunId);
42+
Map<String, Integer> info = perWorkflowIdMap.get(workflowRunId);
43+
stringBuilder.append(
44+
"\n\tTotal Number of Workflow Exec: " + info.get(NUM_OF_WORKFLOW_EXECUTIONS));
45+
stringBuilder.append("\n\tTotal Number of Signals: " + info.get(NUM_OF_SIGNALS));
46+
stringBuilder.append("\n\tTotal Number of Queries: " + info.get(NUM_OF_QUERIES));
47+
stringBuilder.append("\n\tTotal Number of GetResult: " + info.get(NUM_OF_GET_RESULT));
48+
}
49+
50+
return stringBuilder.toString();
51+
}
52+
53+
private void add(String workflowId, String type) {
54+
if (!perWorkflowIdMap.containsKey(workflowId)) {
55+
perWorkflowIdMap.put(workflowId, getDefaultInfoMap());
56+
}
57+
58+
if (perWorkflowIdMap.get(workflowId).get(type) == null) {
59+
perWorkflowIdMap.get(workflowId).put(type, 1);
60+
} else {
61+
int current = perWorkflowIdMap.get(workflowId).get(type).intValue();
62+
int next = current + 1;
63+
perWorkflowIdMap.get(workflowId).put(type, next);
64+
}
65+
}
66+
67+
public int getNumOfWorkflowExecutions(String workflowId) {
68+
return perWorkflowIdMap.get(workflowId).get(NUM_OF_WORKFLOW_EXECUTIONS);
69+
}
70+
71+
public int getNumOfGetResults(String workflowId) {
72+
return perWorkflowIdMap.get(workflowId).get(NUM_OF_GET_RESULT);
73+
}
74+
75+
public int getNumOfSignals(String workflowId) {
76+
return perWorkflowIdMap.get(workflowId).get(NUM_OF_SIGNALS);
77+
}
78+
79+
public int getNumOfQueries(String workflowId) {
80+
return perWorkflowIdMap.get(workflowId).get(NUM_OF_QUERIES);
81+
}
82+
83+
/**
84+
* Creates a default counter info map for a workflowid
85+
*
86+
* @return default counter info map
87+
*/
88+
private Map<String, Integer> getDefaultInfoMap() {
89+
return Stream.of(
90+
new AbstractMap.SimpleImmutableEntry<>(NUM_OF_WORKFLOW_EXECUTIONS, 0),
91+
new AbstractMap.SimpleImmutableEntry<>(NUM_OF_SIGNALS, 0),
92+
new AbstractMap.SimpleImmutableEntry<>(NUM_OF_GET_RESULT, 0),
93+
new AbstractMap.SimpleImmutableEntry<>(NUM_OF_QUERIES, 0))
94+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
95+
}
96+
97+
public void addStartInvocation(String workflowId) {
98+
add(workflowId, NUM_OF_WORKFLOW_EXECUTIONS);
99+
}
100+
101+
public void addSignalInvocation(String workflowId) {
102+
add(workflowId, NUM_OF_SIGNALS);
103+
}
104+
105+
public void addGetResultInvocation(String workflowId) {
106+
add(workflowId, NUM_OF_GET_RESULT);
107+
}
108+
109+
public void addQueryInvocation(String workflowId) {
110+
add(workflowId, NUM_OF_QUERIES);
111+
}
112+
}

src/main/java/io/temporal/samples/interceptor/InterceptorStarter.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
package io.temporal.samples.interceptor;
2121

2222
import io.temporal.client.WorkflowClient;
23+
import io.temporal.client.WorkflowClientOptions;
2324
import io.temporal.client.WorkflowOptions;
2425
import io.temporal.client.WorkflowStub;
26+
import io.temporal.common.interceptors.WorkflowClientInterceptor;
2527
import io.temporal.samples.interceptor.activities.MyActivitiesImpl;
2628
import io.temporal.samples.interceptor.workflow.MyChildWorkflowImpl;
2729
import io.temporal.samples.interceptor.workflow.MyWorkflow;
@@ -35,19 +37,25 @@
3537

3638
public class InterceptorStarter {
3739

38-
public static SimpleCountWorkerInterceptor interceptor = new SimpleCountWorkerInterceptor();
40+
public static SimpleCountWorkerInterceptor workerInterceptor = new SimpleCountWorkerInterceptor();
3941
private static final String TEST_QUEUE = "test-queue";
4042
private static final String WORKFLOW_ID = "TestInterceptorWorkflow";
4143

4244
private static final Logger logger = LoggerFactory.getLogger(SimpleCountWorkerInterceptor.class);
4345

4446
public static void main(String[] args) {
47+
48+
final ClientCounter clientCounter = new ClientCounter();
49+
final WorkflowClientInterceptor clientInterceptor = new SimpleClientInterceptor(clientCounter);
50+
4551
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
46-
WorkflowClient client = WorkflowClient.newInstance(service);
52+
WorkflowClient client =
53+
WorkflowClient.newInstance(
54+
service, WorkflowClientOptions.newBuilder().setInterceptors(clientInterceptor).build());
4755

4856
WorkerFactoryOptions wfo =
4957
WorkerFactoryOptions.newBuilder()
50-
.setWorkerInterceptors(interceptor)
58+
.setWorkerInterceptors(workerInterceptor)
5159
.validateAndBuildWithDefaults();
5260

5361
WorkerFactory factory = WorkerFactory.newInstance(client, wfo);
@@ -84,9 +92,13 @@ public static void main(String[] args) {
8492
logger.info("Name: " + name);
8593
logger.info("Title: " + title);
8694

87-
// Print the Counter Info
88-
logger.info("Collected Counter Info: ");
89-
logger.info(Counter.getInfo());
95+
// Print the Worker Counter Info
96+
logger.info("Collected Worker Counter Info: ");
97+
logger.info(WorkerCounter.getInfo());
98+
99+
// Print the Client Counter Info
100+
logger.info("Collected Client Counter Info: ");
101+
logger.info(clientCounter.getInfo());
90102

91103
System.exit(0);
92104
}

src/main/java/io/temporal/samples/interceptor/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Demo Workflow Interceptor
22

3-
The sample demonstrates the use of a simple Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions as well as the number of Signals and Queries.
3+
The sample demonstrates:
4+
- the use of a simple Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions as well as the number of Signals and Queries.
5+
- the use of a simple Client Workflow Interceptor that counts the number of Workflow Executions as well as the number of Signals, Queries and GetResult invocations.
46

57
Run the following command to start the sample:
68

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.interceptor;
21+
22+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
23+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
24+
import java.util.concurrent.TimeoutException;
25+
26+
public class SimpleClientCallsInterceptor extends WorkflowClientCallsInterceptorBase {
27+
private ClientCounter clientCounter;
28+
29+
public SimpleClientCallsInterceptor(
30+
WorkflowClientCallsInterceptor next, ClientCounter clientCounter) {
31+
super(next);
32+
this.clientCounter = clientCounter;
33+
}
34+
35+
@Override
36+
public WorkflowStartOutput start(WorkflowStartInput input) {
37+
clientCounter.addStartInvocation(input.getWorkflowId());
38+
return super.start(input);
39+
}
40+
41+
@Override
42+
public WorkflowSignalOutput signal(WorkflowSignalInput input) {
43+
clientCounter.addSignalInvocation(input.getWorkflowExecution().getWorkflowId());
44+
return super.signal(input);
45+
}
46+
47+
@Override
48+
public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutException {
49+
clientCounter.addGetResultInvocation(input.getWorkflowExecution().getWorkflowId());
50+
return super.getResult(input);
51+
}
52+
53+
@Override
54+
public <R> QueryOutput<R> query(QueryInput<R> input) {
55+
clientCounter.addQueryInvocation(input.getWorkflowExecution().getWorkflowId());
56+
return super.query(input);
57+
}
58+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.interceptor;
21+
22+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
23+
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
24+
25+
public class SimpleClientInterceptor extends WorkflowClientInterceptorBase {
26+
27+
private ClientCounter clientCounter;
28+
29+
public SimpleClientInterceptor(ClientCounter clientCounter) {
30+
this.clientCounter = clientCounter;
31+
}
32+
33+
@Override
34+
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
35+
WorkflowClientCallsInterceptor next) {
36+
return new SimpleClientCallsInterceptor(next, clientCounter);
37+
}
38+
}

src/main/java/io/temporal/samples/interceptor/SimpleCountActivityInboundCallsInterceptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public void init(ActivityExecutionContext context) {
4040

4141
@Override
4242
public ActivityOutput execute(ActivityInput input) {
43-
Counter.add(
43+
WorkerCounter.add(
4444
this.activityExecutionContext.getInfo().getWorkflowId(),
45-
Counter.NUM_OF_ACTIVITY_EXECUTIONS);
45+
WorkerCounter.NUM_OF_ACTIVITY_EXECUTIONS);
4646
return super.execute(input);
4747
}
4848
}

src/main/java/io/temporal/samples/interceptor/SimpleCountWorkflowInboundCallsInterceptor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,19 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
4242

4343
@Override
4444
public WorkflowOutput execute(WorkflowInput input) {
45-
Counter.add(this.workflowInfo.getWorkflowId(), Counter.NUM_OF_WORKFLOW_EXECUTIONS);
45+
WorkerCounter.add(this.workflowInfo.getWorkflowId(), WorkerCounter.NUM_OF_WORKFLOW_EXECUTIONS);
4646
return super.execute(input);
4747
}
4848

4949
@Override
5050
public void handleSignal(SignalInput input) {
51-
Counter.add(this.workflowInfo.getWorkflowId(), Counter.NUM_OF_SIGNALS);
51+
WorkerCounter.add(this.workflowInfo.getWorkflowId(), WorkerCounter.NUM_OF_SIGNALS);
5252
super.handleSignal(input);
5353
}
5454

5555
@Override
5656
public QueryOutput handleQuery(QueryInput input) {
57-
Counter.add(this.workflowInfo.getWorkflowId(), Counter.NUM_OF_QUERIES);
57+
WorkerCounter.add(this.workflowInfo.getWorkflowId(), WorkerCounter.NUM_OF_QUERIES);
5858
return super.handleQuery(input);
5959
}
6060
}

src/main/java/io/temporal/samples/interceptor/SimpleCountWorkflowOutboundCallsInterceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public SimpleCountWorkflowOutboundCallsInterceptor(WorkflowOutboundCallsIntercep
3232

3333
@Override
3434
public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
35-
Counter.add(Workflow.getInfo().getWorkflowId(), Counter.NUM_OF_CHILD_WORKFLOW_EXECUTIONS);
35+
WorkerCounter.add(
36+
Workflow.getInfo().getWorkflowId(), WorkerCounter.NUM_OF_CHILD_WORKFLOW_EXECUTIONS);
3637
return super.executeChildWorkflow(input);
3738
}
3839
}

src/main/java/io/temporal/samples/interceptor/Counter.java renamed to src/main/java/io/temporal/samples/interceptor/WorkerCounter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* Simple counter class. Static impl just for the sake of the sample. Note: in your applications you
3131
* should use CDI for example instead.
3232
*/
33-
public class Counter {
33+
public class WorkerCounter {
3434
private static Map<String, Map<String, Integer>> perWorkflowIdMap =
3535
Collections.synchronizedMap(new HashMap<>());
3636

0 commit comments

Comments
 (0)