diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/README.md b/core/src/main/java/io/temporal/samples/resetworkflows/README.md new file mode 100644 index 000000000..4eb329695 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/README.md @@ -0,0 +1,24 @@ +# Demo List Workflows + +The sample demonstrates: +1) Setting custom search attributes for a Workflow +2) Using ListWorkflowExecutionsRequest and custom Search Attribute query to list +Workflow Executions that match that query + +## Running + +1. Unlike the other examples, this one has to be started with Elasticsearch +capabilities enabled. If you are using docker you can do that with: + +```bash +git clone https://github.com/temporalio/docker-compose.git +cd docker-compose +docker-compose -f docker-compose-cas-es.yml up +``` + +2. +Run the following command to start the sample: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.listworkflows.Starter +``` diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java new file mode 100644 index 000000000..ec7f0e12b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivities.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface ResetActivities { + void activityOne(); + + void activityTwo(); + + void activityThree(); +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java new file mode 100644 index 000000000..8812a4358 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetActivitiesImpl.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResetActivitiesImpl implements ResetActivities { + + private static final Logger log = LoggerFactory.getLogger(ResetActivitiesImpl.class); + + @Override + public void activityOne() { + // simulate some actual work... + sleepSeconds(1); + thisMayOrMayNotThrowAnError("activityOne"); + } + + @Override + public void activityTwo() { + // simulate some actual work... + sleepSeconds(1); + thisMayOrMayNotThrowAnError("activityTwo"); + } + + @Override + public void activityThree() { + // simulate some actual work... + sleepSeconds(1); + thisMayOrMayNotThrowAnError("activityThree"); + } + + private void sleepSeconds(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException e) { + // This is being swallowed on purpose + Thread.currentThread().interrupt(); + log.error("Exception in thread sleep: ", e); + } + } + + private void thisMayOrMayNotThrowAnError(String activityName) { + Random random = new Random(); + double randomValue = random.nextDouble(); + log.info("**** Random value: {}", randomValue); + if (randomValue < 0.10) { // 10% chance of failure + log.info("Activity {} failed...", activityName); + throw new IllegalArgumentException("Activity has illegal argument...failing"); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java new file mode 100644 index 000000000..7eeff088e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflow.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface ResetWorkflow { + @WorkflowMethod + void runActivities(String message); +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java new file mode 100644 index 000000000..205bfe558 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/ResetWorkflowImpl.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.ApplicationFailure; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class ResetWorkflowImpl implements ResetWorkflow { + // private boolean exit; + private final ResetActivities resetActivities = + Workflow.newActivityStub( + ResetActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(1)) + .setDoNotRetry(IllegalArgumentException.class.getName()) + .build()) + .build()); + // private final RetryOptions customerRetryOptions = + // RetryOptions.newBuilder().setMaximumAttempts(5).build(); + // private final Duration expiration = Duration.ofMinutes(1); + + @Override + public void runActivities(String message) { + + try { + resetActivities.activityOne(); + resetActivities.activityTwo(); + resetActivities.activityThree(); + } catch (ActivityFailure e) { + System.out.println("Activity failed: " + e.getMessage()); + throw ApplicationFailure.newNonRetryableFailure( + "simulated failure", "some error", "ActivityFailure"); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java b/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java new file mode 100644 index 000000000..642b338e5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/Starter.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import com.google.protobuf.ByteString; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsResponse; +import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class Starter { + public static final String TASK_QUEUE = "resetTaskQueue"; + private static final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + private static final WorkflowClient client = WorkflowClient.newInstance(service); + // private static final WorkerFactory factory = WorkerFactory.newInstance(client); + + public static void main(String[] args) { + + // start customer workflows and define custom search attributes for each + startWorkflow(); + + // small delay before we start querying executions + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + throw new RuntimeException("Exception happened in thread sleep: ", e); + } + + String fiveMinutesAgo = dateOffset(20); + resetWorkflowExecutions( + "WorkflowType='ResetWorkflow' and ExecutionStatus=" + + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED_VALUE + + " and CloseTime>" + + "'" + + fiveMinutesAgo + + "'", + null); + + // sleep for 3 seconds before we shut down the worker + sleep(5); + + // Cleanup + service.shutdown(); + System.exit(0); + } + + private static void resetWorkflowExecutions(String query, ByteString token) { + + ListWorkflowExecutionsRequest request; + + if (token == null) { + request = + ListWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setQuery(query) + .build(); + } else { + request = + ListWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setQuery(query) + .setNextPageToken(token) + .build(); + } + + ListWorkflowExecutionsResponse response = + service.blockingStub().listWorkflowExecutions(request); + + for (WorkflowExecutionInfo workflowExecutionInfo : response.getExecutionsList()) { + System.out.println( + "Workflow ID: " + + workflowExecutionInfo.getExecution().getWorkflowId() + + " Run ID: " + + workflowExecutionInfo.getExecution().getRunId() + + " Status: " + + workflowExecutionInfo.getStatus()); + if (workflowExecutionInfo.getParentExecution() != null) { + System.out.println( + "****** PARENT: " + + workflowExecutionInfo.getExecution().getWorkflowId() + + " - " + + workflowExecutionInfo.getExecution().getRunId()); + resetWorkflow(workflowExecutionInfo.getExecution().getWorkflowId()); + } + } + + if (response.getNextPageToken() != null && response.getNextPageToken().size() > 0) { + resetWorkflowExecutions(query, response.getNextPageToken()); + } + } + + private static void resetWorkflow(String workflowId) { + long eventId = 3; + WorkflowStub existingUntyped = client.newUntypedWorkflowStub(workflowId); + + ResetWorkflowExecutionRequest resetWorkflowExecutionRequest = + ResetWorkflowExecutionRequest.newBuilder() + .setRequestId(UUID.randomUUID().toString()) + .setNamespace("default") + .setWorkflowExecution(existingUntyped.getExecution()) + .setWorkflowTaskFinishEventId(eventId) + .setReason("Doing a workflow reset...") + .build(); + + try { + ResetWorkflowExecutionResponse resetWorkflowExecutionResponse = + service.blockingStub().resetWorkflowExecution(resetWorkflowExecutionRequest); + System.out.println( + "Workflow with ID " + + workflowId + + " has been reset. New RunId: " + + resetWorkflowExecutionResponse.getRunId()); + } catch (Exception e) { + System.err.println("Failed to reset workflow with ID " + workflowId); + } + } + + private static void startWorkflow() { + // start a workflow for each customer that we need to add message to account + + for (int i = 1; i < 6; i++) { + String message = "Running reset workflow: " + i; + + WorkflowOptions newResetWorkflowOptions = + WorkflowOptions.newBuilder() + .setWorkflowId("reset-workflow-" + i) + .setTaskQueue(TASK_QUEUE) + .build(); + ResetWorkflow newResetWorkflow = + client.newWorkflowStub(ResetWorkflow.class, newResetWorkflowOptions); + // start async + WorkflowClient.start(newResetWorkflow::runActivities, message); + } + } + + private static void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException e) { + System.out.println("Exception: " + e.getMessage()); + System.exit(0); + } + } + + private static String dateOffset(long seconds) { + long currentTimeMillis = System.currentTimeMillis() - (seconds * 1000); + OffsetDateTime offsetDateTime = + OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(currentTimeMillis), ZoneOffset.UTC); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"); + String formattedDateTime = offsetDateTime.format(formatter); + + return formattedDateTime; + } +} diff --git a/core/src/main/java/io/temporal/samples/resetworkflows/Worker.java b/core/src/main/java/io/temporal/samples/resetworkflows/Worker.java new file mode 100644 index 000000000..a3802b5dc --- /dev/null +++ b/core/src/main/java/io/temporal/samples/resetworkflows/Worker.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.resetworkflows; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; + +public class Worker { + public static final String TASK_QUEUE = "resetTaskQueue"; + private static final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + private static final WorkflowClient client = WorkflowClient.newInstance(service); + private static final WorkerFactory factory = WorkerFactory.newInstance(client); + + public static void main(String[] args) { + // create the worker for workflow and activities + createWorker(); + } + + private static void createWorker() { + io.temporal.worker.Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(ResetWorkflowImpl.class); + worker.registerActivitiesImplementations(new ResetActivitiesImpl()); + + factory.start(); + } +}