Skip to content

Commit 614eed9

Browse files
author
Keith Tenzer
committed
re-worked starter to use pagination for handling resets
Signed-off-by: Keith Tenzer <ktenzer@keiths-mbp.lan>
1 parent fc6bf5b commit 614eed9

File tree

1 file changed

+89
-77
lines changed
  • core/src/main/java/io/temporal/samples/resetworkflows

1 file changed

+89
-77
lines changed

core/src/main/java/io/temporal/samples/resetworkflows/Starter.java

Lines changed: 89 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package io.temporal.samples.resetworkflows;
2121

22+
import com.google.protobuf.ByteString;
2223
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
2324
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
2425
import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest;
@@ -29,7 +30,10 @@
2930
import io.temporal.client.WorkflowOptions;
3031
import io.temporal.client.WorkflowStub;
3132
import io.temporal.serviceclient.WorkflowServiceStubs;
32-
import java.util.List;
33+
import java.time.OffsetDateTime;
34+
import java.time.ZoneOffset;
35+
import java.time.format.DateTimeFormatter;
36+
import java.util.UUID;
3337
import java.util.concurrent.TimeUnit;
3438

3539
public class Starter {
@@ -50,96 +54,93 @@ public static void main(String[] args) {
5054
throw new RuntimeException("Exception happened in thread sleep: ", e);
5155
}
5256

53-
// query "new" customers for all "CustomerWorkflow" workflows with status "Running" (1)
54-
ListWorkflowExecutionsResponse listFailedWorkflows =
55-
getExecutionsResponse(
56-
"WorkflowType='ResetWorkflow' and ExecutionStatus="
57-
+ WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE);
57+
String fiveMinutesAgo = dateOffset(20);
58+
resetWorkflowExecutions(
59+
"WorkflowType='ResetWorkflow' and ExecutionStatus="
60+
+ WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE
61+
+ " and CloseTime>"
62+
+ "'"
63+
+ fiveMinutesAgo
64+
+ "'",
65+
null);
5866

5967
// sleep for 3 seconds before we shut down the worker
6068
sleep(5);
6169

62-
List<WorkflowExecutionInfo> newExecutionInfo = listFailedWorkflows.getExecutionsList();
63-
batchResetWorkflows(newExecutionInfo);
64-
// for (WorkflowExecutionInfo wei : newExecutionInfo) {
65-
// System.out.println("Resetting workflow: " + wei.getExecution().getWorkflowId());
66-
// System.out.println(wei.getStatusValue());
67-
// resetWorkflow(wei.getExecution().getWorkflowId());
68-
// }
69-
70+
// Cleanup
71+
service.shutdown();
7072
System.exit(0);
7173
}
7274

