Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import functools
import typing
from typing import cast, Any
from typing import Any, cast

import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
Expand Down
16 changes: 13 additions & 3 deletions packages/bigframes/bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import threading
import traceback
import warnings
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
from typing import TYPE_CHECKING, Callable, Iterable, Optional, TypeVar

import google.auth.exceptions

Expand Down Expand Up @@ -124,12 +124,22 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


def execution_history() -> "bigframes.session._ExecutionHistory":
def execution_history(
*,
events: Optional[Iterable[bigframes.core.events.Event]] = None,
job_ids: Optional[Iterable[str]] = None,
filter_by_cell: bool = True,
) -> "bigframes.session._ExecutionHistory":
import pandas # noqa: F401

import bigframes.session

return with_default_session(bigframes.session.Session.execution_history)
return with_default_session(
bigframes.session.Session.execution_history,
events=events,
job_ids=job_ids,
filter_by_cell=filter_by_cell,
)


class _GlobalSessionContext:
Expand Down
6 changes: 6 additions & 0 deletions packages/bigframes/bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run(
def _read_gbq_colab( # type: ignore[overload-overlap]
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[False] = ...,
) -> bigframes.dataframe.DataFrame: ...
Expand All @@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap]
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -317,6 +319,7 @@ def _read_gbq_colab(
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> bigframes.dataframe.DataFrame | pandas.Series:
Expand All @@ -328,6 +331,8 @@ def _read_gbq_colab(
Args:
query_or_table (str):
SQL query or table ID (table ID not yet supported).
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (Optional[Dict[str, Any]]):
Parameters to format into the query string.
dry_run (bool):
Expand Down Expand Up @@ -379,6 +384,7 @@ def _read_gbq_colab(
return global_session.with_default_session(
bigframes.session.Session._read_gbq_colab,
query_or_table,
callback=callback,
pyformat_args=pyformat_args,
dry_run=dry_run,
)
Expand Down
152 changes: 128 additions & 24 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@
class _ExecutionHistory:
def __init__(self, jobs: list[dict]):
self._df = pandas.DataFrame(jobs)
if self._df.empty:
self._df = pandas.DataFrame(
columns=[
"job_id",
"query_id",
"job_type",
"status",
"query",
"total_bytes_processed",
"job_url",
]
)

def to_dataframe(self) -> pandas.DataFrame:
"""Returns the execution history as a pandas DataFrame."""
Expand Down Expand Up @@ -199,9 +211,10 @@ def __init__(
self._clients_provider = clients_provider
self._location = context.location or "US"
else:
credentials, project = (
bigframes._config.auth.resolve_credentials_and_project(context)
)
(
credentials,
project,
) = bigframes._config.auth.resolve_credentials_and_project(context)
if context.location is None:
with bigquery.Client(
project=project,
Expand Down Expand Up @@ -430,12 +443,82 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def execution_history(self) -> _ExecutionHistory:
def execution_history(
self,
*,
events: Optional[Iterable[bigframes.core.events.Event]] = None,
job_ids: Optional[Iterable[str]] = None,
filter_by_cell: bool = True,
) -> _ExecutionHistory:
"""Returns the history of executions initiated by BigFrames in the current session.

Use `.to_dataframe()` on the result to get a pandas DataFrame.

Args:
events (Iterable[Event], optional):
Filter execution history to only include jobs associated with the given events.
job_ids (Iterable[str], optional):
Filter execution history to only include jobs matching the given job IDs.
filter_by_cell (bool, optional):
If True and running in Colab/Jupyter, automatically filter history to only include
jobs executed within the current cell. Defaults to True.
"""
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
jobs = [job.__dict__ for job in self._metrics.jobs]

if events is not None:
event_job_ids = {
getattr(event, "job_id", None)
for event in events
if getattr(event, "job_id", None) is not None
}
event_query_ids = {
getattr(event, "query_id", None)
for event in events
if getattr(event, "query_id", None) is not None
}
jobs = [
job
for job in jobs
if (
job.get("job_id") is not None and job.get("job_id") in event_job_ids
)
or (
job.get("query_id") is not None
and job.get("query_id") in event_query_ids
)
]

elif job_ids is not None:
target_job_ids = set(job_ids)
jobs = [
job
for job in jobs
if (
job.get("job_id") is not None
and job.get("job_id") in target_job_ids
)
or (
job.get("query_id") is not None
and job.get("query_id") in target_job_ids
)
]

elif filter_by_cell:
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
current_count = ipy.execution_count
jobs = [
job
for job in jobs
if job.get("cell_execution_count") == current_count
]
except (ImportError, NameError):
pass

return _ExecutionHistory(jobs)

@property
def _allows_ambiguity(self) -> bool:
Expand Down Expand Up @@ -498,7 +581,8 @@ def read_gbq( # type: ignore[overload-overlap]
col_order: Iterable[str] = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def read_gbq(
Expand All @@ -514,7 +598,8 @@ def read_gbq(
col_order: Iterable[str] = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

def read_gbq(
self,
Expand Down Expand Up @@ -584,25 +669,29 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

@log_adapter.log_name_override("read_gbq_colab")
def _read_gbq_colab(
self,
query: str,
# TODO: Add a callback parameter that takes some kind of Event object.
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> Union[dataframe.DataFrame, pandas.Series]:
Expand All @@ -615,6 +704,8 @@ def _read_gbq_colab(
query (str):
A SQL query string to execute. Results (if any) are turned into
a DataFrame.
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (dict):
A dictionary of potential variables to replace in ``query``.
Note: strings are _not_ escaped. Use query parameters for these,
Expand All @@ -634,13 +725,19 @@ def _read_gbq_colab(
dry_run=dry_run,
)

return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)
def _run_query():
return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)

if callback is not None:
with self._publisher.subscribe(callback):
return _run_query()
return _run_query()

@overload
def read_gbq_query( # type: ignore[overload-overlap]
Expand All @@ -656,7 +753,8 @@ def read_gbq_query( # type: ignore[overload-overlap]
filters: third_party_pandas_gbq.FiltersType = ...,
dry_run: Literal[False] = ...,
allow_large_results: Optional[bool] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def read_gbq_query(
Expand All @@ -672,7 +770,8 @@ def read_gbq_query(
filters: third_party_pandas_gbq.FiltersType = ...,
dry_run: Literal[True] = ...,
allow_large_results: Optional[bool] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

def read_gbq_query(
self,
Expand Down Expand Up @@ -819,7 +918,8 @@ def read_gbq_table( # type: ignore[overload-overlap]
use_cache: bool = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

@overload
def read_gbq_table(
Expand All @@ -833,7 +933,8 @@ def read_gbq_table(
use_cache: bool = ...,
col_order: Iterable[str] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
) -> pandas.Series:
...

def read_gbq_table(
self,
Expand Down Expand Up @@ -984,23 +1085,26 @@ def read_pandas(
pandas_dataframe: pandas.Index,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.core.indexes.Index: ...
) -> bigframes.core.indexes.Index:
...

@typing.overload
def read_pandas(
self,
pandas_dataframe: pandas.Series,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.series.Series: ...
) -> bigframes.series.Series:
...

@typing.overload
def read_pandas(
self,
pandas_dataframe: pandas.DataFrame,
*,
write_engine: constants.WriteEngineType = "default",
) -> dataframe.DataFrame: ...
) -> dataframe.DataFrame:
...

def read_pandas(
self,
Expand Down
Loading
Loading