Skip to content

Commit 2d7e033

Browse files
Chronicle Teamcopybara-github
authored andcommitted
Add Python samples for the UDM Search API.
PiperOrigin-RevId: 488753595
1 parent 60e792f commit 2d7e033

File tree

2 files changed

+252
-0
lines changed

2 files changed

+252
-0
lines changed

search/udm_search.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#!/usr/bin/env python3
2+
3+
# Copyright 2022 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
"""Executable and reusable sample for UDM Search.
18+
19+
API reference:
20+
https://cloud.google.com/chronicle/docs/reference/search-api#udmsearch
21+
"""
22+
23+
import argparse
24+
import datetime
25+
import json
26+
import sys
27+
from typing import Any, Mapping, Optional, Sequence
28+
29+
from google.auth.transport import requests
30+
31+
from common import chronicle_auth
32+
from common import datetime_converter
33+
from common import regions
34+
35+
CHRONICLE_API_BASE_URL = "https://backstory.googleapis.com"
36+
37+
38+
def initialize_command_line_args(
39+
args: Optional[Sequence[str]] = None) -> Optional[argparse.Namespace]:
40+
"""Initializes and checks all the command-line arguments."""
41+
parser = argparse.ArgumentParser()
42+
chronicle_auth.add_argument_credentials_file(parser)
43+
regions.add_argument_region(parser)
44+
parser.add_argument(
45+
"-q", "--query", type=str, required=True, help=("UDM Search query"))
46+
parser.add_argument(
47+
"-ts",
48+
"--start_time",
49+
type=datetime_converter.iso8601_datetime_utc,
50+
required=True,
51+
help=(
52+
"start of time range, as an ISO 8601 string ('yyyy-mm-ddThh:mm:ss')"))
53+
parser.add_argument(
54+
"-te",
55+
"--end_time",
56+
required=True,
57+
type=datetime_converter.iso8601_datetime_utc,
58+
help=("end of time range, as an ISO 8601 string ('yyyy-mm-ddThh:mm:ss')"))
59+
parser.add_argument(
60+
"-tl",
61+
"--local_time",
62+
action="store_true",
63+
help=("time is specified in the system's local timezone (default = UTC)"))
64+
parser.add_argument(
65+
"-l",
66+
"--limit",
67+
type=int,
68+
default=1000,
69+
help=("Limit on the maximum number of matches to return, up to 1,000" +
70+
"(default = 1,000)"))
71+
72+
# Sanity checks for the command-line arguments.
73+
parsed_args = parser.parse_args(args)
74+
s, e, limit = parsed_args.start_time, parsed_args.end_time, parsed_args.limit
75+
if parsed_args.local_time:
76+
s = s.replace(tzinfo=None).astimezone(datetime.timezone.utc)
77+
e = e.replace(tzinfo=None).astimezone(datetime.timezone.utc)
78+
if s > datetime.datetime.now().astimezone(datetime.timezone.utc):
79+
print("Error: start time should not be in the future")
80+
return None
81+
if e > datetime.datetime.now().astimezone(datetime.timezone.utc):
82+
print("Error: end time should not be in the future")
83+
return None
84+
if s >= e:
85+
print("Error: start time should not be same as or later than end time")
86+
return None
87+
if limit > 1000 or limit < 1:
88+
print("Error: limit can not be more than 1,000 or less than 1")
89+
return None
90+
91+
return parsed_args
92+
93+
94+
def udm_search(http_session: requests.AuthorizedSession,
95+
query: str,
96+
start_time: datetime.datetime,
97+
end_time: datetime.datetime,
98+
limit: Optional[int] = 1000) -> Mapping[str, Any]:
99+
"""Performs a UDM search across the specified time range.
100+
101+
Args:
102+
http_session: Authorized session for HTTP requests.
103+
query: UDM search query.
104+
start_time: Inclusive beginning of the time range to search, with any
105+
timezone (even a timezone-unaware datetime object, i.e. local time).
106+
end_time: Exclusive end of the time range to search, with any timezone (even
107+
a timezone-unaware datetime object, i.e. local time).
108+
limit: Maximum number of matched events to return, up to 1,000 (default =
109+
1,000).
110+
111+
Returns:
112+
{
113+
"events": [
114+
{
115+
"name": "...",
116+
"udm": {
117+
"metadata": { ... },
118+
"principal": { ... },
119+
"target": { ... },
120+
},
121+
},
122+
{
123+
"name": "...",
124+
"udm": {
125+
"metadata": { ... },
126+
"principal": { ... },
127+
"target": { ... },
128+
},
129+
},
130+
...More matched events...
131+
]
132+
}
133+
134+
Raises:
135+
requests.exceptions.HTTPError: HTTP request resulted in an error
136+
(response.status_code >= 400).
137+
"""
138+
url = f"{CHRONICLE_API_BASE_URL}/v1/events:udmSearch"
139+
s = datetime_converter.strftime(start_time)
140+
e = datetime_converter.strftime(end_time)
141+
params = {
142+
"query": query,
143+
"time_range.start_time": s,
144+
"time_range.end_time": e,
145+
"limit": limit
146+
}
147+
response = http_session.request("GET", url, params=params)
148+
149+
if response.status_code >= 400:
150+
print(response.text)
151+
response.raise_for_status()
152+
return response.json()
153+
154+
155+
if __name__ == "__main__":
156+
cli = initialize_command_line_args()
157+
if not cli:
158+
sys.exit(1) # A sanity check failed.
159+
160+
q, start, end, l = cli.query, cli.start_time, cli.end_time, cli.limit
161+
if cli.local_time:
162+
start = start.replace(tzinfo=None)
163+
164+
CHRONICLE_API_BASE_URL = regions.url(CHRONICLE_API_BASE_URL, cli.region)
165+
session = chronicle_auth.initialize_http_session(cli.credentials_file)
166+
print(json.dumps(udm_search(session, q, start, end, l), indent=2))

