From 2bdb2e43e91e97aa77af62836f3fd935aed86b83 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Tue, 22 Apr 2025 23:55:48 -0700 Subject: [PATCH 1/2] lean example of UwS --- .../samples/updatewithstart/README.md | 24 +++++ .../updatewithstart/StartWorkflowRequest.java | 13 +++ .../UpdateWithStartClient.java | 99 +++++++++++++++++++ .../UpdateWithStartWorker.java | 43 ++++++++ .../UpdateWithStartWorkflow.java | 33 +++++++ .../UpdateWithStartWorkflowImpl.java | 46 +++++++++ .../UpdateWithStartWorkflowState.java | 31 ++++++ 7 files changed, 289 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/README.md create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/README.md b/core/src/main/java/io/temporal/samples/updatewithstart/README.md new file mode 100644 index 000000000..f0c486608 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/README.md @@ -0,0 +1,24 @@ +### Early-Return Sample + +This sample demonstrates an early-return from a workflow. + +By utilizing Update-with-Start, a client can start a new workflow and synchronously receive +a response mid-workflow, while the workflow continues to run to completion. + +To run the sample, start the worker: +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.earlyreturn.EarlyReturnWorker +``` + +Then, start the client: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.earlyreturn.EarlyReturnClient +``` + +* The client will start a workflow using Update-With-Start. +* Update-With-Start will trigger an initialization step. +* If the initialization step succeeds (default), intialization will return to the client with a transaction ID and the workflow will continue. The workflow will then complete and return the final result. +* If the intitialization step fails (amount <= 0), the workflow will return to the client with an error message and the workflow will run an activity to cancel the transaction. + +To trigger a failed initialization, set the amount to <= 0 in the `EarlyReturnClient` class's `runWorkflowWithUpdateWithStart` method and re-run the client. \ No newline at end of file diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java b/core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java new file mode 100644 index 000000000..36ca9f42c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java @@ -0,0 +1,13 @@ +package io.temporal.samples.updatewithstart; + +public class StartWorkflowRequest { + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + private String value; +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java new file mode 100644 index 000000000..4f082ad58 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; +import io.temporal.api.enums.v1.WorkflowIdReusePolicy; +import io.temporal.client.*; +import io.temporal.serviceclient.WorkflowServiceStubs; + +public class UpdateWithStartClient { + private static final String TASK_QUEUE = "UpdateWithStartTQ"; + private static final String WORKFLOW_ID_PREFIX = "update-with-start-"; + + public static void main(String[] args) { + WorkflowClient client = setupWorkflowClient(); + var opts = buildWorkflowOptions(); + runWorkflowWithUpdateWithStart(client, opts); + runWorkflowWithUpdateWithStart(client, opts); + } + + // Set up the WorkflowClient + public static WorkflowClient setupWorkflowClient() { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + return WorkflowClient.newInstance(service); + } + + // Run workflow using 'updateWithStart' + private static void runWorkflowWithUpdateWithStart( + WorkflowClient client, WorkflowOptions options) { + + var args = new StartWorkflowRequest(); + + UpdateWithStartWorkflow workflow = + client.newWorkflowStub(UpdateWithStartWorkflow.class, options); + + try { + // var result = + // WorkflowClient.executeUpdateWithStart( + // workflow::putApplication, + // args, + // UpdateOptions.newBuilder().build(), + // new WithStartWorkflowOperation<>(workflow::execute, args)); + + var handle = + WorkflowClient.startUpdateWithStart( + workflow::putApplication, + args, + UpdateOptions.newBuilder() + .setWaitForStage(WorkflowUpdateStage.ACCEPTED) + .build(), + new WithStartWorkflowOperation<>(workflow::execute, args)); + var result = handle.getResult(); + + System.out.println( + "Workflow UwS with value: " + + result.getArgs().getValue() + + ", with updates count:" + + result.getUpdates().size()); + + } catch (WorkflowExecutionAlreadyStarted e) { + System.err.println("WorkflowAlreadyStarted" + e); + } catch (WorkflowServiceException e) { + System.err.println("WorkflowServiceException" + e.getCause()); + } catch (Exception e) { + System.err.println( + "UpdateWithStart failed: " + e.getMessage() + "/" + e.getClass().getCanonicalName()); + } + } + + // https://docs.temporal.io/develop/java/message-passing + // Build WorkflowOptions with task queue and unique ID + private static WorkflowOptions buildWorkflowOptions() { + return WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowIdReusePolicy( + WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY) + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) + .setWorkflowId(WORKFLOW_ID_PREFIX + System.currentTimeMillis()) + .build(); + } +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java new file mode 100644 index 000000000..3bf6819c3 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +public class UpdateWithStartWorker { + private static final String TASK_QUEUE = "UpdateWithStartTQ"; + + public static void main(String[] args) { + WorkflowClient client = UpdateWithStartClient.setupWorkflowClient(); + startWorker(client); + } + + private static void startWorker(WorkflowClient client) { + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(UpdateWithStartWorkflowImpl.class); + + factory.start(); + System.out.println("Worker started"); + } +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java new file mode 100644 index 000000000..f24e44cf4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface UpdateWithStartWorkflow { + @WorkflowMethod + void execute(StartWorkflowRequest args); + + @UpdateMethod + UpdateWithStartWorkflowState putApplication(StartWorkflowRequest args); +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java new file mode 100644 index 000000000..e5f2e17a7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.workflow.WorkflowInit; + +public class UpdateWithStartWorkflowImpl implements UpdateWithStartWorkflow { + // private static final Logger log = LoggerFactory.getLogger(UpdateWithStartWorkflowImpl.class); + private UpdateWithStartWorkflowState state; + + @WorkflowInit + public UpdateWithStartWorkflowImpl(StartWorkflowRequest args) { + this.state = new UpdateWithStartWorkflowState(args); + } + + @Override + public void execute(StartWorkflowRequest args) { + System.out.println("execute called"); + // log.info("Workflow started {}", args); + } + + @Override + public UpdateWithStartWorkflowState putApplication(StartWorkflowRequest args) { + System.out.println("putApplication called"); + this.state.getUpdates().add(args); + + return this.state; + } +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java new file mode 100644 index 000000000..c8a5dc4e7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java @@ -0,0 +1,31 @@ +package io.temporal.samples.updatewithstart; + +import java.util.ArrayList; +import java.util.List; + +public class UpdateWithStartWorkflowState { + private StartWorkflowRequest args; + private List updates = new ArrayList<>(); + + public UpdateWithStartWorkflowState() {} + + public UpdateWithStartWorkflowState(StartWorkflowRequest args) { + this.args = args; + } + + public StartWorkflowRequest getArgs() { + return args; + } + + public void setArgs(StartWorkflowRequest args) { + this.args = args; + } + + public List getUpdates() { + return updates; + } + + public void setUpdates(List updates) { + this.updates = updates; + } +} From d43fbf251ff9929924478003db99d4680f0cab89 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Wed, 23 Apr 2025 00:22:28 -0700 Subject: [PATCH 2/2] query to verify --- .../UpdateWithStartClient.java | 15 ++++++++++-- .../UpdateWithStartWorkflow.java | 4 ++++ .../UpdateWithStartWorkflowImpl.java | 23 +++++++++++++++---- .../UpdateWithStartWorkflowState.java | 23 +++++++++++-------- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java index 4f082ad58..f05d8a254 100644 --- a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java @@ -23,6 +23,7 @@ import io.temporal.api.enums.v1.WorkflowIdReusePolicy; import io.temporal.client.*; import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.UUID; public class UpdateWithStartClient { private static final String TASK_QUEUE = "UpdateWithStartTQ"; @@ -46,11 +47,13 @@ private static void runWorkflowWithUpdateWithStart( WorkflowClient client, WorkflowOptions options) { var args = new StartWorkflowRequest(); + args.setValue(UUID.randomUUID().toString()); UpdateWithStartWorkflow workflow = client.newWorkflowStub(UpdateWithStartWorkflow.class, options); try { + // // First I tried this to updateWithStart // var result = // WorkflowClient.executeUpdateWithStart( // workflow::putApplication, @@ -63,17 +66,25 @@ private static void runWorkflowWithUpdateWithStart( workflow::putApplication, args, UpdateOptions.newBuilder() - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) .build(), new WithStartWorkflowOperation<>(workflow::execute, args)); var result = handle.getResult(); System.out.println( "Workflow UwS with value: " - + result.getArgs().getValue() + + result.getInitArgs().getValue() + ", with updates count:" + result.getUpdates().size()); + System.out.println( + "Workflow QUERY with initArgs: " + + workflow.getState().getInitArgs().getValue() + + "\n with updates: " + + workflow.getState().getUpdates().size() + + "\nwith execute args " + + workflow.getState().getExecuteArgs().getValue()); + } catch (WorkflowExecutionAlreadyStarted e) { System.err.println("WorkflowAlreadyStarted" + e); } catch (WorkflowServiceException e) { diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java index f24e44cf4..f9086a919 100644 --- a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java @@ -19,6 +19,7 @@ package io.temporal.samples.updatewithstart; +import io.temporal.workflow.QueryMethod; import io.temporal.workflow.UpdateMethod; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; @@ -30,4 +31,7 @@ public interface UpdateWithStartWorkflow { @UpdateMethod UpdateWithStartWorkflowState putApplication(StartWorkflowRequest args); + + @QueryMethod + UpdateWithStartWorkflowState getState(); } diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java index e5f2e17a7..14cc22187 100644 --- a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java @@ -19,28 +19,43 @@ package io.temporal.samples.updatewithstart; +import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class UpdateWithStartWorkflowImpl implements UpdateWithStartWorkflow { - // private static final Logger log = LoggerFactory.getLogger(UpdateWithStartWorkflowImpl.class); + private static final Logger log = LoggerFactory.getLogger(UpdateWithStartWorkflowImpl.class); private UpdateWithStartWorkflowState state; @WorkflowInit public UpdateWithStartWorkflowImpl(StartWorkflowRequest args) { - this.state = new UpdateWithStartWorkflowState(args); + this.state = new UpdateWithStartWorkflowState(); + this.state.setInitArgs(args); + System.out.println("WorkflowInit args = " + args); } @Override public void execute(StartWorkflowRequest args) { + log.info("Workflow started {}", args); + + this.state.setExecuteArgs(args); System.out.println("execute called"); - // log.info("Workflow started {}", args); + + Workflow.await(() -> this.state.getUpdates().size() == 2); } @Override public UpdateWithStartWorkflowState putApplication(StartWorkflowRequest args) { - System.out.println("putApplication called"); + this.state.getUpdates().add(args); + System.out.println("put application called " + this.state.getUpdates().size()); + return this.state; + } + @Override + public UpdateWithStartWorkflowState getState() { + System.out.println("getState called " + this.state.getInitArgs().getValue()); return this.state; } } diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java index c8a5dc4e7..1c4aba04f 100644 --- a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java @@ -4,21 +4,18 @@ import java.util.List; public class UpdateWithStartWorkflowState { - private StartWorkflowRequest args; + private StartWorkflowRequest initArgs; + private StartWorkflowRequest executeArgs; private List updates = new ArrayList<>(); public UpdateWithStartWorkflowState() {} - public UpdateWithStartWorkflowState(StartWorkflowRequest args) { - this.args = args; + public StartWorkflowRequest getInitArgs() { + return initArgs; } - public StartWorkflowRequest getArgs() { - return args; - } - - public void setArgs(StartWorkflowRequest args) { - this.args = args; + public void setInitArgs(StartWorkflowRequest initArgs) { + this.initArgs = initArgs; } public List getUpdates() { @@ -28,4 +25,12 @@ public List getUpdates() { public void setUpdates(List updates) { this.updates = updates; } + + public StartWorkflowRequest getExecuteArgs() { + return executeArgs; + } + + public void setExecuteArgs(StartWorkflowRequest executeArgs) { + this.executeArgs = executeArgs; + } }