73-
private static ListWorkflowExecutionsResponse getExecutionsResponse(String query) {
74-
ListWorkflowExecutionsRequest listWorkflowExecutionRequest =
75-
ListWorkflowExecutionsRequest.newBuilder()
76-
.setNamespace(client.getOptions().getNamespace())
77-
.setQuery(query)
78-
.build();
79-
ListWorkflowExecutionsResponse listWorkflowExecutionsResponse =
80-
service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest);
81-
return listWorkflowExecutionsResponse;
82-
}
75+
private static void resetWorkflowExecutions(String query, ByteString token) {
76+
77+
ListWorkflowExecutionsRequest request;
8378

84-
// private static void resetWorkflow(String workflowId) {
85-
// long eventId = 3;
86-
// WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId);
87-
//
88-
// ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
89-
// ResetWorkflowExecutionRequest.newBuilder()
90-
// .setRequestId(UUID.randomUUID().toString())
91-
// .setNamespace("default")
92-
// .setWorkflowExecution(existingUntyped.getExecution())
93-
// .setWorkflowTaskFinishEventId(eventId)
94-
// .setReason("Doing a workflow reset...")
95-
// .build();
96-
//
97-
// try {
98-
// ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
99-
// service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest);
100-
// System.out.println(
101-
// "Workflow with ID "
102-
// + workflowId
103-
// + " has been reset. New RunId: "
104-
// + resetWorkflowExecutionResponse.getRunId());
105-
// } catch (Exception e) {
106-
// System.err.println("Failed to reset workflow with ID " + workflowId);
107-
// }
108-
//
109-
// // Cleanup
110-
// service.shutdown();
111-
// }
112-
113-
private static void batchResetWorkflows(List<WorkflowExecutionInfo> newExecutionInfo) {
114-
for (WorkflowExecutionInfo wei : newExecutionInfo) {
115-
String workflowId = wei.getExecution().getWorkflowId();
116-
long eventId = 3;
117-
WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId);
118-
119-
ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
120-
ResetWorkflowExecutionRequest.newBuilder()
121-
.setNamespace("default")
122-
.setWorkflowExecution(existingUntyped.getExecution())
123-
.setWorkflowTaskFinishEventId(eventId)
124-
.setRequestId(java.util.UUID.randomUUID().toString())
125-
.setReason("Doing a batch reset...")
79+
if (token == null) {
80+
request =
81+
ListWorkflowExecutionsRequest.newBuilder()
82+
.setNamespace(client.getOptions().getNamespace())
83+
.setQuery(query)
12684
.build();
85+
} else {
86+
request =
87+
ListWorkflowExecutionsRequest.newBuilder()
88+
.setNamespace(client.getOptions().getNamespace())
89+
.setQuery(query)
90+
.setNextPageToken(token)
91+
.build();
92+
}
12793

128-
try {
129-
ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
130-
service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest);
94+
ListWorkflowExecutionsResponse response =
95+
service.blockingStub().listWorkflowExecutions(request);
96+
97+
for (WorkflowExecutionInfo workflowExecutionInfo : response.getExecutionsList()) {
98+
System.out.println(
99+
"Workflow ID: "
100+
+ workflowExecutionInfo.getExecution().getWorkflowId()
101+
+ " Run ID: "
102+
+ workflowExecutionInfo.getExecution().getRunId()
103+
+ " Status: "
104+
+ workflowExecutionInfo.getStatus());
105+
if (workflowExecutionInfo.getParentExecution() != null) {
131106
System.out.println(
132-
"Workflow with ID "
133-
+ workflowId
134-
+ " has been reset. New RunId: "
135-
+ resetWorkflowExecutionResponse.getRunId());
136-
} catch (Exception e) {
137-
System.err.println("Failed to reset workflow with ID " + workflowId);
107+
"****** PARENT: "
108+
+ workflowExecutionInfo.getExecution().getWorkflowId()
109+
+ " - "
110+
+ workflowExecutionInfo.getExecution().getRunId());
111+
resetWorkflow(workflowExecutionInfo.getExecution().getWorkflowId());
138112
}
139113
}
140114

141-
// Cleanup
142-
service.shutdown();
115+
if (response.getNextPageToken() != null && response.getNextPageToken().size() > 0) {
116+
resetWorkflowExecutions(query, response.getNextPageToken());
117+
}
118+
}
119+
120+
private static void resetWorkflow(String workflowId) {
121+
long eventId = 3;
122+
WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId);
123+
124+
ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
125+
ResetWorkflowExecutionRequest.newBuilder()
126+
.setRequestId(UUID.randomUUID().toString())
127+
.setNamespace("default")
128+
.setWorkflowExecution(existingUntyped.getExecution())
129+
.setWorkflowTaskFinishEventId(eventId)
130+
.setReason("Doing a workflow reset...")
131+
.build();
132+
133+
try {
134+
ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
135+
service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest);
136+
System.out.println(
137+
"Workflow with ID "
138+
+ workflowId
139+
+ " has been reset. New RunId: "
140+
+ resetWorkflowExecutionResponse.getRunId());
141+
} catch (Exception e) {
142+
System.err.println("Failed to reset workflow with ID " + workflowId);
143+
}
143144
}
144145

145146
private static void startWorkflow() {
@@ -168,4 +169,15 @@ private static void sleep(int seconds) {
168169
System.exit(0);
169170
}
170171
}
172+
173+
private static String dateOffset(long seconds) {
174+
long currentTimeMillis = System.currentTimeMillis() - (seconds * 1000);
175+
OffsetDateTime offsetDateTime =
176+
OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(currentTimeMillis), ZoneOffset.UTC);
177+
178+
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX");
179+
String formattedDateTime = offsetDateTime.format(formatter);
180+
181+
return formattedDateTime;
182+
}
171183
}

0 commit comments

Comments
 (0)