Skip to content

Commit 2788ee1

Browse files
docs(samples): Add Dataflow snippet for Pub/Sub write (GoogleCloudPlatform#8866)
* docs(samples): Add Dataflow snippet for Pub/Sub write - Add model class for source data. - Improve code comments. - Fix test to handle the case where a message is received twice.
1 parent eb4a795 commit 2788ee1

3 files changed

Lines changed: 228 additions & 1 deletion

File tree

dataflow/snippets/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@
146146
<artifactId>junit</artifactId>
147147
<version>4.13.2</version>
148148
<scope>test</scope>
149-
</dependency>
149+
</dependency>
150+
<dependency>
151+
<artifactId>truth</artifactId>
152+
<groupId>com.google.truth</groupId>
153+
<scope>test</scope>
154+
<version>1.1.5</version>
155+
</dependency>
150156
</dependencies>
151157
</project>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_pubsub_write_with_attributes]
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Arrays;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import org.apache.beam.sdk.Pipeline;
25+
import org.apache.beam.sdk.coders.DefaultCoder;
26+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
27+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
28+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
29+
import org.apache.beam.sdk.options.Description;
30+
import org.apache.beam.sdk.options.PipelineOptions;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.transforms.Create;
33+
import org.apache.beam.sdk.transforms.MapElements;
34+
import org.apache.beam.sdk.values.TypeDescriptor;
35+
36+
37+
38+
public class PubSubWriteWithAttributes {
39+
public interface Options extends PipelineOptions {
40+
@Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
41+
String getTopic();
42+
43+
void setTopic(String value);
44+
}
45+
46+
// A custom datatype for the source data.
47+
@DefaultCoder(AvroCoder.class)
48+
static class ExampleData {
49+
public String name;
50+
public String product;
51+
public Long timestamp; // Epoch time in milliseconds
52+
53+
public ExampleData() {}
54+
55+
public ExampleData(String name, String product, Long timestamp) {
56+
this.name = name;
57+
this.product = product;
58+
this.timestamp = timestamp;
59+
}
60+
}
61+
62+
// Write messages to a Pub/Sub topic.
63+
public static void main(String[] args) {
64+
// Example source data.
65+
final List<ExampleData> messages = Arrays.asList(
66+
new ExampleData("Robert", "TV", 1613141590000L),
67+
new ExampleData("Maria", "Phone", 1612718280000L),
68+
new ExampleData("Juan", "Laptop", 1611618000000L),
69+
new ExampleData("Rebeca", "Videogame", 1610000000000L)
70+
);
71+
72+
// Parse the pipeline options passed into the application. Example:
73+
// ----runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
74+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
75+
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
76+
var pipeline = Pipeline.create(options);
77+
pipeline
78+
// Create some data to write to Pub/Sub.
79+
.apply(Create.of(messages))
80+
// Convert the data to Pub/Sub messages.
81+
.apply(MapElements
82+
.into(TypeDescriptor.of(PubsubMessage.class))
83+
.via((message -> {
84+
byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
85+
// Create attributes for each message.
86+
HashMap<String, String> attributes = new HashMap<String, String>();
87+
attributes.put("buyer", message.name);
88+
attributes.put("timestamp", Long.toString(message.timestamp));
89+
return new PubsubMessage(payload, attributes);
90+
})))
91+
// Write the messages to Pub/Sub.
92+
.apply(PubsubIO.writeMessages().to(options.getTopic()));
93+
pipeline.run().waitUntilFinish();
94+
}
95+
}
96+
// [END dataflow_pubsub_write_with_attributes]
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
import static com.google.common.truth.Truth.assertWithMessage;
20+
import static org.junit.Assert.assertEquals;
21+
22+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
23+
import com.google.cloud.pubsub.v1.MessageReceiver;
24+
import com.google.cloud.pubsub.v1.Subscriber;
25+
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
26+
import com.google.cloud.pubsub.v1.TopicAdminClient;
27+
import com.google.pubsub.v1.ProjectSubscriptionName;
28+
import com.google.pubsub.v1.PubsubMessage;
29+
import com.google.pubsub.v1.PushConfig;
30+
import com.google.pubsub.v1.SubscriptionName;
31+
import com.google.pubsub.v1.TopicName;
32+
import java.io.ByteArrayOutputStream;
33+
import java.io.PrintStream;
34+
import java.util.Map;
35+
import java.util.UUID;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
39+
import org.junit.After;
40+
import org.junit.Before;
41+
import org.junit.Test;
42+
import org.junit.runner.RunWith;
43+
import org.junit.runners.JUnit4;
44+
45+
@RunWith(JUnit4.class)
46+
public class PubSubWriteIT {
47+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
48+
49+
private ByteArrayOutputStream bout;
50+
private PrintStream out;
51+
private String topicId;
52+
private String subscriptionId;
53+
TopicAdminClient topicAdminClient;
54+
SubscriptionAdminClient subscriptionAdminClient;
55+
56+
// Check if the required environment variables are set.
57+
public static void requireEnvVar(String envVarName) {
58+
assertWithMessage(String.format("Missing environment variable '%s' ", envVarName))
59+
.that(System.getenv(envVarName)).isNotEmpty();
60+
}
61+
62+
@Before
63+
public void setUp() throws Exception {
64+
requireEnvVar("GOOGLE_CLOUD_PROJECT");
65+
66+
bout = new ByteArrayOutputStream();
67+
out = new PrintStream(bout);
68+
System.setOut(out);
69+
70+
topicId = "test_topic_" + UUID.randomUUID().toString().substring(0, 8);
71+
subscriptionId = topicId + "-sub";
72+
73+
TopicName topicName = TopicName.of(PROJECT_ID, topicId);
74+
topicAdminClient = TopicAdminClient.create();
75+
topicAdminClient.createTopic(topicName);
76+
77+
SubscriptionName subscriptionName = SubscriptionName.of(PROJECT_ID, subscriptionId);
78+
subscriptionAdminClient = SubscriptionAdminClient.create();
79+
subscriptionAdminClient.createSubscription(subscriptionName, topicName,
80+
PushConfig.getDefaultInstance(), 120);
81+
}
82+
83+
@After
84+
public void tearDown() {
85+
subscriptionAdminClient.deleteSubscription(SubscriptionName.of(PROJECT_ID, subscriptionId));
86+
topicAdminClient.deleteTopic(TopicName.of(PROJECT_ID, topicId));
87+
System.setOut(null);
88+
}
89+
90+
@Test
91+
public void testPubSubWriteWithAttributes() throws Exception {
92+
93+
Map<String, PubsubMessage> messages = new ConcurrentHashMap<>();
94+
95+
PubSubWriteWithAttributes.main(
96+
new String[] {
97+
"--runner=DirectRunner",
98+
"--topic=" + String.format("projects/%s/topics/%s", PROJECT_ID, topicId)
99+
});
100+
101+
MessageReceiver receiver =
102+
(PubsubMessage message, AckReplyConsumer consumer) -> {
103+
// Store in a map by message ID, which are guaranteed to be unique within a topic.
104+
messages.put(message.getMessageId(), message);
105+
consumer.ack();
106+
};
107+
108+
// Verify that the pipeline wrote messages to Pub/Sub
109+
Subscriber subscriber = null;
110+
try {
111+
ProjectSubscriptionName subscriptionName =
112+
ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
113+
114+
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
115+
subscriber.startAsync().awaitRunning();
116+
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
117+
} catch (TimeoutException timeoutException) {
118+
subscriber.stopAsync();
119+
}
120+
assertEquals(4, messages.size());
121+
for (Map.Entry<String, PubsubMessage> item : messages.entrySet()) {
122+
assertEquals(2, item.getValue().getAttributesCount());
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)