Skip to content

Commit fc6bf5b

Browse files
author
Keith Tenzer
committed
Initial checkin for reset workflows sample
Signed-off-by: Keith Tenzer <ktenzer@keiths-mbp.lan>
1 parent 30c0355 commit fc6bf5b

File tree

7 files changed

+429
-0
lines changed

7 files changed

+429
-0
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Demo List Workflows
2+
3+
The sample demonstrates:
4+
1) Setting custom search attributes for a Workflow
5+
2) Using ListWorkflowExecutionsRequest and custom Search Attribute query to list
6+
Workflow Executions that match that query
7+
8+
## Running
9+
10+
1. Unlike the other examples, this one has to be started with Elasticsearch
11+
capabilities enabled. If you are using docker you can do that with:
12+
13+
```bash
14+
git clone https://github.com/temporalio/docker-compose.git
15+
cd docker-compose
16+
docker-compose -f docker-compose-cas-es.yml up
17+
```
18+
19+
2.
20+
Run the following command to start the sample:
21+
22+
```bash
23+
./gradlew -q execute -PmainClass=io.temporal.samples.listworkflows.Starter
24+
```
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.resetworkflows;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
24+
@ActivityInterface
25+
public interface ResetActivities {
26+
void activityOne();
27+
28+
void activityTwo();
29+
30+
void activityThree();
31+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.resetworkflows;
21+
22+
import java.util.Random;
23+
import java.util.concurrent.TimeUnit;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class ResetActivitiesImpl implements ResetActivities {
28+
29+
private static final Logger log = LoggerFactory.getLogger(ResetActivitiesImpl.class);
30+
31+
@Override
32+
public void activityOne() {
33+
// simulate some actual work...
34+
sleepSeconds(1);
35+
thisMayOrMayNotThrowAnError("activityOne");
36+
}
37+
38+
@Override
39+
public void activityTwo() {
40+
// simulate some actual work...
41+
sleepSeconds(1);
42+
thisMayOrMayNotThrowAnError("activityTwo");
43+
}
44+
45+
@Override
46+
public void activityThree() {
47+
// simulate some actual work...
48+
sleepSeconds(1);
49+
thisMayOrMayNotThrowAnError("activityThree");
50+
}
51+
52+
private void sleepSeconds(int seconds) {
53+
try {
54+
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
55+
} catch (InterruptedException e) {
56+
// This is being swallowed on purpose
57+
Thread.currentThread().interrupt();
58+
log.error("Exception in thread sleep: ", e);
59+
}
60+
}
61+
62+
private void thisMayOrMayNotThrowAnError(String activityName) {
63+
Random random = new Random();
64+
double randomValue = random.nextDouble();
65+
log.info("**** Random value: {}", randomValue);
66+
if (randomValue < 0.10) { // 10% chance of failure
67+
log.info("Activity {} failed...", activityName);
68+
throw new IllegalArgumentException("Activity has illegal argument...failing");
69+
}
70+
}
71+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.resetworkflows;
21+
22+
import io.temporal.workflow.WorkflowInterface;
23+
import io.temporal.workflow.WorkflowMethod;
24+
25+
@WorkflowInterface
26+
public interface ResetWorkflow {
27+
@WorkflowMethod
28+
void runActivities(String message);
29+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.resetworkflows;
21+
22+
import io.temporal.activity.ActivityOptions;
23+
import io.temporal.common.RetryOptions;
24+
import io.temporal.failure.ActivityFailure;
25+
import io.temporal.failure.ApplicationFailure;
26+
import io.temporal.workflow.Workflow;
27+
import java.time.Duration;
28+
29+
public class ResetWorkflowImpl implements ResetWorkflow {
30+
// private boolean exit;
31+
private final ResetActivities resetActivities =
32+
Workflow.newActivityStub(
33+
ResetActivities.class,
34+
ActivityOptions.newBuilder()
35+
.setStartToCloseTimeout(Duration.ofSeconds(10))
36+
.setRetryOptions(
37+
RetryOptions.newBuilder()
38+
.setInitialInterval(Duration.ofSeconds(1))
39+
.setDoNotRetry(IllegalArgumentException.class.getName())
40+
.build())
41+
.build());
42+
// private final RetryOptions customerRetryOptions =
43+
// RetryOptions.newBuilder().setMaximumAttempts(5).build();
44+
// private final Duration expiration = Duration.ofMinutes(1);
45+
46+
@Override
47+
public void runActivities(String message) {
48+
49+
try {
50+
resetActivities.activityOne();
51+
resetActivities.activityTwo();
52+
resetActivities.activityThree();
53+
} catch (ActivityFailure e) {
54+
System.out.println("Activity failed: " + e.getMessage());
55+
throw ApplicationFailure.newNonRetryableFailure(
56+
"simulated failure", "some error", "ActivityFailure");
57+
}
58+
}
59+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.resetworkflows;
21+
22+
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
23+
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
24+
import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest;
25+
import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsResponse;
26+
import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionRequest;
27+
import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse;
28+
import io.temporal.client.WorkflowClient;
29+
import io.temporal.client.WorkflowOptions;
30+
import io.temporal.client.WorkflowStub;
31+
import io.temporal.serviceclient.WorkflowServiceStubs;
32+
import java.util.List;
33+
import java.util.concurrent.TimeUnit;
34+
35+
public class Starter {
36+
public static final String TASK_QUEUE = "resetTaskQueue";
37+
private static final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
38+
private static final WorkflowClient client = WorkflowClient.newInstance(service);
39+
// private static final WorkerFactory factory = WorkerFactory.newInstance(client);
40+
41+
public static void main(String[] args) {
42+
43+
// start customer workflows and define custom search attributes for each
44+
startWorkflow();
45+
46+
// small delay before we start querying executions
47+
try {
48+
Thread.sleep(2 * 1000);
49+
} catch (InterruptedException e) {
50+
throw new RuntimeException("Exception happened in thread sleep: ", e);
51+
}
52+
53+
// query "new" customers for all "CustomerWorkflow" workflows with status "Running" (1)
54+
ListWorkflowExecutionsResponse listFailedWorkflows =
55+
getExecutionsResponse(
56+
"WorkflowType='ResetWorkflow' and ExecutionStatus="
57+
+ WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE);
58+
59+
// sleep for 3 seconds before we shut down the worker
60+
sleep(5);
61+
62+
List<WorkflowExecutionInfo> newExecutionInfo = listFailedWorkflows.getExecutionsList();
63+
batchResetWorkflows(newExecutionInfo);
64+
// for (WorkflowExecutionInfo wei : newExecutionInfo) {
65+
// System.out.println("Resetting workflow: " + wei.getExecution().getWorkflowId());
66+
// System.out.println(wei.getStatusValue());
67+
// resetWorkflow(wei.getExecution().getWorkflowId());
68+
// }
69+
70+
System.exit(0);
71+
}
72+
73+
private static ListWorkflowExecutionsResponse getExecutionsResponse(String query) {
74+
ListWorkflowExecutionsRequest listWorkflowExecutionRequest =
75+
ListWorkflowExecutionsRequest.newBuilder()
76+
.setNamespace(client.getOptions().getNamespace())
77+
.setQuery(query)
78+
.build();
79+
ListWorkflowExecutionsResponse listWorkflowExecutionsResponse =
80+
service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest);
81+
return listWorkflowExecutionsResponse;
82+
}
83+
84+
// private static void resetWorkflow(String workflowId) {
85+
// long eventId = 3;
86+
// WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId);
87+
//
88+
// ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
89+
// ResetWorkflowExecutionRequest.newBuilder()
90+
// .setRequestId(UUID.randomUUID().toString())
91+
// .setNamespace("default")
92+
// .setWorkflowExecution(existingUntyped.getExecution())
93+
// .setWorkflowTaskFinishEventId(eventId)
94+
// .setReason("Doing a workflow reset...")
95+
// .build();
96+
//
97+
// try {
98+
// ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
99+
// service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest);
100+
// System.out.println(
101+
// "Workflow with ID "
102+
// + workflowId
103+
// + " has been reset. New RunId: "
104+
// + resetWorkflowExecutionResponse.getRunId());
105+
// } catch (Exception e) {
106+
// System.err.println("Failed to reset workflow with ID " + workflowId);
107+
// }
108+
//
109+
// // Cleanup
110+
// service.shutdown();
111+
// }
112+
113+
private static void batchResetWorkflows(List<WorkflowExecutionInfo> newExecutionInfo) {
114+
for (WorkflowExecutionInfo wei : newExecutionInfo) {
115+
String workflowId = wei.getExecution().getWorkflowId();
116+
long eventId = 3;
117+
WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId);
118+
119+
ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
120+
ResetWorkflowExecutionRequest.newBuilder()
121+
.setNamespace("default")
122+
.setWorkflowExecution(existingUntyped.getExecution())
123+
.setWorkflowTaskFinishEventId(eventId)
124+
.setRequestId(java.util.UUID.randomUUID().toString())
125+
.setReason("Doing a batch reset...")
126+
.build();
127+
128+
try {
129+
ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
130+
service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest);
131+
System.out.println(
132+
"Workflow with ID "
133+
+ workflowId
134+
+ " has been reset. New RunId: "
135+
+ resetWorkflowExecutionResponse.getRunId());
136+
} catch (Exception e) {
137+
System.err.println("Failed to reset workflow with ID " + workflowId);
138+
}
139+
}
140+
141+
// Cleanup
142+
service.shutdown();
143+
}
144+
145+
private static void startWorkflow() {
146+
// start a workflow for each customer that we need to add message to account
147+
148+
for (int i = 1; i < 6; i++) {
149+
String message = "Running reset workflow: " + i;
150+
151+
WorkflowOptions newResetWorkflowOptions =
152+
WorkflowOptions.newBuilder()
153+
.setWorkflowId("reset-workflow-" + i)
154+
.setTaskQueue(TASK_QUEUE)
155+
.build();
156+
ResetWorkflow newResetWorkflow =
157+
client.newWorkflowStub(ResetWorkflow.class, newResetWorkflowOptions);
158+
// start async
159+
WorkflowClient.start(newResetWorkflow::runActivities, message);
160+
}
161+
}
162+
163+
private static void sleep(int seconds) {
164+
try {
165+
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
166+
} catch (InterruptedException e) {
167+
System.out.println("Exception: " + e.getMessage());
168+
System.exit(0);
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)