search/udm_search_test.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
"""Tests for the "udm_search" module."""
16+
17+
import datetime
18+
import unittest
19+
from unittest import mock
20+
21+
from google.auth.transport import requests
22+
23+
from . import udm_search
24+
25+
26+
class UDMSearchTest(unittest.TestCase):
27+
28+
def test_initialize_command_line_args_utc(self):
29+
actual = udm_search.initialize_command_line_args([
30+
"--query=metadata.event_type=\"NETWORK_CONNECTION\"",
31+
"--start_time=2022-08-01T00:00:00", "--end_time=2022-08-01T01:00:00"
32+
])
33+
self.assertIsNotNone(actual)
34+
35+
def test_initialize_command_line_args_local_time(self):
36+
actual = udm_search.initialize_command_line_args([
37+
"--query=metadata.event_type=\"NETWORK_CONNECTION\"",
38+
"--start_time=2022-08-01T00:00:00", "--end_time=2022-08-01T01:00:00",
39+
"--local_time"
40+
])
41+
self.assertIsNotNone(actual)
42+
43+
def test_initialize_command_line_args_limit(self):
44+
actual = udm_search.initialize_command_line_args([
45+
"--query=metadata.event_type=\"NETWORK_CONNECTION\"",
46+
"--start_time=2022-08-01T00:00:00", "--end_time=2022-08-01T01:00:00",
47+
"--limit=100"
48+
])
49+
self.assertIsNotNone(actual)
50+
51+
def test_initialize_command_line_args_invalid_limit(self):
52+
actual = udm_search.initialize_command_line_args([
53+
"--query=metadata.event_type=\"NETWORK_CONNECTION\"",
54+
"--start_time=2022-08-01T00:00:00", "--end_time=2022-08-01T01:00:00",
55+
"--limit=100000"
56+
])
57+
self.assertIsNone(actual)
58+
59+
def test_initialize_command_line_args_invalid_start_time(self):
60+
actual = udm_search.initialize_command_line_args([
61+
"--query=metadata.event_type=\"NETWORK_CONNECTION\"",
62+
"--start_time=2100-08-01T00:00:00", "--end_time=2022-08-01T01:00:00"
63+
])
64+
self.assertIsNone(actual)
65+
66+
def test_initialize_command_line_args_invalid_end_time(self):
67+
actual = udm_search.initialize_command_line_args([
68+
"--query=metadata.event_type=\"NETWORK_CONNECTION\"",
69+
"--start_time=2022-08-01T00:00:00", "--end_time=2022-07-01T01:00:00"
70+
])
71+
self.assertIsNone(actual)
72+
73+
@mock.patch.object(requests, "AuthorizedSession", autospec=True)
74+
@mock.patch.object(requests.requests, "Response", autospec=True)
75+
def test_udm_search(self, mock_response, mock_session):
76+
mock_session.request.return_value = mock_response
77+
type(mock_response).status_code = mock.PropertyMock(return_value=200)
78+
mock_response.json.return_value = {"mock": "json"}
79+
actual = udm_search.udm_search(mock_session, "principal.ip=\"10.1.2.3\"",
80+
datetime.datetime(2022, 8, 1, 00, 00, 00),
81+
datetime.datetime(2022, 8, 2, 0, 0, 0))
82+
self.assertEqual(actual, {"mock": "json"})
83+
84+
85+
if __name__ == "__main__":
86+
unittest.main()

0 commit comments

Comments
 (0)