Skip to content

Commit c170ee8

Browse files
authored
Add timeline and top-level slot-millis to query statistics. (googleapis#5312)
* Add timeline and top-level slot-millis to query statistics. * address reviewer comment: add _int_or_none guards * Add a system test that examines query statistics. * Remove ratio evaluation from the query statistics system test.
1 parent 38c3e9a commit c170ee8

3 files changed

Lines changed: 286 additions & 48 deletions

File tree

bigquery/google/cloud/bigquery/job.py

Lines changed: 93 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2200,6 +2200,14 @@ def query_plan(self):
22002200
plan_entries = self._job_statistics().get('queryPlan', ())
22012201
return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
22022202

2203+
@property
2204+
def timeline(self):
2205+
"""List(TimelineEntry): Return the query execution timeline
2206+
from job statistics.
2207+
"""
2208+
raw = self._job_statistics().get('timeline', ())
2209+
return [TimelineEntry.from_api_repr(entry) for entry in raw]
2210+
22032211
@property
22042212
def total_bytes_processed(self):
22052213
"""Return total bytes processed from job statistics, if present.
@@ -2274,6 +2282,11 @@ def num_dml_affected_rows(self):
22742282
result = int(result)
22752283
return result
22762284

2285+
@property
2286+
def slot_millis(self):
2287+
"""Union[int, None]: Slot-milliseconds used by this query job."""
2288+
return _int_or_none(self._job_statistics().get('totalSlotMs'))
2289+
22772290
@property
22782291
def statement_type(self):
22792292
"""Return statement type from job statistics, if present.
@@ -2518,46 +2531,49 @@ def start(self):
25182531
if self._properties.get('startMs') is None:
25192532
return None
25202533
return _datetime_from_microseconds(
2521-
self._properties.get('startMs') * 1000.0)
2534+
int(self._properties.get('startMs')) * 1000.0)
25222535

25232536
@property
25242537
def end(self):
25252538
"""Union[Datetime, None]: Datetime when the stage ended."""
25262539
if self._properties.get('endMs') is None:
25272540
return None
25282541
return _datetime_from_microseconds(
2529-
self._properties.get('endMs') * 1000.0)
2542+
int(self._properties.get('endMs')) * 1000.0)
25302543

25312544
@property
25322545
def input_stages(self):
25332546
"""List(int): Entry IDs for stages that were inputs for this stage."""
2534-
return self._properties.get('inputStages', [])
2547+
if self._properties.get('inputStages') is None:
2548+
return []
2549+
return [_int_or_none(entry)
2550+
for entry in self._properties.get('inputStages')]
25352551

25362552
@property
25372553
def parallel_inputs(self):
25382554
"""Union[int, None]: Number of parallel input segments within
25392555
the stage.
25402556
"""
2541-
return self._properties.get('parallelInputs')
2557+
return _int_or_none(self._properties.get('parallelInputs'))
25422558

25432559
@property
25442560
def completed_parallel_inputs(self):
25452561
"""Union[int, None]: Number of parallel input segments completed."""
2546-
return self._properties.get('completedParallelInputs')
2562+
return _int_or_none(self._properties.get('completedParallelInputs'))
25472563

25482564
@property
25492565
def wait_ms_avg(self):
25502566
"""Union[int, None]: Milliseconds the average worker spent waiting to
25512567
be scheduled.
25522568
"""
2553-
return self._properties.get('waitMsAvg')
2569+
return _int_or_none(self._properties.get('waitMsAvg'))
25542570

25552571
@property
25562572
def wait_ms_max(self):
25572573
"""Union[int, None]: Milliseconds the slowest worker spent waiting to
25582574
be scheduled.
25592575
"""
2560-
return self._properties.get('waitMsMax')
2576+
return _int_or_none(self._properties.get('waitMsMax'))
25612577

25622578
@property
25632579
def wait_ratio_avg(self):
@@ -2580,14 +2596,14 @@ def read_ms_avg(self):
25802596
"""Union[int, None]: Milliseconds the average worker spent reading
25812597
input.
25822598
"""
2583-
return self._properties.get('readMsAvg')
2599+
return _int_or_none(self._properties.get('readMsAvg'))
25842600

25852601
@property
25862602
def read_ms_max(self):
25872603
"""Union[int, None]: Milliseconds the slowest worker spent reading
25882604
input.
25892605
"""
2590-
return self._properties.get('readMsMax')
2606+
return _int_or_none(self._properties.get('readMsMax'))
25912607

