Skip to content

Commit 97e7e82

Browse files
Pub/Sub: enable parallel writes to GCS in Pub/Sub Dataflow example (GoogleCloudPlatform#5547)
* feat: enable parallel writes * address david's comments * address david's 2nd round of comments
1 parent 8e154fb commit 97e7e82

3 files changed

Lines changed: 97 additions & 80 deletions

File tree

pubsub/streaming-analytics/PubSubToGCS.py

Lines changed: 59 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -14,119 +14,122 @@
1414

1515
# [START pubsub_to_gcs]
1616
import argparse
17-
import datetime
18-
import json
17+
from datetime import datetime
1918
import logging
19+
import random
2020

21-
import apache_beam as beam
21+
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
2222
from apache_beam.options.pipeline_options import PipelineOptions
23-
import apache_beam.transforms.window as window
23+
from apache_beam.transforms.window import FixedWindows
2424

2525

26-
class GroupWindowsIntoBatches(beam.PTransform):
27-
"""A composite transform that groups Pub/Sub messages based on publish
28-
time and outputs a list of dictionaries, where each contains one message
29-
and its publish timestamp.
26+
class GroupMessagesByFixedWindows(PTransform):
27+
"""A composite transform that groups Pub/Sub messages based on publish time
28+
and outputs a list of tuples, each containing a message and its publish time.
3029
"""
3130

32-
def __init__(self, window_size):
33-
# Convert minutes into seconds.
31+
def __init__(self, window_size, num_shards=5):
32+
# Set window size to 60 seconds.
3433
self.window_size = int(window_size * 60)
34+
self.num_shards = num_shards
3535

3636
def expand(self, pcoll):
3737
return (
3838
pcoll
39-
# Assigns window info to each Pub/Sub message based on its
40-
# publish timestamp.
41-
| "Window into Fixed Intervals"
42-
>> beam.WindowInto(window.FixedWindows(self.window_size))
43-
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
44-
# Use a dummy key to group the elements in the same window.
45-
# Note that all the elements in one window must fit into memory
46-
# for this. If the windowed elements do not fit into memory,
47-
# please consider using `beam.util.BatchElements`.
48-
# https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
49-
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
50-
| "Groupby" >> beam.GroupByKey()
51-
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
39+
# Bind window info to each element using element timestamp (or publish time).
40+
| "Window into fixed intervals"
41+
>> WindowInto(FixedWindows(self.window_size))
42+
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
43+
# Assign a random key to each windowed element based on the number of shards.
44+
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
45+
# Group windowed elements by key. All the elements in the same window must fit
46+
# memory for this. If not, you need to use `beam.util.BatchElements`.
47+
| "Group by key" >> GroupByKey()
5248
)
5349

5450

55-
class AddTimestamps(beam.DoFn):
56-
def process(self, element, publish_time=beam.DoFn.TimestampParam):
57-
"""Processes each incoming windowed element by extracting the Pub/Sub
58-
message and its publish timestamp into a dictionary. `publish_time`
59-
defaults to the publish timestamp returned by the Pub/Sub server. It
60-
is bound to each element by Beam at runtime.
51+
class AddTimestamp(DoFn):
52+
def process(self, element, publish_time=DoFn.TimestampParam):
53+
"""Processes each windowed element by extracting the message body and its
54+
publish time into a tuple.
6155
"""
62-
63-
yield {
64-
"message_body": element.decode("utf-8"),
65-
"publish_time": datetime.datetime.utcfromtimestamp(
66-
float(publish_time)
67-
).strftime("%Y-%m-%d %H:%M:%S.%f"),
68-
}
56+
yield (
57+
element.decode("utf-8"),
58+
datetime.utcfromtimestamp(float(publish_time)).strftime(
59+
"%Y-%m-%d %H:%M:%S.%f"
60+
),
61+
)
6962

7063

71-
class WriteBatchesToGCS(beam.DoFn):
64+
class WriteToGCS(DoFn):
7265
def __init__(self, output_path):
7366
self.output_path = output_path
7467

75-
def process(self, batch, window=beam.DoFn.WindowParam):
76-
"""Write one batch per file to a Google Cloud Storage bucket. """
68+
def process(self, key_value, window=DoFn.WindowParam):
69+
"""Write messages in a batch to Google Cloud Storage."""
7770

7871
ts_format = "%H:%M"
7972
window_start = window.start.to_utc_datetime().strftime(ts_format)
8073
window_end = window.end.to_utc_datetime().strftime(ts_format)
81-
filename = "-".join([self.output_path, window_start, window_end])
74+
shard_id, batch = key_value
75+
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
8276

83-
with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
84-
for element in batch:
85-
f.write("{}\n".format(json.dumps(element)).encode("utf-8"))
77+
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
78+
for message_body, publish_time in batch:
79+
f.write(f"{message_body},{publish_time}\n".encode("utf-8"))
8680

8781

88-
def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
89-
# `save_main_session` is set to true because some DoFn's rely on
90-
# globally imported modules.
82+
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
83+
# Set `save_main_session` to True so DoFns can access globally imported modules.
9184
pipeline_options = PipelineOptions(
9285
pipeline_args, streaming=True, save_main_session=True
9386
)
9487

95-
with beam.Pipeline(options=pipeline_options) as pipeline:
88+
with Pipeline(options=pipeline_options) as pipeline:
9689
(
9790
pipeline
98-
| "Read PubSub Messages"
99-
>> beam.io.ReadFromPubSub(topic=input_topic)
100-
| "Window into" >> GroupWindowsIntoBatches(window_size)
101-
| "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
91+
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
92+
# binds the publish time returned by the Pub/Sub server for each message
93+
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
94+
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
95+
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
96+
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
97+
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
10298
)
10399

104100

105-
if __name__ == "__main__": # noqa
101+
if __name__ == "__main__":
106102
logging.getLogger().setLevel(logging.INFO)
107103

108104
parser = argparse.ArgumentParser()
109105
parser.add_argument(
110106
"--input_topic",
111-
help="The Cloud Pub/Sub topic to read from.\n"
112-
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
107+
help="The Cloud Pub/Sub topic to read from."
108+
'"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
113109
)
114110
parser.add_argument(
115111
"--window_size",
116112
type=float,
117113
default=1.0,
118-
help="Output file's window size in number of minutes.",
114+
help="Output file's window size in minutes.",
119115
)
120116
parser.add_argument(
121117
"--output_path",
122-
help="GCS Path of the output file including filename prefix.",
118+
help="Path of the output GCS file including the prefix.",
119+
)
120+
parser.add_argument(
121+
"--num_shards",
122+
type=int,
123+
default=5,
124+
help="Number of shards to use when writing windowed elements to GCS.",
123125
)
124126
known_args, pipeline_args = parser.parse_known_args()
125127

126128
run(
127129
known_args.input_topic,
128130
known_args.output_path,
129131
known_args.window_size,
132+
known_args.num_shards,
130133
pipeline_args,
131134
)
132135
# [END pubsub_to_gcs]

pubsub/streaming-analytics/PubSubToGCS_test.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import os
1616
import uuid
1717

18-
import apache_beam as beam
18+
from apache_beam.io.gcp.gcsio import GcsIO
1919
from apache_beam.testing.test_pipeline import TestPipeline
2020
from apache_beam.testing.test_stream import TestStream
2121
from apache_beam.testing.test_utils import TempDir
@@ -47,8 +47,9 @@
4747
def test_pubsub_to_gcs():
4848
PubSubToGCS.run(
4949
input_topic="unused", # mocked by TestStream
50-
output_path="gs://{}/pubsub/{}/output".format(BUCKET, UUID),
50+
output_path=f"gs://{BUCKET}/pubsub/{UUID}/output",
5151
window_size=1, # 1 minute
52+
num_shards=1,
5253
pipeline_args=[
5354
"--project",
5455
PROJECT,
@@ -58,8 +59,8 @@ def test_pubsub_to_gcs():
5859
)
5960

6061
# Check for output files on GCS.
61-
gcs_client = beam.io.gcp.gcsio.GcsIO()
62-
files = gcs_client.list_prefix("gs://{}/pubsub/{}".format(BUCKET, UUID))
62+
gcs_client = GcsIO()
63+
files = gcs_client.list_prefix(f"gs://{BUCKET}/pubsub/{UUID}")
6364
assert len(files) > 0
6465

6566
# Clean up.

pubsub/streaming-analytics/README.md

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ Sample(s) showing how to use [Google Cloud Pub/Sub] with [Google Cloud Dataflow]
1414
or via the `gcloud` command line tool.
1515

1616
```sh
17-
export PROJECT_NAME=your-google-cloud-project-id
18-
gcloud projects create $PROJECT_NAME
17+
export PROJECT_ID=your-google-cloud-project-id
18+
gcloud projects create $PROJECT_ID
1919
```
2020

2121
1. [Enable billing].
@@ -41,15 +41,16 @@ Sample(s) showing how to use [Google Cloud Pub/Sub] with [Google Cloud Dataflow]
4141
Alternatively, you can use `gcloud` through the command line.
4242

4343
```sh
44-
export PROJECT_NAME=$(gcloud config get-value project)
45-
export SA_NAME=samples
46-
export IAM_ACCOUNT=$SA_NAME@$PROJECT_NAME.iam.gserviceaccount.com
44+
export PROJECT_ID=$(gcloud config get-value project)
45+
export SERVICE_ACCOUNT_NAME=samples
46+
export IAM_ACCOUNT=$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com
4747

4848
# Create the service account.
49-
gcloud iam service-accounts create $SA_NAME --display-name $SA_NAME
49+
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \
50+
--display-name $SERVICE_ACCOUNT_NAME
5051

5152
# Set the role to Project Owner (*).
52-
gcloud projects add-iam-policy-binding $PROJECT_NAME \
53+
gcloud projects add-iam-policy-binding $PROJECT_ID \
5354
--member serviceAccount:$IAM_ACCOUNT \
5455
--role roles/owner
5556

@@ -77,16 +78,18 @@ Sample(s) showing how to use [Google Cloud Pub/Sub] with [Google Cloud Dataflow]
7778
1. Create a Cloud Storage bucket.
7879

7980
```bash
80-
export BUCKET_NAME=your-gcs-bucket
81+
export BUCKET_ID=your-gcs-bucket-id
8182

82-
gsutil mb gs://$BUCKET_NAME
83+
gsutil mb gs://$BUCKET_ID
8384
```
8485

8586
1. Start a [Google Cloud Scheduler] job that publishes one message to a [Google Cloud Pub/Sub] topic every minute. This will create an [App Engine] app if one has never been created on the project.
8687

8788
```bash
89+
export TOPIC_ID=your-topic-id
90+
8891
# Create a Pub/Sub topic.
89-
gcloud pubsub topics create cron-topic
92+
gcloud pubsub topics create $TOPIC_ID
9093

9194
# Create a Cloud Scheduler job
9295
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
@@ -134,33 +137,43 @@ The following instructions will help you prepare your development environment.
134137

135138
* [PubSubToGCS.py](PubSubToGCS.py)
136139

137-
The following example will run a streaming pipeline. It will read messages from a Pub/Sub topic, then window them into fixed-sized intervals, and write one file per window into a GCS location.
140+
The following example will run a streaming pipeline. The pipeline does the following:
141+
1. Reads messages from a Pub/Sub topic.
142+
1. Group messages into batches for every windows.
143+
1. Adds window start and end time to each element/message.
144+
1. Adds publish timestamp to each element/message.
145+
1. Adds a random shard ID as key to each windowed element. *Sharding* lets you split the elements in the same window into multiple small batches. This way, multiple workers can each write a batch of elements into Cloud Storage. This results in one file per shard.
146+
1. Groups the elements by their shard ID for every window.
147+
1. Writes the grouped elements to a file on Cloud Storage.
138148

139149
+ `--project`: sets the Google Cloud project ID to run the pipeline on
140150
+ `--region`: sets the Dataflow [regional endpoint](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)
141151
+ `--input_topic`: sets the input Pub/Sub topic to read messages from
142152
+ `--output_path`: sets the output GCS path prefix to write files to
143153
+ `--runner`: specifies the runner to run the pipeline, if not set to `DataflowRunner`, `DirectRunner` is used
144154
+ `--window_size [optional]`: specifies the window size in minutes, defaults to 1.0
155+
+ `--num_shards [optional]`: sets the number of shards when writing windowed elements to GCS, defaults to 5.
145156
+ `--temp_location`: needed for executing the pipeline
146157

147158
```bash
148159
python PubSubToGCS.py \
149-
--project=$PROJECT_NAME \
160+
--project=$PROJECT_ID \
150161
--region=us-central1 \
151-
--input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \
152-
--output_path=gs://$BUCKET_NAME/samples/output \
162+
--input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
163+
--output_path=gs://$BUCKET_ID/samples/output \
153164
--runner=DataflowRunner \
154-
--window_size=2 \
155-
--temp_location=gs://$BUCKET_NAME/temp
165+
--window_size=1 \
166+
# If set, you will write up to `num_shards` files per window to GCS.
167+
# --num_shards=2 \
168+
--temp_location=gs://$BUCKET_ID/temp
156169
```
157170

158171
After the job has been submitted, you can check its status in the [GCP Console Dataflow page].
159172

160173
You can also check the output to your GCS bucket using the command line below or in the [GCP Console Storage page]. You may need to wait a few minutes for the files to appear.
161174

162175
```bash
163-
gsutil ls gs://$BUCKET_NAME/samples/
176+
gsutil ls gs://$BUCKET_ID/samples/
164177
```
165178

166179
## Cleanup
@@ -178,17 +191,17 @@ gsutil ls gs://$BUCKET_NAME/samples/
178191
1. Delete the topic. [Google Cloud Dataflow] will automatically delete the subscription associated with the streaming pipeline when the job is canceled.
179192

180193
```bash
181-
gcloud pubsub topics delete cron-topic
194+
gcloud pubsub topics delete $TOPIC_ID
182195
```
183196

184197
1. Lastly, to avoid incurring charges to your GCP account for the resources created in this tutorial:
185198

186199
```bash
187200
# Delete only the files created by this sample.
188-
gsutil -m rm -rf "gs://$BUCKET_NAME/samples/output*"
201+
gsutil -m rm -rf "gs://$BUCKET_ID/samples/output*"
189202
190203
# [optional] Remove the Cloud Storage bucket.
191-
gsutil rb gs://$BUCKET_NAME
204+
gsutil rb gs://$BUCKET_ID
192205
```
193206

194207
[Apache Beam]: https://beam.apache.org/

0 commit comments

Comments
 (0)