Skip to content

Commit 5a3675e

Browse files
authored
fix: call DLP API in the region of DataFlow job (GoogleCloudPlatform#9144)
* fix: call DLP API in the region of DataFlow job modify final code to call DLP API for the region where DataFlow job is deployed. add placeholder for this modification to boilerplate code. populate README with information about the folder and link to Neos tutorial.
1 parent 8322cad commit 5a3675e

File tree

3 files changed

+32
-12
lines changed

3 files changed

+32
-12
lines changed

logging/redaction/README.md

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
# Logging redaction tutorial code samples
22

3-
> **Warning**
4-
> This section is still **W**ork **I**n **P**rogress.
5-
> Cloud Shell button now opens this README. It will open the tutorial _AFTER_ its official launch.
6-
> Tests to validate the code samples will be added.
3+
This is a sample of the [DataFlow] pipeline that detects and masks US Social Security Numbers from the log payloads at ingestion time.
4+
The folder contains the following files:
75

8-
This section contains code that is used in the "Redact confidential information in logs" tutorial.
9-
You can open the tutorial in Cloud Shell:
6+
* [Dockerfile] to use for a customized a DataFlow job container in order to save initialization time
7+
* [Boilerplate code][boilerplate] that implements a pipeline for streaming log entries from [PubSub] to a destination Log bucket
8+
* [Final version][final] of the pipeline that includes all modifications to the [boilerplate] code that are required to implement log redaction
9+
* [Requirements] file to be install the DataFlow job's environment with missing component(s)
10+
11+
If you have a Google Cloud account and an access to a GCP project you can launch an interactive tutorial in Cloud Console and see how the sample works.
12+
To run the tutorial press the button below.
1013

1114
[![Open in Cloud Shell][shell_img]][shell_link]
1215

13-
_NOTE:_ You will need a valid Google Cloud credentials to open the tutorial.
16+
NOTE: To run this tutorial you will need to have permissions to enable Google APIs, provision PubSub, DataFlow and Cloud Storage resources as well as permissions to call DLP API
1417

18+
[dataflow]: https://cloud.google.com/dataflow
19+
[pubsub]: https://cloud.google.com/pubsub
20+
[dockerfile]: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/logging/redaction/Dockerfile
21+
[boilerplate]: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/logging/redaction/log_redaction.py
22+
[final]: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/logging/redaction/log_redaction_final.py
23+
[requirements]: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/logging/redaction/requirements.txt
1524
[shell_img]: http://gstatic.com/cloudssh/images/open-btn.png
16-
[shell_link]: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=logging/redaction/README.md
25+
[shell_link]: https://console.cloud.google.com/?walkthrough_id=cloud-ops-log-redacting-on-ingestion

logging/redaction/log_redaction.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ def run(
105105
streaming=True,
106106
save_main_session=True
107107
)
108+
109+
# TODO: Read job's deployment region
110+
108111
pipeline = Pipeline(options=pipeline_options)
109112
_ = (
110113
pipeline

logging/redaction/log_redaction_final.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from apache_beam import CombineFn, CombineGlobally, DoFn, io, ParDo, Pipeline, WindowInto
2222
from apache_beam.error import PipelineError
23-
from apache_beam.options.pipeline_options import PipelineOptions
23+
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions
2424
from apache_beam.transforms.window import FixedWindows
2525

2626
from google.cloud import dlp_v2, logging_v2
@@ -82,8 +82,9 @@ def extract_output(self, accumulator):
8282
class LogRedaction(DoFn):
8383
'''Apply inspection and redaction to textPayload field of log entries'''
8484

85-
def __init__(self, project_id: str):
85+
def __init__(self, region, project_id: str):
8686
self.project_id = project_id
87+
self.region = region
8788
self.dlp_client = None
8889

8990
def _log_to_row(self, entry):
@@ -113,7 +114,7 @@ def process(self, logs):
113114

114115
response = self.dlp_client.deidentify_content(
115116
request={
116-
'parent': f'projects/{self.project_id}',
117+
'parent': f'projects/{self.project_id}/locations/{self.region}',
117118
'inspect_config': INSPECT_CFG,
118119
'deidentify_config': REDACTION_CFG,
119120
'item': table,
@@ -177,6 +178,13 @@ def run(
177178
streaming=True,
178179
save_main_session=True
179180
)
181+
182+
region = "us-central1"
183+
try:
184+
region = pipeline_options.view_as(GoogleCloudOptions).region
185+
except AttributeError:
186+
pass
187+
180188
pipeline = Pipeline(options=pipeline_options)
181189
_ = (
182190
pipeline
@@ -186,7 +194,7 @@ def run(
186194
# Optimize Google API consumption and avoid possible throttling
187195
# by calling APIs for batched data and not per each element
188196
| 'Batch aggregated payloads' >> CombineGlobally(BatchPayloads()).without_defaults()
189-
| 'Redact SSN info from logs' >> ParDo(LogRedaction(destination_log_name.split('/')[1]))
197+
| 'Redact SSN info from logs' >> ParDo(LogRedaction(region, destination_log_name.split('/')[1]))
190198
| 'Ingest to output log' >> ParDo(IngestLogs(destination_log_name))
191199
)
192200
pipeline.run()

0 commit comments

Comments
 (0)