forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_redaction_final.py
More file actions
233 lines (192 loc) · 7.53 KB
/
log_redaction_final.py
File metadata and controls
233 lines (192 loc) · 7.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import json
import logging
from apache_beam import (
CombineFn,
CombineGlobally,
DoFn,
io,
ParDo,
Pipeline,
WindowInto,
)
from apache_beam.error import PipelineError
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions
from apache_beam.transforms.window import FixedWindows
from google.cloud import dlp_v2, logging_v2
# For more details about info types format, please see
# https://cloud.google.com/dlp/docs/reference/rest/v2/InspectConfig
INSPECT_CFG = {"info_types": [{"name": "US_SOCIAL_SECURITY_NUMBER"}]}
# For more details about transformation format, please see
# https://cloud.google.com/dlp/docs/reference/rest/v2/projects.deidentifyTemplates#DeidentifyTemplate.InfoTypeTransformations
REDACTION_CFG = {
"info_type_transformations": {
"transformations": [
{
"primitive_transformation": {
"character_mask_config": {"masking_character": "#"}
}
}
]
}
}
class PayloadAsJson(DoFn):
"""Convert PubSub message payload to UTF-8 and return as JSON"""
def process(self, element):
yield json.loads(element.decode("utf-8"))
class BatchPayloads(CombineFn):
"""Collect all items in the windowed collection into single batch"""
def create_accumulator(self):
return []
def add_input(self, accumulator, input):
accumulator.append(input)
return accumulator
def merge_accumulators(self, accumulators):
merged = [item for accumulator in accumulators for item in accumulator]
return merged
def extract_output(self, accumulator):
return accumulator
class LogRedaction(DoFn):
"""Apply inspection and redaction to textPayload field of log entries"""
def __init__(self, region, project_id: str):
self.project_id = project_id
self.region = region
self.dlp_client = None
def _log_to_row(self, entry):
# Make `Row` from `textPayload`. For more details on the row, please see
# https://cloud.google.com/dlp/docs/reference/rest/v2/ContentItem#Row
payload = entry.get("textPayload", "")
return {"values": [{"string_value": payload}]}
def setup(self):
"""Initialize DLP client"""
if self.dlp_client:
return
self.dlp_client = dlp_v2.DlpServiceClient()
if not self.dlp_client:
logging.error("Cannot create Google DLP Client")
raise PipelineError("Cannot create Google DLP Client")
def process(self, logs):
# Construct the `table`. For more details on the table schema, please see
# https://cloud.google.com/dlp/docs/reference/rest/v2/ContentItem#Table
table = {
"table": {
"headers": [{"name": "textPayload"}],
"rows": map(self._log_to_row, logs),
}
}
response = self.dlp_client.deidentify_content(
request={
"parent": f"projects/{self.project_id}/locations/{self.region}",
"inspect_config": INSPECT_CFG,
"deidentify_config": REDACTION_CFG,
"item": table,
}
)
# replace payload with redacted version
modified_logs = []
for index, log in enumerate(logs):
log["textPayload"] = response.item.table.rows[index].values[0].string_value
# you may consider changing insert ID if the project already has a copy
# of this log (e.g. log['insertId'] = 'deid-' + log['insertId'])
# For more details about insert ID, please see:
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id
modified_logs.append(log)
yield modified_logs
class IngestLogs(DoFn):
"""Ingest payloads into destination log"""
def __init__(self, destination_log_name):
self.destination_log_name = destination_log_name
self.logger = None
def _replace_log_name(self, entry):
# updates log name in the entry to logger name
entry["logName"] = self.logger.name
return entry
def setup(self):
# initialize logging client
if self.logger:
return
logging_client = logging_v2.Client()
if not logging_client:
logging.error("Cannot create Google Logging Client")
raise PipelineError("Cannot create Google Logging Client")
self.logger = logging_client.logger(self.destination_log_name)
if not self.logger:
logging.error("Google client library cannot create Logger object")
raise PipelineError("Google client library cannot create Logger object")
def process(self, element):
if self.logger:
logs = list(map(self._replace_log_name, element))
self.logger.client.logging_api.write_entries(logs)
yield logs
def run(
pubsub_subscription: str,
destination_log_name: str,
window_size: float,
pipeline_args: list[str] = None,
) -> None:
"""Runs Dataflow pipeline"""
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
region = "us-central1"
try:
region = pipeline_options.view_as(GoogleCloudOptions).region
except AttributeError:
pass
pipeline = Pipeline(options=pipeline_options)
_ = (
pipeline
| "Read log entries from Pub/Sub"
>> io.ReadFromPubSub(subscription=pubsub_subscription)
| "Convert log entry payload to Json" >> ParDo(PayloadAsJson())
| "Aggregate payloads in fixed time intervals"
>> WindowInto(FixedWindows(window_size))
# Optimize Google API consumption and avoid possible throttling
# by calling APIs for batched data and not per each element
| "Batch aggregated payloads"
>> CombineGlobally(BatchPayloads()).without_defaults()
| "Redact SSN info from logs"
>> ParDo(LogRedaction(region, destination_log_name.split("/")[1]))
| "Ingest to output log" >> ParDo(IngestLogs(destination_log_name))
)
pipeline.run()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--pubsub_subscription",
help="The Cloud Pub/Sub subscription to read from in the format "
'"projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>".',
)
parser.add_argument(
"--destination_log_name",
help="The log name to ingest log entries in the format "
'"projects/<PROJECT_ID>/logs/<LOG_ID>".',
)
parser.add_argument(
"--window_size",
type=float,
default=60.0,
help="Output file's window size in seconds.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.pubsub_subscription,
known_args.destination_log_name,
known_args.window_size,
pipeline_args,
)