Skip to content

Commit 09aa10c

Browse files
authored
Python examples to use IObase IOs sinks in streaming (and batch) (#35811)
* examples to use IObase IOs sinks (TextIO, ParquetIO, AvroIO, TFRecordsIO) in batch and streaming * add __init__.py for module import * Fix lint (unused imports) * move the samples in to sinks (as iobased_sinks might be too obscure for users). add folder README add mode comments per damccorm comments. * fix imports * formatter fix * spaces * test pylint line length * add no qa * fix extra line
1 parent 7cedc0d commit 09aa10c

6 files changed

Lines changed: 551 additions & 0 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Examples of writing to Sinks
21+
22+
This module contains example pipelines that use the [Beam IO connectors](https://beam.apache.org/documentation/io/connectors/) also known as Sinks to write in streaming and batch.
23+
24+
## Batch
25+
26+
test_write_bounded.py - a simple pipeline taking a bounded PCollection
27+
as input using the [Create](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Create)
28+
transform (useful for testing) and writing it to files using multiple IOs.
29+
30+
### Running the pipeline
31+
32+
To run the pipeline locally:
33+
34+
```sh
35+
python -m apache_beam.examples.sinks.test_write_bounded
36+
```
37+
38+
## Streaming
39+
40+
Two example pipelines that use 2 different approches for creating the input.
41+
42+
test_write_unbounded.py uses [TestStream](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/testing/TestStream.html),
43+
a method where you can control when data arrives and how watermark advances.
44+
This is especially useful in unit tests.
45+
46+
test_periodicimpulse.py uses [PeriodicImpulse](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.periodicsequence.html#apache_beam.transforms.periodicsequence.PeriodicImpulse),
47+
a method useful to test pipelines in realtime. You can run it to Dataflow as well.
48+
49+
### Running the pipeline
50+
51+
To run the pipelines locally:
52+
53+
```sh
54+
python -m apache_beam.examples.sinks.test_write_unbounded
55+
```
56+
57+
```sh
58+
python -m apache_beam.examples.sinks.test_periodicimpulse
59+
```
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from datetime import datetime
18+
19+
import pytz
20+
21+
import apache_beam as beam
22+
from apache_beam.testing.test_stream import TestStream
23+
24+
25+
class GenerateEvent(beam.PTransform):
26+
# pylint: disable=line-too-long
27+
"""This class simulates streaming data.
28+
It leverages [TestStream](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/testing/TestStream.html),
29+
a method where you can control when data arrives and how watermark advances.
30+
This is especially useful in unit tests.""" # noqa
31+
32+
@staticmethod
33+
def sample_data():
34+
return GenerateEvent()
35+
36+
def expand(self, input):
37+
# these are the elements that will arrive in the simulated TestStream
38+
# at multiple timestamps
39+
elem = [{'age': 10}, {'age': 20}, {'age': 30}]
40+
41+
# The simulated TestStream adds elements at specific timestamps
42+
# using add_elements and advances the watermark after 1 or more
43+
# elements are arrive using advance_watermark_to
44+
return (
45+
input
46+
| TestStream().add_elements(
47+
elements=elem,
48+
event_timestamp=datetime(
49+
2021, 3, 1, 0, 0, 1, 0,
50+
tzinfo=pytz.UTC).timestamp()).add_elements(
51+
elements=elem,
52+
event_timestamp=datetime(
53+
2021, 3, 1, 0, 0, 2, 0,
54+
tzinfo=pytz.UTC).timestamp()).add_elements(
55+
elements=elem,
56+
event_timestamp=datetime(
57+
2021, 3, 1, 0, 0, 3, 0,
58+
tzinfo=pytz.UTC).timestamp()).add_elements(
59+
elements=elem,
60+
event_timestamp=datetime(
61+
2021, 3, 1, 0, 0, 4, 0,
62+
tzinfo=pytz.UTC).timestamp()).
63+
advance_watermark_to(
64+
datetime(2021, 3, 1, 0, 0, 5, 0,
65+
tzinfo=pytz.UTC).timestamp()).add_elements(
66+
elements=elem,
67+
event_timestamp=datetime(
68+
2021, 3, 1, 0, 0, 5, 0,
69+
tzinfo=pytz.UTC).timestamp()).
70+
add_elements(
71+
elements=elem,
72+
event_timestamp=datetime(
73+
2021, 3, 1, 0, 0, 6,
74+
0, tzinfo=pytz.UTC).timestamp()).add_elements(
75+
elements=elem,
76+
event_timestamp=datetime(
77+
2021, 3, 1, 0, 0, 7, 0,
78+
tzinfo=pytz.UTC).timestamp()).add_elements(
79+
elements=elem,
80+
event_timestamp=datetime(
81+
2021, 3, 1, 0, 0, 8, 0,
82+
tzinfo=pytz.UTC).timestamp()).add_elements(
83+
elements=elem,
84+
event_timestamp=datetime(
85+
2021, 3, 1, 0, 0, 9, 0,
86+
tzinfo=pytz.UTC).timestamp()).
87+
advance_watermark_to(
88+
datetime(2021, 3, 1, 0, 0, 10, 0,
89+
tzinfo=pytz.UTC).timestamp()).add_elements(
90+
elements=elem,
91+
event_timestamp=datetime(
92+
2021, 3, 1, 0, 0, 10, 0,
93+
tzinfo=pytz.UTC).timestamp()).add_elements(
94+
elements=elem,
95+
event_timestamp=datetime(
96+
2021, 3, 1, 0, 0, 11, 0,
97+
tzinfo=pytz.UTC).timestamp()).
98+
add_elements(
99+
elements=elem,
100+
event_timestamp=datetime(
101+
2021, 3, 1, 0, 0, 12, 0,
102+
tzinfo=pytz.UTC).timestamp()).add_elements(
103+
elements=elem,
104+
event_timestamp=datetime(
105+
2021, 3, 1, 0, 0, 13, 0,
106+
tzinfo=pytz.UTC).timestamp()).add_elements(
107+
elements=elem,
108+
event_timestamp=datetime(
109+
2021, 3, 1, 0, 0, 14, 0,
110+
tzinfo=pytz.UTC).timestamp()).
111+
advance_watermark_to(
112+
datetime(2021, 3, 1, 0, 0, 15, 0,
113+
tzinfo=pytz.UTC).timestamp()).add_elements(
114+
elements=elem,
115+
event_timestamp=datetime(
116+
2021, 3, 1, 0, 0, 15, 0,
117+
tzinfo=pytz.UTC).timestamp()).add_elements(
118+
elements=elem,
119+
event_timestamp=datetime(
120+
2021, 3, 1, 0, 0, 16, 0,
121+
tzinfo=pytz.UTC).timestamp()).
122+
add_elements(
123+
elements=elem,
124+
event_timestamp=datetime(
125+
2021, 3, 1, 0, 0, 17, 0,
126+
tzinfo=pytz.UTC).timestamp()).add_elements(
127+
elements=elem,
128+
event_timestamp=datetime(
129+
2021, 3, 1, 0, 0, 18, 0,
130+
tzinfo=pytz.UTC).timestamp()).add_elements(
131+
elements=elem,
132+
event_timestamp=datetime(
133+
2021, 3, 1, 0, 0, 19, 0,
134+
tzinfo=pytz.UTC).timestamp()).
135+
advance_watermark_to(
136+
datetime(2021, 3, 1, 0, 0, 20, 0,
137+
tzinfo=pytz.UTC).timestamp()).add_elements(
138+
elements=elem,
139+
event_timestamp=datetime(
140+
2021, 3, 1, 0, 0, 20, 0,
141+
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
142+
datetime(
143+
2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
144+
timestamp()).advance_watermark_to_infinity())
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
# To run the pipelines locally:
21+
# python -m apache_beam.examples.sinks.test_periodicimpulse
22+
23+
# This file contains examples of writing unbounded PCollection using
24+
# PeriodicImpulse to files
25+
26+
import argparse
27+
import logging
28+
29+
import apache_beam as beam
30+
from apache_beam.options.pipeline_options import PipelineOptions
31+
from apache_beam.options.pipeline_options import SetupOptions
32+
from apache_beam.runners.runner import PipelineResult
33+
from apache_beam.transforms.window import FixedWindows
34+
35+
36+
def run(argv=None, save_main_session=True) -> PipelineResult:
37+
"""Main entry point; defines and runs the wordcount pipeline."""
38+
parser = argparse.ArgumentParser()
39+
_, pipeline_args = parser.parse_known_args(argv)
40+
41+
# We use the save_main_session option because one or more DoFn's in this
42+
# workflow rely on global context (e.g., a module imported at module level).
43+
pipeline_options = PipelineOptions(pipeline_args)
44+
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
45+
46+
p = beam.Pipeline(options=pipeline_options)
47+
48+
_ = (
49+
p
50+
| "Create elements" >> beam.transforms.periodicsequence.PeriodicImpulse(
51+
start_timestamp=1,
52+
stop_timestamp=100,
53+
fire_interval=10,
54+
apply_windowing=False)
55+
| 'ApplyWindowing' >> beam.WindowInto(FixedWindows(20))
56+
| beam.io.WriteToText(
57+
file_path_prefix="__output__/ouput_WriteToText",
58+
file_name_suffix=".txt"))
59+
60+
# Execute the pipeline and return the result.
61+
result = p.run()
62+
result.wait_until_finish()
63+
return result
64+
65+
66+
if __name__ == '__main__':
67+
logging.getLogger().setLevel(logging.INFO)
68+
run()
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
# To run the pipelines locally:
21+
# python -m apache_beam.examples.sinks.test_write_bounded
22+
23+
# This file contains multiple examples of writing bounded PCollection to files
24+
25+
import argparse
26+
import json
27+
import logging
28+
29+
import pyarrow
30+
31+
import apache_beam as beam
32+
from apache_beam.io.fileio import WriteToFiles
33+
from apache_beam.io.textio import WriteToText
34+
from apache_beam.options.pipeline_options import PipelineOptions
35+
from apache_beam.options.pipeline_options import SetupOptions
36+
from apache_beam.runners.runner import PipelineResult
37+
from apache_beam.transforms.util import LogElements
38+
39+
40+
def run(argv=None, save_main_session=True) -> PipelineResult:
41+
"""Main entry point; defines and runs the wordcount pipeline."""
42+
parser = argparse.ArgumentParser()
43+
_, pipeline_args = parser.parse_known_args(argv)
44+
45+
# We use the save_main_session option because one or more DoFn's in this
46+
# workflow rely on global context (e.g., a module imported at module level).
47+
pipeline_options = PipelineOptions(pipeline_args)
48+
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
49+
50+
p = beam.Pipeline(options=pipeline_options)
51+
52+
output = (
53+
p | beam.Create([{
54+
'age': 10
55+
}, {
56+
'age': 20
57+
}, {
58+
'age': 30
59+
}])
60+
| beam.LogElements(
61+
prefix='before write ', with_window=False, level=logging.INFO))
62+
#TextIO
63+
output2 = output | 'Write to text' >> WriteToText(
64+
file_path_prefix="__output_batch__/ouput_WriteToText",
65+
file_name_suffix=".txt",
66+
shard_name_template='-U-SSSSS-of-NNNNN')
67+
_ = output2 | 'LogElements after WriteToText' >> LogElements(
68+
prefix='after WriteToText ', with_window=False, level=logging.INFO)
69+
70+
#FileIO
71+
output3 = (
72+
output | 'Serialize' >> beam.Map(json.dumps)
73+
| 'Write to files' >>
74+
WriteToFiles(path="__output_batch__/output_WriteToFiles"))
75+
_ = output3 | 'LogElements after WriteToFiles' >> LogElements(
76+
prefix='after WriteToFiles ', with_window=False, level=logging.INFO)
77+
78+
#ParquetIO
79+
output4 = output | 'Write' >> beam.io.WriteToParquet(
80+
file_path_prefix="__output_batch__/output_parquet",
81+
schema=pyarrow.schema([('age', pyarrow.int64())]))
82+
_ = output4 | 'LogElements after WriteToParquet' >> LogElements(
83+
prefix='after WriteToParquet ', with_window=False, level=logging.INFO)
84+
_ = output | 'Write parquet' >> beam.io.WriteToParquet(
85+
file_path_prefix="__output_batch__/output_WriteToParquet",
86+
schema=pyarrow.schema([('age', pyarrow.int64())]),
87+
record_batch_size=10,
88+
num_shards=0)
89+
90+
# Execute the pipeline and return the result.
91+
result = p.run()
92+
result.wait_until_finish()
93+
return result
94+
95+
96+
if __name__ == '__main__':
97+
logging.getLogger().setLevel(logging.INFO)
98+
run()

0 commit comments

Comments
 (0)