From fc6bf5b358e15a1ff9bddd35c7e7da154b6f26d9 Mon Sep 17 00:00:00 2001 From: Keith Tenzer Date: Fri, 19 Apr 2024 10:00:04 -0700 Subject: [PATCH 1/2] Initial checkin for reset workflows sample Signed-off-by: Keith Tenzer --- .../temporal/samples/resetworkflows/README.md | 24 +++ .../resetworkflows/ResetActivities.java | 31 ++++ .../resetworkflows/ResetActivitiesImpl.java | 71 ++++++++ .../samples/resetworkflows/ResetWorkflow.java | 29 +++ .../resetworkflows/ResetWorkflowImpl.java | 59 ++++++ .../samples/resetworkflows/Starter.java | 171 ++++++++++++++++++ .../samples/resetworkflows/Worker.java | 44 +++++ 7 files changed, 429 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/README.md create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/Starter.java create mode 100644 core/src/main/java/io/temporal/samples/resetworkflows/Worker.java diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/README.md b/core/src/main/java/io/temporal/samples/resetworkflows/README.md new file mode 100644 index 000000000..4eb329695 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/README.md @@ -0,0 +1,24 @@ +# Demo List Workflows + +The sample demonstrates: +1) Setting custom search attributes for a Workflow +2) Using ListWorkflowExecutionsRequest and custom Search Attribute query to list +Workflow Executions that match that query + +## Running + +1. Unlike the other examples, this one has to be started with Elasticsearch +capabilities enabled. If you are using docker you can do that with: + +```bash +git clone https://github.com/temporalio/docker-compose.git +cd docker-compose +docker-compose -f docker-compose-cas-es.yml up +``` + +2. +Run the following command to start the sample: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.listworkflows.Starter +``` diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java new file mode 100644 index 000000000..ec7f0e12b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface ResetActivities { + void activityOne(); + + void activityTwo(); + + void activityThree(); +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java new file mode 100644 index 000000000..8812a4358 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResetActivitiesImpl implements ResetActivities { + + private static final Logger log = LoggerFactory.getLogger(ResetActivitiesImpl.class); + + @Override + public void activityOne() { + // simulate some actual work... + sleepSeconds(1); + thisMayOrMayNotThrowAnError("activityOne"); + } + + @Override + public void activityTwo() { + // simulate some actual work... + sleepSeconds(1); + thisMayOrMayNotThrowAnError("activityTwo"); + } + + @Override + public void activityThree() { + // simulate some actual work... + sleepSeconds(1); + thisMayOrMayNotThrowAnError("activityThree"); + } + + private void sleepSeconds(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException e) { + // This is being swallowed on purpose + Thread.currentThread().interrupt(); + log.error("Exception in thread sleep: ", e); + } + } + + private void thisMayOrMayNotThrowAnError(String activityName) { + Random random = new Random(); + double randomValue = random.nextDouble(); + log.info("**** Random value: {}", randomValue); + if (randomValue < 0.10) { // 10% chance of failure + log.info("Activity {} failed...", activityName); + throw new IllegalArgumentException("Activity has illegal argument...failing"); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java new file mode 100644 index 000000000..7eeff088e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface ResetWorkflow { + @WorkflowMethod + void runActivities(String message); +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java new file mode 100644 index 000000000..205bfe558 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.ApplicationFailure; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class ResetWorkflowImpl implements ResetWorkflow { + // private boolean exit; + private final ResetActivities resetActivities = + Workflow.newActivityStub( + ResetActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(1)) + .setDoNotRetry(IllegalArgumentException.class.getName()) + .build()) + .build()); + // private final RetryOptions customerRetryOptions = + // RetryOptions.newBuilder().setMaximumAttempts(5).build(); + // private final Duration expiration = Duration.ofMinutes(1); + + @Override + public void runActivities(String message) { + + try { + resetActivities.activityOne(); + resetActivities.activityTwo(); + resetActivities.activityThree(); + } catch (ActivityFailure e) { + System.out.println("Activity failed: " + e.getMessage()); + throw ApplicationFailure.newNonRetryableFailure( + "simulated failure", "some error", "ActivityFailure"); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java b/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java new file mode 100644 index 000000000..d32c19c66 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsResponse; +import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class Starter { + public static final String TASK_QUEUE = "resetTaskQueue"; + private static final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + private static final WorkflowClient client = WorkflowClient.newInstance(service); + // private static final WorkerFactory factory = WorkerFactory.newInstance(client); + + public static void main(String[] args) { + + // start customer workflows and define custom search attributes for each + startWorkflow(); + + // small delay before we start querying executions + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + throw new RuntimeException("Exception happened in thread sleep: ", e); + } + + // query "new" customers for all "CustomerWorkflow" workflows with status "Running" (1) + ListWorkflowExecutionsResponse listFailedWorkflows = + getExecutionsResponse( + "WorkflowType='ResetWorkflow' and ExecutionStatus=" + + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE); + + // sleep for 3 seconds before we shut down the worker + sleep(5); + + List newExecutionInfo = listFailedWorkflows.getExecutionsList(); + batchResetWorkflows(newExecutionInfo); + // for (WorkflowExecutionInfo wei : newExecutionInfo) { + // System.out.println("Resetting workflow: " + wei.getExecution().getWorkflowId()); + // System.out.println(wei.getStatusValue()); + // resetWorkflow(wei.getExecution().getWorkflowId()); + // } + + System.exit(0); + } + + private static ListWorkflowExecutionsResponse getExecutionsResponse(String query) { + ListWorkflowExecutionsRequest listWorkflowExecutionRequest = + ListWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setQuery(query) + .build(); + ListWorkflowExecutionsResponse listWorkflowExecutionsResponse = + service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest); + return listWorkflowExecutionsResponse; + } + + // private static void resetWorkflow(String workflowId) { + // long eventId = 3; + // WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId); + // + // ResetWorkflowExecutionRequest resetWorkflowExecutionRequest = + // ResetWorkflowExecutionRequest.newBuilder() + // .setRequestId(UUID.randomUUID().toString()) + // .setNamespace("default") + // .setWorkflowExecution(existingUntyped.getExecution()) + // .setWorkflowTaskFinishEventId(eventId) + // .setReason("Doing a workflow reset...") + // .build(); + // + // try { + // ResetWorkflowExecutionResponse resetWorkflowExecutionResponse = + // service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest); + // System.out.println( + // "Workflow with ID " + // + workflowId + // + " has been reset. New RunId: " + // + resetWorkflowExecutionResponse.getRunId()); + // } catch (Exception e) { + // System.err.println("Failed to reset workflow with ID " + workflowId); + // } + // + // // Cleanup + // service.shutdown(); + // } + + private static void batchResetWorkflows(List newExecutionInfo) { + for (WorkflowExecutionInfo wei : newExecutionInfo) { + String workflowId = wei.getExecution().getWorkflowId(); + long eventId = 3; + WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId); + + ResetWorkflowExecutionRequest resetWorkflowExecutionRequest = + ResetWorkflowExecutionRequest.newBuilder() + .setNamespace("default") + .setWorkflowExecution(existingUntyped.getExecution()) + .setWorkflowTaskFinishEventId(eventId) + .setRequestId(java.util.UUID.randomUUID().toString()) + .setReason("Doing a batch reset...") + .build(); + + try { + ResetWorkflowExecutionResponse resetWorkflowExecutionResponse = + service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest); + System.out.println( + "Workflow with ID " + + workflowId + + " has been reset. New RunId: " + + resetWorkflowExecutionResponse.getRunId()); + } catch (Exception e) { + System.err.println("Failed to reset workflow with ID " + workflowId); + } + } + + // Cleanup + service.shutdown(); + } + + private static void startWorkflow() { + // start a workflow for each customer that we need to add message to account + + for (int i = 1; i < 6; i++) { + String message = "Running reset workflow: " + i; + + WorkflowOptions newResetWorkflowOptions = + WorkflowOptions.newBuilder() + .setWorkflowId("reset-workflow-" + i) + .setTaskQueue(TASK_QUEUE) + .build(); + ResetWorkflow newResetWorkflow = + client.newWorkflowStub(ResetWorkflow.class, newResetWorkflowOptions); + // start async + WorkflowClient.start(newResetWorkflow::runActivities, message); + } + } + + private static void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException e) { + System.out.println("Exception: " + e.getMessage()); + System.exit(0); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/Worker.java b/core/src/main/java/io/temporal/samples/resetworkflows/Worker.java new file mode 100644 index 000000000..a3802b5dc --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/Worker.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; + +public class Worker { + public static final String TASK_QUEUE = "resetTaskQueue"; + private static final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + private static final WorkflowClient client = WorkflowClient.newInstance(service); + private static final WorkerFactory factory = WorkerFactory.newInstance(client); + + public static void main(String[] args) { + // create the worker for workflow and activities + createWorker(); + } + + private static void createWorker() { + io.temporal.worker.Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(ResetWorkflowImpl.class); + worker.registerActivitiesImplementations(new ResetActivitiesImpl()); + + factory.start(); + } +} From 614eed98e43433bc1305ba3b3f3ee3b08ac958aa Mon Sep 17 00:00:00 2001 From: Keith Tenzer Date: Mon, 22 Apr 2024 16:00:19 -0700 Subject: [PATCH 2/2] re-worked starter to use pagination for handling resets Signed-off-by: Keith Tenzer --- .../samples/resetworkflows/Starter.java | 166 ++++++++++-------- 1 file changed, 89 insertions(+), 77 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java b/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java index d32c19c66..642b338e5 100644 --- a/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java +++ b/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java @@ -19,6 +19,7 @@ package io.temporal.samples.resetworkflows; +import com.google.protobuf.ByteString; import io.temporal.api.enums.v1.WorkflowExecutionStatus; import io.temporal.api.workflow.v1.WorkflowExecutionInfo; import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest; @@ -29,7 +30,10 @@ import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs; -import java.util.List; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.UUID; import java.util.concurrent.TimeUnit; public class Starter { @@ -50,96 +54,93 @@ public static void main(String[] args) { throw new RuntimeException("Exception happened in thread sleep: ", e); } - // query "new" customers for all "CustomerWorkflow" workflows with status "Running" (1) - ListWorkflowExecutionsResponse listFailedWorkflows = - getExecutionsResponse( - "WorkflowType='ResetWorkflow' and ExecutionStatus=" - + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE); + String fiveMinutesAgo = dateOffset(20); + resetWorkflowExecutions( + "WorkflowType='ResetWorkflow' and ExecutionStatus=" + + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE + + " and CloseTime>" + + "'" + + fiveMinutesAgo + + "'", + null); // sleep for 3 seconds before we shut down the worker sleep(5); - List newExecutionInfo = listFailedWorkflows.getExecutionsList(); - batchResetWorkflows(newExecutionInfo); - // for (WorkflowExecutionInfo wei : newExecutionInfo) { - // System.out.println("Resetting workflow: " + wei.getExecution().getWorkflowId()); - // System.out.println(wei.getStatusValue()); - // resetWorkflow(wei.getExecution().getWorkflowId()); - // } - + // Cleanup + service.shutdown(); System.exit(0); } - private static ListWorkflowExecutionsResponse getExecutionsResponse(String query) { - ListWorkflowExecutionsRequest listWorkflowExecutionRequest = - ListWorkflowExecutionsRequest.newBuilder() - .setNamespace(client.getOptions().getNamespace()) - .setQuery(query) - .build(); - ListWorkflowExecutionsResponse listWorkflowExecutionsResponse = - service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest); - return listWorkflowExecutionsResponse; - } + private static void resetWorkflowExecutions(String query, ByteString token) { + + ListWorkflowExecutionsRequest request; - // private static void resetWorkflow(String workflowId) { - // long eventId = 3; - // WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId); - // - // ResetWorkflowExecutionRequest resetWorkflowExecutionRequest = - // ResetWorkflowExecutionRequest.newBuilder() - // .setRequestId(UUID.randomUUID().toString()) - // .setNamespace("default") - // .setWorkflowExecution(existingUntyped.getExecution()) - // .setWorkflowTaskFinishEventId(eventId) - // .setReason("Doing a workflow reset...") - // .build(); - // - // try { - // ResetWorkflowExecutionResponse resetWorkflowExecutionResponse = - // service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest); - // System.out.println( - // "Workflow with ID " - // + workflowId - // + " has been reset. New RunId: " - // + resetWorkflowExecutionResponse.getRunId()); - // } catch (Exception e) { - // System.err.println("Failed to reset workflow with ID " + workflowId); - // } - // - // // Cleanup - // service.shutdown(); - // } - - private static void batchResetWorkflows(List newExecutionInfo) { - for (WorkflowExecutionInfo wei : newExecutionInfo) { - String workflowId = wei.getExecution().getWorkflowId(); - long eventId = 3; - WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId); - - ResetWorkflowExecutionRequest resetWorkflowExecutionRequest = - ResetWorkflowExecutionRequest.newBuilder() - .setNamespace("default") - .setWorkflowExecution(existingUntyped.getExecution()) - .setWorkflowTaskFinishEventId(eventId) - .setRequestId(java.util.UUID.randomUUID().toString()) - .setReason("Doing a batch reset...") + if (token == null) { + request = + ListWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setQuery(query) .build(); + } else { + request = + ListWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setQuery(query) + .setNextPageToken(token) + .build(); + } - try { - ResetWorkflowExecutionResponse resetWorkflowExecutionResponse = - service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest); + ListWorkflowExecutionsResponse response = + service.blockingStub().listWorkflowExecutions(request); + + for (WorkflowExecutionInfo workflowExecutionInfo : response.getExecutionsList()) { + System.out.println( + "Workflow ID: " + + workflowExecutionInfo.getExecution().getWorkflowId() + + " Run ID: " + + workflowExecutionInfo.getExecution().getRunId() + + " Status: " + + workflowExecutionInfo.getStatus()); + if (workflowExecutionInfo.getParentExecution() != null) { System.out.println( - "Workflow with ID " - + workflowId - + " has been reset. New RunId: " - + resetWorkflowExecutionResponse.getRunId()); - } catch (Exception e) { - System.err.println("Failed to reset workflow with ID " + workflowId); + "****** PARENT: " + + workflowExecutionInfo.getExecution().getWorkflowId() + + " - " + + workflowExecutionInfo.getExecution().getRunId()); + resetWorkflow(workflowExecutionInfo.getExecution().getWorkflowId()); } } - // Cleanup - service.shutdown(); + if (response.getNextPageToken() != null && response.getNextPageToken().size() > 0) { + resetWorkflowExecutions(query, response.getNextPageToken()); + } + } + + private static void resetWorkflow(String workflowId) { + long eventId = 3; + WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId); + + ResetWorkflowExecutionRequest resetWorkflowExecutionRequest = + ResetWorkflowExecutionRequest.newBuilder() + .setRequestId(UUID.randomUUID().toString()) + .setNamespace("default") + .setWorkflowExecution(existingUntyped.getExecution()) + .setWorkflowTaskFinishEventId(eventId) + .setReason("Doing a workflow reset...") + .build(); + + try { + ResetWorkflowExecutionResponse resetWorkflowExecutionResponse = + service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest); + System.out.println( + "Workflow with ID " + + workflowId + + " has been reset. New RunId: " + + resetWorkflowExecutionResponse.getRunId()); + } catch (Exception e) { + System.err.println("Failed to reset workflow with ID " + workflowId); + } } private static void startWorkflow() { @@ -168,4 +169,15 @@ private static void sleep(int seconds) { System.exit(0); } } + + private static String dateOffset(long seconds) { + long currentTimeMillis = System.currentTimeMillis() - (seconds * 1000); + OffsetDateTime offsetDateTime = + OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(currentTimeMillis), ZoneOffset.UTC); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"); + String formattedDateTime = offsetDateTime.format(formatter); + + return formattedDateTime; + } }