From 01d3ed10880a089f92cb47b33d1deeae46ad523c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 21 May 2026 16:02:48 +0000 Subject: [PATCH 1/2] feat: support pandas inputs in more bigframes.bigquery functions --- .../bigframes/bigframes/core/googlesql.py | 34 +++- packages/bigframes/docs/reference/index.rst | 1 + .../tests/unit/core/test_googlesql.py | 171 ++++++++++++++++++ 3 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 packages/bigframes/tests/unit/core/test_googlesql.py diff --git a/packages/bigframes/bigframes/core/googlesql.py b/packages/bigframes/bigframes/core/googlesql.py index a37c9790ff56..5282b4da7046 100644 --- a/packages/bigframes/bigframes/core/googlesql.py +++ b/packages/bigframes/bigframes/core/googlesql.py @@ -16,14 +16,38 @@ from __future__ import annotations -from typing import Any, Union +from typing import TYPE_CHECKING, Any, Optional, Union + +import pandas as pd import bigframes.core.col import bigframes.core.expression as ex +import bigframes.core.global_session as global_session import bigframes.core.sentinels as sentinels import bigframes.series as series from bigframes.operations import googlesql +if TYPE_CHECKING: + import bigframes.session + + +def _is_pandas_series(arg: Any) -> bool: + return isinstance(arg, pd.Series) + + +def _find_session(*args: Any) -> Optional[bigframes.session.Session]: + for arg in args: + if isinstance(arg, series.Series): + return arg._session + return None + + +def _get_session(*args: Any) -> bigframes.session.Session: + session = _find_session(*args) + if session is not None: + return session + return global_session.get_global_session() + def apply_googlesql_scalar_op( op: googlesql.GoogleSqlScalarOp, @@ -44,6 +68,14 @@ def apply_googlesql_scalar_op( The result of the operation. If any of ``args`` is a Series, returns a Series. Otherwise, returns an Expression. """ + has_pandas_series = any(_is_pandas_series(arg) for arg in args) + + if has_pandas_series: + session = _get_session(*args) + args = tuple( + session.read_pandas(arg) if _is_pandas_series(arg) else arg for arg in args + ) + # Find the first Series to use for alignment first_series = None for arg in args: diff --git a/packages/bigframes/docs/reference/index.rst b/packages/bigframes/docs/reference/index.rst index 0ddfa5f0e3f0..60934582e969 100644 --- a/packages/bigframes/docs/reference/index.rst +++ b/packages/bigframes/docs/reference/index.rst @@ -9,6 +9,7 @@ packages. bigframes._config bigframes.bigquery + bigframes.bigquery.aead bigframes.bigquery.ai bigframes.bigquery.ml bigframes.bigquery.obj diff --git a/packages/bigframes/tests/unit/core/test_googlesql.py b/packages/bigframes/tests/unit/core/test_googlesql.py new file mode 100644 index 000000000000..b29978ba2220 --- /dev/null +++ b/packages/bigframes/tests/unit/core/test_googlesql.py @@ -0,0 +1,171 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest.mock as mock + +import pandas as pd + +import bigframes.core.col as col +import bigframes.core.expression as ex +import bigframes.core.global_session +import bigframes.core.googlesql as core_googlesql +import bigframes.series as series +from bigframes.operations import googlesql +from bigframes.testing import mocks + +# Define a test op +_TEST_OP = googlesql.GoogleSqlScalarOp( + "TEST_OP", + args=(googlesql.ArgSpec(), googlesql.ArgSpec()), + signature=lambda *args: None, +) + + +def test_apply_googlesql_scalar_op_expressions(): + # Only expressions + result = core_googlesql.apply_googlesql_scalar_op( + _TEST_OP, + col.col("a"), + col.col("b"), + ) + assert isinstance(result, col.Expression) + + +def test_apply_googlesql_scalar_op_pandas_series_global_session(monkeypatch): + # Setup mock session + session = mocks.create_bigquery_session() + monkeypatch.setattr(bigframes.core.global_session, "_global_session", session) + bigframes.options.bigquery._session_started = True + + # Create a real-ish Series to return from read_pandas + df = mocks.create_dataframe(monkeypatch, session=session, data={"col": [1, 2, 3]}) + bf_series = df["col"] + + # Mock read_pandas on the session + mock_read_pandas = mock.MagicMock(return_value=bf_series) + session.read_pandas = mock_read_pandas + + # Mock _apply_nary_op on Series class to avoid real compilation/execution + mock_apply_nary_op = mock.MagicMock(return_value=bf_series) + monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op) + + pd_series = pd.Series([1, 2, 3]) + + # Call the function with a pandas Series and a literal + result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, 42) + + # Verify read_pandas was called on the global session + mock_read_pandas.assert_called_once_with(pd_series) + + # Verify _apply_nary_op was called on the converted series + mock_apply_nary_op.assert_called_once() + # First arg to _apply_nary_op is the op, second is the processed_args + assert mock_apply_nary_op.call_args[0][0] == _TEST_OP + # processed_args should contain the converted bf_series and the literal 42 + processed_args = mock_apply_nary_op.call_args[0][1] + assert processed_args[0] is bf_series + assert processed_args[1] == 42 + + # Verify result is a Series + assert isinstance(result, series.Series) + + +def test_apply_googlesql_scalar_op_pandas_series_with_bf_series(monkeypatch): + # Setup mock session 1 (global) and session 2 (associated with bf_series) + global_session = mocks.create_bigquery_session(session_id="global") + monkeypatch.setattr( + bigframes.core.global_session, "_global_session", global_session + ) + bigframes.options.bigquery._session_started = True + + bf_session = mocks.create_bigquery_session(session_id="bf_session") + + # Create a bf_series associated with bf_session + df = mocks.create_dataframe( + monkeypatch, session=bf_session, data={"col": [1, 2, 3]} + ) + bf_series = df["col"] + + assert bf_series._session == bf_session + + # Mock read_pandas on both sessions + mock_global_read_pandas = mock.MagicMock() + global_session.read_pandas = mock_global_read_pandas + + mock_bf_read_pandas = mock.MagicMock(return_value=bf_series) + bf_session.read_pandas = mock_bf_read_pandas + + # Mock _apply_nary_op + mock_apply_nary_op = mock.MagicMock(return_value=bf_series) + monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op) + + pd_series = pd.Series([1, 2, 3]) + + # Call with both pandas Series and BigFrames Series + result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, bf_series) + + # Verify read_pandas was called on bf_session, NOT global_session + mock_bf_read_pandas.assert_called_once_with(pd_series) + mock_global_read_pandas.assert_not_called() + + # Verify _apply_nary_op was called + mock_apply_nary_op.assert_called_once() + processed_args = mock_apply_nary_op.call_args[0][1] + # Both arguments to the op should now be BigFrames Series + assert processed_args[0] is bf_series + assert processed_args[1] is bf_series + + assert isinstance(result, series.Series) + + +def test_apply_googlesql_scalar_op_mixed_args(monkeypatch): + session = mocks.create_bigquery_session() + monkeypatch.setattr(bigframes.core.global_session, "_global_session", session) + bigframes.options.bigquery._session_started = True + + df = mocks.create_dataframe(monkeypatch, session=session, data={"col": [1, 2, 3]}) + bf_series = df["col"] + + mock_read_pandas = mock.MagicMock(return_value=bf_series) + session.read_pandas = mock_read_pandas + + mock_apply_nary_op = mock.MagicMock(return_value=bf_series) + monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op) + + pd_series = pd.Series([1, 2, 3]) + expr = col.Expression(ex.const(10)) + + # Call with pandas Series, Expression, and Literal + result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, expr, 42) + + # Verify pandas Series was converted + mock_read_pandas.assert_called_once_with(pd_series) + + # Verify _apply_nary_op was called + mock_apply_nary_op.assert_called_once() + processed_args = mock_apply_nary_op.call_args[0][1] + + # Processed args should be: + # 1. bf_series (converted from pd_series) + # 2. A new Series (projected from the expression onto bf_series' block) + # 3. Literal 42 + assert isinstance(processed_args[0], series.Series) + assert processed_args[0] is bf_series + + assert isinstance(processed_args[1], series.Series) + assert processed_args[1] is not bf_series + + assert processed_args[2] == 42 + + assert isinstance(result, series.Series) From 60751e965c11f2700c8c5e73ab14c3c4e262f53c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 21 May 2026 17:41:03 +0000 Subject: [PATCH 2/2] support more bigframes types in session discovery --- .../bigframes/bigframes/core/googlesql.py | 5 +- .../tests/unit/core/test_googlesql.py | 97 +++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/packages/bigframes/bigframes/core/googlesql.py b/packages/bigframes/bigframes/core/googlesql.py index 5282b4da7046..8869fbff3ef9 100644 --- a/packages/bigframes/bigframes/core/googlesql.py +++ b/packages/bigframes/bigframes/core/googlesql.py @@ -36,8 +36,11 @@ def _is_pandas_series(arg: Any) -> bool: def _find_session(*args: Any) -> Optional[bigframes.session.Session]: + import bigframes.core.indexes as indexes + import bigframes.dataframe as dataframe + for arg in args: - if isinstance(arg, series.Series): + if isinstance(arg, (series.Series, dataframe.DataFrame, indexes.Index)): return arg._session return None diff --git a/packages/bigframes/tests/unit/core/test_googlesql.py b/packages/bigframes/tests/unit/core/test_googlesql.py index b29978ba2220..849128f94456 100644 --- a/packages/bigframes/tests/unit/core/test_googlesql.py +++ b/packages/bigframes/tests/unit/core/test_googlesql.py @@ -169,3 +169,100 @@ def test_apply_googlesql_scalar_op_mixed_args(monkeypatch): assert processed_args[2] == 42 assert isinstance(result, series.Series) + + +def test_apply_googlesql_scalar_op_pandas_series_with_bf_dataframe(monkeypatch): + # Setup mock session 2 (associated with bf_dataframe) + bf_session = mocks.create_bigquery_session(session_id="bf_session") + + # Create a bf_dataframe associated with bf_session + bf_dataframe = mocks.create_dataframe( + monkeypatch, session=bf_session, data={"col": [1, 2, 3]} + ) + bf_series = bf_dataframe["col"] + + # Setup mock session 1 (global) AFTER creating the dataframe + global_session = mocks.create_bigquery_session(session_id="global") + monkeypatch.setattr( + bigframes.core.global_session, "_global_session", global_session + ) + bigframes.options.bigquery._session_started = True + + assert bf_dataframe._session == bf_session + + # Mock read_pandas on both sessions + mock_global_read_pandas = mock.MagicMock() + global_session.read_pandas = mock_global_read_pandas + + mock_bf_read_pandas = mock.MagicMock(return_value=bf_series) + bf_session.read_pandas = mock_bf_read_pandas + + # Mock _apply_nary_op + mock_apply_nary_op = mock.MagicMock(return_value=bf_series) + monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op) + + pd_series = pd.Series([1, 2, 3]) + + # Call with pandas Series and BigFrames DataFrame + result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, bf_dataframe) + + # Verify read_pandas was called on bf_session, NOT global_session + mock_bf_read_pandas.assert_called_once_with(pd_series) + mock_global_read_pandas.assert_not_called() + + # Verify _apply_nary_op was called + mock_apply_nary_op.assert_called_once() + processed_args = mock_apply_nary_op.call_args[0][1] + assert processed_args[0] is bf_series + assert processed_args[1] is bf_dataframe + + assert isinstance(result, series.Series) + + +def test_apply_googlesql_scalar_op_pandas_series_with_bf_index(monkeypatch): + # Setup mock session 2 (associated with bf_index) + bf_session = mocks.create_bigquery_session(session_id="bf_session") + + # Create a bf_dataframe associated with bf_session to get an index + bf_dataframe = mocks.create_dataframe( + monkeypatch, session=bf_session, data={"col": [1, 2, 3]} + ) + bf_index = bf_dataframe.index + bf_series = bf_dataframe["col"] + + # Setup mock session 1 (global) AFTER creating the dataframe + global_session = mocks.create_bigquery_session(session_id="global") + monkeypatch.setattr( + bigframes.core.global_session, "_global_session", global_session + ) + bigframes.options.bigquery._session_started = True + + assert bf_index._session == bf_session + + # Mock read_pandas on both sessions + mock_global_read_pandas = mock.MagicMock() + global_session.read_pandas = mock_global_read_pandas + + mock_bf_read_pandas = mock.MagicMock(return_value=bf_series) + bf_session.read_pandas = mock_bf_read_pandas + + # Mock _apply_nary_op + mock_apply_nary_op = mock.MagicMock(return_value=bf_series) + monkeypatch.setattr(series.Series, "_apply_nary_op", mock_apply_nary_op) + + pd_series = pd.Series([1, 2, 3]) + + # Call with pandas Series and BigFrames Index + result = core_googlesql.apply_googlesql_scalar_op(_TEST_OP, pd_series, bf_index) + + # Verify read_pandas was called on bf_session, NOT global_session + mock_bf_read_pandas.assert_called_once_with(pd_series) + mock_global_read_pandas.assert_not_called() + + # Verify _apply_nary_op was called + mock_apply_nary_op.assert_called_once() + processed_args = mock_apply_nary_op.call_args[0][1] + assert processed_args[0] is bf_series + assert processed_args[1] is bf_index + + assert isinstance(result, series.Series)