Skip to content

Commit 8a25bfb

Browse files
feat(dlp): Split inspect samples into multiple files such that each region tag gets its own file part -2 (GoogleCloudPlatform#10781)
* Restructured inspect related samples - 2 * Removed unused import
1 parent 3ab2179 commit 8a25bfb

20 files changed

Lines changed: 2844 additions & 3840 deletions
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Sample app that uses the Data Loss Prevention API to inspect a string, a
16+
local file or a file on Google Cloud Storage."""
17+
18+
19+
import argparse
20+
21+
# [START dlp_inspect_bigquery]
22+
import threading
23+
from typing import List, Optional
24+
25+
import google.cloud.dlp
26+
import google.cloud.pubsub
27+
28+
29+
def inspect_bigquery(
30+
project: str,
31+
bigquery_project: str,
32+
dataset_id: str,
33+
table_id: str,
34+
topic_id: str,
35+
subscription_id: str,
36+
info_types: List[str],
37+
custom_dictionaries: List[str] = None,
38+
custom_regexes: List[str] = None,
39+
min_likelihood: Optional[int] = None,
40+
max_findings: Optional[int] = None,
41+
timeout: int = 500,
42+
) -> None:
43+
"""Uses the Data Loss Prevention API to analyze BigQuery data.
44+
Args:
45+
project: The Google Cloud project id to use as a parent resource.
46+
bigquery_project: The Google Cloud project id of the target table.
47+
dataset_id: The id of the target BigQuery dataset.
48+
table_id: The id of the target BigQuery table.
49+
topic_id: The id of the Cloud Pub/Sub topic to which the API will
50+
broadcast job completion. The topic must already exist.
51+
subscription_id: The id of the Cloud Pub/Sub subscription to listen on
52+
while waiting for job completion. The subscription must already
53+
exist and be subscribed to the topic.
54+
info_types: A list of strings representing info types to look for.
55+
A full list of info type categories can be fetched from the API.
56+
min_likelihood: A string representing the minimum likelihood threshold
57+
that constitutes a match. One of: 'LIKELIHOOD_UNSPECIFIED',
58+
'VERY_UNLIKELY', 'UNLIKELY', 'POSSIBLE', 'LIKELY', 'VERY_LIKELY'.
59+
max_findings: The maximum number of findings to report; 0 = no maximum.
60+
timeout: The number of seconds to wait for a response from the API.
61+
Returns:
62+
None; the response from the API is printed to the terminal.
63+
"""
64+
65+
# Instantiate a client.
66+
dlp = google.cloud.dlp_v2.DlpServiceClient()
67+
68+
# Prepare info_types by converting the list of strings into a list of
69+
# dictionaries (protos are also accepted).
70+
if not info_types:
71+
info_types = ["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"]
72+
info_types = [{"name": info_type} for info_type in info_types]
73+
74+
# Prepare custom_info_types by parsing the dictionary word lists and
75+
# regex patterns.
76+
if custom_dictionaries is None:
77+
custom_dictionaries = []
78+
dictionaries = [
79+
{
80+
"info_type": {"name": f"CUSTOM_DICTIONARY_{i}"},
81+
"dictionary": {"word_list": {"words": custom_dict.split(",")}},
82+
}
83+
for i, custom_dict in enumerate(custom_dictionaries)
84+
]
85+
if custom_regexes is None:
86+
custom_regexes = []
87+
regexes = [
88+
{
89+
"info_type": {"name": f"CUSTOM_REGEX_{i}"},
90+
"regex": {"pattern": custom_regex},
91+
}
92+
for i, custom_regex in enumerate(custom_regexes)
93+
]
94+
custom_info_types = dictionaries + regexes
95+
96+
# Construct the configuration dictionary. Keys which are None may
97+
# optionally be omitted entirely.
98+
inspect_config = {
99+
"info_types": info_types,
100+
"custom_info_types": custom_info_types,
101+
"min_likelihood": min_likelihood,
102+
"limits": {"max_findings_per_request": max_findings},
103+
}
104+
105+
# Construct a storage_config containing the target Bigquery info.
106+
storage_config = {
107+
"big_query_options": {
108+
"table_reference": {
109+
"project_id": bigquery_project,
110+
"dataset_id": dataset_id,
111+
"table_id": table_id,
112+
}
113+
}
114+
}
115+
116+
# Convert the project id into full resource ids.
117+
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
118+
parent = f"projects/{project}/locations/global"
119+
120+
# Tell the API where to send a notification when the job is complete.
121+
actions = [{"pub_sub": {"topic": topic}}]
122+
123+
# Construct the inspect_job, which defines the entire inspect content task.
124+
inspect_job = {
125+
"inspect_config": inspect_config,
126+
"storage_config": storage_config,
127+
"actions": actions,
128+
}
129+
130+
operation = dlp.create_dlp_job(
131+
request={"parent": parent, "inspect_job": inspect_job}
132+
)
133+
print(f"Inspection operation started: {operation.name}")
134+
135+
# Create a Pub/Sub client and find the subscription. The subscription is
136+
# expected to already be listening to the topic.
137+
subscriber = google.cloud.pubsub.SubscriberClient()
138+
subscription_path = subscriber.subscription_path(project, subscription_id)
139+
140+
# Set up a callback to acknowledge a message. This closes around an event
141+
# so that it can signal that it is done and the main thread can continue.
142+
job_done = threading.Event()
143+
144+
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
145+
try:
146+
if message.attributes["DlpJobName"] == operation.name:
147+
# This is the message we're looking for, so acknowledge it.
148+
message.ack()
149+
150+
# Now that the job is done, fetch the results and print them.
151+
job = dlp.get_dlp_job(request={"name": operation.name})
152+
print(f"Job name: {job.name}")
153+
if job.inspect_details.result.info_type_stats:
154+
for finding in job.inspect_details.result.info_type_stats:
155+
print(
156+
"Info type: {}; Count: {}".format(
157+
finding.info_type.name, finding.count
158+
)
159+
)
160+
else:
161+
print("No findings.")
162+
163+
# Signal to the main thread that we can exit.
164+
job_done.set()
165+
else:
166+
# This is not the message we're looking for.
167+
message.drop()
168+
except Exception as e:
169+
# Because this is executing in a thread, an exception won't be
170+
# noted unless we print it manually.
171+
print(e)
172+
raise
173+
174+
# Register the callback and wait on the event.
175+
subscriber.subscribe(subscription_path, callback=callback)
176+
finished = job_done.wait(timeout=timeout)
177+
if not finished:
178+
print(
179+
"No event received before the timeout. Please verify that the "
180+
"subscription provided is subscribed to the topic provided."
181+
)
182+
183+
184+
# [END dlp_inspect_bigquery]
185+
186+
if __name__ == "__main__":
187+
parser = argparse.ArgumentParser()
188+
189+
parser.add_argument(
190+
"bigquery_project",
191+
help="The Google Cloud project id of the target table.",
192+
)
193+
parser.add_argument(
194+
"dataset_id", help="The ID of the target BigQuery dataset."
195+
)
196+
parser.add_argument(
197+
"table_id", help="The ID of the target BigQuery table."
198+
)
199+
parser.add_argument(
200+
"topic_id",
201+
help="The id of the Cloud Pub/Sub topic to use to report that the job "
202+
'is complete, e.g. "dlp-sample-topic".',
203+
)
204+
parser.add_argument(
205+
"subscription_id",
206+
help="The id of the Cloud Pub/Sub subscription to monitor for job "
207+
'completion, e.g. "dlp-sample-subscription". The subscription must '
208+
"already be subscribed to the topic. See the test files or the Cloud "
209+
"Pub/Sub sample files for examples on how to create the subscription.",
210+
)
211+
parser.add_argument(
212+
"--project",
213+
help="The Google Cloud project id to use as a parent resource.",
214+
)
215+
parser.add_argument(
216+
"--info_types",
217+
nargs="+",
218+
help="Strings representing info types to look for. A full list of "
219+
"info categories and types is available from the API. Examples "
220+
'include "FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS". '
221+
"If unspecified, the three above examples will be used.",
222+
default=["FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS"],
223+
)
224+
parser.add_argument(
225+
"--custom_dictionaries",
226+
action="append",
227+
help="Strings representing comma-delimited lists of dictionary words"
228+
" to search for as custom info types. Each string is a comma "
229+
"delimited list of words representing a distinct dictionary.",
230+
default=None,
231+
)
232+
parser.add_argument(
233+
"--custom_regexes",
234+
action="append",
235+
help="Strings representing regex patterns to search for as custom "
236+
" info types.",
237+
default=None,
238+
)
239+
parser.add_argument(
240+
"--min_likelihood",
241+
choices=[
242+
"LIKELIHOOD_UNSPECIFIED",
243+
"VERY_UNLIKELY",
244+
"UNLIKELY",
245+
"POSSIBLE",
246+
"LIKELY",
247+
"VERY_LIKELY",
248+
],
249+
help="A string representing the minimum likelihood threshold that "
250+
"constitutes a match.",
251+
)
252+
parser.add_argument(
253+
"--max_findings",
254+
type=int,
255+
help="The maximum number of findings to report; 0 = no maximum.",
256+
)
257+
parser.add_argument(
258+
"--timeout",
259+
type=int,
260+
help="The maximum number of seconds to wait for a response from the "
261+
"API. The default is 300 seconds.",
262+
default=300,
263+
)
264+
265+
args = parser.parse_args()
266+
267+
inspect_bigquery(
268+
args.project,
269+
args.bigquery_project,
270+
args.dataset_id,
271+
args.table_id,
272+
args.topic_id,
273+
args.subscription_id,
274+
args.info_types,
275+
custom_dictionaries=args.custom_dictionaries,
276+
custom_regexes=args.custom_regexes,
277+
min_likelihood=args.min_likelihood,
278+
max_findings=args.max_findings,
279+
timeout=args.timeout,
280+
)

0 commit comments

Comments
 (0)