1919
2020package io .temporal .samples .resetworkflows ;
2121
22+ import com .google .protobuf .ByteString ;
2223import io .temporal .api .enums .v1 .WorkflowExecutionStatus ;
2324import io .temporal .api .workflow .v1 .WorkflowExecutionInfo ;
2425import io .temporal .api .workflowservice .v1 .ListWorkflowExecutionsRequest ;
2930import io .temporal .client .WorkflowOptions ;
3031import io .temporal .client .WorkflowStub ;
3132import io .temporal .serviceclient .WorkflowServiceStubs ;
32- import java .util .List ;
33+ import java .time .OffsetDateTime ;
34+ import java .time .ZoneOffset ;
35+ import java .time .format .DateTimeFormatter ;
36+ import java .util .UUID ;
3337import java .util .concurrent .TimeUnit ;
3438
3539public class Starter {
@@ -50,96 +54,93 @@ public static void main(String[] args) {
5054 throw new RuntimeException ("Exception happened in thread sleep: " , e );
5155 }
5256
53- // query "new" customers for all "CustomerWorkflow" workflows with status "Running" (1)
54- ListWorkflowExecutionsResponse listFailedWorkflows =
55- getExecutionsResponse (
56- "WorkflowType='ResetWorkflow' and ExecutionStatus="
57- + WorkflowExecutionStatus .WORKFLOW_EXECUTION_STATUS_FAILED_VALUE );
57+ String fiveMinutesAgo = dateOffset (20 );
58+ resetWorkflowExecutions (
59+ "WorkflowType='ResetWorkflow' and ExecutionStatus="
60+ + WorkflowExecutionStatus .WORKFLOW_EXECUTION_STATUS_FAILED_VALUE
61+ + " and CloseTime>"
62+ + "'"
63+ + fiveMinutesAgo
64+ + "'" ,
65+ null );
5866
5967 // sleep for 3 seconds before we shut down the worker
6068 sleep (5 );
6169
62- List <WorkflowExecutionInfo > newExecutionInfo = listFailedWorkflows .getExecutionsList ();
63- batchResetWorkflows (newExecutionInfo );
64- // for (WorkflowExecutionInfo wei : newExecutionInfo) {
65- // System.out.println("Resetting workflow: " + wei.getExecution().getWorkflowId());
66- // System.out.println(wei.getStatusValue());
67- // resetWorkflow(wei.getExecution().getWorkflowId());
68- // }
69-
70+ // Cleanup
71+ service .shutdown ();
7072 System .exit (0 );
7173 }
7274
73- private static ListWorkflowExecutionsResponse getExecutionsResponse (String query ) {
74- ListWorkflowExecutionsRequest listWorkflowExecutionRequest =
75- ListWorkflowExecutionsRequest .newBuilder ()
76- .setNamespace (client .getOptions ().getNamespace ())
77- .setQuery (query )
78- .build ();
79- ListWorkflowExecutionsResponse listWorkflowExecutionsResponse =
80- service .blockingStub ().listWorkflowExecutions (listWorkflowExecutionRequest );
81- return listWorkflowExecutionsResponse ;
82- }
75+ private static void resetWorkflowExecutions (String query , ByteString token ) {
76+
77+ ListWorkflowExecutionsRequest request ;
8378
84- // private static void resetWorkflow(String workflowId) {
85- // long eventId = 3;
86- // WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId);
87- //
88- // ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
89- // ResetWorkflowExecutionRequest.newBuilder()
90- // .setRequestId(UUID.randomUUID().toString())
91- // .setNamespace("default")
92- // .setWorkflowExecution(existingUntyped.getExecution())
93- // .setWorkflowTaskFinishEventId(eventId)
94- // .setReason("Doing a workflow reset...")
95- // .build();
96- //
97- // try {
98- // ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
99- // service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest);
100- // System.out.println(
101- // "Workflow with ID "
102- // + workflowId
103- // + " has been reset. New RunId: "
104- // + resetWorkflowExecutionResponse.getRunId());
105- // } catch (Exception e) {
106- // System.err.println("Failed to reset workflow with ID " + workflowId);
107- // }
108- //
109- // // Cleanup
110- // service.shutdown();
111- // }
112-
113- private static void batchResetWorkflows (List <WorkflowExecutionInfo > newExecutionInfo ) {
114- for (WorkflowExecutionInfo wei : newExecutionInfo ) {
115- String workflowId = wei .getExecution ().getWorkflowId ();
116- long eventId = 3 ;
117- WorkflowStub existingUntyped = client .newUntypedWorkflowStub (workflowId );
118-
119- ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
120- ResetWorkflowExecutionRequest .newBuilder ()
121- .setNamespace ("default" )
122- .setWorkflowExecution (existingUntyped .getExecution ())
123- .setWorkflowTaskFinishEventId (eventId )
124- .setRequestId (java .util .UUID .randomUUID ().toString ())
125- .setReason ("Doing a batch reset..." )
79+ if (token == null ) {
80+ request =
81+ ListWorkflowExecutionsRequest .newBuilder ()
82+ .setNamespace (client .getOptions ().getNamespace ())
83+ .setQuery (query )
12684 .build ();
85+ } else {
86+ request =
87+ ListWorkflowExecutionsRequest .newBuilder ()
88+ .setNamespace (client .getOptions ().getNamespace ())
89+ .setQuery (query )
90+ .setNextPageToken (token )
91+ .build ();
92+ }
12793
128- try {
129- ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
130- service .blockingStub ().resetWorkflowExecution (resetWorkflowExecutionRequest );
94+ ListWorkflowExecutionsResponse response =
95+ service .blockingStub ().listWorkflowExecutions (request );
96+
97+ for (WorkflowExecutionInfo workflowExecutionInfo : response .getExecutionsList ()) {
98+ System .out .println (
99+ "Workflow ID: "
100+ + workflowExecutionInfo .getExecution ().getWorkflowId ()
101+ + " Run ID: "
102+ + workflowExecutionInfo .getExecution ().getRunId ()
103+ + " Status: "
104+ + workflowExecutionInfo .getStatus ());
105+ if (workflowExecutionInfo .getParentExecution () != null ) {
131106 System .out .println (
132- "Workflow with ID "
133- + workflowId
134- + " has been reset. New RunId: "
135- + resetWorkflowExecutionResponse .getRunId ());
136- } catch (Exception e ) {
137- System .err .println ("Failed to reset workflow with ID " + workflowId );
107+ "****** PARENT: "
108+ + workflowExecutionInfo .getExecution ().getWorkflowId ()
109+ + " - "
110+ + workflowExecutionInfo .getExecution ().getRunId ());
111+ resetWorkflow (workflowExecutionInfo .getExecution ().getWorkflowId ());
138112 }
139113 }
140114
141- // Cleanup
142- service .shutdown ();
115+ if (response .getNextPageToken () != null && response .getNextPageToken ().size () > 0 ) {
116+ resetWorkflowExecutions (query , response .getNextPageToken ());
117+ }
118+ }
119+
120+ private static void resetWorkflow (String workflowId ) {
121+ long eventId = 3 ;
122+ WorkflowStub existingUntyped = client .newUntypedWorkflowStub (workflowId );
123+
124+ ResetWorkflowExecutionRequest resetWorkflowExecutionRequest =
125+ ResetWorkflowExecutionRequest .newBuilder ()
126+ .setRequestId (UUID .randomUUID ().toString ())
127+ .setNamespace ("default" )
128+ .setWorkflowExecution (existingUntyped .getExecution ())
129+ .setWorkflowTaskFinishEventId (eventId )
130+ .setReason ("Doing a workflow reset..." )
131+ .build ();
132+
133+ try {
134+ ResetWorkflowExecutionResponse resetWorkflowExecutionResponse =
135+ service .blockingStub ().resetWorkflowExecution (resetWorkflowExecutionRequest );
136+ System .out .println (
137+ "Workflow with ID "
138+ + workflowId
139+ + " has been reset. New RunId: "
140+ + resetWorkflowExecutionResponse .getRunId ());
141+ } catch (Exception e ) {
142+ System .err .println ("Failed to reset workflow with ID " + workflowId );
143+ }
143144 }
144145
145146 private static void startWorkflow () {
@@ -168,4 +169,15 @@ private static void sleep(int seconds) {
168169 System .exit (0 );
169170 }
170171 }
172+
173+ private static String dateOffset (long seconds ) {
174+ long currentTimeMillis = System .currentTimeMillis () - (seconds * 1000 );
175+ OffsetDateTime offsetDateTime =
176+ OffsetDateTime .ofInstant (java .time .Instant .ofEpochMilli (currentTimeMillis ), ZoneOffset .UTC );
177+
178+ DateTimeFormatter formatter = DateTimeFormatter .ofPattern ("yyyy-MM-dd'T'HH:mm:ssXXX" );
179+ String formattedDateTime = offsetDateTime .format (formatter );
180+
181+ return formattedDateTime ;
182+ }
171183}
0 commit comments