From a58279aacea8e95f39ea2550f657b38f73dba0c0 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 20 May 2026 22:41:35 +0000 Subject: [PATCH 01/10] feat(bigframes): Defer unnamed @udf deployment until needed --- .../bigframes/functions/_function_session.py | 127 +++++++++++++++--- .../bigframes/bigframes/functions/function.py | 8 +- .../bigframes/bigframes/functions/udf_def.py | 56 +++++++- .../bigframes/bigframes/session/__init__.py | 2 +- .../bigframes/session/bq_caching_executor.py | 75 +++++++++++ .../large/functions/test_managed_function.py | 63 +++++++++ .../unit/functions/test_remote_function.py | 63 +++++++++ 7 files changed, 372 insertions(+), 22 deletions(-) diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 186ecbcf3e1f..27b0a250db65 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -64,6 +64,9 @@ def __init__(self): # Lock to synchronize the update of the session artifacts self._artifacts_lock = threading.Lock() + self._deployed_routines: set[bytes] = set() + self._deploying_routines: set[bytes] = set() + def _resolve_session(self, session: Optional[Session]) -> Session: """Resolves the BigFrames session.""" import bigframes.pandas as bpd @@ -191,6 +194,83 @@ def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str): with self._artifacts_lock: self._temp_artifacts[bqrf_routine] = gcf_path + def deploy_undeployed_udf( + self, + session: Session, + bq_udf: udf_def.PythonUdf, + ) -> udf_def.BigqueryUdf: + """Deploys a UDF to BigQuery if not already deployed.""" + udf_hash = bq_udf.stable_hash() + import time + + bigquery_client = self._resolve_bigquery_client(session, None) + bq_connection_manager = session.bqconnectionmanager + dataset_ref = self._resolve_dataset_reference(session, bigquery_client, None) + bq_location, _ = _utils.get_remote_function_locations( + bigquery_client.location + ) + + managed_function_client = _function_client.FunctionClient( + dataset_ref.project, + bq_location, + dataset_ref.dataset_id, + bigquery_client, + bq_connection_manager, + session=session, + ) + + config = bq_udf.to_managed_function_config() + bq_function_name = _function_client.get_managed_function_name( + config, session.session_id + ) + full_rf_name = ( + managed_function_client.get_remote_function_fully_qualilfied_name( + bq_function_name + ) + ) + routine_ref = bigquery.RoutineReference.from_string(full_rf_name) + + with self._artifacts_lock: + if udf_hash in self._deployed_routines: + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + + while True: + with self._artifacts_lock: + if udf_hash in self._deployed_routines: + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + + if udf_hash not in self._deploying_routines: + self._deploying_routines.add(udf_hash) + break + + time.sleep(0.2) + + try: + managed_function_client.provision_bq_managed_function( + name=bq_function_name, + config=config, + ) + except Exception: + with self._artifacts_lock: + self._deploying_routines.discard(udf_hash) + raise + + with self._artifacts_lock: + self._deploying_routines.discard(udf_hash) + self._deployed_routines.add(udf_hash) + self._temp_artifacts[full_rf_name] = "" + + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + def clean_up( self, bqclient: bigquery.Client, @@ -679,6 +759,8 @@ def udf( max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, container_memory: Optional[str] = None, + *, + _force_deploy: bool = False, ): """Decorator to turn a Python user defined function (udf) into a BigQuery managed function. @@ -835,27 +917,42 @@ def wrapper(func): capture_references=False, ) - bq_function_name = managed_function_client.provision_bq_managed_function( - name=name, - config=config, - ) - full_rf_name = ( - managed_function_client.get_remote_function_fully_qualilfied_name( - bq_function_name - ) + requirements = udf_def.RuntimeRequirements( + container_cpu=container_cpu, + container_memory=container_memory, + bq_connection_id=bq_connection_id, + max_batching_rows=max_batching_rows, + packages=tuple(packages) if packages else (), ) - udf_definition = udf_def.BigqueryUdf( - routine_ref=bigquery.RoutineReference.from_string(full_rf_name), - signature=udf_sig, - ) + if not name and not _force_deploy: # session-owned resource - deferred deployment + udf_definition = udf_def.PythonUdf( + signature=udf_sig, + code=code_def, + requirements=requirements, + ) + else: + bq_function_name = managed_function_client.provision_bq_managed_function( + name=name, + config=config, + ) + full_rf_name = ( + managed_function_client.get_remote_function_fully_qualilfied_name( + bq_function_name + ) + ) + udf_definition = udf_def.BigqueryUdf( + routine_ref=bigquery.RoutineReference.from_string(full_rf_name), + signature=udf_sig, + ) if udf_sig.is_row_processor: msg = bfe.format_message("input_types=Series is in preview.") warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) if not name: # session-owned resource - will be cleaned up automatically - self._update_temp_artifacts(full_rf_name, "") + if _force_deploy: + self._update_temp_artifacts(full_rf_name, "") return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) # user-managed permanent resource - will not be cleaned up automatically @@ -888,9 +985,7 @@ def deploy_udf( A wrapped Python user defined function, usable in :meth:`~bigframes.series.Series.apply`. """ - # TODO(tswast): If we update udf to defer deployment, update this method - # to deploy immediately. - return self.udf(**kwargs)(func) + return self.udf(_force_deploy=True, **kwargs)(func) def _resolve_signature( diff --git a/packages/bigframes/bigframes/functions/function.py b/packages/bigframes/bigframes/functions/function.py index dc0fa55c8e7b..49b59f4360b7 100644 --- a/packages/bigframes/bigframes/functions/function.py +++ b/packages/bigframes/bigframes/functions/function.py @@ -16,7 +16,7 @@ import dataclasses import logging -from typing import TYPE_CHECKING, Callable, Optional, Protocol, runtime_checkable +from typing import TYPE_CHECKING, Callable, Optional, Protocol, runtime_checkable, Union import google.api_core.exceptions from google.cloud import bigquery @@ -162,7 +162,7 @@ class Udf(Protocol): """ @property - def udf_def(self) -> udf_def.BigqueryUdf: ... + def udf_def(self) -> Union[udf_def.BigqueryUdf, udf_def.PythonUdf]: ... class BigqueryCallableRoutine: @@ -242,11 +242,11 @@ class UdfRoutine: func: Callable # Try not to depend on this, bq managed function creation will be deferred later # And this ref will be replaced with requirements rather to support lazy creation - _udf_def: udf_def.BigqueryUdf + _udf_def: Union[udf_def.BigqueryUdf, udf_def.PythonUdf] def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) @property - def udf_def(self) -> udf_def.BigqueryUdf: + def udf_def(self) -> Union[udf_def.BigqueryUdf, udf_def.PythonUdf]: return self._udf_def diff --git a/packages/bigframes/bigframes/functions/udf_def.py b/packages/bigframes/bigframes/functions/udf_def.py index fbe000f608fd..f85b04edd066 100644 --- a/packages/bigframes/bigframes/functions/udf_def.py +++ b/packages/bigframes/bigframes/functions/udf_def.py @@ -371,12 +371,35 @@ def stable_hash(self) -> bytes: return hash_val.digest() +@dataclasses.dataclass(frozen=True) +class RuntimeRequirements: + container_cpu: Optional[float] = None + container_memory: Optional[str] = None + bq_connection_id: Optional[str] = None + max_batching_rows: Optional[int] = None + packages: tuple[str, ...] = () + + def stable_hash(self) -> bytes: + hash_val = google_crc32c.Checksum() + if self.container_cpu is not None: + hash_val.update(str(self.container_cpu).encode()) + if self.container_memory is not None: + hash_val.update(str(self.container_memory).encode()) + if self.bq_connection_id is not None: + hash_val.update(str(self.bq_connection_id).encode()) + if self.max_batching_rows is not None: + hash_val.update(str(self.max_batching_rows).encode()) + if self.packages: + for p in sorted(self.packages): + hash_val.update(p.encode()) + return hash_val.digest() + + @dataclasses.dataclass(frozen=True) class BigqueryUdf: """ Represents the information needed to call a BigQuery remote function - not a full spec. """ - routine_ref: bigquery.RoutineReference = dataclasses.field() signature: UdfSignature @@ -398,6 +421,37 @@ def from_routine( return cls(routine.reference, signature=signature) +@dataclasses.dataclass(frozen=True) +class PythonUdf: + """ + Represents user-requested Python UDF semantics, including the code and runtime requirements. + """ + signature: UdfSignature + code: CodeDef + requirements: RuntimeRequirements = dataclasses.field( + default_factory=RuntimeRequirements + ) + + def stable_hash(self) -> bytes: + hash_val = google_crc32c.Checksum() + hash_val.update(self.code.stable_hash()) + hash_val.update(self.signature.stable_hash()) + hash_val.update(self.requirements.stable_hash()) + return hash_val.digest() + + def to_managed_function_config(self) -> ManagedFunctionConfig: + return ManagedFunctionConfig( + code=self.code, + signature=self.signature, + max_batching_rows=self.requirements.max_batching_rows, + container_cpu=self.requirements.container_cpu, + container_memory=self.requirements.container_memory, + bq_connection_id=self.requirements.bq_connection_id, + capture_references=False, + ) + + + @dataclasses.dataclass(frozen=True) class CodeDef: # Produced by cloudpickle, not compatible across python versions diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 38e92a60321b..92e032bc31e4 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -1958,7 +1958,7 @@ def udf( output_type: Optional[type] = None, dataset: str, bigquery_connection: Optional[str] = None, - name: str, + name: Optional[str] = None, packages: Optional[Sequence[str]] = None, max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 9948480d5cac..541cb0cbed02 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -511,12 +511,87 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode plan = plan.top_down(rewrite.fold_row_counts) return plan + async def _deploy_undeployed_udfs(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode: + import dataclasses + import bigframes.core.expression as expression + import bigframes.functions.udf_def as udf_def + import bigframes.operations as ops + + undeployed_udfs: list[udf_def.PythonUdf] = [] + for node in plan.unique_nodes(): + for expr in node._node_expressions: + for sub_expr in expr.walk(): + if isinstance(sub_expr, expression.OpExpression): + op = sub_expr.op + if isinstance( + op, + ( + ops.RemoteFunctionOp, + ops.BinaryRemoteFunctionOp, + ops.NaryRemoteFunctionOp, + ), + ): + func_def = op.function_def + if isinstance(func_def, udf_def.PythonUdf): + undeployed_udfs.append(func_def) + + if not undeployed_udfs: + return plan + + # Deduplicate while preserving order + seen = set() + unique_undeployed_udfs = [] + for udf in undeployed_udfs: + if udf not in seen: + seen.add(udf) + unique_undeployed_udfs.append(udf) + + session = self.loader._session + deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf] = {} + for udf in unique_undeployed_udfs: + deployed_udf = await asyncio.to_thread( + session._function_session.deploy_undeployed_udf, + session, + udf, + ) + deployed_mapping[udf] = deployed_udf + + # Now rewrite the plan using bottom_up to substitute the UDF definitions! + def replace_in_expr(expr: expression.Expression) -> expression.Expression: + def replace_step(e: expression.Expression) -> expression.Expression: + if isinstance(e, expression.OpExpression): + op = e.op + if isinstance( + op, + ( + ops.RemoteFunctionOp, + ops.BinaryRemoteFunctionOp, + ops.NaryRemoteFunctionOp, + ), + ): + func_def = op.function_def + if func_def in deployed_mapping: + new_func_def = deployed_mapping[func_def] + new_op = dataclasses.replace(op, function_def=new_func_def) + return dataclasses.replace(e, op=new_op) + return e + + return expr.bottom_up(replace_step) + + def replace_in_node(node: nodes.BigFrameNode) -> nodes.BigFrameNode: + if hasattr(node, "transform_exprs"): + return node.transform_exprs(replace_in_expr) + return node + + return plan.bottom_up(replace_in_node) + async def _prepare_plan_bq_execution( self, plan: nodes.BigFrameNode, compute_options: Optional[ex_spec.BqComputeOptions] = None, ) -> nodes.BigFrameNode: """Prepare the plan for BigQuery execution by caching subtrees and uploading large local sources.""" + plan = await self._deploy_undeployed_udfs(plan) if compute_options is not None and compute_options.enable_multi_query_execution: await self._simplify_with_caching(plan, compute_options=compute_options) plan = self._prepare_plan_simplify(plan) diff --git a/packages/bigframes/tests/system/large/functions/test_managed_function.py b/packages/bigframes/tests/system/large/functions/test_managed_function.py index 0c2e3d8fe895..d735cc407b9b 100644 --- a/packages/bigframes/tests/system/large/functions/test_managed_function.py +++ b/packages/bigframes/tests/system/large/functions/test_managed_function.py @@ -1128,3 +1128,66 @@ def foo_list(x: int, y0: float, y1: bytes, y2: bool) -> list[str]: # Ignore any dtype difference. pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + +def test_deferred_unnamed_udf_execution(session, dataset_id, scalars_dfs): + import bigframes.functions.udf_def as udf_def + + # Create an unnamed UDF (name=None) + @session.udf(dataset=dataset_id) + def unnamed_multiplier(x: int) -> int: + return x * 3 + + # 1. Assert it is represented as a PythonUdf (not deployed yet) + assert isinstance(unnamed_multiplier.udf_def, udf_def.PythonUdf) + + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df["int64_too"] + pd_series = scalars_pandas_df["int64_too"] + + # 2. Applying it triggers deployment behind the scenes! + bf_result = bf_series.apply(unnamed_multiplier).to_pandas() + pd_result = pd_series.apply(lambda x: x * 3) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # 3. Verify that the deployed routine name matches our stable hash and exists in BigQuery + import bigframes.functions._function_client as bff_client + config = unnamed_multiplier.udf_def.to_managed_function_config() + expected_routine_name = bff_client.get_managed_function_name(config, session.session_id) + routine = session.bqclient.get_routine(f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}") + assert routine is not None + + +def test_deferred_udf_with_runtime_requirements(session, dataset_id, scalars_dfs): + import bigframes.functions.udf_def as udf_def + + # Create an unnamed UDF with custom options + @session.udf( + dataset=dataset_id, + container_cpu=1, + container_memory="2Gi", + max_batching_rows=25, + ) + def heavy_unnamed_udf(x: int) -> int: + return x + 100 + + assert isinstance(heavy_unnamed_udf.udf_def, udf_def.PythonUdf) + + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df["int64_too"] + pd_series = scalars_pandas_df["int64_too"] + + bf_result = bf_series.apply(heavy_unnamed_udf).to_pandas() + pd_result = pd_series.apply(lambda x: x + 100) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # Verify it was deployed with the correct runtime options + import bigframes.functions._function_client as bff_client + config = heavy_unnamed_udf.udf_def.to_managed_function_config() + expected_routine_name = bff_client.get_managed_function_name(config, session.session_id) + routine = session.bqclient.get_routine(f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}") + assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 1 + assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" + assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "25" diff --git a/packages/bigframes/tests/unit/functions/test_remote_function.py b/packages/bigframes/tests/unit/functions/test_remote_function.py index 17c04b338385..a6bd0cab3cee 100644 --- a/packages/bigframes/tests/unit/functions/test_remote_function.py +++ b/packages/bigframes/tests/unit/functions/test_remote_function.py @@ -75,3 +75,66 @@ def my_remote_func(x: int) -> int: # Test that the function would have been deployed somewhere. assert "my_custom_name" in deployed.bigframes_bigquery_function + + +def test_deferred_udf_execution(): + import bigframes.functions.udf_def as udf_def + import google.cloud.bigquery + + session = mocks.create_bigquery_session() + @session._function_session.udf(session=session) + def my_unnamed_udf(x: int) -> int: + return x * 2 + + # 1. Verify that no BQ query was executed to deploy the UDF during registration! + session._queries.clear() + assert len(session._queries) == 0 + + # 2. Verify that it created a PythonUdf + assert isinstance(my_unnamed_udf.udf_def, udf_def.PythonUdf) + + # 3. Verify that when calling the UDF via a query, it triggers the UDF deployment query! + import bigframes.core.nodes as nodes + import bigframes.core.expression as ex + import bigframes.operations as ops + + # Let's construct an expression using our UDF + udf_op = ops.RemoteFunctionOp(function_def=my_unnamed_udf.udf_def, apply_on_null=False) + expr = ex.OpExpression(op=udf_op, inputs=(ex.const(5),)) + + class MockNode: + def __init__(self, exprs): + self._node_expressions = exprs + self.child_nodes = [] + + def unique_nodes(self): + yield self + + def bottom_up(self, transform): + return transform(self) + + def transform_exprs(self, fn): + return MockNode([fn(e) for e in self._node_expressions]) + + mock_node = MockNode([expr]) + + import asyncio + # Deploy and replace definition in the plan + new_plan = asyncio.run(session._executor._ibis_executor._deploy_undeployed_udfs(mock_node)) + + # Verify that the DDL to create the function was executed! + assert len(session._queries) > 0 + assert any("CREATE OR REPLACE FUNCTION" in q for q in session._queries) + + # 4. Verify that the definition in the plan has been replaced with BigqueryUdf + new_expr = new_plan._node_expressions[0] + new_op = new_expr.op + assert isinstance(new_op.function_def, udf_def.BigqueryUdf) + assert new_op.function_def.routine_ref is not None + + # 5. Verify memoization: Deploying the new plan again executes ZERO additional DDL queries! + session._queries.clear() + new_plan_2 = asyncio.run(session._executor._ibis_executor._deploy_undeployed_udfs(new_plan)) + assert len(session._queries) == 0 + assert new_plan_2 == new_plan + From 50370dc5dd191845208240b9a125bb8665e9497a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 20 May 2026 23:10:30 +0000 Subject: [PATCH 02/10] fixes --- .../bigframes/functions/_function_session.py | 16 ++++++++------- .../bigframes/bigframes/functions/function.py | 2 +- .../bigframes/bigframes/functions/udf_def.py | 3 ++- .../bigframes/bigframes/pandas/__init__.py | 2 +- .../bigframes/session/bq_caching_executor.py | 5 ++++- .../large/functions/test_managed_function.py | 18 +++++++++++++---- .../unit/functions/test_remote_function.py | 20 +++++++++++++------ 7 files changed, 45 insertions(+), 21 deletions(-) diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 27b0a250db65..b02fb665c3f7 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -206,9 +206,7 @@ def deploy_undeployed_udf( bigquery_client = self._resolve_bigquery_client(session, None) bq_connection_manager = session.bqconnectionmanager dataset_ref = self._resolve_dataset_reference(session, bigquery_client, None) - bq_location, _ = _utils.get_remote_function_locations( - bigquery_client.location - ) + bq_location, _ = _utils.get_remote_function_locations(bigquery_client.location) managed_function_client = _function_client.FunctionClient( dataset_ref.project, @@ -925,16 +923,20 @@ def wrapper(func): packages=tuple(packages) if packages else (), ) - if not name and not _force_deploy: # session-owned resource - deferred deployment + if ( + not name and not _force_deploy + ): # session-owned resource - deferred deployment udf_definition = udf_def.PythonUdf( signature=udf_sig, code=code_def, requirements=requirements, ) else: - bq_function_name = managed_function_client.provision_bq_managed_function( - name=name, - config=config, + bq_function_name = ( + managed_function_client.provision_bq_managed_function( + name=name, + config=config, + ) ) full_rf_name = ( managed_function_client.get_remote_function_fully_qualilfied_name( diff --git a/packages/bigframes/bigframes/functions/function.py b/packages/bigframes/bigframes/functions/function.py index 49b59f4360b7..e9a40f415324 100644 --- a/packages/bigframes/bigframes/functions/function.py +++ b/packages/bigframes/bigframes/functions/function.py @@ -16,7 +16,7 @@ import dataclasses import logging -from typing import TYPE_CHECKING, Callable, Optional, Protocol, runtime_checkable, Union +from typing import TYPE_CHECKING, Callable, Optional, Protocol, Union, runtime_checkable import google.api_core.exceptions from google.cloud import bigquery diff --git a/packages/bigframes/bigframes/functions/udf_def.py b/packages/bigframes/bigframes/functions/udf_def.py index f85b04edd066..b95dafc4253b 100644 --- a/packages/bigframes/bigframes/functions/udf_def.py +++ b/packages/bigframes/bigframes/functions/udf_def.py @@ -400,6 +400,7 @@ class BigqueryUdf: """ Represents the information needed to call a BigQuery remote function - not a full spec. """ + routine_ref: bigquery.RoutineReference = dataclasses.field() signature: UdfSignature @@ -426,6 +427,7 @@ class PythonUdf: """ Represents user-requested Python UDF semantics, including the code and runtime requirements. """ + signature: UdfSignature code: CodeDef requirements: RuntimeRequirements = dataclasses.field( @@ -451,7 +453,6 @@ def to_managed_function_config(self) -> ManagedFunctionConfig: ) - @dataclasses.dataclass(frozen=True) class CodeDef: # Produced by cloudpickle, not compatible across python versions diff --git a/packages/bigframes/bigframes/pandas/__init__.py b/packages/bigframes/bigframes/pandas/__init__.py index 34ec3037e92f..082a00438f42 100644 --- a/packages/bigframes/bigframes/pandas/__init__.py +++ b/packages/bigframes/bigframes/pandas/__init__.py @@ -202,7 +202,7 @@ def udf( output_type: Optional[type] = None, dataset: str, bigquery_connection: Optional[str] = None, - name: str, + name: Optional[str] = None, packages: Optional[Sequence[str]] = None, max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 541cb0cbed02..e50cd18a3c34 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -511,8 +511,11 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode plan = plan.top_down(rewrite.fold_row_counts) return plan - async def _deploy_undeployed_udfs(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode: + async def _deploy_undeployed_udfs( + self, plan: nodes.BigFrameNode + ) -> nodes.BigFrameNode: import dataclasses + import bigframes.core.expression as expression import bigframes.functions.udf_def as udf_def import bigframes.operations as ops diff --git a/packages/bigframes/tests/system/large/functions/test_managed_function.py b/packages/bigframes/tests/system/large/functions/test_managed_function.py index d735cc407b9b..e93d2bb068be 100644 --- a/packages/bigframes/tests/system/large/functions/test_managed_function.py +++ b/packages/bigframes/tests/system/large/functions/test_managed_function.py @@ -1153,9 +1153,14 @@ def unnamed_multiplier(x: int) -> int: # 3. Verify that the deployed routine name matches our stable hash and exists in BigQuery import bigframes.functions._function_client as bff_client + config = unnamed_multiplier.udf_def.to_managed_function_config() - expected_routine_name = bff_client.get_managed_function_name(config, session.session_id) - routine = session.bqclient.get_routine(f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}") + expected_routine_name = bff_client.get_managed_function_name( + config, session.session_id + ) + routine = session.bqclient.get_routine( + f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}" + ) assert routine is not None @@ -1185,9 +1190,14 @@ def heavy_unnamed_udf(x: int) -> int: # Verify it was deployed with the correct runtime options import bigframes.functions._function_client as bff_client + config = heavy_unnamed_udf.udf_def.to_managed_function_config() - expected_routine_name = bff_client.get_managed_function_name(config, session.session_id) - routine = session.bqclient.get_routine(f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}") + expected_routine_name = bff_client.get_managed_function_name( + config, session.session_id + ) + routine = session.bqclient.get_routine( + f"{session._anonymous_dataset.project}.{session._anonymous_dataset.dataset_id}.{expected_routine_name}" + ) assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 1 assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "25" diff --git a/packages/bigframes/tests/unit/functions/test_remote_function.py b/packages/bigframes/tests/unit/functions/test_remote_function.py index a6bd0cab3cee..4490bf36caae 100644 --- a/packages/bigframes/tests/unit/functions/test_remote_function.py +++ b/packages/bigframes/tests/unit/functions/test_remote_function.py @@ -78,10 +78,12 @@ def my_remote_func(x: int) -> int: def test_deferred_udf_execution(): - import bigframes.functions.udf_def as udf_def import google.cloud.bigquery + import bigframes.functions.udf_def as udf_def + session = mocks.create_bigquery_session() + @session._function_session.udf(session=session) def my_unnamed_udf(x: int) -> int: return x * 2 @@ -94,12 +96,14 @@ def my_unnamed_udf(x: int) -> int: assert isinstance(my_unnamed_udf.udf_def, udf_def.PythonUdf) # 3. Verify that when calling the UDF via a query, it triggers the UDF deployment query! - import bigframes.core.nodes as nodes import bigframes.core.expression as ex + import bigframes.core.nodes as nodes import bigframes.operations as ops # Let's construct an expression using our UDF - udf_op = ops.RemoteFunctionOp(function_def=my_unnamed_udf.udf_def, apply_on_null=False) + udf_op = ops.RemoteFunctionOp( + function_def=my_unnamed_udf.udf_def, apply_on_null=False + ) expr = ex.OpExpression(op=udf_op, inputs=(ex.const(5),)) class MockNode: @@ -119,8 +123,11 @@ def transform_exprs(self, fn): mock_node = MockNode([expr]) import asyncio + # Deploy and replace definition in the plan - new_plan = asyncio.run(session._executor._ibis_executor._deploy_undeployed_udfs(mock_node)) + new_plan = asyncio.run( + session._executor._ibis_executor._deploy_undeployed_udfs(mock_node) + ) # Verify that the DDL to create the function was executed! assert len(session._queries) > 0 @@ -134,7 +141,8 @@ def transform_exprs(self, fn): # 5. Verify memoization: Deploying the new plan again executes ZERO additional DDL queries! session._queries.clear() - new_plan_2 = asyncio.run(session._executor._ibis_executor._deploy_undeployed_udfs(new_plan)) + new_plan_2 = asyncio.run( + session._executor._ibis_executor._deploy_undeployed_udfs(new_plan) + ) assert len(session._queries) == 0 assert new_plan_2 == new_plan - From ea0d7df6ce82d4a13186edffab7afe258bf3d3a9 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 21 May 2026 20:53:55 +0000 Subject: [PATCH 03/10] simplify remote function op defs --- .../ibis_compiler/scalar_op_registry.py | 36 ---------- .../sqlglot/expressions/generic_ops.py | 23 ------ .../bigframes/bigframes/core/rewrite/udfs.py | 43 +---------- packages/bigframes/bigframes/dataframe.py | 26 +++---- .../bigframes/functions/_function_session.py | 4 +- .../bigframes/operations/__init__.py | 9 +-- .../operations/remote_function_ops.py | 27 ++----- .../bigframes/bigframes/operations/to_op.py | 39 ++++++++++ packages/bigframes/bigframes/series.py | 19 ++--- .../bigframes/session/bq_caching_executor.py | 63 ++++++++-------- .../sqlglot/expressions/test_generic_ops.py | 57 +-------------- .../unit/functions/test_remote_function.py | 71 ------------------- 12 files changed, 97 insertions(+), 320 deletions(-) create mode 100644 packages/bigframes/bigframes/operations/to_op.py diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index e9e8435ea1a1..b5b1e515cefa 100644 --- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1039,42 +1039,6 @@ def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): udf_sig = op.function_def.signature assert not udf_sig.is_virtual # should have been devirtualized in lowering pass ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) - - @ibis_udf.scalar.builtin( - name=str(op.function_def.routine_ref), signature=ibis_py_sig - ) - def udf(input): ... - - x_transformed = udf(x) - if not op.apply_on_null: - return ibis_api.case().when(x.isnull(), x).else_(x_transformed).end() - return x_transformed - - -@scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True) -def binary_remote_function_op_impl( - x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp -): - udf_sig = op.function_def.signature - assert not udf_sig.is_virtual # should have been devirtualized in lowering pass - ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) - - @ibis_udf.scalar.builtin( - name=str(op.function_def.routine_ref), signature=ibis_py_sig - ) - def udf(input1, input2): ... - - x_transformed = udf(x, y) - return x_transformed - - -@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) -def nary_remote_function_op_impl( - *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp -): - udf_sig = op.function_def.signature - assert not udf_sig.is_virtual # should have been devirtualized in lowering pass - ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) arg_names = tuple(arg.name for arg in udf_sig.inputs) @ibis_udf.scalar.builtin( diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py b/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py index 644e7bc365f7..1e9cc7c8aede 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py @@ -188,29 +188,6 @@ def _get_remote_function_name(op): @register_unary_op(ops.RemoteFunctionOp, pass_op=True) def _(expr: TypedExpr, op: ops.RemoteFunctionOp) -> sge.Expression: - func_name = _get_remote_function_name(op) - func = sge.func(func_name, expr.expr) - - if not op.apply_on_null: - return sge.If( - this=sge.Is(this=expr.expr, expression=sge.Null()), - true=expr.expr, - false=func, - ) - - return func - - -@register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True) -def _( - left: TypedExpr, right: TypedExpr, op: ops.BinaryRemoteFunctionOp -) -> sge.Expression: - func_name = _get_remote_function_name(op) - return sge.func(func_name, left.expr, right.expr) - - -@register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) -def _(*operands: TypedExpr, op: ops.NaryRemoteFunctionOp) -> sge.Expression: func_name = _get_remote_function_name(op) return sge.func(func_name, *(operand.expr for operand in operands)) diff --git a/packages/bigframes/bigframes/core/rewrite/udfs.py b/packages/bigframes/bigframes/core/rewrite/udfs.py index 284ac4217c09..286a9d9d9401 100644 --- a/packages/bigframes/bigframes/core/rewrite/udfs.py +++ b/packages/bigframes/bigframes/core/rewrite/udfs.py @@ -32,7 +32,6 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression: func_def = expr.op.function_def devirtualized_expr = ops.RemoteFunctionOp( func_def.with_devirtualize(), - apply_on_null=expr.op.apply_on_null, ).as_expr(*expr.children) if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): return func_def.signature.output.out_expr(devirtualized_expr) @@ -40,47 +39,7 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression: return devirtualized_expr -@dataclasses.dataclass -class LowerBinaryRemoteFunctionRule(op_lowering.OpLoweringRule): - @property - def op(self) -> type[ops.ScalarOp]: - return ops.BinaryRemoteFunctionOp - - def lower(self, expr: expression.OpExpression) -> expression.Expression: - assert isinstance(expr.op, ops.BinaryRemoteFunctionOp) - func_def = expr.op.function_def - devirtualized_expr = ops.BinaryRemoteFunctionOp( - func_def.with_devirtualize(), - ).as_expr(*expr.children) - if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): - return func_def.signature.output.out_expr(devirtualized_expr) - else: - return devirtualized_expr - - -@dataclasses.dataclass -class LowerNaryRemoteFunctionRule(op_lowering.OpLoweringRule): - @property - def op(self) -> type[ops.ScalarOp]: - return ops.NaryRemoteFunctionOp - - def lower(self, expr: expression.OpExpression) -> expression.Expression: - assert isinstance(expr.op, ops.NaryRemoteFunctionOp) - func_def = expr.op.function_def - devirtualized_expr = ops.NaryRemoteFunctionOp( - func_def.with_devirtualize(), - ).as_expr(*expr.children) - if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): - return func_def.signature.output.out_expr(devirtualized_expr) - else: - return devirtualized_expr - - -UDF_LOWERING_RULES = ( - LowerRemoteFunctionRule(), - LowerBinaryRemoteFunctionRule(), - LowerNaryRemoteFunctionRule(), -) +UDF_LOWERING_RULES = (LowerRemoteFunctionRule(),) def lower_udfs(root: bigframe_node.BigFrameNode) -> bigframe_node.BigFrameNode: diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index 33ca3b0a4ce9..67566884565a 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -4678,12 +4678,10 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: if na_action not in {None, "ignore"}: raise ValueError(f"na_action={na_action} not supported") + op = ops.func_to_op(func) + # TODO(shobs): Support **kwargs - return self._apply_unary_op( - ops.RemoteFunctionOp( - function_def=func.udf_def, apply_on_null=(na_action is None) - ) - ) + return self._apply_nary_op(op) def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # In Bigframes BigQuery function, DataFrame '.apply' method is specifically @@ -4754,17 +4752,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) # Apply the function - if args: - result_series = rows_as_json_series._apply_nary_op( - ops.NaryRemoteFunctionOp(function_def=func.udf_def), - list(args), - ) - else: - result_series = rows_as_json_series._apply_unary_op( - ops.RemoteFunctionOp( - function_def=func.udf_def, apply_on_null=True - ) - ) + result_series = rows_as_json_series._apply_nary_op( + ops.func_to_op(func), + list(args), + ) + else: # This is a special case where we are providing not-pandas-like # extension. If the bigquery function can take one or more @@ -4822,7 +4814,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): series_list = [self[col] for col in self.columns] op_list = series_list[1:] + list(args) result_series = series_list[0]._apply_nary_op( - ops.NaryRemoteFunctionOp(function_def=func.udf_def), op_list + ops.func_to_op(func), op_list ) result_series.name = None diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index b02fb665c3f7..89b52e42a319 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -20,6 +20,7 @@ import inspect import sys import threading +import time import warnings from typing import ( TYPE_CHECKING, @@ -194,14 +195,13 @@ def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str): with self._artifacts_lock: self._temp_artifacts[bqrf_routine] = gcf_path - def deploy_undeployed_udf( + def _deploy_udf( self, session: Session, bq_udf: udf_def.PythonUdf, ) -> udf_def.BigqueryUdf: """Deploys a UDF to BigQuery if not already deployed.""" udf_hash = bq_udf.stable_hash() - import time bigquery_client = self._resolve_bigquery_client(session, None) bq_connection_manager = session.bqconnectionmanager diff --git a/packages/bigframes/bigframes/operations/__init__.py b/packages/bigframes/bigframes/operations/__init__.py index dd036bec5a26..e85bafcbad29 100644 --- a/packages/bigframes/bigframes/operations/__init__.py +++ b/packages/bigframes/bigframes/operations/__init__.py @@ -183,9 +183,9 @@ ) from bigframes.operations.numpy_op_maps import NUMPY_TO_BINOP, NUMPY_TO_OP from bigframes.operations.remote_function_ops import ( - BinaryRemoteFunctionOp, - NaryRemoteFunctionOp, + PythonUdfOp, RemoteFunctionOp, + func_to_op, ) from bigframes.operations.string_ops import ( EndsWithOp, @@ -375,9 +375,8 @@ "StructFieldOp", "StructOp", # Remote Functions ops - "BinaryRemoteFunctionOp", - "NaryRemoteFunctionOp", "RemoteFunctionOp", + "PythonUdfOp", # Frequency ops "DatetimeToIntegerLabelOp", "FloorDtOp", @@ -436,6 +435,8 @@ "AIIf", "AIScore", "AISimilarity", + # Helper functions + "func_to_op", # Numpy ops mapping "NUMPY_TO_BINOP", "NUMPY_TO_OP", diff --git a/packages/bigframes/bigframes/operations/remote_function_ops.py b/packages/bigframes/bigframes/operations/remote_function_ops.py index 9c51210df0e7..a19cd43007a0 100644 --- a/packages/bigframes/bigframes/operations/remote_function_ops.py +++ b/packages/bigframes/bigframes/operations/remote_function_ops.py @@ -19,12 +19,10 @@ from bigframes.operations import base_ops -# TODO: Enforce input type constraints from function def -@dataclasses.dataclass(frozen=True) -class RemoteFunctionOp(base_ops.UnaryOp): - name: typing.ClassVar[str] = "remote_function" - function_def: udf_def.BigqueryUdf - apply_on_null: bool +@dataclasses.dataclass +class PythonUdfOp(base_ops.NaryOp): + name: typing.ClassVar[str] = "python_udf" + function_def: udf_def.PythonUdf @property def expensive(self) -> bool: @@ -35,21 +33,8 @@ def output_type(self, *input_types): @dataclasses.dataclass(frozen=True) -class BinaryRemoteFunctionOp(base_ops.BinaryOp): - name: typing.ClassVar[str] = "binary_remote_function" - function_def: udf_def.BigqueryUdf - - @property - def expensive(self) -> bool: - return True - - def output_type(self, *input_types): - return self.function_def.signature.output.bf_type - - -@dataclasses.dataclass(frozen=True) -class NaryRemoteFunctionOp(base_ops.NaryOp): - name: typing.ClassVar[str] = "nary_remote_function" +class RemoteFunctionOp(base_ops.NaryOp): + name: typing.ClassVar[str] = "remote_function" function_def: udf_def.BigqueryUdf @property diff --git a/packages/bigframes/bigframes/operations/to_op.py b/packages/bigframes/bigframes/operations/to_op.py new file mode 100644 index 000000000000..a177ea7b423e --- /dev/null +++ b/packages/bigframes/bigframes/operations/to_op.py @@ -0,0 +1,39 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bigframes.functions.udf_def import BigqueryUdf, PythonUdf +from bigframes.operations import base_ops, remote_function_ops + + +def func_to_op(op) -> base_ops.NaryOp: + """ + Convert various bigframes, python objects into a bigframes operations. + + This should handle anything that might be passed to eg map, combine, other pandas methods that take a function. + + It should raise a TypeError if the object is not a supported type. + + Args: + op: The object to convert. + + Returns: + A bigframes operations. + """ + # TODO: Handle numpy ufuncs, builtin functions, etc. + if isinstance(op, BigqueryUdf): + return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) + elif isinstance(op, PythonUdf): + return remote_function_ops.PythonUdfOp(function_def=op.udf_def) + else: + raise TypeError(f"Unsupported function type: {op}") diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index 87c03395c753..5f462536e312 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -2039,17 +2039,10 @@ def apply( if isinstance(func, bigframes.functions.Udf): # We are working with bigquery function at this point - if args: - result_series = self._apply_nary_op( - ops.NaryRemoteFunctionOp(function_def=func.udf_def), args - ) - # TODO(jialuo): Investigate why `_apply_nary_op` drops the series - # `name`. Manually reassigning it here as a temporary fix. - result_series.name = self.name - else: - result_series = self._apply_unary_op( - ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True) - ) + result_series = self._apply_nary_op(ops.func_to_op(func), args) + # TODO(jialuo): Investigate why `_apply_nary_op` drops the series + # `name`. Manually reassigning it here as a temporary fix. + result_series.name = self.name return result_series @@ -2099,9 +2092,7 @@ def combine( ) if isinstance(func, bigframes.functions.Udf): - result_series = self._apply_binary_op( - other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) - ) + result_series = self._apply_binary_op(other, ops.func_to_op(func)) return result_series bf_op = python_ops.python_callable_to_op(func) diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index e50cd18a3c34..43426fcdee47 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -514,34 +514,10 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode async def _deploy_undeployed_udfs( self, plan: nodes.BigFrameNode ) -> nodes.BigFrameNode: - import dataclasses - - import bigframes.core.expression as expression - import bigframes.functions.udf_def as udf_def - import bigframes.operations as ops - - undeployed_udfs: list[udf_def.PythonUdf] = [] - for node in plan.unique_nodes(): - for expr in node._node_expressions: - for sub_expr in expr.walk(): - if isinstance(sub_expr, expression.OpExpression): - op = sub_expr.op - if isinstance( - op, - ( - ops.RemoteFunctionOp, - ops.BinaryRemoteFunctionOp, - ops.NaryRemoteFunctionOp, - ), - ): - func_def = op.function_def - if isinstance(func_def, udf_def.PythonUdf): - undeployed_udfs.append(func_def) - + undeployed_udfs = self._collect_udf_defs(plan) if not undeployed_udfs: return plan - # Deduplicate while preserving order seen = set() unique_undeployed_udfs = [] for udf in undeployed_udfs: @@ -553,30 +529,49 @@ async def _deploy_undeployed_udfs( deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf] = {} for udf in unique_undeployed_udfs: deployed_udf = await asyncio.to_thread( - session._function_session.deploy_undeployed_udf, + session._function_session._deploy_udf, session, udf, ) deployed_mapping[udf] = deployed_udf + return self._subsitute_temporary_functions(plan, deployed_mapping) + + def _collect_udf_defs(self, plan: nodes.BigFrameNode) -> list[udf_def.PythonUdf]: + udf_defs: list[udf_def.PythonUdf] = [] + for node in plan.unique_nodes(): + for expr in node._node_expressions: + for sub_expr in expr.walk(): + if isinstance(sub_expr, expression.OpExpression): + op = sub_expr.op + if isinstance( + op, + (ops.PythonUdfOp,), + ): + func_def = op.function_def + if isinstance(func_def, udf_def.PythonUdf): + udf_defs.append(func_def) + return udf_defs + + def _subsitute_temporary_functions( + self, + plan: nodes.BigFrameNode, + deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf], + ) -> nodes.BigFrameNode: # Now rewrite the plan using bottom_up to substitute the UDF definitions! def replace_in_expr(expr: expression.Expression) -> expression.Expression: def replace_step(e: expression.Expression) -> expression.Expression: if isinstance(e, expression.OpExpression): op = e.op - if isinstance( - op, - ( - ops.RemoteFunctionOp, - ops.BinaryRemoteFunctionOp, - ops.NaryRemoteFunctionOp, - ), - ): + if isinstance(op, ops.PythonUdfOp): func_def = op.function_def if func_def in deployed_mapping: new_func_def = deployed_mapping[func_def] new_op = dataclasses.replace(op, function_def=new_func_def) return dataclasses.replace(e, op=new_op) + raise ValueError( + f"UDF definition {func_def} not found in deployed mapping" + ) return e return expr.bottom_up(replace_step) diff --git a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py index a986ca270de0..fb5a9fd7ce84 100644 --- a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py +++ b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py @@ -172,63 +172,8 @@ def test_astype_json_invalid( def test_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): - bf_df = scalar_types_df[["int64_col"]] - function_def = udf_def.BigqueryUdf( - routine_ref=bigquery.RoutineReference.from_string( - "my_project.my_dataset.my_routine" - ), - signature=udf_def.UdfSignature( - inputs=( - udf_def.UdfArg( - "x", - udf_def.DirectScalarType(int), - ), - ), - output=udf_def.DirectScalarType(float), - ), - ) - ops_map = { - "apply_on_null_true": ops.RemoteFunctionOp( - function_def=function_def, apply_on_null=True - ).as_expr("int64_col"), - "apply_on_null_false": ops.RemoteFunctionOp( - function_def=function_def, apply_on_null=False - ).as_expr("int64_col"), - } - sql = utils._apply_ops_to_sql(bf_df, list(ops_map.values()), list(ops_map.keys())) - snapshot.assert_match(sql, "out.sql") - - -def test_binary_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): - bf_df = scalar_types_df[["int64_col", "float64_col"]] - op = ops.BinaryRemoteFunctionOp( - function_def=udf_def.BigqueryUdf( - routine_ref=bigquery.RoutineReference.from_string( - "my_project.my_dataset.my_routine" - ), - signature=udf_def.UdfSignature( - inputs=( - udf_def.UdfArg( - "x", - udf_def.DirectScalarType(int), - ), - udf_def.UdfArg( - "y", - udf_def.DirectScalarType(float), - ), - ), - output=udf_def.DirectScalarType(float), - ), - ) - ) - sql = utils._apply_binary_op(bf_df, op, "int64_col", "float64_col") - - snapshot.assert_match(sql, "out.sql") - - -def test_nary_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): bf_df = scalar_types_df[["int64_col", "float64_col", "string_col"]] - op = ops.NaryRemoteFunctionOp( + op = ops.RemoteFunctionOp( function_def=udf_def.BigqueryUdf( routine_ref=bigquery.RoutineReference.from_string( "my_project.my_dataset.my_routine" diff --git a/packages/bigframes/tests/unit/functions/test_remote_function.py b/packages/bigframes/tests/unit/functions/test_remote_function.py index 4490bf36caae..17c04b338385 100644 --- a/packages/bigframes/tests/unit/functions/test_remote_function.py +++ b/packages/bigframes/tests/unit/functions/test_remote_function.py @@ -75,74 +75,3 @@ def my_remote_func(x: int) -> int: # Test that the function would have been deployed somewhere. assert "my_custom_name" in deployed.bigframes_bigquery_function - - -def test_deferred_udf_execution(): - import google.cloud.bigquery - - import bigframes.functions.udf_def as udf_def - - session = mocks.create_bigquery_session() - - @session._function_session.udf(session=session) - def my_unnamed_udf(x: int) -> int: - return x * 2 - - # 1. Verify that no BQ query was executed to deploy the UDF during registration! - session._queries.clear() - assert len(session._queries) == 0 - - # 2. Verify that it created a PythonUdf - assert isinstance(my_unnamed_udf.udf_def, udf_def.PythonUdf) - - # 3. Verify that when calling the UDF via a query, it triggers the UDF deployment query! - import bigframes.core.expression as ex - import bigframes.core.nodes as nodes - import bigframes.operations as ops - - # Let's construct an expression using our UDF - udf_op = ops.RemoteFunctionOp( - function_def=my_unnamed_udf.udf_def, apply_on_null=False - ) - expr = ex.OpExpression(op=udf_op, inputs=(ex.const(5),)) - - class MockNode: - def __init__(self, exprs): - self._node_expressions = exprs - self.child_nodes = [] - - def unique_nodes(self): - yield self - - def bottom_up(self, transform): - return transform(self) - - def transform_exprs(self, fn): - return MockNode([fn(e) for e in self._node_expressions]) - - mock_node = MockNode([expr]) - - import asyncio - - # Deploy and replace definition in the plan - new_plan = asyncio.run( - session._executor._ibis_executor._deploy_undeployed_udfs(mock_node) - ) - - # Verify that the DDL to create the function was executed! - assert len(session._queries) > 0 - assert any("CREATE OR REPLACE FUNCTION" in q for q in session._queries) - - # 4. Verify that the definition in the plan has been replaced with BigqueryUdf - new_expr = new_plan._node_expressions[0] - new_op = new_expr.op - assert isinstance(new_op.function_def, udf_def.BigqueryUdf) - assert new_op.function_def.routine_ref is not None - - # 5. Verify memoization: Deploying the new plan again executes ZERO additional DDL queries! - session._queries.clear() - new_plan_2 = asyncio.run( - session._executor._ibis_executor._deploy_undeployed_udfs(new_plan) - ) - assert len(session._queries) == 0 - assert new_plan_2 == new_plan From 0bd172eaa948479e73a16ae549a05255431713f5 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 21 May 2026 22:31:50 +0000 Subject: [PATCH 04/10] fixes --- packages/bigframes/bigframes/core/blocks.py | 4 ++-- .../core/compile/ibis_compiler/scalar_op_registry.py | 7 +++---- .../core/compile/sqlglot/expressions/generic_ops.py | 7 +++---- packages/bigframes/bigframes/dataframe.py | 2 +- packages/bigframes/bigframes/operations/__init__.py | 2 +- .../bigframes/operations/remote_function_ops.py | 2 +- packages/bigframes/bigframes/operations/to_op.py | 10 ++++++---- .../bigframes/session/bq_caching_executor.py | 11 ++++++++++- 8 files changed, 27 insertions(+), 18 deletions(-) diff --git a/packages/bigframes/bigframes/core/blocks.py b/packages/bigframes/bigframes/core/blocks.py index b9a246fc0360..6506b56a8bf8 100644 --- a/packages/bigframes/bigframes/core/blocks.py +++ b/packages/bigframes/bigframes/core/blocks.py @@ -1091,9 +1091,9 @@ def multi_apply_window_op( def multi_apply_unary_op( self, - op: Union[ops.UnaryOp, ex.Expression], + op: Union[ops.UnaryOp, ops.NaryOp, ex.Expression], ) -> Block: - if isinstance(op, ops.UnaryOp): + if isinstance(op, (ops.UnaryOp, ops.NaryOp)): input_varname = guid.generate_guid() expr = op.as_expr(ex.free_var(input_varname)) else: diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index b5b1e515cefa..5172d1e7c602 100644 --- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1034,8 +1034,8 @@ def timedelta_floor_op_impl(x: ibis_types.NumericValue): return ibis_api.case().when(x > ibis.literal(0), x.floor()).else_(x.ceil()).end() -@scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) -def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): +@scalar_op_compiler.register_nary_op(ops.RemoteFunctionOp, pass_op=True) +def remote_function_op_impl(*values: ibis_types.Value, op: ops.RemoteFunctionOp): udf_sig = op.function_def.signature assert not udf_sig.is_virtual # should have been devirtualized in lowering pass ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @@ -1048,8 +1048,7 @@ def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): ) def udf(*inputs): ... - result = udf(*operands) - return result + return udf(*values) @scalar_op_compiler.register_unary_op(ops.MapOp, pass_op=True) diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py b/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py index 1e9cc7c8aede..d78c18f01499 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/expressions/generic_ops.py @@ -186,10 +186,9 @@ def _get_remote_function_name(op): ) -@register_unary_op(ops.RemoteFunctionOp, pass_op=True) -def _(expr: TypedExpr, op: ops.RemoteFunctionOp) -> sge.Expression: - func_name = _get_remote_function_name(op) - return sge.func(func_name, *(operand.expr for operand in operands)) +@register_nary_op(ops.RemoteFunctionOp, pass_op=True) +def _(*values: TypedExpr, op: ops.RemoteFunctionOp) -> sge.Expression: + return sge.func(_get_remote_function_name(op), *(value.expr for value in values)) @register_nary_op(ops.case_when_op) diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index 67566884565a..bef26b5d5f30 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -4681,7 +4681,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: op = ops.func_to_op(func) # TODO(shobs): Support **kwargs - return self._apply_nary_op(op) + return self._apply_unary_op(op) def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # In Bigframes BigQuery function, DataFrame '.apply' method is specifically diff --git a/packages/bigframes/bigframes/operations/__init__.py b/packages/bigframes/bigframes/operations/__init__.py index e85bafcbad29..11d0ef28f075 100644 --- a/packages/bigframes/bigframes/operations/__init__.py +++ b/packages/bigframes/bigframes/operations/__init__.py @@ -185,7 +185,6 @@ from bigframes.operations.remote_function_ops import ( PythonUdfOp, RemoteFunctionOp, - func_to_op, ) from bigframes.operations.string_ops import ( EndsWithOp, @@ -230,6 +229,7 @@ timestamp_add_op, timestamp_sub_op, ) +from bigframes.operations.to_op import func_to_op __all__ = [ # Base ops diff --git a/packages/bigframes/bigframes/operations/remote_function_ops.py b/packages/bigframes/bigframes/operations/remote_function_ops.py index a19cd43007a0..3ce77d51c615 100644 --- a/packages/bigframes/bigframes/operations/remote_function_ops.py +++ b/packages/bigframes/bigframes/operations/remote_function_ops.py @@ -19,7 +19,7 @@ from bigframes.operations import base_ops -@dataclasses.dataclass +@dataclasses.dataclass(frozen=True) class PythonUdfOp(base_ops.NaryOp): name: typing.ClassVar[str] = "python_udf" function_def: udf_def.PythonUdf diff --git a/packages/bigframes/bigframes/operations/to_op.py b/packages/bigframes/bigframes/operations/to_op.py index a177ea7b423e..c139541470d1 100644 --- a/packages/bigframes/bigframes/operations/to_op.py +++ b/packages/bigframes/bigframes/operations/to_op.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from bigframes.functions import Udf from bigframes.functions.udf_def import BigqueryUdf, PythonUdf from bigframes.operations import base_ops, remote_function_ops @@ -31,9 +32,10 @@ def func_to_op(op) -> base_ops.NaryOp: A bigframes operations. """ # TODO: Handle numpy ufuncs, builtin functions, etc. - if isinstance(op, BigqueryUdf): - return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) - elif isinstance(op, PythonUdf): - return remote_function_ops.PythonUdfOp(function_def=op.udf_def) + if isinstance(op, Udf): + if isinstance(op.udf_def, BigqueryUdf): + return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) + elif isinstance(op.udf_def, PythonUdf): + return remote_function_ops.PythonUdfOp(function_def=op.udf_def) else: raise TypeError(f"Unsupported function type: {op}") diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 43426fcdee47..fb2113bc3cdd 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -35,6 +35,7 @@ import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes +import bigframes.operations as ops import bigframes.session._io.bigquery as bq_io import bigframes.session.execution_cache as execution_cache import bigframes.session.execution_spec as ex_spec @@ -42,7 +43,15 @@ import bigframes.session.planner import bigframes.session.temporary_storage from bigframes._config import ComputeOptions -from bigframes.core import bq_data, compile, guid, identifiers, local_data, rewrite +from bigframes.core import ( + bq_data, + compile, + expression, + guid, + identifiers, + local_data, + rewrite, +) from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir from bigframes.session import ( From e2b0fd0bc96cb3cf21d0a167b0f3e8c3f82dfc5e Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 May 2026 01:48:49 +0000 Subject: [PATCH 05/10] fixes --- packages/bigframes/bigframes/core/blocks.py | 2 +- packages/bigframes/bigframes/dataframe.py | 10 ++++-- .../bigframes/functions/_function_session.py | 32 +++++++++---------- packages/bigframes/bigframes/series.py | 6 +++- .../bigframes/session/bq_caching_executor.py | 27 ++++------------ .../test_remote_function_op/out.sql | 7 +--- 6 files changed, 36 insertions(+), 48 deletions(-) diff --git a/packages/bigframes/bigframes/core/blocks.py b/packages/bigframes/bigframes/core/blocks.py index 6506b56a8bf8..33f5aaab5c7d 100644 --- a/packages/bigframes/bigframes/core/blocks.py +++ b/packages/bigframes/bigframes/core/blocks.py @@ -1098,7 +1098,7 @@ def multi_apply_unary_op( expr = op.as_expr(ex.free_var(input_varname)) else: input_varnames = op.free_variables - assert len(input_varnames) == 1 + assert len(set(input_varnames)) == 1 expr = op input_varname = input_varnames[0] diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index bef26b5d5f30..960aab25c355 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -4678,10 +4678,14 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: if na_action not in {None, "ignore"}: raise ValueError(f"na_action={na_action} not supported") - op = ops.func_to_op(func) + expr = ops.func_to_op(func).as_expr(ex.free_var("input")) + if na_action == "ignore": + # True case, predicate, False case + expr = ops.where_op.as_expr( + expr, ops.notnull_op.as_expr(ex.free_var("input")), ex.const(None) + ) - # TODO(shobs): Support **kwargs - return self._apply_unary_op(op) + return DataFrame(self._block.multi_apply_unary_op(expr)) def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # In Bigframes BigQuery function, DataFrame '.apply' method is specifically diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 89b52e42a319..87838246b0d2 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -757,8 +757,6 @@ def udf( max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, container_memory: Optional[str] = None, - *, - _force_deploy: bool = False, ): """Decorator to turn a Python user defined function (udf) into a BigQuery managed function. @@ -922,6 +920,9 @@ def wrapper(func): max_batching_rows=max_batching_rows, packages=tuple(packages) if packages else (), ) + if udf_sig.is_row_processor: + msg = bfe.format_message("input_types=Series is in preview.") + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) if ( not name and not _force_deploy @@ -931,7 +932,8 @@ def wrapper(func): code=code_def, requirements=requirements, ) - else: + return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) + else: # deploy immediately bq_function_name = ( managed_function_client.provision_bq_managed_function( name=name, @@ -947,21 +949,17 @@ def wrapper(func): routine_ref=bigquery.RoutineReference.from_string(full_rf_name), signature=udf_sig, ) - - if udf_sig.is_row_processor: - msg = bfe.format_message("input_types=Series is in preview.") - warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) - - if not name: # session-owned resource - will be cleaned up automatically - if _force_deploy: + if name is None: + # Null name means anonymous, session-owned resource with force deploy. + # Unnamed resources are owned by the session and will be cleaned up automatically. self._update_temp_artifacts(full_rf_name, "") - return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) - - # user-managed permanent resource - will not be cleaned up automatically - else: - return bq_functions.BigqueryCallableRoutine( - udf_definition, session, local_func=func, is_managed=True - ) + return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) + else: + # user-managed permanent resource - will not be cleaned up automatically + # provide richer handle for backwards compatibility + return bq_functions.BigqueryCallableRoutine( + udf_definition, session, local_func=func, is_managed=True + ) return wrapper diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index 5f462536e312..469ddb8ac276 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -2092,7 +2092,11 @@ def combine( ) if isinstance(func, bigframes.functions.Udf): - result_series = self._apply_binary_op(other, ops.func_to_op(func)) + result_series = self._apply_nary_op(ops.func_to_op(func), (other,)) + if hasattr(other, "name") and other.name != self._name: # type: ignore + result_series.name = None + else: + result_series.name = self.name return result_series bf_op = python_ops.python_callable_to_op(func) diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index fb2113bc3cdd..60f91a015e9d 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -54,6 +54,7 @@ ) from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir +from bigframes.functions import udf_def from bigframes.session import ( direct_gbq_execution, executor, @@ -523,20 +524,10 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode async def _deploy_undeployed_udfs( self, plan: nodes.BigFrameNode ) -> nodes.BigFrameNode: - undeployed_udfs = self._collect_udf_defs(plan) - if not undeployed_udfs: - return plan - - seen = set() - unique_undeployed_udfs = [] - for udf in undeployed_udfs: - if udf not in seen: - seen.add(udf) - unique_undeployed_udfs.append(udf) - + referenced_udfs = self._collect_udf_defs(plan) session = self.loader._session deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf] = {} - for udf in unique_undeployed_udfs: + for udf in set(referenced_udfs): deployed_udf = await asyncio.to_thread( session._function_session._deploy_udf, session, @@ -553,10 +544,7 @@ def _collect_udf_defs(self, plan: nodes.BigFrameNode) -> list[udf_def.PythonUdf] for sub_expr in expr.walk(): if isinstance(sub_expr, expression.OpExpression): op = sub_expr.op - if isinstance( - op, - (ops.PythonUdfOp,), - ): + if isinstance(op, ops.PythonUdfOp): func_def = op.function_def if isinstance(func_def, udf_def.PythonUdf): udf_defs.append(func_def) @@ -567,7 +555,6 @@ def _subsitute_temporary_functions( plan: nodes.BigFrameNode, deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf], ) -> nodes.BigFrameNode: - # Now rewrite the plan using bottom_up to substitute the UDF definitions! def replace_in_expr(expr: expression.Expression) -> expression.Expression: def replace_step(e: expression.Expression) -> expression.Expression: if isinstance(e, expression.OpExpression): @@ -575,9 +562,9 @@ def replace_step(e: expression.Expression) -> expression.Expression: if isinstance(op, ops.PythonUdfOp): func_def = op.function_def if func_def in deployed_mapping: - new_func_def = deployed_mapping[func_def] - new_op = dataclasses.replace(op, function_def=new_func_def) - return dataclasses.replace(e, op=new_op) + deployed_func = deployed_mapping[func_def] + rf_op = ops.RemoteFunctionOp(function_def=deployed_func) + return dataclasses.replace(e, op=rf_op) raise ValueError( f"UDF definition {func_def} not found in deployed mapping" ) diff --git a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql index 1854c0258825..a1977d809f70 100644 --- a/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql +++ b/packages/bigframes/tests/unit/core/compile/sqlglot/expressions/snapshots/test_generic_ops/test_remote_function_op/out.sql @@ -1,8 +1,3 @@ SELECT - `my_project`.`my_dataset`.`my_routine`(`int64_col`) AS `apply_on_null_true`, - IF( - `int64_col` IS NULL, - `int64_col`, - `my_project`.`my_dataset`.`my_routine`(`int64_col`) - ) AS `apply_on_null_false` + `my_project`.`my_dataset`.`my_routine`(`int64_col`, `float64_col`, `string_col`) AS `int64_col` FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` AS `bft_0` \ No newline at end of file From b1583cbb722c0295a190366b8b5b1a42d9a93ebc Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 May 2026 01:57:21 +0000 Subject: [PATCH 06/10] fix _force_deploy flag --- packages/bigframes/bigframes/functions/_function_session.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 87838246b0d2..2b241ad266c5 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -757,6 +757,8 @@ def udf( max_batching_rows: Optional[int] = None, container_cpu: Optional[float] = None, container_memory: Optional[str] = None, + *, + _force_deploy: bool = False, ): """Decorator to turn a Python user defined function (udf) into a BigQuery managed function. From d248c3def658567ed171e9848c3560ccd080089a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 May 2026 02:05:46 +0000 Subject: [PATCH 07/10] executor owns function manager --- .../bigframes/session/bq_caching_executor.py | 17 ++++++++++++----- .../bigframes/session/proxy_executor.py | 6 ++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 60f91a015e9d..ee005c4cd084 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -63,6 +63,8 @@ read_api_execution, semi_executor, ) +import bigframes.functions._function_session as bff_session + # Max complexity that should be executed as a single query QUERY_COMPLEXITY_LIMIT = 1e7 @@ -131,6 +133,7 @@ def __init__( labels: tuple[tuple[str, str], ...] = (), compiler_name: Literal["ibis", "sqlglot"] = "sqlglot", cache: Optional[execution_cache.ExecutionCache] = None, + function_manager: bff_session.FunctionSession, ): self.bqclient = bqclient self.storage_manager = storage_manager @@ -166,6 +169,7 @@ def __init__( publisher=self._publisher, labels=dict(labels), ) + self._function_manager = function_manager def to_sql( self, @@ -524,16 +528,19 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode async def _deploy_undeployed_udfs( self, plan: nodes.BigFrameNode ) -> nodes.BigFrameNode: - referenced_udfs = self._collect_udf_defs(plan) + referenced_udfs = list(set(self._collect_udf_defs(plan))) session = self.loader._session deployed_mapping: dict[udf_def.PythonUdf, udf_def.BigqueryUdf] = {} - for udf in set(referenced_udfs): - deployed_udf = await asyncio.to_thread( - session._function_session._deploy_udf, + tasks = [ + asyncio.to_thread( + self._function_manager._deploy_udf, session, udf, ) - deployed_mapping[udf] = deployed_udf + for udf in referenced_udfs + ] + results = await asyncio.gather(*tasks) + deployed_mapping = dict(zip(referenced_udfs, results)) return self._subsitute_temporary_functions(plan, deployed_mapping) diff --git a/packages/bigframes/bigframes/session/proxy_executor.py b/packages/bigframes/bigframes/session/proxy_executor.py index c4fe6584bd27..5645095db878 100644 --- a/packages/bigframes/bigframes/session/proxy_executor.py +++ b/packages/bigframes/bigframes/session/proxy_executor.py @@ -33,6 +33,8 @@ metrics, temporary_storage, ) +import bigframes.functions._function_session as bff_session + _COMPILER_LABEL_KEY = "bigframes-compiler" @@ -52,6 +54,8 @@ def __init__( metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, enable_polars_execution: bool = False, publisher: bigframes.core.events.Publisher, + function_manager: bff_session.FunctionSession, + labels: tuple[tuple[str, str], ...] = (), ): self._enable_polars_execution = enable_polars_execution @@ -67,6 +71,7 @@ def __init__( labels=labels, cache=shared_cache, compiler_name="ibis", + function_manager=function_manager, ) self._sqlglot_executor = bq_caching_executor.BigQueryCachingExecutor( bqclient, @@ -79,6 +84,7 @@ def __init__( labels=labels, cache=shared_cache, compiler_name="sqlglot", + function_manager=function_manager, ) def to_sql( From ffed98f7fd81d63dd6516e6966018ab55f4199f8 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 May 2026 02:06:01 +0000 Subject: [PATCH 08/10] lint --- packages/bigframes/bigframes/session/bq_caching_executor.py | 3 +-- packages/bigframes/bigframes/session/proxy_executor.py | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index ee005c4cd084..77723c1bbd2a 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -35,6 +35,7 @@ import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes +import bigframes.functions._function_session as bff_session import bigframes.operations as ops import bigframes.session._io.bigquery as bq_io import bigframes.session.execution_cache as execution_cache @@ -63,8 +64,6 @@ read_api_execution, semi_executor, ) -import bigframes.functions._function_session as bff_session - # Max complexity that should be executed as a single query QUERY_COMPLEXITY_LIMIT = 1e7 diff --git a/packages/bigframes/bigframes/session/proxy_executor.py b/packages/bigframes/bigframes/session/proxy_executor.py index 5645095db878..615823822c71 100644 --- a/packages/bigframes/bigframes/session/proxy_executor.py +++ b/packages/bigframes/bigframes/session/proxy_executor.py @@ -23,6 +23,7 @@ import google.cloud.exceptions import bigframes.core +import bigframes.functions._function_session as bff_session from bigframes import exceptions as bfe from bigframes.session import ( bq_caching_executor, @@ -33,8 +34,6 @@ metrics, temporary_storage, ) -import bigframes.functions._function_session as bff_session - _COMPILER_LABEL_KEY = "bigframes-compiler" @@ -55,7 +54,6 @@ def __init__( enable_polars_execution: bool = False, publisher: bigframes.core.events.Publisher, function_manager: bff_session.FunctionSession, - labels: tuple[tuple[str, str], ...] = (), ): self._enable_polars_execution = enable_polars_execution From b0a219d55cbf7356e7bc4db063d73e0bdc232697 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 May 2026 02:08:42 +0000 Subject: [PATCH 09/10] always give rich ref for deployed funcs --- .../bigframes/functions/_function_session.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 2b241ad266c5..1c6aa4b5e24a 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -955,13 +955,10 @@ def wrapper(func): # Null name means anonymous, session-owned resource with force deploy. # Unnamed resources are owned by the session and will be cleaned up automatically. self._update_temp_artifacts(full_rf_name, "") - return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) - else: - # user-managed permanent resource - will not be cleaned up automatically - # provide richer handle for backwards compatibility - return bq_functions.BigqueryCallableRoutine( - udf_definition, session, local_func=func, is_managed=True - ) + + return bq_functions.BigqueryCallableRoutine( + udf_definition, session, local_func=func, is_managed=True + ) return wrapper From f1dc42d60e4f324618818f112d95e7a0ebccd793 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 May 2026 17:54:41 +0000 Subject: [PATCH 10/10] fixes --- .../bigframes/bigframes/functions/_function_session.py | 4 ++-- packages/bigframes/bigframes/session/__init__.py | 1 + .../bigframes/tests/unit/session/test_proxy_executor.py | 8 +++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 1c6aa4b5e24a..281785fbea67 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -947,7 +947,7 @@ def wrapper(func): bq_function_name ) ) - udf_definition = udf_def.BigqueryUdf( + rf_def = udf_def.BigqueryUdf( routine_ref=bigquery.RoutineReference.from_string(full_rf_name), signature=udf_sig, ) @@ -957,7 +957,7 @@ def wrapper(func): self._update_temp_artifacts(full_rf_name, "") return bq_functions.BigqueryCallableRoutine( - udf_definition, session, local_func=func, is_managed=True + rf_def, session, local_func=func, is_managed=True ) return wrapper diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 92e032bc31e4..d1bdc3854e46 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -338,6 +338,7 @@ def __init__( enable_polars_execution=context.enable_polars_execution, publisher=self._publisher, labels=tuple(labels.items()), + function_manager=self._function_session, ) def __del__(self): diff --git a/packages/bigframes/tests/unit/session/test_proxy_executor.py b/packages/bigframes/tests/unit/session/test_proxy_executor.py index a1a8f168995b..7aa549934446 100644 --- a/packages/bigframes/tests/unit/session/test_proxy_executor.py +++ b/packages/bigframes/tests/unit/session/test_proxy_executor.py @@ -33,8 +33,14 @@ def mock_executor(): bqstoragereadclient = mock.Mock() loader = mock.Mock() publisher = mock.Mock() + function_manager = mock.Mock() return DualCompilerProxyExecutor( - bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher + bqclient, + storage_manager, + bqstoragereadclient, + loader, + publisher=publisher, + function_manager=function_manager, )