Skip to content

Commit 55d5810

Browse files
authored
DSL Sample support for child workflow invocations (temporalio#317)
* add support for subflows/child workflows to dsl sample Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * update readme Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 1b5ba5e commit 55d5810

8 files changed

Lines changed: 221 additions & 34 deletions

File tree

src/main/java/io/temporal/samples/dsl/DslWorkflowCache.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ private static class WorkflowHolder {
4141
Workflow.fromSource(getFileAsString("dsl/customerapproval/applicantworkflow.json"));
4242
Workflow approvalWorkflow =
4343
Workflow.fromSource(getFileAsString("dsl/customerapproval/approvalworkflow.json"));
44+
Workflow bankingParentWorkflow =
45+
Workflow.fromSource(
46+
getFileAsString("dsl/bankingtransactionssubflow/parentworkflow.json"));
47+
Workflow bankingChildWorkflow =
48+
Workflow.fromSource(
49+
getFileAsString("dsl/bankingtransactionssubflow/childworkflow.json"));
50+
4451
dslWorkflowMap.put(
4552
customerApplicationWorkflow.getId() + "-" + customerApplicationWorkflow.getVersion(),
4653
customerApplicationWorkflow);
@@ -51,6 +58,12 @@ private static class WorkflowHolder {
5158
applicantWorkflow.getId() + "-" + applicantWorkflow.getVersion(), applicantWorkflow);
5259
dslWorkflowMap.put(
5360
approvalWorkflow.getId() + "-" + approvalWorkflow.getVersion(), approvalWorkflow);
61+
dslWorkflowMap.put(
62+
bankingParentWorkflow.getId() + "-" + bankingParentWorkflow.getVersion(),
63+
bankingParentWorkflow);
64+
dslWorkflowMap.put(
65+
bankingChildWorkflow.getId() + "-" + bankingChildWorkflow.getVersion(),
66+
bankingChildWorkflow);
5467
} catch (Exception e) {
5568
System.out.println("Exception: " + e.getMessage());
5669
}

src/main/java/io/temporal/samples/dsl/DynamicDslWorkflow.java

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import io.serverlessworkflow.api.branches.Branch;
2626
import io.serverlessworkflow.api.events.OnEvents;
2727
import io.serverlessworkflow.api.functions.FunctionDefinition;
28+
import io.serverlessworkflow.api.functions.SubFlowRef;
2829
import io.serverlessworkflow.api.interfaces.State;
2930
import io.serverlessworkflow.api.states.*;
3031
import io.serverlessworkflow.api.switchconditions.DataCondition;
3132
import io.serverlessworkflow.utils.WorkflowUtils;
3233
import io.temporal.activity.ActivityOptions;
34+
import io.temporal.api.common.v1.WorkflowExecution;
35+
import io.temporal.api.enums.v1.ParentClosePolicy;
3336
import io.temporal.common.converter.EncodedValues;
3437
import io.temporal.samples.dsl.model.ActResult;
3538
import io.temporal.samples.dsl.model.WorkflowData;
@@ -151,7 +154,6 @@ private State executeStateAndReturnNext(State dslWorkflowState) {
151154
() -> signalMap.containsKey(eventState.getOnEvents().get(0).getEventRefs().get(0)));
152155
workflowData = signalMap.get(eventState.getOnEvents().get(0).getEventRefs().get(0));
153156
} else {
154-
155157
List<Action> eventStateActions = eventState.getOnEvents().get(0).getActions();
156158
if (eventState.getOnEvents().get(0).getActionMode() != null
157159
&& eventState
@@ -221,29 +223,82 @@ private State executeStateAndReturnNext(State dslWorkflowState) {
221223
}
222224
} else {
223225
for (Action action : operationState.getActions()) {
224-
225-
// check if its a custom function
226-
FunctionDefinition functionDefinition =
227-
WorkflowUtils.getFunctionDefinitionsForAction(dslWorkflow, action.getName());
228-
if (functionDefinition.getType().equals(FunctionDefinition.Type.CUSTOM)) {
229-
// for this example custom function is assumed sending signal via external stub
230-
String[] operationParts = functionDefinition.getOperation().split("#");
231-
ExternalWorkflowStub externalWorkflowStub =
232-
Workflow.newUntypedExternalWorkflowStub(operationParts[0]);
233-
externalWorkflowStub.signal(operationParts[1], workflowData.getValue());
234-
} else {
235-
if (action.getSleep() != null && action.getSleep().getBefore() != null) {
236-
Workflow.sleep(Duration.parse(action.getSleep().getBefore()));
226+
// added support for subflow (child workflow)
227+
if (action.getSubFlowRef() != null) {
228+
229+
if (action.getSubFlowRef().getInvoke() != null
230+
&& action.getSubFlowRef().getInvoke().equals(SubFlowRef.Invoke.ASYNC)) {
231+
ChildWorkflowOptions childWorkflowOptions;
232+
233+
if (action
234+
.getSubFlowRef()
235+
.getOnParentComplete()
236+
.equals(SubFlowRef.OnParentComplete.CONTINUE)) {
237+
childWorkflowOptions =
238+
ChildWorkflowOptions.newBuilder()
239+
.setWorkflowId(action.getSubFlowRef().getWorkflowId())
240+
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
241+
.build();
242+
} else {
243+
childWorkflowOptions =
244+
ChildWorkflowOptions.newBuilder()
245+
.setWorkflowId(action.getSubFlowRef().getWorkflowId())
246+
.build();
247+
}
248+
ChildWorkflowStub childWorkflow =
249+
Workflow.newUntypedChildWorkflowStub(
250+
action.getSubFlowRef().getWorkflowId(), childWorkflowOptions);
251+
Promise<Object> promise =
252+
childWorkflow.executeAsync(
253+
Object.class,
254+
action.getSubFlowRef().getWorkflowId(),
255+
action.getSubFlowRef().getVersion(),
256+
workflowData.getValue());
257+
// for async we do not care about result in sample
258+
// wait until child starts
259+
Promise<WorkflowExecution> childExecution =
260+
Workflow.getWorkflowExecution(childWorkflow);
261+
childExecution.get();
262+
263+
} else {
264+
ChildWorkflowStub childWorkflow =
265+
Workflow.newUntypedChildWorkflowStub(
266+
action.getSubFlowRef().getWorkflowId(),
267+
ChildWorkflowOptions.newBuilder()
268+
.setWorkflowId(action.getSubFlowRef().getWorkflowId())
269+
.build());
270+
271+
workflowData.addResults(
272+
childWorkflow.execute(
273+
Object.class,
274+
action.getSubFlowRef().getWorkflowId(),
275+
action.getSubFlowRef().getVersion(),
276+
workflowData.getValue()));
237277
}
238-
// execute the action as an activity and assign its results to workflowData
239-
workflowData.addResults(
240-
activities.execute(
241-
action.getFunctionRef().getRefName(),
242-
ActResult.class,
243-
workflowData.getCustomer()));
244-
245-
if (action.getSleep() != null && action.getSleep().getAfter() != null) {
246-
Workflow.sleep(Duration.parse(action.getSleep().getAfter()));
278+
} else {
279+
// check if its a custom function
280+
FunctionDefinition functionDefinition =
281+
WorkflowUtils.getFunctionDefinitionsForAction(dslWorkflow, action.getName());
282+
if (functionDefinition.getType().equals(FunctionDefinition.Type.CUSTOM)) {
283+
// for this example custom function is assumed sending signal via external stub
284+
String[] operationParts = functionDefinition.getOperation().split("#");
285+
ExternalWorkflowStub externalWorkflowStub =
286+
Workflow.newUntypedExternalWorkflowStub(operationParts[0]);
287+
externalWorkflowStub.signal(operationParts[1], workflowData.getValue());
288+
} else {
289+
if (action.getSleep() != null && action.getSleep().getBefore() != null) {
290+
Workflow.sleep(Duration.parse(action.getSleep().getBefore()));
291+
}
292+
// execute the action as an activity and assign its results to workflowData
293+
workflowData.addResults(
294+
activities.execute(
295+
action.getFunctionRef().getRefName(),
296+
ActResult.class,
297+
workflowData.getCustomer()));
298+
299+
if (action.getSleep() != null && action.getSleep().getAfter() != null) {
300+
Workflow.sleep(Duration.parse(action.getSleep().getAfter()));
301+
}
247302
}
248303
}
249304
}

src/main/java/io/temporal/samples/dsl/README.md

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ which helps us parse the DSL into an object model as well as provides DSL valida
88
Since this is just a sample, this sample provides only partial implementation of the
99
entire Serverless Workflow DSL features.
1010

11-
The sample runs four DSL workflows, `customerapplication/workflow.yml`,
11+
This sample runs the following DSL workflows, `customerapplication/workflow.yml`,
1212
`bankingtransactions/workflow.yml`, `customerapproval/applicantworkflow.yml`,
13-
`customerapproval/approvalworkflow.yml`.
13+
`customerapproval/approvalworkflow.yml`, `bankingtransactionssubflow/parentworkflow.json`,
14+
`bankingtransactionssubflow/childworkflow.yml`.
1415

1516
Note that most DSLs, including Serverless Workflow DSL used in this sample represent
1617
their workflow data as JSON. As such manipulation of this data is done via expression languages
@@ -92,6 +93,33 @@ Workflow results:
9293
"result" : "APPROVED"
9394
} ]
9495
}
96+
Validating workflow: bankingparentworkflow
97+
Starting workflow with id: bankingparentworkflow and version: 1.0
98+
Workflow results:
99+
{
100+
"customer" : {
101+
"name" : "John",
102+
"age" : 22,
103+
"transactions" : [ 100, -50, 20 ]
104+
},
105+
"results" : [ {
106+
"customer" : {
107+
"name" : "John",
108+
"age" : 22,
109+
"transactions" : [ 100, -50, 20 ]
110+
},
111+
"results" : [ {
112+
"type" : "InvokeBankingService",
113+
"result" : "invoked"
114+
}, {
115+
"type" : "InvokeBankingService",
116+
"result" : "invoked"
117+
}, {
118+
"type" : "InvokeBankingService",
119+
"result" : "invoked"
120+
} ]
121+
} ]
122+
}
95123
```
96124

97125

src/main/java/io/temporal/samples/dsl/Starter.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,27 @@ public static void main(String[] args) {
4747
runBankingTransactionWorkflow();
4848
// Customer approval workflow
4949
runCustomerApprovalWorkflow();
50+
// Banking transaction workflow with parent/child relationship
51+
runBankingTransactionWithChildWorkflow();
52+
5053
System.exit(0);
5154
}
5255

5356
private static void runCustomerApplicationWorkflow() {
54-
runWorkflow("customerapplication", "1.0", "dsl/customerapplication/datainput.json");
57+
runWorkflow("customerapplication", "1.0", "dsl/customerapplication/datainput.json", true);
5558
}
5659

5760
private static void runBankingTransactionWorkflow() {
58-
runWorkflow("bankingtransactions", "1.0", "dsl/bankingtransactions/datainput.json");
61+
runWorkflow("bankingtransactions", "1.0", "dsl/bankingtransactions/datainput.json", true);
62+
}
63+
64+
private static void runBankingTransactionWithChildWorkflow() {
65+
runWorkflow(
66+
"bankingparentworkflow", "1.0", "dsl/bankingtransactionssubflow/datainput.json", false);
5967
}
6068

6169
private static void runWorkflow(
62-
String workflowId, String workflowVersion, String dataInputFileName) {
70+
String workflowId, String workflowVersion, String dataInputFileName, boolean doQuery) {
6371
try {
6472
// Get the workflow dsl from cache
6573
Workflow dslWorkflow = DslWorkflowCache.getWorkflow(workflowId, workflowVersion);
@@ -79,12 +87,14 @@ private static void runWorkflow(
7987
// Wait for workflow to finish
8088
JsonNode result = workflowStub.getResult(JsonNode.class);
8189

82-
// Query the customer name and age
83-
String customerName = workflowStub.query("QueryCustomerName", String.class);
84-
int customerAge = workflowStub.query("QueryCustomerAge", Integer.class);
90+
if (doQuery) {
91+
// Query the customer name and age
92+
String customerName = workflowStub.query("QueryCustomerName", String.class);
93+
int customerAge = workflowStub.query("QueryCustomerAge", Integer.class);
8594

86-
System.out.println("Query result for customer name: " + customerName);
87-
System.out.println("Query result for customer age: " + customerAge);
95+
System.out.println("Query result for customer name: " + customerName);
96+
System.out.println("Query result for customer age: " + customerAge);
97+
}
8898

8999
// Print workflow results
90100
System.out.println("Workflow results: \n" + result.toPrettyString());

src/main/java/io/temporal/samples/dsl/model/WorkflowData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void setValue(ObjectNode value) {
6363
this.value = value;
6464
}
6565

66-
public void addResults(ActResult result) {
66+
public void addResults(Object result) {
6767
((ArrayNode) this.value.get("results")).add(mapper.valueToTree(result));
6868
}
6969

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"id": "bankingchildworkflow",
3+
"name": "Customer Banking Transactions Child Workflow",
4+
"version": "1.0",
5+
"specVersion": "0.8",
6+
"timeouts": {
7+
"actionExecTimeout": "PT10S"
8+
},
9+
"autoRetries": true,
10+
"start": "ProcessTransactions",
11+
"states": [
12+
{
13+
"name": "ProcessTransactions",
14+
"type": "foreach",
15+
"inputCollection": "${ .customer.transactions }",
16+
"iterationParam": "${ .tx }",
17+
"mode": "parallel",
18+
"actions": [
19+
{
20+
"name": "Processing Action",
21+
"functionRef": "InvokeBankingService"
22+
}
23+
],
24+
"end": true
25+
}
26+
],
27+
"functions": [
28+
{
29+
"name": "InvokeBankingService",
30+
"type": "rest"
31+
},
32+
{
33+
"name": "QueryCustomerName",
34+
"type": "expression",
35+
"operation": "${ .customer.name }"
36+
},
37+
{
38+
"name": "QueryCustomerAge",
39+
"type": "expression",
40+
"operation": "${ .customer.age }"
41+
}
42+
]
43+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"customer": {
3+
"name": "John",
4+
"age": 22,
5+
"transactions": [100, -50, 20]
6+
},
7+
"results": []
8+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"id": "bankingparentworkflow",
3+
"name": "Customer Banking Transactions Parent Workflow",
4+
"version": "1.0",
5+
"specVersion": "0.8",
6+
"timeouts": {
7+
"workflowExecTimeout": {
8+
"duration": "PT1M"
9+
},
10+
"actionExecTimeout": "PT10S"
11+
},
12+
"autoRetries": true,
13+
"start": "InvokeBankingChild",
14+
"states": [
15+
{
16+
"name": "InvokeBankingChild",
17+
"type": "operation",
18+
"actionMode": "sequential",
19+
"actions": [
20+
{
21+
"subFlowRef": {
22+
"workflowId": "bankingchildworkflow",
23+
"version": "1.0"
24+
}
25+
}
26+
],
27+
"end": true
28+
}
29+
]
30+
}

0 commit comments

Comments
 (0)