Skip to content

Commit 902591b

Browse files
authored
Sample: Auto Heartbeat via Interceptor (#745)
* Sample: Auto Heartbeat via Interceptor Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * updates Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * updates per recommendations Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * formatting fix Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> --------- Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 38372fa commit 902591b

10 files changed

Lines changed: 615 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ See the README.md file in each main sample directory for cut/paste Gradle comman
7979
- [**HelloDelayedStart**](/core/src/main/java/io/temporal/samples/hello/HelloDelayedStart.java): Demonstrates how to use delayed start config option when starting a Workflow Executions.
8080
- [**HelloSignalWithTimer**](/core/src/main/java/io/temporal/samples/hello/HelloSignalWithTimer.java): Demonstrates how to use collect signals for certain amount of time and then process last one.
8181
- [**HelloWorkflowTimer**](/core/src/main/java/io/temporal/samples/hello/HelloWorkflowTimer.java): Demonstrates how we can use workflow timer to restrict duration of workflow execution instead of workflow run/execution timeouts.
82+
- [**Auto-Heartbeating**](/core/src/main/java/io/temporal/samples/autoheartbeat/): Demonstrates use of Auto-heartbeating utility via activity interceptor.
8283

8384

8485
#### Scenario-based samples
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.activity.ActivityExecutionContext;
23+
import java.time.Instant;
24+
import java.time.ZoneId;
25+
import java.time.format.DateTimeFormatter;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
31+
public class AutoHeartbeatUtil {
32+
private final long period;
33+
private final long initialDelay;
34+
private final TimeUnit periodTimeUnit;
35+
private final ScheduledExecutorService timerService =
36+
Executors.newSingleThreadScheduledExecutor();
37+
private final ActivityExecutionContext context;
38+
private final Object details;
39+
private String heartbeaterId;
40+
41+
public AutoHeartbeatUtil(
42+
long period,
43+
long initialDelay,
44+
TimeUnit periodTimeUnit,
45+
ActivityExecutionContext context,
46+
Object details) {
47+
this.period = period;
48+
this.initialDelay = initialDelay;
49+
this.periodTimeUnit = periodTimeUnit;
50+
this.context = context;
51+
this.details = details;
52+
// Set to activity id better, for sample we just use type
53+
heartbeaterId = context.getInfo().getActivityType();
54+
}
55+
56+
public ScheduledFuture<?> start() {
57+
System.out.println("Autoheartbeater[" + heartbeaterId + "] starting...");
58+
return timerService.scheduleAtFixedRate(
59+
() -> {
60+
// try {
61+
System.out.println(
62+
"Autoheartbeater["
63+
+ heartbeaterId
64+
+ "]"
65+
+ "heartbeating at: "
66+
+ printShortCurrentTime());
67+
context.heartbeat(details);
68+
},
69+
initialDelay,
70+
period,
71+
periodTimeUnit);
72+
}
73+
74+
public void stop() {
75+
System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop.");
76+
// Try not to execute another heartbeat that could have been queued up
77+
// Note this can at times take a second or two so make sure to test this out on your workers
78+
// So can set best heartbeat timeout (sometimes might need larger value to accomodate)
79+
timerService.shutdownNow();
80+
}
81+
82+
private String printShortCurrentTime() {
83+
return DateTimeFormatter.ofPattern("HH:mm:ss")
84+
.withZone(ZoneId.systemDefault())
85+
.format(Instant.now());
86+
}
87+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Auto-heartbeating sample for activities that define HeartbeatTimeout
2+
3+
This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor to all
4+
activities where you define HeartbeatTimeout. Use case where this can be helpful include situations where you have
5+
long-running activities where you want to heartbeat but its difficult to explicitly call heartbeat api in activity code
6+
directly.
7+
Another useful scenario for this is where you have activity that at times can complete in very short amount of time,
8+
but then at times can take for example minutes. In this case you have to set longer StartToClose timeout
9+
but you might not want first heartbeat to be sent right away but send it after the "shorter" duration of activity
10+
execution.
11+
12+
Warning: make sure to test this sample for your use case. This includes load testing. This sample was not
13+
tested on large scale workloads. In addition note that it is recommended to heartbeat from activity code itself. Using
14+
this type of autoheartbeating utility does have disatvantage that activity code itself can continue running after
15+
a handled activity cancelation. Please be aware of these warnings when applying this sample.
16+
17+
1. Start the Sample:
18+
19+
```bash
20+
./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter
21+
```
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.client.WorkflowOptions;
24+
import io.temporal.client.WorkflowStub;
25+
import io.temporal.failure.CanceledFailure;
26+
import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl;
27+
import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor;
28+
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow;
29+
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl;
30+
import io.temporal.serviceclient.WorkflowServiceStubs;
31+
import io.temporal.worker.Worker;
32+
import io.temporal.worker.WorkerFactory;
33+
import io.temporal.worker.WorkerFactoryOptions;
34+
35+
public class Starter {
36+
static final String TASK_QUEUE = "AutoheartbeatTaskQueue";
37+
static final String WORKFLOW_ID = "AutoHeartbeatWorkflow";
38+
39+
public static void main(String[] args) {
40+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
41+
WorkflowClient client = WorkflowClient.newInstance(service);
42+
43+
// Configure our auto heartbeat workflow interceptor which will apply
44+
// AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat
45+
// timeout configured
46+
WorkerFactoryOptions wfo =
47+
WorkerFactoryOptions.newBuilder()
48+
.setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor())
49+
.build();
50+
51+
WorkerFactory factory = WorkerFactory.newInstance(client, wfo);
52+
Worker worker = factory.newWorker(TASK_QUEUE);
53+
54+
worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class);
55+
worker.registerActivitiesImplementations(new AutoActivitiesImpl());
56+
57+
factory.start();
58+
59+
// first run completes execution with autoheartbeat utils
60+
firstRun(client);
61+
// second run cancels running (pending) activity via signal (specific scope cancel)
62+
secondRun(client);
63+
// third run cancels running execution which cancels activity as well
64+
thirdRun(client);
65+
// fourth run turns off autoheartbeat for activities and lets activity time out on heartbeat
66+
// timeout
67+
fourthRun(client);
68+
69+
System.exit(0);
70+
}
71+
72+
@SuppressWarnings("unused")
73+
private static void firstRun(WorkflowClient client) {
74+
System.out.println("**** First Run: run workflow to completion");
75+
AutoWorkflow firstRun =
76+
client.newWorkflowStub(
77+
AutoWorkflow.class,
78+
WorkflowOptions.newBuilder()
79+
.setWorkflowId(WORKFLOW_ID)
80+
.setTaskQueue(TASK_QUEUE)
81+
.build());
82+
83+
try {
84+
String firstRunResult = firstRun.exec("Auto heartbeating is cool");
85+
System.out.println("First run result: " + firstRunResult);
86+
} catch (Exception e) {
87+
System.out.println("First run - Workflow exec exception: " + e.getClass().getName());
88+
}
89+
}
90+
91+
@SuppressWarnings("unused")
92+
private static void secondRun(WorkflowClient client) {
93+
System.out.println("\n\n**** Second Run: cancel activities via signal");
94+
AutoWorkflow secondRun =
95+
client.newWorkflowStub(
96+
AutoWorkflow.class,
97+
WorkflowOptions.newBuilder()
98+
.setWorkflowId(WORKFLOW_ID)
99+
.setTaskQueue(TASK_QUEUE)
100+
.build());
101+
WorkflowClient.start(secondRun::exec, "Auto heartbeating is cool");
102+
doSleeps(4);
103+
secondRun.cancelActivity();
104+
105+
try {
106+
String secondRunResult = WorkflowStub.fromTyped(secondRun).getResult(String.class);
107+
System.out.println("Second run result: " + secondRunResult);
108+
} catch (Exception e) {
109+
System.out.println("Second run - Workflow exec exception: " + e.getClass().getName());
110+
}
111+
}
112+
113+
@SuppressWarnings("unused")
114+
private static void thirdRun(WorkflowClient client) {
115+
System.out.println("\n\n**** Third Run: cancel workflow execution");
116+
AutoWorkflow thirdRun =
117+
client.newWorkflowStub(
118+
AutoWorkflow.class,
119+
WorkflowOptions.newBuilder()
120+
.setWorkflowId(WORKFLOW_ID)
121+
.setTaskQueue(TASK_QUEUE)
122+
.build());
123+
WorkflowClient.start(thirdRun::exec, "Auto heartbeating is cool");
124+
doSleeps(10);
125+
try {
126+
WorkflowStub.fromTyped(thirdRun).cancel();
127+
String thirdRunResult = WorkflowStub.fromTyped(thirdRun).getResult(String.class);
128+
System.out.println("Third run result: " + thirdRunResult);
129+
} catch (Exception e) {
130+
// we are expecting workflow cancelation
131+
if (e.getCause() instanceof CanceledFailure) {
132+
System.out.println("Third run - Workflow execution canceled.");
133+
} else {
134+
System.out.println("Third run - Workflow exec exception: " + e.getMessage());
135+
}
136+
}
137+
}
138+
139+
@SuppressWarnings("unused")
140+
private static void fourthRun(WorkflowClient client) {
141+
System.out.println("\n\n**** Fourth Run: cause heartbeat timeout");
142+
// we disable autoheartbeat via env var
143+
System.setProperty("sample.disableAutoHeartbeat", "true");
144+
AutoWorkflow fourth =
145+
client.newWorkflowStub(
146+
AutoWorkflow.class,
147+
WorkflowOptions.newBuilder()
148+
.setWorkflowId(WORKFLOW_ID)
149+
.setTaskQueue(TASK_QUEUE)
150+
.build());
151+
152+
try {
153+
String fourthRunResult = fourth.exec("Auto heartbeating is cool");
154+
System.out.println("Fourth run result: " + fourthRunResult);
155+
} catch (Exception e) {
156+
System.out.println("Fourth run - Workflow exec exception: " + e.getClass().getName());
157+
}
158+
}
159+
160+
private static void doSleeps(int seconds) {
161+
try {
162+
Thread.sleep(seconds * 1000L);
163+
} catch (Exception e) {
164+
System.out.println(e.getMessage());
165+
}
166+
}
167+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat.activities;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
24+
@ActivityInterface
25+
public interface AutoActivities {
26+
String runActivityOne(String input);
27+
28+
String runActivityTwo(String input);
29+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.autoheartbeat.activities;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
public class AutoActivitiesImpl implements AutoActivities {
25+
26+
@Override
27+
public String runActivityOne(String input) {
28+
return runActivity("runActivityOne - " + input, 10);
29+
}
30+
31+
@Override
32+
public String runActivityTwo(String input) {
33+
return runActivity("runActivityTwo - " + input, 5);
34+
}
35+
36+
@SuppressWarnings("FutureReturnValueIgnored")
37+
private String runActivity(String input, int seconds) {
38+
for (int i = 0; i < seconds; i++) {
39+
sleep(1);
40+
}
41+
return "Activity completed: " + input;
42+
}
43+
44+
private void sleep(int seconds) {
45+
try {
46+
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
47+
} catch (InterruptedException ee) {
48+
// Empty
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)