Skip to content

Commit 0a6a218

Browse files
authored
Adding custom payload converter sample (temporalio#132)
Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 16982c4 commit 0a6a218

File tree

8 files changed

+374
-0
lines changed

8 files changed

+374
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ All tests are available under [src/test/java](https://github.com/temporalio/samp
115115

116116
- [**List Workflows**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/listworkflows): Demonstrates the use of custom search attributes and ListWorkflowExecutionsRequest with custom queries.
117117

118+
- [**Payload Converter**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/payloadconverter): Demonstrates the use of a custom payload converter.
119+
118120
<!-- @@@SNIPEND -->
119121

120122
## IDE Integration

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ dependencies {
3333
implementation group: 'io.temporal', name: 'temporal-sdk', version: '1.1.0'
3434
implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.5'
3535
implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10'
36+
implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '2.2.0'
37+
implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '2.2.0'
38+
implementation group: 'io.cloudevents', name: 'cloudevents-json-jackson', version: '2.2.0'
3639

3740
testImplementation group: 'io.temporal', name: 'temporal-testing', version: '1.1.0'
3841
testImplementation group: 'io.temporal', name: 'temporal-testing-junit4', version: '1.1.0'
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.payloadconverter;
21+
22+
import io.cloudevents.CloudEvent;
23+
import io.temporal.workflow.QueryMethod;
24+
import io.temporal.workflow.SignalMethod;
25+
import io.temporal.workflow.WorkflowInterface;
26+
import io.temporal.workflow.WorkflowMethod;
27+
28+
@WorkflowInterface
29+
public interface CEWorkflow {
30+
@WorkflowMethod
31+
void exec(CloudEvent cloudEvent);
32+
33+
@SignalMethod
34+
void addEvent(CloudEvent cloudEvent);
35+
36+
@QueryMethod
37+
CloudEvent getLastEvent();
38+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.payloadconverter;
21+
22+
import io.cloudevents.CloudEvent;
23+
import io.temporal.workflow.Workflow;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
public class CEWorkflowImpl implements CEWorkflow {
28+
29+
private List<CloudEvent> eventList = new ArrayList<>();
30+
31+
@Override
32+
public void exec(CloudEvent cloudEvent) {
33+
34+
eventList.add(cloudEvent);
35+
36+
Workflow.await(() -> eventList.size() == 10);
37+
}
38+
39+
@Override
40+
public void addEvent(CloudEvent cloudEvent) {
41+
eventList.add(cloudEvent);
42+
}
43+
44+
@Override
45+
public CloudEvent getLastEvent() {
46+
return eventList.get(eventList.size() - 1);
47+
}
48+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.payloadconverter;
21+
22+
import com.google.protobuf.ByteString;
23+
import io.cloudevents.CloudEvent;
24+
import io.cloudevents.core.format.EventFormat;
25+
import io.cloudevents.core.format.EventSerializationException;
26+
import io.cloudevents.core.provider.EventFormatProvider;
27+
import io.cloudevents.jackson.JsonFormat;
28+
import io.temporal.api.common.v1.Payload;
29+
import io.temporal.common.converter.DataConverterException;
30+
import io.temporal.common.converter.PayloadConverter;
31+
import java.lang.reflect.Type;
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.Optional;
34+
35+
/** Payload converter specific to CloudEvents format */
36+
public class CloudEventsPayloadConverter implements PayloadConverter {
37+
38+
private EventFormat CEFormat =
39+
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
40+
41+
@Override
42+
public String getEncodingType() {
43+
return "json/plain";
44+
}
45+
46+
@Override
47+
public Optional<Payload> toData(Object value) throws DataConverterException {
48+
49+
try {
50+
CloudEvent cloudEvent = (CloudEvent) value;
51+
byte[] serialized = CEFormat.serialize(cloudEvent);
52+
53+
return Optional.of(
54+
Payload.newBuilder()
55+
.putMetadata(
56+
"encoding", ByteString.copyFrom(getEncodingType(), StandardCharsets.UTF_8))
57+
.setData(ByteString.copyFrom(serialized))
58+
.build());
59+
60+
} catch (EventSerializationException | ClassCastException e) {
61+
throw new DataConverterException(e);
62+
}
63+
}
64+
65+
@Override
66+
public <T> T fromData(Payload content, Class<T> valueClass, Type valueType)
67+
throws DataConverterException {
68+
try {
69+
return (T) CEFormat.deserialize(content.getData().toByteArray());
70+
} catch (ClassCastException e) {
71+
throw new DataConverterException(e);
72+
}
73+
}
74+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Custom Payload Converter
2+
3+
The sample demonstrates creating and setting a custom Payload Converter.
4+
5+
## Running
6+
7+
1. Start Temporal Server with "default" namespace enabled.
8+
For example using local Docker:
9+
10+
```bash
11+
git clone https://github.com/temporalio/docker-compose.git
12+
cd docker-compose
13+
docker-compose up
14+
```
15+
16+
2.
17+
Run the following command to start the sample:
18+
19+
```bash
20+
./gradlew -q execute -PmainClass=io.temporal.samples.payloadconverter.Starter
21+
```
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.payloadconverter;
21+
22+
import io.cloudevents.CloudEvent;
23+
import io.cloudevents.core.builder.CloudEventBuilder;
24+
import io.cloudevents.jackson.JsonCloudEventData;
25+
import io.temporal.client.WorkflowClient;
26+
import io.temporal.client.WorkflowClientOptions;
27+
import io.temporal.client.WorkflowOptions;
28+
import io.temporal.common.converter.DefaultDataConverter;
29+
import io.temporal.serviceclient.WorkflowServiceStubs;
30+
import io.temporal.worker.Worker;
31+
import io.temporal.worker.WorkerFactory;
32+
import java.net.URI;
33+
import java.nio.charset.Charset;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
37+
public class Starter {
38+
39+
private static final String TASK_QUEUE = "TestQueue";
40+
41+
public static void main(String[] args) {
42+
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
43+
44+
// Add CloudEventsPayloadConverter
45+
// It has the same encoding type as JacksonJsonPayloadConverter
46+
DefaultDataConverter ddc =
47+
DefaultDataConverter.newDefaultInstance()
48+
.withPayloadConverterOverrides(new CloudEventsPayloadConverter());
49+
50+
WorkflowClientOptions workflowClientOptions =
51+
WorkflowClientOptions.newBuilder().setDataConverter(ddc).build();
52+
53+
WorkflowClient client = WorkflowClient.newInstance(service, workflowClientOptions);
54+
WorkerFactory factory = WorkerFactory.newInstance(client);
55+
Worker worker = factory.newWorker(TASK_QUEUE);
56+
57+
worker.registerWorkflowImplementationTypes(CEWorkflowImpl.class);
58+
59+
factory.start();
60+
61+
WorkflowOptions newCustomerWorkflowOptions =
62+
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
63+
64+
CEWorkflow workflow = client.newWorkflowStub(CEWorkflow.class, newCustomerWorkflowOptions);
65+
66+
// Create 10 cloud events
67+
List<CloudEvent> cloudEventList = new ArrayList<>();
68+
69+
for (int i = 0; i < 10; i++) {
70+
cloudEventList.add(
71+
CloudEventBuilder.v1()
72+
.withId(String.valueOf(100 + i))
73+
.withType("example.demo")
74+
.withSource(URI.create("http://temporal.io"))
75+
.withData(
76+
"application/json",
77+
("{\n" + "\"greeting\": \"hello " + i + "\"\n" + "}")
78+
.getBytes(Charset.defaultCharset()))
79+
.build());
80+
}
81+
82+
WorkflowClient.start(workflow::exec, cloudEventList.get(0));
83+
84+
// Send signals (cloud event data)
85+
for (int j = 1; j < 10; j++) {
86+
workflow.addEvent(cloudEventList.get(j));
87+
}
88+
89+
// Get the CE result and get its data (JSON)
90+
String result =
91+
((JsonCloudEventData) workflow.getLastEvent().getData()).getNode().get("greeting").asText();
92+
93+
System.out.println("Last event body: " + result);
94+
95+
System.exit(0);
96+
}
97+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.payloadconverter;
21+
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotNull;
24+
25+
import io.cloudevents.CloudEvent;
26+
import io.cloudevents.core.builder.CloudEventBuilder;
27+
import io.cloudevents.jackson.JsonCloudEventData;
28+
import io.temporal.client.WorkflowClient;
29+
import io.temporal.client.WorkflowClientOptions;
30+
import io.temporal.client.WorkflowOptions;
31+
import io.temporal.common.converter.DefaultDataConverter;
32+
import io.temporal.testing.TestWorkflowRule;
33+
import java.net.URI;
34+
import java.nio.charset.Charset;
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
import org.junit.Rule;
38+
import org.junit.Test;
39+
40+
public class PayloadConverterTest {
41+
42+
private DefaultDataConverter ddc =
43+
DefaultDataConverter.newDefaultInstance()
44+
.withPayloadConverterOverrides(new CloudEventsPayloadConverter());
45+
46+
private WorkflowClientOptions workflowClientOptions =
47+
WorkflowClientOptions.newBuilder().setDataConverter(ddc).build();
48+
49+
@Rule
50+
public TestWorkflowRule testWorkflowRule =
51+
TestWorkflowRule.newBuilder()
52+
.setWorkflowClientOptions(workflowClientOptions)
53+
.setWorkflowTypes(CEWorkflowImpl.class)
54+
.build();
55+
56+
@Test
57+
public void testActivityImpl() {
58+
List<CloudEvent> cloudEventList = new ArrayList<>();
59+
60+
for (int i = 0; i < 10; i++) {
61+
cloudEventList.add(
62+
CloudEventBuilder.v1()
63+
.withId(String.valueOf(100 + i))
64+
.withType("example.demo")
65+
.withSource(URI.create("http://temporal.io"))
66+
.withData(
67+
"application/json",
68+
("{\n" + "\"greeting\": \"hello " + i + "\"\n" + "}")
69+
.getBytes(Charset.defaultCharset()))
70+
.build());
71+
}
72+
73+
WorkflowOptions workflowOptions =
74+
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
75+
CEWorkflow workflow =
76+
testWorkflowRule.getWorkflowClient().newWorkflowStub(CEWorkflow.class, workflowOptions);
77+
// start async
78+
WorkflowClient.start(workflow::exec, cloudEventList.get(0));
79+
80+
for (int j = 1; j < 10; j++) {
81+
workflow.addEvent(cloudEventList.get(j));
82+
}
83+
84+
// Get the CE result and get its data (JSON)
85+
String result =
86+
((JsonCloudEventData) workflow.getLastEvent().getData()).getNode().get("greeting").asText();
87+
88+
assertNotNull(result);
89+
assertEquals("hello 9", result);
90+
}
91+
}

0 commit comments

Comments
 (0)