diff --git a/packages/bigframes/bigframes/_config/experiment_options.py b/packages/bigframes/bigframes/_config/experiment_options.py index e8183d2b2228..f6bdd5f6ac56 100644 --- a/packages/bigframes/bigframes/_config/experiment_options.py +++ b/packages/bigframes/bigframes/_config/experiment_options.py @@ -28,6 +28,7 @@ def __init__(self): self._semantic_operators: bool = False self._ai_operators: bool = False self._sql_compiler: Literal["legacy", "stable", "experimental"] = "stable" + self._enable_python_transpiler: bool = False @property def semantic_operators(self) -> bool: @@ -166,3 +167,17 @@ def blob_display_height(self, value: Optional[int]): warnings.warn(msg, category=bfe.ApiDeprecationWarning) bigframes.options.display.blob_display_height = value + + @property + def enable_python_transpiler(self) -> bool: + return self._enable_python_transpiler + + @enable_python_transpiler.setter + def enable_python_transpiler(self, value: bool): + if value: + msg = bfe.format_message( + "Python transpiler is an unstable, experimental feature, and not yet fully " + "validated, use at your own risk." + ) + warnings.warn(msg, category=bfe.PreviewWarning) + self._enable_python_transpiler = value diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index 6b7922fe9753..568a6f076d6a 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -4692,13 +4692,17 @@ def _prepare_export( return array_value, id_overrides def map(self, func, na_action: Optional[str] = None) -> DataFrame: - if not isinstance(func, bigframes.functions.Udf): + from bigframes._config import options + + if not isinstance(func, bigframes.functions.Udf) and not ( + options.experiments.enable_python_transpiler and callable(func) + ): raise TypeError("the first argument must be callable") if na_action not in {None, "ignore"}: raise ValueError(f"na_action={na_action} not supported") - expr = ops.func_to_op(func).as_expr(ex.free_var("input")) + expr = ops.func_to_expr(func).apply(ex.free_var("input")) if na_action == "ignore": # True case, predicate, False case expr = ops.where_op.as_expr( @@ -4718,11 +4722,74 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning) - if not isinstance(func, bigframes.functions.Udf): + from bigframes._config import options + + if not isinstance(func, bigframes.functions.Udf) and not ( + options.experiments.enable_python_transpiler and callable(func) + ): raise ValueError( "For axis=1 a BigFrames BigQuery function must be used." ) + if ( + not isinstance(func, bigframes.functions.Udf) + and options.experiments.enable_python_transpiler + and callable(func) + ): + from bigframes.operations.to_op import CallableExpression + + callable_expr = CallableExpression.from_callable( + func, unpack_mode=False + ) + + # Bind the extra arguments (args and kwargs) starting from parameter 1 + bindings = {} + # Positional arguments: + for idx, val in enumerate(args): + param_name = callable_expr.arg_specs[idx + 1].name + bindings[param_name] = val + # Keyword arguments: + for key, val in kwargs.items(): + bindings[key] = val + + # Bind defaults for other parameters (excluding the first 'row' parameter) + for spec in callable_expr.arg_specs[1:]: + if ( + spec.name not in bindings + and spec.default_value is not inspect.Parameter.empty + ): + bindings[spec.name] = spec.default_value + + # Wrap all values in bindings as expressions + def to_expr(val): + if isinstance(val, ex.Expression): + return val + return ex.const(val) + + bindings = {k: to_expr(v) for k, v in bindings.items()} + + # Now bind these variables in the expression! + expr = callable_expr.expr.bind_variables( + bindings, allow_partial_bindings=True + ) + + # Now bind the remaining free variables to the DataFrame columns: + col_bindings = {} + block = self._get_block() + for col in self.columns: + if col in expr.free_variables: + col_id = block.resolve_label_exact(col) + if col_id is not None: + col_bindings[col] = ex.deref(col_id) + + expr = expr.bind_variables(col_bindings) + + # Project the expression on the DataFrame block to get a new Series! + block, result_id = self._get_block().project_expr(expr) + from bigframes.series import Series + + return Series(block.select_column(result_id)) + if func.udf_def.signature.is_row_processor: # Early check whether the dataframe dtypes are currently supported # in the bigquery function @@ -4777,7 +4844,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # Apply the function result_series = rows_as_json_series._apply_nary_op( - ops.func_to_op(func), + ops.func_to_expr(func).expr.op, list(args), ) @@ -4837,8 +4904,8 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): series_list = [self[col] for col in self.columns] op_list = series_list[1:] + list(args) - result_series = series_list[0]._apply_nary_op( - ops.func_to_op(func), op_list + result_series = series_list[0]._apply_callable_expr( + ops.func_to_expr(func), op_list ) result_series.name = None diff --git a/packages/bigframes/bigframes/operations/__init__.py b/packages/bigframes/bigframes/operations/__init__.py index b8d860029a0f..2b4d1ca31d21 100644 --- a/packages/bigframes/bigframes/operations/__init__.py +++ b/packages/bigframes/bigframes/operations/__init__.py @@ -229,7 +229,7 @@ timestamp_add_op, timestamp_sub_op, ) -from bigframes.operations.to_op import func_to_op +from bigframes.operations.to_op import func_to_expr __all__ = [ # Base ops @@ -437,7 +437,7 @@ "AIScore", "AISimilarity", # Helper functions - "func_to_op", + "func_to_expr", # Numpy ops mapping "NUMPY_TO_BINOP", "NUMPY_TO_OP", diff --git a/packages/bigframes/bigframes/operations/to_op.py b/packages/bigframes/bigframes/operations/to_op.py index 7fd44d957e40..68664aa19a67 100644 --- a/packages/bigframes/bigframes/operations/to_op.py +++ b/packages/bigframes/bigframes/operations/to_op.py @@ -11,31 +11,185 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations +import dataclasses +import inspect +import typing + +import bigframes.core.expression as ex +import bigframes.core.identifiers as ids +import bigframes.dtypes as dtypes +from bigframes._config import options from bigframes.functions import Udf from bigframes.functions.udf_def import BigqueryUdf, PythonUdf -from bigframes.operations import base_ops, remote_function_ops +from bigframes.operations import remote_function_ops + + +@dataclasses.dataclass(frozen=True) +class ArgumentSpec: + """ + Information about a single argument to a function + """ + + name: str + default_value: typing.Any + is_varargs: bool -def func_to_op(op) -> base_ops.NaryOp: +@dataclasses.dataclass(frozen=True) +class CallableExpression(ex.Expression): """ - Convert various bigframes, python functions into bigframes operations. + Encodes a calling convention and an expression to bind arguments to. + """ + + expr: ex.Expression + arg_specs: typing.Sequence[ArgumentSpec] + + @classmethod + def from_callable( + cls, func: typing.Callable, unpack_mode: bool = False + ) -> CallableExpression: + sig = inspect.signature(func) + arg_specs = [] + for name, param in sig.parameters.items(): + is_varargs = param.kind == inspect.Parameter.VAR_POSITIONAL + arg_specs.append( + ArgumentSpec( + name=name, + default_value=param.default, + is_varargs=is_varargs, + ) + ) + + from bigframes.core.bytecode import dis_to_expr + + expr = dis_to_expr(func, unpack_mode=unpack_mode) + return cls(expr=expr, arg_specs=arg_specs) + + def apply(self, *args, **kwargs) -> ex.Expression: + """ + Apply the arguments to the expression. + + All args are expected to be column references, or scalars. + """ + bindings = {} + pos_idx = 0 + + def to_expr(val): + if isinstance(val, ex.Expression): + return val + return ex.const(val) + + for spec in self.arg_specs: + if spec.is_varargs: + raise NotImplementedError( + "varargs in compiled python functions is not supported" + ) - This should handle anything that might be passed to eg map, combine, other pandas methods that take a function. + if pos_idx < len(args): + bindings[spec.name] = to_expr(args[pos_idx]) + pos_idx += 1 + elif spec.name in kwargs: + bindings[spec.name] = to_expr(kwargs[spec.name]) + elif spec.default_value is not inspect.Parameter.empty: + bindings[spec.name] = to_expr(spec.default_value) + else: + raise TypeError(f"missing required argument: '{spec.name}'") - It should raise a TypeError if the object is not a supported type. + if pos_idx < len(args): + raise TypeError( + f"too many positional arguments: expected {len(self.arg_specs)}, got {len(args)}" + ) - Args: - op: The object to convert. + return self.expr.bind_variables(bindings) - Returns: - A bigframes operations. + @property + def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: + return self.expr.column_references + + @property + def free_variables(self) -> typing.Tuple[typing.Hashable, ...]: + return self.expr.free_variables + + @property + def is_const(self) -> bool: + return self.expr.is_const + + @property + def is_resolved(self) -> bool: + return False + + @property + def output_type(self) -> dtypes.ExpressionType: + raise ValueError( + "CallableExpression does not have a fixed output type until arguments are applied." + ) + + def bind_refs( + self, + bindings: typing.Mapping[ids.ColumnId, ex.Expression], + allow_partial_bindings: bool = False, + ) -> CallableExpression: + return dataclasses.replace( + self, + expr=self.expr.bind_refs( + bindings, allow_partial_bindings=allow_partial_bindings + ), + ) + + def bind_variables( + self, + bindings: typing.Mapping[typing.Hashable, ex.Expression], + allow_partial_bindings: bool = False, + ) -> CallableExpression: + arg_names = {spec.name for spec in self.arg_specs} + filtered_bindings = {k: v for k, v in bindings.items() if k not in arg_names} + return dataclasses.replace( + self, + expr=self.expr.bind_variables( + filtered_bindings, allow_partial_bindings=allow_partial_bindings + ), + ) + + def transform_children( + self, t: typing.Callable[[ex.Expression], ex.Expression] + ) -> ex.Expression: + new_expr = t(self.expr) + if new_expr != self.expr: + return dataclasses.replace(self, expr=new_expr) + return self + + +def func_to_expr(op, unpack_mode: bool = False) -> CallableExpression: + """ + Convert various bigframes, python functions into bigframes CallableExpression. """ - # TODO(b/517578802): Handle numpy ufuncs, builtin functions, etc. if isinstance(op, Udf): if isinstance(op.udf_def, BigqueryUdf): - return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) + bq_op = remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) elif isinstance(op.udf_def, PythonUdf): - return remote_function_ops.PythonUdfOp(function_def=op.udf_def) + bq_op = remote_function_ops.PythonUdfOp(function_def=op.udf_def) + else: + raise TypeError(f"Unsupported UDF definition: {op.udf_def}") + + inputs_expr = tuple( + ex.free_var(arg.name) for arg in op.udf_def.signature.inputs + ) + expr = ex.OpExpression(bq_op, inputs_expr) + + arg_specs = [ + ArgumentSpec( + name=arg.name, + default_value=inspect.Parameter.empty, + is_varargs=False, + ) + for arg in op.udf_def.signature.inputs + ] + return CallableExpression(expr=expr, arg_specs=arg_specs) + + elif options.experiments.enable_python_transpiler and callable(op): + return CallableExpression.from_callable(op, unpack_mode=unpack_mode) + else: raise TypeError(f"Unsupported function type: {op}") diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index 181bc4f63b2f..54c1eed0582f 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -2065,9 +2065,13 @@ def apply( " are supported." ) - if isinstance(func, bigframes.functions.Udf): + from bigframes._config import options + + if isinstance(func, bigframes.functions.Udf) or ( + options.experiments.enable_python_transpiler and callable(func) + ): # We are working with bigquery function at this point - result_series = self._apply_nary_op(ops.func_to_op(func), args) + result_series = self._apply_callable_expr(ops.func_to_expr(func), args) # TODO(jialuo): Investigate why `_apply_nary_op` drops the series # `name`. Manually reassigning it here as a temporary fix. result_series.name = self.name @@ -2119,8 +2123,12 @@ def combine( " are supported." ) - if isinstance(func, bigframes.functions.Udf): - result_series = self._apply_nary_op(ops.func_to_op(func), (other,)) + from bigframes._config import options + + if isinstance(func, bigframes.functions.Udf) or ( + options.experiments.enable_python_transpiler and callable(func) + ): + result_series = self._apply_callable_expr(ops.func_to_expr(func), (other,)) if hasattr(other, "name") and other.name != self._name: # type: ignore result_series.name = None else: @@ -2727,6 +2735,19 @@ def _apply_nary_op( block, result_id = block.project_expr(op.as_expr(*values)) return Series(block.select_column(result_id).with_column_labels([None])) + def _apply_callable_expr( + self, + callable_expr: bigframes.operations.to_op.CallableExpression, + others: Sequence[typing.Union[Series, scalars.Scalar]], + ignore_self=False, + ): + """Applies a CallableExpression to the series and others.""" + values, block = self._align_n( + others, ignore_self=ignore_self, cast_scalars=False + ) + block, result_id = block.project_expr(callable_expr.apply(*values)) + return Series(block.select_column(result_id).with_column_labels([None])) + def _apply_binary_aggregation( self, other: Series, stat: agg_ops.BinaryAggregateOp ) -> float: diff --git a/packages/bigframes/tests/unit/test_py_udf.py b/packages/bigframes/tests/unit/test_py_udf.py new file mode 100644 index 000000000000..c44e1e946a13 --- /dev/null +++ b/packages/bigframes/tests/unit/test_py_udf.py @@ -0,0 +1,129 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib +from typing import Generator + +import pandas as pd +import pandas.testing +import pytest + +import bigframes +import bigframes.pandas as bpd +from bigframes.testing.utils import ( + assert_frame_equal, + assert_series_equal, + convert_pandas_dtypes, +) + +pytest.importorskip("polars") +pytest.importorskip("pandas", minversion="2.0.0") + +CURRENT_DIR = pathlib.Path(__file__).parent +DATA_DIR = CURRENT_DIR.parent / "data" + + +@pytest.fixture(scope="module", autouse=True) +def session() -> Generator[bigframes.Session, None, None]: + import bigframes.core.global_session + from bigframes.testing import polars_session + + with bpd.option_context("experiments.enable_python_transpiler", True): + session = polars_session.TestSession() + with bigframes.core.global_session._GlobalSessionContext(session): + yield session + + +@pytest.fixture(scope="module") +def scalars_pandas_df_index() -> pd.DataFrame: + """pd.DataFrame pointing at test data.""" + + df = pd.read_json( + DATA_DIR / "scalars.jsonl", + lines=True, + ) + convert_pandas_dtypes(df, bytes_col=True) + + df = df.set_index("rowindex", drop=False) + df.index.name = None + return df.set_index("rowindex").sort_index() + + +@pytest.fixture(scope="module") +def scalars_df_index( + session: bigframes.Session, scalars_pandas_df_index +) -> bpd.DataFrame: + return session.read_pandas(scalars_pandas_df_index) + + +@pytest.fixture(scope="module") +def scalars_dfs( + scalars_df_index, + scalars_pandas_df_index, +): + return scalars_df_index, scalars_pandas_df_index + + +def test_dataframe_map_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input): + return input * 3 + 12 + + bf_result = scalars_df_index[columns].map(foo, na_action="ignore").to_pandas() + + pd_result = ( + scalars_pandas_df_index[columns].map(foo, na_action="ignore").astype("Int64") + ) + + assert_frame_equal(bf_result, pd_result) + + +def test_dataframe_apply_axis_1_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input): + return input.int64_too + input.int64_col + + bf_result = scalars_df_index[columns].apply(foo, axis=1).to_pandas() + + pd_result = scalars_pandas_df_index[columns].apply(foo, axis=1).astype("Int64") + + assert_series_equal(bf_result, pd_result) + + +def test_series_combine_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + def which_smaller(left, right): + return (left * right) + 3 + + bf_result = ( + scalars_df_index["int64_too"] + .combine(scalars_df_index["int64_col"], which_smaller) + .to_pandas() + ) + + pd_result = scalars_pandas_df_index["int64_too"].combine( + scalars_pandas_df_index["int64_col"], which_smaller + ) + + assert_series_equal(bf_result, pd_result)