Skip to content

Commit fa20a74

Browse files
feat: Add bigframes.execution_history API to track BigQuery jobs (#16588)
This PR promotes execution_history() to the top-level bigframes namespace and upgrades it to track rich metadata for every BigQuery job executed during your session. Key User Benefits: - Easier Access: Call bigframes.execution_history() directly instead of digging into sub-namespaces. - Rich Metadata Tracking: Captures structured statistics for both Query Jobs and Load Jobs including: - job_id and a direct Google Cloud Console URL for easy debugging. - Performance metrics: total_bytes_processed, duration_seconds, and slot_millis. - Query details (truncated preview of the SQL ran). - Clean, Focused Logs: Automatically filters out internal library overhead (like schema validations and index uniqueness checks) so your history only shows the data processing steps you actually care about. Usage Example: ``` 1 import bigframes.pandas as bpd 2 import pandas as pd 3 import bigframes 4 5 # ... run some bigframes operations ... 6 df = bpd.read_gbq("SELECT 1") 7 8 # Upload some local data (triggers a Load Job) 9 bpd.read_pandas(pd.DataFrame({'a': [1, 2, 3]})) 10 11 # Get a DataFrame of all BQ jobs run in this session 12 history = bigframes.execution_history() 13 14 # Inspect recent queries, their costs, and durations 15 print(history[['job_id', 'job_type', 'total_bytes_processed', 'duration_seconds', 'query']]) ``` verified at: 1. vs code notebook: screen/8u2yhaRV9iHbDbF 2. colab notebook: screen/9L8VrP5y9DXhnZz More testcases and notebook update will be checked in using separate PRs for easier review. Fixes #<481840739> 🦕 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent dec9813 commit fa20a74

File tree

13 files changed

+420
-68
lines changed

13 files changed

+420
-68
lines changed

packages/bigframes/bigframes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
4646
from bigframes.core.global_session import ( # noqa: E402
4747
close_session,
48+
execution_history,
4849
get_global_session,
4950
)
5051
from bigframes.session import Session, connect # noqa: E402
@@ -69,6 +70,7 @@ def load_ipython_extension(ipython):
6970
"BigQueryOptions",
7071
"get_global_session",
7172
"close_session",
73+
"execution_history",
7274
"enums",
7375
"exceptions",
7476
"connect",

packages/bigframes/bigframes/core/global_session.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
124124
return func_(get_global_session(), *args, **kwargs)
125125

126126

127+
def execution_history() -> "bigframes.session._ExecutionHistory":
128+
import pandas # noqa: F401
129+
130+
import bigframes.session
131+
132+
return with_default_session(bigframes.session.Session.execution_history)
133+
134+
127135
class _GlobalSessionContext:
128136
"""
129137
Context manager for testing that sets global session.

packages/bigframes/bigframes/core/sql/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
from __future__ import annotations
1514

1615
"""
1716
Utility functions for SQL construction.
1817
"""
1918

19+
from __future__ import annotations
20+
2021
import json
2122
from typing import (
2223
TYPE_CHECKING,

packages/bigframes/bigframes/session/__init__.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,46 @@
109109
logger = logging.getLogger(__name__)
110110

111111

112+
class _ExecutionHistory:
113+
def __init__(self, jobs: list[dict]):
114+
self._df = pandas.DataFrame(jobs)
115+
116+
def to_dataframe(self) -> pandas.DataFrame:
117+
"""Returns the execution history as a pandas DataFrame."""
118+
return self._df
119+
120+
def _repr_html_(self) -> str | None:
121+
import bigframes.formatting_helpers as formatter
122+
123+
if self._df.empty:
124+
return "<div>No executions found.</div>"
125+
126+
cols = ["job_type", "job_id", "status", "total_bytes_processed", "job_url"]
127+
128+
# Filter columns to only those that exist in the dataframe
129+
available_cols = [c for c in cols if c in self._df.columns]
130+
131+
def format_url(url):
132+
return f'<a target="_blank" href="{url}">Open Job</a>' if url else ""
133+
134+
try:
135+
df_display = self._df[available_cols].copy()
136+
if "total_bytes_processed" in df_display.columns:
137+
df_display["total_bytes_processed"] = df_display[
138+
"total_bytes_processed"
139+
].apply(formatter.get_formatted_bytes)
140+
if "job_url" in df_display.columns:
141+
df_display["job_url"] = df_display["job_url"].apply(format_url)
142+
143+
# Rename job_id to query_id to match user expectations
144+
if "job_id" in df_display.columns:
145+
df_display = df_display.rename(columns={"job_id": "query_id"})
146+
147+
return df_display.to_html(escape=False, index=False)
148+
except Exception:
149+
return self._df.to_html()
150+
151+
112152
@log_adapter.class_logger
113153
class Session(
114154
third_party_pandas_gbq.GBQIOMixin,
@@ -233,6 +273,7 @@ def __init__(
233273
)
234274

235275
self._metrics = metrics.ExecutionMetrics()
276+
self._publisher.subscribe(self._metrics.on_event)
236277
self._function_session = bff_session.FunctionSession()
237278
self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager(
238279
self._clients_provider.bqclient,
@@ -371,6 +412,13 @@ def slot_millis_sum(self):
371412
"""The sum of all slot time used by bigquery jobs in this session."""
372413
return self._metrics.slot_millis
373414

415+
def execution_history(self) -> _ExecutionHistory:
416+
"""Returns the history of executions initiated by BigFrames in the current session.
417+
418+
Use `.to_dataframe()` on the result to get a pandas DataFrame.
419+
"""
420+
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
421+
374422
@property
375423
def _allows_ambiguity(self) -> bool:
376424
return self._allow_ambiguity

packages/bigframes/bigframes/session/loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import pandas
5353
import pyarrow as pa
5454
from google.cloud import bigquery_storage_v1
55+
from google.cloud.bigquery.job.load import LoadJob
56+
from google.cloud.bigquery.job.query import QueryJob
5557
from google.cloud.bigquery_storage_v1 import (
5658
types as bq_storage_types,
5759
)
@@ -623,6 +625,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
623625
else:
624626
job.result()
625627

628+
if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
629+
self._metrics.count_job_stats(query_job=job)
630+
626631
@overload
627632
def read_gbq_table( # type: ignore[overload-overlap]
628633
self,

packages/bigframes/bigframes/session/metrics.py

Lines changed: 186 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,143 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18+
import datetime
1819
import os
19-
from typing import Optional, Tuple
20+
from typing import Any, Mapping, Optional, Tuple, Union
2021

2122
import google.cloud.bigquery as bigquery
22-
import google.cloud.bigquery.job as bq_job
2323
import google.cloud.bigquery.table as bq_table
24+
from google.cloud.bigquery.job.load import LoadJob
25+
from google.cloud.bigquery.job.query import QueryJob
2426

2527
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2628

2729

30+
@dataclasses.dataclass
31+
class JobMetadata:
32+
job_id: Optional[str] = None
33+
query_id: Optional[str] = None
34+
location: Optional[str] = None
35+
project: Optional[str] = None
36+
creation_time: Optional[datetime.datetime] = None
37+
start_time: Optional[datetime.datetime] = None
38+
end_time: Optional[datetime.datetime] = None
39+
duration_seconds: Optional[float] = None
40+
status: Optional[str] = None
41+
total_bytes_processed: Optional[int] = None
42+
total_slot_ms: Optional[int] = None
43+
job_type: Optional[str] = None
44+
error_result: Optional[Mapping[str, Any]] = None
45+
cached: Optional[bool] = None
46+
job_url: Optional[str] = None
47+
query: Optional[str] = None
48+
destination_table: Optional[str] = None
49+
source_uris: Optional[list[str]] = None
50+
input_files: Optional[int] = None
51+
input_bytes: Optional[int] = None
52+
output_rows: Optional[int] = None
53+
source_format: Optional[str] = None
54+
55+
@classmethod
56+
def from_job(
57+
cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
58+
) -> "JobMetadata":
59+
query_text = getattr(query_job, "query", None)
60+
if query_text and len(query_text) > 1024:
61+
query_text = query_text[:1021] + "..."
62+
63+
job_id = getattr(query_job, "job_id", None)
64+
job_url = None
65+
if job_id:
66+
job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"
67+
68+
metadata = cls(
69+
job_id=query_job.job_id,
70+
location=query_job.location,
71+
project=query_job.project,
72+
creation_time=query_job.created,
73+
start_time=query_job.started,
74+
end_time=query_job.ended,
75+
duration_seconds=exec_seconds,
76+
status=query_job.state,
77+
job_type=query_job.job_type,
78+
error_result=query_job.error_result,
79+
query=query_text,
80+
job_url=job_url,
81+
)
82+
if isinstance(query_job, QueryJob):
83+
metadata.cached = getattr(query_job, "cache_hit", None)
84+
metadata.destination_table = (
85+
str(query_job.destination) if query_job.destination else None
86+
)
87+
metadata.total_bytes_processed = getattr(
88+
query_job, "total_bytes_processed", None
89+
)
90+
metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
91+
elif isinstance(query_job, LoadJob):
92+
metadata.output_rows = getattr(query_job, "output_rows", None)
93+
metadata.input_files = getattr(query_job, "input_files", None)
94+
metadata.input_bytes = getattr(query_job, "input_bytes", None)
95+
metadata.destination_table = (
96+
str(query_job.destination)
97+
if getattr(query_job, "destination", None)
98+
else None
99+
)
100+
if getattr(query_job, "source_uris", None):
101+
metadata.source_uris = list(query_job.source_uris)
102+
if query_job.configuration and hasattr(
103+
query_job.configuration, "source_format"
104+
):
105+
metadata.source_format = query_job.configuration.source_format
106+
107+
return metadata
108+
109+
@classmethod
110+
def from_row_iterator(
111+
cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
112+
) -> "JobMetadata":
113+
query_text = getattr(row_iterator, "query", None)
114+
if query_text and len(query_text) > 1024:
115+
query_text = query_text[:1021] + "..."
116+
117+
job_id = getattr(row_iterator, "job_id", None)
118+
job_url = None
119+
if job_id:
120+
project = getattr(row_iterator, "project", "")
121+
location = getattr(row_iterator, "location", "")
122+
job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"
123+
124+
return cls(
125+
job_id=job_id,
126+
query_id=getattr(row_iterator, "query_id", None),
127+
location=getattr(row_iterator, "location", None),
128+
project=getattr(row_iterator, "project", None),
129+
creation_time=getattr(row_iterator, "created", None),
130+
start_time=getattr(row_iterator, "started", None),
131+
end_time=getattr(row_iterator, "ended", None),
132+
duration_seconds=exec_seconds,
133+
status="DONE",
134+
total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
135+
total_slot_ms=getattr(row_iterator, "slot_millis", None),
136+
job_type="query",
137+
cached=getattr(row_iterator, "cache_hit", None),
138+
query=query_text,
139+
job_url=job_url,
140+
)
141+
142+
28143
@dataclasses.dataclass
29144
class ExecutionMetrics:
30145
execution_count: int = 0
31146
slot_millis: int = 0
32147
bytes_processed: int = 0
33148
execution_secs: float = 0
34149
query_char_count: int = 0
150+
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
35151

36152
def count_job_stats(
37153
self,
38-
query_job: Optional[bq_job.QueryJob] = None,
154+
query_job: Optional[Union[QueryJob, LoadJob]] = None,
39155
row_iterator: Optional[bq_table.RowIterator] = None,
40156
):
41157
if query_job is None:
@@ -57,41 +173,88 @@ def count_job_stats(
57173
self.slot_millis += slot_millis
58174
self.execution_secs += exec_seconds
59175

60-
elif query_job.configuration.dry_run:
61-
query_char_count = len(query_job.query)
176+
self.jobs.append(
177+
JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
178+
)
179+
180+
elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
181+
query_char_count = len(getattr(query_job, "query", ""))
62182

63183
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
64184
bytes_processed = 0
65185
slot_millis = 0
66186
exec_seconds = 0.0
67187

68-
elif (stats := get_performance_stats(query_job)) is not None:
69-
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
188+
elif isinstance(query_job, bigquery.QueryJob):
189+
if (stats := get_performance_stats(query_job)) is not None:
190+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
191+
self.execution_count += 1
192+
self.query_char_count += query_char_count or 0
193+
self.bytes_processed += bytes_processed or 0
194+
self.slot_millis += slot_millis or 0
195+
self.execution_secs += exec_seconds or 0
196+
197+
metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
198+
self.jobs.append(metadata)
199+
200+
else:
70201
self.execution_count += 1
71-
self.query_char_count += query_char_count or 0
72-
self.bytes_processed += bytes_processed or 0
73-
self.slot_millis += slot_millis or 0
74-
self.execution_secs += exec_seconds or 0
202+
duration = (
203+
(query_job.ended - query_job.created).total_seconds()
204+
if query_job.ended and query_job.created
205+
else None
206+
)
207+
self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))
208+
209+
# For pytest runs only, log information about the query job
210+
# to a file in order to create a performance report.
211+
if (
212+
isinstance(query_job, bigquery.QueryJob)
213+
and not query_job.configuration.dry_run
214+
):
215+
stats = get_performance_stats(query_job)
216+
if stats:
217+
write_stats_to_disk(
218+
query_char_count=stats[0],
219+
bytes_processed=stats[1],
220+
slot_millis=stats[2],
221+
exec_seconds=stats[3],
222+
)
223+
elif row_iterator is not None:
224+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
225+
query_char_count = len(getattr(row_iterator, "query", "") or "")
226+
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
227+
created = getattr(row_iterator, "created", None)
228+
ended = getattr(row_iterator, "ended", None)
229+
exec_seconds = (
230+
(ended - created).total_seconds() if created and ended else 0.0
231+
)
75232
write_stats_to_disk(
76233
query_char_count=query_char_count,
77234
bytes_processed=bytes_processed,
78235
slot_millis=slot_millis,
79236
exec_seconds=exec_seconds,
80237
)
81238

82-
else:
83-
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
84-
bytes_processed = 0
85-
query_char_count = 0
86-
slot_millis = 0
87-
exec_seconds = 0
239+
def on_event(self, event: Any):
240+
try:
241+
import bigframes.core.events
242+
from bigframes.session.executor import LocalExecuteResult
243+
except ImportError:
244+
return
88245

89-
write_stats_to_disk(
90-
query_char_count=query_char_count,
91-
bytes_processed=bytes_processed,
92-
slot_millis=slot_millis,
93-
exec_seconds=exec_seconds,
94-
)
246+
if isinstance(event, bigframes.core.events.ExecutionFinished):
247+
if event.result and isinstance(event.result, LocalExecuteResult):
248+
self.execution_count += 1
249+
bytes_processed = event.result.total_bytes_processed or 0
250+
self.bytes_processed += bytes_processed
251+
252+
metadata = JobMetadata(
253+
job_type="polars",
254+
status="DONE",
255+
total_bytes_processed=bytes_processed,
256+
)
257+
self.jobs.append(metadata)
95258

96259

97260
def get_performance_stats(

0 commit comments

Comments
 (0)