25922608
@property
25932609
def read_ratio_avg(self):
@@ -2610,14 +2626,14 @@ def compute_ms_avg(self):
26102626
"""Union[int, None]: Milliseconds the average worker spent on CPU-bound
26112627
processing.
26122628
"""
2613-
return self._properties.get('computeMsAvg')
2629+
return _int_or_none(self._properties.get('computeMsAvg'))
26142630

26152631
@property
26162632
def compute_ms_max(self):
26172633
"""Union[int, None]: Milliseconds the slowest worker spent on CPU-bound
26182634
processing.
26192635
"""
2620-
return self._properties.get('computeMsMax')
2636+
return _int_or_none(self._properties.get('computeMsMax'))
26212637

26222638
@property
26232639
def compute_ratio_avg(self):
@@ -2640,14 +2656,14 @@ def write_ms_avg(self):
26402656
"""Union[int, None]: Milliseconds the average worker spent writing
26412657
output data.
26422658
"""
2643-
return self._properties.get('writeMsAvg')
2659+
return _int_or_none(self._properties.get('writeMsAvg'))
26442660

26452661
@property
26462662
def write_ms_max(self):
26472663
"""Union[int, None]: Milliseconds the slowest worker spent writing
26482664
output data.
26492665
"""
2650-
return self._properties.get('writeMsMax')
2666+
return _int_or_none(self._properties.get('writeMsMax'))
26512667

26522668
@property
26532669
def write_ratio_avg(self):
@@ -2668,12 +2684,12 @@ def write_ratio_max(self):
26682684
@property
26692685
def records_read(self):
26702686
"""Union[int, None]: Number of records read by this stage."""
2671-
return self._properties.get('recordsRead')
2687+
return _int_or_none(self._properties.get('recordsRead'))
26722688

26732689
@property
26742690
def records_written(self):
26752691
"""Union[int, None]: Number of records written by this stage."""
2676-
return self._properties.get('recordsWritten')
2692+
return _int_or_none(self._properties.get('recordsWritten'))
26772693

26782694
@property
26792695
def status(self):
@@ -2685,14 +2701,14 @@ def shuffle_output_bytes(self):
26852701
"""Union[int, None]: Number of bytes written by this stage to
26862702
intermediate shuffle.
26872703
"""
2688-
return self._properties.get('shuffleOutputBytes')
2704+
return _int_or_none(self._properties.get('shuffleOutputBytes'))
26892705

26902706
@property
26912707
def shuffle_output_bytes_spilled(self):
26922708
"""Union[int, None]: Number of bytes written by this stage to
26932709
intermediate shuffle and spilled to disk.
26942710
"""
2695-
return self._properties.get('shuffleOutputBytesSpilled')
2711+
return _int_or_none(self._properties.get('shuffleOutputBytesSpilled'))
26962712

26972713
@property
26982714
def steps(self):
@@ -2703,6 +2719,66 @@ def steps(self):
27032719
for step in self._properties.get('steps', [])]
27042720

27052721

2722+
class TimelineEntry(object):
2723+
"""TimelineEntry represents progress of a query job at a particular
2724+
point in time.
2725+
2726+
See
2727+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs
2728+
for the underlying API representation within query statistics.
2729+
2730+
"""
2731+
2732+
def __init__(self):
2733+
self._properties = {}
2734+
2735+
@classmethod
2736+
def from_api_repr(cls, resource):
2737+
"""Factory: construct instance from the JSON repr.
2738+
2739+
Args:
2740+
resource(Dict[str: object]):
2741+
QueryTimelineSample representation returned from API
2742+
2743+
Returns:
2744+
google.cloud.bigquery.TimelineEntry:
2745+
Timeline sample parsed from ``resource``
2746+
"""
2747+
entry = cls()
2748+
entry._properties = resource
2749+
return entry
2750+
2751+
@property
2752+
def elapsed_ms(self):
2753+
"""Union[int, None]: Milliseconds elapsed since start of query
2754+
execution."""
2755+
return _int_or_none(self._properties.get('elapsedMs'))
2756+
2757+
@property
2758+
def active_units(self):
2759+
"""Union[int, None]: Current number of input units being processed
2760+
by workers, reported as largest value since the last sample."""
2761+
return _int_or_none(self._properties.get('activeUnits'))
2762+
2763+
@property
2764+
def pending_units(self):
2765+
"""Union[int, None]: Current number of input units remaining for
2766+
query stages active at this sample time."""
2767+
return _int_or_none(self._properties.get('pendingUnits'))
2768+
2769+
@property
2770+
def completed_units(self):
2771+
"""Union[int, None]: Current number of input units completed by
2772+
this query."""
2773+
return _int_or_none(self._properties.get('completedUnits'))
2774+
2775+
@property
2776+
def slot_millis(self):
2777+
"""Union[int, None]: Cumulative slot-milliseconds consumed by
2778+
this query."""
2779+
return _int_or_none(self._properties.get('totalSlotMs'))
2780+
2781+
27062782
class UnknownJob(_AsyncJob):
27072783
"""A job whose type cannot be determined."""
27082784

