Skip to content

Commit b61f8bc

Browse files
docs(samples): Add dataflow_batch_write_to_storage snippet (GoogleCloudPlatform#10658)
* docs(samples): Add dataflow_batch_write_to_storage snippet * Bump Beam SDK version * Fix linter error * Remove a stray debugging line * Remove __init__.py files * Revert "Remove __init__.py files" This reverts commit 21ed33e. * Update copyright headers Update year Add to __init__ files * Incorporate PR feedback - Add a test condition - Add comments re: pipeline options --------- Co-authored-by: Veronica Wasson <veronicawasson@google.com>
1 parent ddef2fb commit b61f8bc

File tree

6 files changed

+116
-0
lines changed

6 files changed

+116
-0
lines changed

dataflow/snippets/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#!/usr/bin/env python
2+
# Copyright 2023 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# [START dataflow_batch_write_to_storage]
17+
import apache_beam as beam
18+
from apache_beam.io.textio import WriteToText
19+
from apache_beam.options.pipeline_options import PipelineOptions
20+
21+
22+
def write_to_cloud_storage(argv=None):
23+
# Parse the pipeline options passed into the application.
24+
class MyOptions(PipelineOptions):
25+
@classmethod
26+
# Define a custom pipeline option that specfies the Cloud Storage bucket.
27+
def _add_argparse_args(cls, parser):
28+
parser.add_argument("--output", required=True)
29+
30+
wordsList = ["1", "2", "3", "4"]
31+
options = MyOptions()
32+
33+
with beam.Pipeline(options=options) as pipeline:
34+
(
35+
pipeline
36+
| "Create elements" >> beam.Create(wordsList)
37+
| "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
38+
)
39+
# [END dataflow_batch_write_to_storage]
40+
41+
42+
if __name__ == "__main__":
43+
write_to_cloud_storage()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pytest==7.0.1

dataflow/snippets/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
apache-beam[gcp]==2.50.0
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import sys
15+
import uuid
16+
17+
from google.cloud import storage
18+
19+
import pytest
20+
21+
from ..batch_write_storage import write_to_cloud_storage
22+
23+
24+
bucket_name = f'test-bucket-{uuid.uuid4()}'
25+
storage_client = storage.Client()
26+
27+
28+
@pytest.fixture(scope="function")
29+
def setup_and_teardown():
30+
try:
31+
bucket = storage_client.create_bucket(bucket_name)
32+
yield
33+
finally:
34+
bucket.delete(force=True)
35+
36+
37+
def test_write_to_cloud_storage(setup_and_teardown):
38+
sys.argv = ['', f'--output=gs://{bucket_name}/output/out-']
39+
write_to_cloud_storage()
40+
41+
blobs = list(storage_client.list_blobs(bucket_name))
42+
# Ensure the pipeline wrote files to Cloud Storage
43+
assert blobs
44+
for blob in blobs:
45+
assert blob.name.endswith(".txt")

0 commit comments

Comments
 (0)