1515from __future__ import annotations
1616
1717import dataclasses
18+ import datetime
1819import os
19- from typing import Optional , Tuple
20+ from typing import Any , Mapping , Optional , Tuple , Union
2021
2122import google .cloud .bigquery as bigquery
22- import google .cloud .bigquery .job as bq_job
2323import google .cloud .bigquery .table as bq_table
24+ from google .cloud .bigquery .job .load import LoadJob
25+ from google .cloud .bigquery .job .query import QueryJob
2426
2527LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2628
2729
30+ @dataclasses .dataclass
31+ class JobMetadata :
32+ job_id : Optional [str ] = None
33+ query_id : Optional [str ] = None
34+ location : Optional [str ] = None
35+ project : Optional [str ] = None
36+ creation_time : Optional [datetime .datetime ] = None
37+ start_time : Optional [datetime .datetime ] = None
38+ end_time : Optional [datetime .datetime ] = None
39+ duration_seconds : Optional [float ] = None
40+ status : Optional [str ] = None
41+ total_bytes_processed : Optional [int ] = None
42+ total_slot_ms : Optional [int ] = None
43+ job_type : Optional [str ] = None
44+ error_result : Optional [Mapping [str , Any ]] = None
45+ cached : Optional [bool ] = None
46+ job_url : Optional [str ] = None
47+ query : Optional [str ] = None
48+ destination_table : Optional [str ] = None
49+ source_uris : Optional [list [str ]] = None
50+ input_files : Optional [int ] = None
51+ input_bytes : Optional [int ] = None
52+ output_rows : Optional [int ] = None
53+ source_format : Optional [str ] = None
54+
55+ @classmethod
56+ def from_job (
57+ cls , query_job : Union [QueryJob , LoadJob ], exec_seconds : Optional [float ] = None
58+ ) -> "JobMetadata" :
59+ query_text = getattr (query_job , "query" , None )
60+ if query_text and len (query_text ) > 1024 :
61+ query_text = query_text [:1021 ] + "..."
62+
63+ job_id = getattr (query_job , "job_id" , None )
64+ job_url = None
65+ if job_id :
66+ job_url = f"https://console.cloud.google.com/bigquery?project={ query_job .project } &j=bq:{ query_job .location } :{ job_id } &page=queryresults"
67+
68+ metadata = cls (
69+ job_id = query_job .job_id ,
70+ location = query_job .location ,
71+ project = query_job .project ,
72+ creation_time = query_job .created ,
73+ start_time = query_job .started ,
74+ end_time = query_job .ended ,
75+ duration_seconds = exec_seconds ,
76+ status = query_job .state ,
77+ job_type = query_job .job_type ,
78+ error_result = query_job .error_result ,
79+ query = query_text ,
80+ job_url = job_url ,
81+ )
82+ if isinstance (query_job , QueryJob ):
83+ metadata .cached = getattr (query_job , "cache_hit" , None )
84+ metadata .destination_table = (
85+ str (query_job .destination ) if query_job .destination else None
86+ )
87+ metadata .total_bytes_processed = getattr (
88+ query_job , "total_bytes_processed" , None
89+ )
90+ metadata .total_slot_ms = getattr (query_job , "slot_millis" , None )
91+ elif isinstance (query_job , LoadJob ):
92+ metadata .output_rows = getattr (query_job , "output_rows" , None )
93+ metadata .input_files = getattr (query_job , "input_files" , None )
94+ metadata .input_bytes = getattr (query_job , "input_bytes" , None )
95+ metadata .destination_table = (
96+ str (query_job .destination )
97+ if getattr (query_job , "destination" , None )
98+ else None
99+ )
100+ if getattr (query_job , "source_uris" , None ):
101+ metadata .source_uris = list (query_job .source_uris )
102+ if query_job .configuration and hasattr (
103+ query_job .configuration , "source_format"
104+ ):
105+ metadata .source_format = query_job .configuration .source_format
106+
107+ return metadata
108+
109+ @classmethod
110+ def from_row_iterator (
111+ cls , row_iterator : bq_table .RowIterator , exec_seconds : Optional [float ] = None
112+ ) -> "JobMetadata" :
113+ query_text = getattr (row_iterator , "query" , None )
114+ if query_text and len (query_text ) > 1024 :
115+ query_text = query_text [:1021 ] + "..."
116+
117+ job_id = getattr (row_iterator , "job_id" , None )
118+ job_url = None
119+ if job_id :
120+ project = getattr (row_iterator , "project" , "" )
121+ location = getattr (row_iterator , "location" , "" )
122+ job_url = f"https://console.cloud.google.com/bigquery?project={ project } &j=bq:{ location } :{ job_id } &page=queryresults"
123+
124+ return cls (
125+ job_id = job_id ,
126+ query_id = getattr (row_iterator , "query_id" , None ),
127+ location = getattr (row_iterator , "location" , None ),
128+ project = getattr (row_iterator , "project" , None ),
129+ creation_time = getattr (row_iterator , "created" , None ),
130+ start_time = getattr (row_iterator , "started" , None ),
131+ end_time = getattr (row_iterator , "ended" , None ),
132+ duration_seconds = exec_seconds ,
133+ status = "DONE" ,
134+ total_bytes_processed = getattr (row_iterator , "total_bytes_processed" , None ),
135+ total_slot_ms = getattr (row_iterator , "slot_millis" , None ),
136+ job_type = "query" ,
137+ cached = getattr (row_iterator , "cache_hit" , None ),
138+ query = query_text ,
139+ job_url = job_url ,
140+ )
141+
142+
28143@dataclasses .dataclass
29144class ExecutionMetrics :
30145 execution_count : int = 0
31146 slot_millis : int = 0
32147 bytes_processed : int = 0
33148 execution_secs : float = 0
34149 query_char_count : int = 0
150+ jobs : list [JobMetadata ] = dataclasses .field (default_factory = list )
35151
36152 def count_job_stats (
37153 self ,
38- query_job : Optional [bq_job . QueryJob ] = None ,
154+ query_job : Optional [Union [ QueryJob , LoadJob ] ] = None ,
39155 row_iterator : Optional [bq_table .RowIterator ] = None ,
40156 ):
41157 if query_job is None :
@@ -57,41 +173,88 @@ def count_job_stats(
57173 self .slot_millis += slot_millis
58174 self .execution_secs += exec_seconds
59175
60- elif query_job .configuration .dry_run :
61- query_char_count = len (query_job .query )
176+ self .jobs .append (
177+ JobMetadata .from_row_iterator (row_iterator , exec_seconds = exec_seconds )
178+ )
179+
180+ elif isinstance (query_job , QueryJob ) and query_job .configuration .dry_run :
181+ query_char_count = len (getattr (query_job , "query" , "" ))
62182
63183 # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
64184 bytes_processed = 0
65185 slot_millis = 0
66186 exec_seconds = 0.0
67187
68- elif (stats := get_performance_stats (query_job )) is not None :
69- query_char_count , bytes_processed , slot_millis , exec_seconds = stats
188+ elif isinstance (query_job , bigquery .QueryJob ):
189+ if (stats := get_performance_stats (query_job )) is not None :
190+ query_char_count , bytes_processed , slot_millis , exec_seconds = stats
191+ self .execution_count += 1
192+ self .query_char_count += query_char_count or 0
193+ self .bytes_processed += bytes_processed or 0
194+ self .slot_millis += slot_millis or 0
195+ self .execution_secs += exec_seconds or 0
196+
197+ metadata = JobMetadata .from_job (query_job , exec_seconds = exec_seconds )
198+ self .jobs .append (metadata )
199+
200+ else :
70201 self .execution_count += 1
71- self .query_char_count += query_char_count or 0
72- self .bytes_processed += bytes_processed or 0
73- self .slot_millis += slot_millis or 0
74- self .execution_secs += exec_seconds or 0
202+ duration = (
203+ (query_job .ended - query_job .created ).total_seconds ()
204+ if query_job .ended and query_job .created
205+ else None
206+ )
207+ self .jobs .append (JobMetadata .from_job (query_job , exec_seconds = duration ))
208+
209+ # For pytest runs only, log information about the query job
210+ # to a file in order to create a performance report.
211+ if (
212+ isinstance (query_job , bigquery .QueryJob )
213+ and not query_job .configuration .dry_run
214+ ):
215+ stats = get_performance_stats (query_job )
216+ if stats :
217+ write_stats_to_disk (
218+ query_char_count = stats [0 ],
219+ bytes_processed = stats [1 ],
220+ slot_millis = stats [2 ],
221+ exec_seconds = stats [3 ],
222+ )
223+ elif row_iterator is not None :
224+ bytes_processed = getattr (row_iterator , "total_bytes_processed" , 0 ) or 0
225+ query_char_count = len (getattr (row_iterator , "query" , "" ) or "" )
226+ slot_millis = getattr (row_iterator , "slot_millis" , 0 ) or 0
227+ created = getattr (row_iterator , "created" , None )
228+ ended = getattr (row_iterator , "ended" , None )
229+ exec_seconds = (
230+ (ended - created ).total_seconds () if created and ended else 0.0
231+ )
75232 write_stats_to_disk (
76233 query_char_count = query_char_count ,
77234 bytes_processed = bytes_processed ,
78235 slot_millis = slot_millis ,
79236 exec_seconds = exec_seconds ,
80237 )
81238
82- else :
83- # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
84- bytes_processed = 0
85- query_char_count = 0
86- slot_millis = 0
87- exec_seconds = 0
239+ def on_event ( self , event : Any ) :
240+ try :
241+ import bigframes . core . events
242+ from bigframes . session . executor import LocalExecuteResult
243+ except ImportError :
244+ return
88245
89- write_stats_to_disk (
90- query_char_count = query_char_count ,
91- bytes_processed = bytes_processed ,
92- slot_millis = slot_millis ,
93- exec_seconds = exec_seconds ,
94- )
246+ if isinstance (event , bigframes .core .events .ExecutionFinished ):
247+ if event .result and isinstance (event .result , LocalExecuteResult ):
248+ self .execution_count += 1
249+ bytes_processed = event .result .total_bytes_processed or 0
250+ self .bytes_processed += bytes_processed
251+
252+ metadata = JobMetadata (
253+ job_type = "polars" ,
254+ status = "DONE" ,
255+ total_bytes_processed = bytes_processed ,
256+ )
257+ self .jobs .append (metadata )
95258
96259
97260def get_performance_stats (
0 commit comments