bigquery/tests/system.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,82 @@ def test_query_w_timeout(self):
10871087
# 1 second is much too short for this query.
10881088
query_job.result(timeout=1)
10891089

1090+
def test_query_statistics(self):
1091+
"""
1092+
A system test to exercise some of the extended query statistics.
1093+
1094+
Note: We construct a query that should need at least three stages by
1095+
specifying a JOIN query. Exact plan and stats are effectively
1096+
non-deterministic, so we're largely interested in confirming values
1097+
are present.
1098+
"""
1099+
1100+
job_config = bigquery.QueryJobConfig()
1101+
job_config.use_query_cache = False
1102+
1103+
query_job = Config.CLIENT.query(
1104+
"""
1105+
SELECT
1106+
COUNT(1)
1107+
FROM
1108+
(
1109+
SELECT
1110+
year,
1111+
wban_number
1112+
FROM `bigquery-public-data.samples.gsod`
1113+
LIMIT 1000
1114+
) lside
1115+
INNER JOIN
1116+
(
1117+
SELECT
1118+
year,
1119+
state
1120+
FROM `bigquery-public-data.samples.natality`
1121+
LIMIT 1000
1122+
) rside
1123+
ON
1124+
lside.year = rside.year
1125+
""",
1126+
location='US',
1127+
job_config=job_config)
1128+
1129+
# run the job to completion
1130+
query_job.result()
1131+
1132+
# Assert top-level stats
1133+
self.assertFalse(query_job.cache_hit)
1134+
self.assertIsNotNone(query_job.destination)
1135+
self.assertTrue(query_job.done)
1136+
self.assertFalse(query_job.dry_run)
1137+
self.assertIsNone(query_job.num_dml_affected_rows)
1138+
self.assertEqual(query_job.priority, 'INTERACTIVE')
1139+
self.assertGreater(query_job.total_bytes_billed, 1)
1140+
self.assertGreater(query_job.total_bytes_processed, 1)
1141+
self.assertEqual(query_job.statement_type, 'SELECT')
1142+
self.assertGreater(query_job.slot_millis, 1)
1143+
1144+
# Make assertions on the shape of the query plan.
1145+
plan = query_job.query_plan
1146+
self.assertGreaterEqual(len(plan), 3)
1147+
first_stage = plan[0]
1148+
self.assertIsNotNone(first_stage.start)
1149+
self.assertIsNotNone(first_stage.end)
1150+
self.assertIsNotNone(first_stage.entry_id)
1151+
self.assertIsNotNone(first_stage.name)
1152+
self.assertGreater(first_stage.parallel_inputs, 0)
1153+
self.assertGreater(first_stage.completed_parallel_inputs, 0)
1154+
self.assertGreater(first_stage.shuffle_output_bytes, 0)
1155+
self.assertEqual(first_stage.status, 'COMPLETE')
1156+
1157+
# Query plan is a digraph. Ensure it has inter-stage links,
1158+
# but not every stage has inputs.
1159+
stages_with_inputs = 0
1160+
for entry in plan:
1161+
if len(entry.input_stages) > 0:
1162+
stages_with_inputs = stages_with_inputs + 1
1163+
self.assertGreater(stages_with_inputs, 0)
1164+
self.assertGreater(len(plan), stages_with_inputs)
1165+
10901166
def test_dbapi_w_standard_sql_types(self):
10911167
examples = self._generate_standard_sql_types_examples()
10921168
for example in examples:

0 commit comments

Comments
 (0)