forked from googleapis/python-bigquery-dataframes
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmetrics.py
More file actions
105 lines (85 loc) · 3.38 KB
/
metrics.py
File metadata and controls
105 lines (85 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import dataclasses
import os
from typing import Optional, Tuple
import google.cloud.bigquery as bigquery
import google.cloud.bigquery.job as bq_job
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
@dataclasses.dataclass
class ExecutionMetrics:
execution_count: int = 0
slot_millis: int = 0
bytes_processed: int = 0
execution_secs: float = 0
def count_job_stats(self, query_job: bq_job.QueryJob):
stats = get_performance_stats(query_job)
if stats is not None:
bytes_processed, slot_millis, execution_secs = stats
self.execution_count += 1
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis
self.execution_secs += execution_secs
if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
write_stats_to_disk(bytes_processed, slot_millis, execution_secs)
def get_performance_stats(
query_job: bigquery.QueryJob,
) -> Optional[Tuple[int, int, float]]:
"""Parse the query job for performance stats.
Return None if the stats do not reflect real work done in bigquery.
"""
if (
query_job.configuration.dry_run
or query_job.created is None
or query_job.ended is None
):
return None
bytes_processed = query_job.total_bytes_processed
if not isinstance(bytes_processed, int):
return None # filter out mocks
slot_millis = query_job.slot_millis
if not isinstance(slot_millis, int):
return None # filter out mocks
execution_secs = (query_job.ended - query_job.created).total_seconds()
return bytes_processed, slot_millis, execution_secs
def write_stats_to_disk(
bytes_processed: int, slot_millis: int, exec_seconds: Optional[float]
):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
"""
if LOGGING_NAME_ENV_VAR not in os.environ:
raise EnvironmentError(
"Environment variable {env_var} is not set".format(
env_var=LOGGING_NAME_ENV_VAR
)
)
test_name = os.environ[LOGGING_NAME_ENV_VAR]
current_directory = os.getcwd()
# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")
# store slot milliseconds
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
f.write(str(slot_millis) + "\n")
# store execution time seconds
exec_time_file = os.path.join(
current_directory, test_name + ".bq_exec_time_seconds"
)
with open(exec_time_file, "a") as f:
f.write(str(exec_seconds) + "\n")