|
14 | 14 |
|
15 | 15 | # [START pubsub_to_gcs] |
16 | 16 | import argparse |
17 | | -import datetime |
18 | | -import json |
| 17 | +from datetime import datetime |
19 | 18 | import logging |
| 19 | +import random |
20 | 20 |
|
21 | | -import apache_beam as beam |
| 21 | +from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys |
22 | 22 | from apache_beam.options.pipeline_options import PipelineOptions |
23 | | -import apache_beam.transforms.window as window |
| 23 | +from apache_beam.transforms.window import FixedWindows |
24 | 24 |
|
25 | 25 |
|
26 | | -class GroupWindowsIntoBatches(beam.PTransform): |
27 | | - """A composite transform that groups Pub/Sub messages based on publish |
28 | | - time and outputs a list of dictionaries, where each contains one message |
29 | | - and its publish timestamp. |
| 26 | +class GroupMessagesByFixedWindows(PTransform): |
| 27 | + """A composite transform that groups Pub/Sub messages based on publish time |
| 28 | + and outputs a list of tuples, each containing a message and its publish time. |
30 | 29 | """ |
31 | 30 |
|
32 | | - def __init__(self, window_size): |
33 | | - # Convert minutes into seconds. |
| 31 | + def __init__(self, window_size, num_shards=5): |
| 32 | + # Set window size to 60 seconds. |
34 | 33 | self.window_size = int(window_size * 60) |
| 34 | + self.num_shards = num_shards |
35 | 35 |
|
36 | 36 | def expand(self, pcoll): |
37 | 37 | return ( |
38 | 38 | pcoll |
39 | | - # Assigns window info to each Pub/Sub message based on its |
40 | | - # publish timestamp. |
41 | | - | "Window into Fixed Intervals" |
42 | | - >> beam.WindowInto(window.FixedWindows(self.window_size)) |
43 | | - | "Add timestamps to messages" >> beam.ParDo(AddTimestamps()) |
44 | | - # Use a dummy key to group the elements in the same window. |
45 | | - # Note that all the elements in one window must fit into memory |
46 | | - # for this. If the windowed elements do not fit into memory, |
47 | | - # please consider using `beam.util.BatchElements`. |
48 | | - # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements |
49 | | - | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem)) |
50 | | - | "Groupby" >> beam.GroupByKey() |
51 | | - | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val) |
| 39 | + # Bind window info to each element using element timestamp (or publish time). |
| 40 | + | "Window into fixed intervals" |
| 41 | + >> WindowInto(FixedWindows(self.window_size)) |
| 42 | + | "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) |
| 43 | + # Assign a random key to each windowed element based on the number of shards. |
| 44 | + | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) |
| 45 | + # Group windowed elements by key. All the elements in the same window must fit |
| 46 | + # memory for this. If not, you need to use `beam.util.BatchElements`. |
| 47 | + | "Group by key" >> GroupByKey() |
52 | 48 | ) |
53 | 49 |
|
54 | 50 |
|
55 | | -class AddTimestamps(beam.DoFn): |
56 | | - def process(self, element, publish_time=beam.DoFn.TimestampParam): |
57 | | - """Processes each incoming windowed element by extracting the Pub/Sub |
58 | | - message and its publish timestamp into a dictionary. `publish_time` |
59 | | - defaults to the publish timestamp returned by the Pub/Sub server. It |
60 | | - is bound to each element by Beam at runtime. |
| 51 | +class AddTimestamp(DoFn): |
| 52 | + def process(self, element, publish_time=DoFn.TimestampParam): |
| 53 | + """Processes each windowed element by extracting the message body and its |
| 54 | + publish time into a tuple. |
61 | 55 | """ |
62 | | - |
63 | | - yield { |
64 | | - "message_body": element.decode("utf-8"), |
65 | | - "publish_time": datetime.datetime.utcfromtimestamp( |
66 | | - float(publish_time) |
67 | | - ).strftime("%Y-%m-%d %H:%M:%S.%f"), |
68 | | - } |
| 56 | + yield ( |
| 57 | + element.decode("utf-8"), |
| 58 | + datetime.utcfromtimestamp(float(publish_time)).strftime( |
| 59 | + "%Y-%m-%d %H:%M:%S.%f" |
| 60 | + ), |
| 61 | + ) |
69 | 62 |
|
70 | 63 |
|
71 | | -class WriteBatchesToGCS(beam.DoFn): |
| 64 | +class WriteToGCS(DoFn): |
72 | 65 | def __init__(self, output_path): |
73 | 66 | self.output_path = output_path |
74 | 67 |
|
75 | | - def process(self, batch, window=beam.DoFn.WindowParam): |
76 | | - """Write one batch per file to a Google Cloud Storage bucket. """ |
| 68 | + def process(self, key_value, window=DoFn.WindowParam): |
| 69 | + """Write messages in a batch to Google Cloud Storage.""" |
77 | 70 |
|
78 | 71 | ts_format = "%H:%M" |
79 | 72 | window_start = window.start.to_utc_datetime().strftime(ts_format) |
80 | 73 | window_end = window.end.to_utc_datetime().strftime(ts_format) |
81 | | - filename = "-".join([self.output_path, window_start, window_end]) |
| 74 | + shard_id, batch = key_value |
| 75 | + filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) |
82 | 76 |
|
83 | | - with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f: |
84 | | - for element in batch: |
85 | | - f.write("{}\n".format(json.dumps(element)).encode("utf-8")) |
| 77 | + with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: |
| 78 | + for message_body, publish_time in batch: |
| 79 | + f.write(f"{message_body},{publish_time}\n".encode("utf-8")) |
86 | 80 |
|
87 | 81 |
|
88 | | -def run(input_topic, output_path, window_size=1.0, pipeline_args=None): |
89 | | - # `save_main_session` is set to true because some DoFn's rely on |
90 | | - # globally imported modules. |
| 82 | +def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None): |
| 83 | + # Set `save_main_session` to True so DoFns can access globally imported modules. |
91 | 84 | pipeline_options = PipelineOptions( |
92 | 85 | pipeline_args, streaming=True, save_main_session=True |
93 | 86 | ) |
94 | 87 |
|
95 | | - with beam.Pipeline(options=pipeline_options) as pipeline: |
| 88 | + with Pipeline(options=pipeline_options) as pipeline: |
96 | 89 | ( |
97 | 90 | pipeline |
98 | | - | "Read PubSub Messages" |
99 | | - >> beam.io.ReadFromPubSub(topic=input_topic) |
100 | | - | "Window into" >> GroupWindowsIntoBatches(window_size) |
101 | | - | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path)) |
| 91 | + # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam |
| 92 | + # binds the publish time returned by the Pub/Sub server for each message |
| 93 | + # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. |
| 94 | + # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub |
| 95 | + | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) |
| 96 | + | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards) |
| 97 | + | "Write to GCS" >> ParDo(WriteToGCS(output_path)) |
102 | 98 | ) |
103 | 99 |
|
104 | 100 |
|
105 | | -if __name__ == "__main__": # noqa |
| 101 | +if __name__ == "__main__": |
106 | 102 | logging.getLogger().setLevel(logging.INFO) |
107 | 103 |
|
108 | 104 | parser = argparse.ArgumentParser() |
109 | 105 | parser.add_argument( |
110 | 106 | "--input_topic", |
111 | | - help="The Cloud Pub/Sub topic to read from.\n" |
112 | | - '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".', |
| 107 | + help="The Cloud Pub/Sub topic to read from." |
| 108 | + '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".', |
113 | 109 | ) |
114 | 110 | parser.add_argument( |
115 | 111 | "--window_size", |
116 | 112 | type=float, |
117 | 113 | default=1.0, |
118 | | - help="Output file's window size in number of minutes.", |
| 114 | + help="Output file's window size in minutes.", |
119 | 115 | ) |
120 | 116 | parser.add_argument( |
121 | 117 | "--output_path", |
122 | | - help="GCS Path of the output file including filename prefix.", |
| 118 | + help="Path of the output GCS file including the prefix.", |
| 119 | + ) |
| 120 | + parser.add_argument( |
| 121 | + "--num_shards", |
| 122 | + type=int, |
| 123 | + default=5, |
| 124 | + help="Number of shards to use when writing windowed elements to GCS.", |
123 | 125 | ) |
124 | 126 | known_args, pipeline_args = parser.parse_known_args() |
125 | 127 |
|
126 | 128 | run( |
127 | 129 | known_args.input_topic, |
128 | 130 | known_args.output_path, |
129 | 131 | known_args.window_size, |
| 132 | + known_args.num_shards, |
130 | 133 | pipeline_args, |
131 | 134 | ) |
132 | 135 | # [END pubsub_to_gcs] |
0 commit comments