diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 197e0a83b5..6c9c04dca7 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -22,9 +22,13 @@ import typing +import bigframes.constants as constants +import bigframes.core.groupby as groupby import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops if typing.TYPE_CHECKING: + import bigframes.dataframe as dataframe import bigframes.series as series @@ -52,9 +56,66 @@ def array_length(series: series.Series) -> series.Series: 2 2 dtype: Int64 + Args: + series (bigframes.series.Series): + A Series with array columns. + Returns: bigframes.series.Series: A Series of integer values indicating the length of each element in the Series. """ return series._apply_unary_op(ops.len_op) + + +def array_agg( + obj: groupby.SeriesGroupBy | groupby.DataFrameGroupBy, +) -> series.Series | dataframe.DataFrame: + """Group data and create arrays from selected columns, omitting NULLs to avoid + BigQuery errors (NULLs not allowed in arrays). + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import bigframes.bigquery as bbq + >>> import numpy as np + >>> bpd.options.display.progress_bar = None + + For a SeriesGroupBy object: + + >>> lst = ['a', 'a', 'b', 'b', 'a'] + >>> s = bpd.Series([1, 2, 3, 4, np.nan], index=lst) + >>> bbq.array_agg(s.groupby(level=0)) + a [1. 2.] + b [3. 4.] + dtype: list[pyarrow] + + For a DataFrameGroupBy object: + + >>> l = [[1, 2, 3], [1, None, 4], [2, 1, 3], [1, 2, 2]] + >>> df = bpd.DataFrame(l, columns=["a", "b", "c"]) + >>> bbq.array_agg(df.groupby(by=["b"])) + a c + b + 1.0 [2] [3] + 2.0 [1 1] [3 2] + + [2 rows x 2 columns] + + Args: + obj (groupby.SeriesGroupBy | groupby.DataFrameGroupBy): + A GroupBy object to be applied the function. + + Returns: + bigframes.series.Series | bigframes.dataframe.DataFrame: A Series or + DataFrame containing aggregated array columns, and indexed by the + original group columns. + """ + if isinstance(obj, groupby.SeriesGroupBy): + return obj._aggregate(agg_ops.ArrayAggOp()) + elif isinstance(obj, groupby.DataFrameGroupBy): + return obj._aggregate_all(agg_ops.ArrayAggOp(), numeric_only=False) + else: + raise ValueError( + f"Unsupported type {type(obj)} to apply `array_agg` function. {constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 98d296c779..c0b0562a54 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -34,13 +34,14 @@ def compile_aggregate( aggregate: ex.Aggregation, bindings: typing.Dict[str, ibis_types.Value], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) - return compile_unary_agg( - aggregate.op, - input, - ) + if aggregate.op.can_order_by: + return compile_ordered_unary_agg(aggregate.op, input, order_by=order_by) + else: + return compile_unary_agg(aggregate.op, input) elif isinstance(aggregate, ex.BinaryAggregation): left = scalar_compiler.compile_expression(aggregate.left, bindings=bindings) right = scalar_compiler.compile_expression(aggregate.right, bindings=bindings) @@ -66,7 +67,8 @@ def compile_analytic( @functools.singledispatch def compile_binary_agg( op: agg_ops.WindowOp, - input: ibis_types.Column, + left: ibis_types.Column, + right: ibis_types.Column, window: Optional[window_spec.WindowSpec] = None, ) -> ibis_types.Value: raise ValueError(f"Can't compile unrecognized operation: {op}") @@ -81,9 +83,24 @@ def compile_unary_agg( raise ValueError(f"Can't compile unrecognized operation: {op}") +@functools.singledispatch +def compile_ordered_unary_agg( + op: agg_ops.WindowOp, + input: ibis_types.Column, + window: Optional[window_spec.WindowSpec] = None, + order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: + raise ValueError(f"Can't compile unrecognized operation: {op}") + + def numeric_op(operation): @functools.wraps(operation) - def constrained_op(op, column: ibis_types.Column, window=None): + def constrained_op( + op, + column: ibis_types.Column, + window=None, + order_by: typing.Sequence[ibis_types.Value] = [], + ): if column.type().is_boolean(): column = typing.cast( ibis_types.NumericColumn, column.cast(ibis_dtypes.int64) @@ -104,7 +121,9 @@ def constrained_op(op, column: ibis_types.Column, window=None): @compile_unary_agg.register @numeric_op def _( - op: agg_ops.SumOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.SumOp, + column: ibis_types.NumericColumn, + window=None, ) -> ibis_types.NumericValue: # Will be null if all inputs are null. Pandas defaults to zero sum though. bq_sum = _apply_window_if_present(column.sum(), window) @@ -116,7 +135,9 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.MedianOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.MedianOp, + column: ibis_types.NumericColumn, + window=None, ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -134,7 +155,9 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.ApproxQuartilesOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.ApproxQuartilesOp, + column: ibis_types.NumericColumn, + window=None, ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -151,7 +174,9 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.QuantileOp, + column: ibis_types.NumericColumn, + window=None, ) -> ibis_types.NumericValue: return _apply_window_if_present(column.quantile(op.q), window) @@ -159,7 +184,10 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.MeanOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.MeanOp, + column: ibis_types.NumericColumn, + window=None, + # order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.mean(), window) @@ -167,7 +195,9 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.ProductOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.ProductOp, + column: ibis_types.NumericColumn, + window=None, ) -> ibis_types.NumericValue: # Need to short-circuit as log with zeroes is illegal sql is_zero = cast(ibis_types.BooleanColumn, (column == 0)) @@ -202,30 +232,50 @@ def _( @compile_unary_agg.register -def _(op: agg_ops.MaxOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.MaxOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.Value: return _apply_window_if_present(column.max(), window) @compile_unary_agg.register -def _(op: agg_ops.MinOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.MinOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.Value: return _apply_window_if_present(column.min(), window) @compile_unary_agg.register @numeric_op -def _(op: agg_ops.StdOp, x: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.StdOp, + x: ibis_types.Column, + window=None, +) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).std(), window) @compile_unary_agg.register @numeric_op -def _(op: agg_ops.VarOp, x: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.VarOp, + x: ibis_types.Column, + window=None, +) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).var(), window) @compile_unary_agg.register @numeric_op -def _(op: agg_ops.PopVarOp, x: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.PopVarOp, + x: ibis_types.Column, + window=None, +) -> ibis_types.Value: return _apply_window_if_present( cast(ibis_types.NumericColumn, x).var(how="pop"), window ) @@ -233,13 +283,19 @@ def _(op: agg_ops.PopVarOp, x: ibis_types.Column, window=None) -> ibis_types.Val @compile_unary_agg.register def _( - op: agg_ops.CountOp, column: ibis_types.Column, window=None + op: agg_ops.CountOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.count(), window) @compile_unary_agg.register -def _(op: agg_ops.CutOp, x: ibis_types.Column, window=None): +def _( + op: agg_ops.CutOp, + x: ibis_types.Column, + window=None, +): out = ibis.case() if isinstance(op.bins, int): col_min = _apply_window_if_present(x.min(), window) @@ -292,7 +348,9 @@ def _(op: agg_ops.CutOp, x: ibis_types.Column, window=None): @compile_unary_agg.register @numeric_op def _( - self: agg_ops.QcutOp, column: ibis_types.Column, window=None + self: agg_ops.QcutOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.IntegerValue: if isinstance(self.quantiles, int): quantiles_ibis = dtypes.literal_to_ibis_scalar(self.quantiles) @@ -322,21 +380,27 @@ def _( @compile_unary_agg.register def _( - op: agg_ops.NuniqueOp, column: ibis_types.Column, window=None + op: agg_ops.NuniqueOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.nunique(), window) @compile_unary_agg.register def _( - op: agg_ops.AnyValueOp, column: ibis_types.Column, window=None + op: agg_ops.AnyValueOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.arbitrary(), window) @compile_unary_agg.register def _( - op: agg_ops.RankOp, column: ibis_types.Column, window=None + op: agg_ops.RankOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(ibis.rank(), window) + 1 @@ -344,7 +408,9 @@ def _( @compile_unary_agg.register def _( - op: agg_ops.DenseRankOp, column: ibis_types.Column, window=None + op: agg_ops.DenseRankOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(column.dense_rank(), window) + 1 @@ -357,7 +423,9 @@ def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types @compile_unary_agg.register def _( - op: agg_ops.FirstNonNullOp, column: ibis_types.Column, window=None + op: agg_ops.FirstNonNullOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.FirstNonNullValue(column).to_expr(), window # type: ignore @@ -365,13 +433,19 @@ def _( @compile_unary_agg.register -def _(op: agg_ops.LastOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.LastOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.Value: return _apply_window_if_present(column.last(), window) @compile_unary_agg.register def _( - op: agg_ops.LastNonNullOp, column: ibis_types.Column, window=None + op: agg_ops.LastNonNullOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.LastNonNullValue(column).to_expr(), window # type: ignore @@ -379,7 +453,11 @@ def _( @compile_unary_agg.register -def _(op: agg_ops.ShiftOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.ShiftOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.Value: if op.periods == 0: # No-op return column if op.periods > 0: @@ -388,7 +466,11 @@ def _(op: agg_ops.ShiftOp, column: ibis_types.Column, window=None) -> ibis_types @compile_unary_agg.register -def _(op: agg_ops.DiffOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.DiffOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.Value: shifted = compile_unary_agg(agg_ops.ShiftOp(op.periods), column, window) if column.type().is_boolean(): return cast(ibis_types.BooleanColumn, column) != cast( @@ -404,7 +486,9 @@ def _(op: agg_ops.DiffOp, column: ibis_types.Column, window=None) -> ibis_types. @compile_unary_agg.register def _( - op: agg_ops.AllOp, column: ibis_types.Column, window=None + op: agg_ops.AllOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be true in pandas. result = _is_true(column).all() @@ -416,7 +500,9 @@ def _( @compile_unary_agg.register def _( - op: agg_ops.AnyOp, column: ibis_types.Column, window=None + op: agg_ops.AnyOp, + column: ibis_types.Column, + window=None, ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be false in pandas. result = _is_true(column).any() @@ -426,6 +512,31 @@ def _( ) +@compile_ordered_unary_agg.register +def _( + op: agg_ops.ArrayAggOp, + column: ibis_types.Column, + window=None, + order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.ArrayValue: + # BigQuery doesn't currently support using ARRAY_AGG with both window and aggregate + # functions simultaneously. Some aggregate functions (or its equivalent syntax) + # are more important, such as: + # - `IGNORE NULLS` are required to avoid an raised error if the final result + # contains a NULL element. + # - `ORDER BY` are required for the default ordering mode. + # To keep things simpler, windowing support is skipped for now. + if window is not None: + raise NotImplementedError( + f"ArrayAgg with windowing is not supported. {constants.FEEDBACK_LINK}" + ) + + return vendored_ibis_ops.ArrayAggregate( + column, + order_by=order_by, + ).to_expr() + + @compile_binary_agg.register def _( op: agg_ops.CorrOp, left: ibis_types.Column, right: ibis_types.Column, window=None diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index b57e0c4d35..1c2217c25a 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -102,6 +102,12 @@ def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: def _ibis_bindings(self) -> dict[str, ibis_types.Value]: return {col: self._get_ibis_column(col) for col in self.column_ids} + @property + @abc.abstractmethod + def is_ordered_ir(self: T) -> bool: + """Whether it is a OrderedIR or UnorderedIR.""" + ... + @abc.abstractmethod def filter(self: T, predicate: ex.Expression) -> T: """Filter the table on a given expression, the predicate must be a boolean expression.""" @@ -163,6 +169,53 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type), ) + def _aggregate_base( + self, + table: ibis_types.Table, + order_by: typing.Sequence[ibis_types.Value] = [], + aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]] = [], + by_column_ids: typing.Sequence[str] = (), + dropna: bool = True, + ) -> OrderedIR: + assert not self.is_ordered_ir or len(order_by) > 0 + + bindings = {col: table[col] for col in self.column_ids} + stats = { + col_out: agg_compiler.compile_aggregate( + aggregate, bindings, order_by=order_by + ) + for aggregate, col_out in aggregations + } + if by_column_ids: + result = table.group_by(by_column_ids).aggregate(**stats) + # Must have deterministic ordering, so order by the unique "by" column + ordering = ExpressionOrdering( + tuple([ascending_over(column_id) for column_id in by_column_ids]), + total_ordering_columns=frozenset(by_column_ids), + ) + columns = tuple(result[key] for key in result.columns) + expr = OrderedIR(result, columns=columns, ordering=ordering) + if dropna: + for column_id in by_column_ids: + expr = expr._filter(expr._get_ibis_column(column_id).notnull()) + return expr + else: + aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} + result = table.aggregate(**aggregates) + # Ordering is irrelevant for single-row output, but set ordering id regardless + # as other ops(join etc.) expect it. + # TODO: Maybe can make completely empty + ordering = ExpressionOrdering( + ordering_value_columns=tuple([]), + total_ordering_columns=frozenset([]), + ) + return OrderedIR( + result, + columns=[result[col_id] for col_id in [*stats.keys()]], + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], + ordering=ordering, + ) + # Ibis Implementations class UnorderedIR(BaseIbisIR): @@ -174,6 +227,10 @@ def __init__( ): super().__init__(table, columns, predicates) + @property + def is_ordered_ir(self) -> bool: + return False + def builder(self): """Creates a mutable builder for expressions.""" # Since ArrayValue is intended to be immutable (immutability offers @@ -310,44 +367,17 @@ def aggregate( Apply aggregations to the expression. Arguments: aggregations: input_column_id, operation, output_column_id tuples - by_column_id: column id of the aggregation key, this is preserved through the transform + by_column_ids: column ids of the aggregation key, this is preserved through + the transform dropna: whether null keys should be dropped + Returns: + OrderedIR: the grouping key is a unique-valued column and has ordering + information. """ table = self._to_ibis_expr() - bindings = {col: table[col] for col in self.column_ids} - stats = { - col_out: agg_compiler.compile_aggregate(aggregate, bindings) - for aggregate, col_out in aggregations - } - if by_column_ids: - result = table.group_by(by_column_ids).aggregate(**stats) - # Must have deterministic ordering, so order by the unique "by" column - ordering = ExpressionOrdering( - tuple([ascending_over(column_id) for column_id in by_column_ids]), - total_ordering_columns=frozenset(by_column_ids), - ) - columns = tuple(result[key] for key in result.columns) - expr = OrderedIR(result, columns=columns, ordering=ordering) - if dropna: - for column_id in by_column_ids: - expr = expr._filter(expr._get_ibis_column(column_id).notnull()) - # Can maybe remove this as Ordering id is redundant as by_column is unique after aggregation - return expr._project_offsets() - else: - aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} - result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. - # TODO: Maybe can make completely empty - ordering = ExpressionOrdering( - ordering_value_columns=tuple([]), - total_ordering_columns=frozenset([]), - ) - return OrderedIR( - result, - columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, - ) + return self._aggregate_base( + table, aggregations=aggregations, by_column_ids=by_column_ids, dropna=dropna + ) def _uniform_sampling(self, fraction: float) -> UnorderedIR: """Sampling the table on given fraction. @@ -526,6 +556,10 @@ def __init__( if not ordering_valid: raise ValueError(f"Illegal ordering keys: {ordering.all_ordering_columns}") + @property + def is_ordered_ir(self) -> bool: + return True + @classmethod def from_pandas( cls, @@ -535,7 +569,8 @@ def from_pandas( """ Builds an in-memory only (SQL only) expr from a pandas dataframe. - Assumed that the dataframe has unique string column names and bigframes-suppported dtypes. + Assumed that the dataframe has unique string column names and bigframes-suppported + dtypes. """ # ibis memtable cannot handle NA, must convert to None @@ -572,7 +607,8 @@ def _hidden_column_ids(self) -> typing.Sequence[str]: @property def _ibis_order(self) -> Sequence[ibis_types.Value]: - """Returns a sequence of ibis values which can be directly used to order a table expression. Has direction modifiers applied.""" + """Returns a sequence of ibis values which can be directly used to order a + table expression. Has direction modifiers applied.""" return _convert_ordering_to_table_values( {**self._column_names, **self._hidden_ordering_column_names}, self._ordering.all_ordering_columns, @@ -604,6 +640,44 @@ def reversed(self) -> OrderedIR: expr_builder.ordering = self._ordering.with_reverse() return expr_builder.build() + def aggregate( + self, + aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], + by_column_ids: typing.Sequence[str] = (), + dropna: bool = True, + ) -> OrderedIR: + """ + Apply aggregations to the expression. + Arguments: + aggregations: input_column_id, operation, output_column_id tuples + by_column_ids: column ids of the aggregation key, this is preserved through + the transform + dropna: whether null keys should be dropped + Returns: + OrderedIR + """ + table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) + + all_columns = { + column_name: table[column_name] + for column_name in { + **self._column_names, + **self._hidden_ordering_column_names, + } + } + order_by = _convert_ordering_to_table_values( + all_columns, + self._ordering.all_ordering_columns, + ) + + return self._aggregate_base( + table, + order_by=order_by, + aggregations=aggregations, + by_column_ids=by_column_ids, + dropna=dropna, + ) + def _uniform_sampling(self, fraction: float) -> OrderedIR: """Sampling the table on given fraction. @@ -1069,7 +1143,8 @@ def _bake_ordering(self) -> OrderedIR: ) def _project_offsets(self) -> OrderedIR: - """Create a new expression that contains offsets. Should only be executed when offsets are needed for an operations. Has no effect on expression semantics.""" + """Create a new expression that contains offsets. Should only be executed when + offsets are needed for an operations. Has no effect on expression semantics.""" if self._ordering.is_sequential: return self table = self._to_ibis_expr( diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index a68023d13d..a9908192f3 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -155,10 +155,18 @@ def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True): @_compile_node.register def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): - result = compile_unordered_ir(node.child).aggregate( - node.aggregations, node.by_column_ids, node.dropna + has_ordered_aggregation_ops = any( + aggregate.op.can_order_by for aggregate, _ in node.aggregations ) - return result if ordered else result.to_unordered() + if ordered and has_ordered_aggregation_ops: + return compile_ordered_ir(node.child).aggregate( + node.aggregations, node.by_column_ids, node.dropna + ) + else: + result = compile_unordered_ir(node.child).aggregate( + node.aggregations, node.by_column_ids, node.dropna + ) + return result if ordered else result.to_unordered() @_compile_node.register @@ -180,10 +188,10 @@ def compile_reproject(node: nodes.ReprojectOpNode, ordered: bool = True): @_compile_node.register -def compiler_explode(node: nodes.ExplodeNode, ordered: bool = True): +def compile_explode(node: nodes.ExplodeNode, ordered: bool = True): return compile_node(node.child, ordered).explode(node.column_ids) @_compile_node.register -def compiler_random_sample(node: nodes.RandomSampleNode, ordered: bool = True): +def compile_random_sample(node: nodes.RandomSampleNode, ordered: bool = True): return compile_node(node.child, ordered)._uniform_sampling(node.fraction) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index c57fac4112..3b5310554b 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -38,6 +38,10 @@ def uses_total_row_ordering(self): """Whether the operator needs total row ordering. (eg. lead, lag, array_agg)""" return False + @property + def can_order_by(self): + return False + @abc.abstractmethod def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: ... @@ -115,7 +119,7 @@ class QuantileOp(UnaryAggregateOp): @property def name(self): - return f"{int(self.q*100)}%" + return f"{int(self.q * 100)}%" def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: return signatures.UNARY_REAL_NUMERIC.output_type(input_types[0]) @@ -127,7 +131,7 @@ class ApproxQuartilesOp(UnaryAggregateOp): @property def name(self): - return f"{self.quartile*25}%" + return f"{self.quartile * 25}%" def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: if not dtypes.is_orderable(input_types[0]): @@ -222,6 +226,24 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ).output_type(input_types[0]) +@dataclasses.dataclass(frozen=True) +class ArrayAggOp(UnaryAggregateOp): + name: ClassVar[str] = "arrayagg" + + @property + def can_order_by(self): + return True + + @property + def skips_nulls(self): + return True + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + return pd.ArrowDtype( + pa.list_(dtypes.bigframes_dtype_to_arrow_dtype(input_types[0])) + ) + + @dataclasses.dataclass(frozen=True) class CutOp(UnaryWindowOp): # TODO: Unintuitive, refactor into multiple ops? diff --git a/tests/system/small/bigquery/test_array.py b/tests/system/small/bigquery/test_array.py index a91669cd88..0664c31a3c 100644 --- a/tests/system/small/bigquery/test_array.py +++ b/tests/system/small/bigquery/test_array.py @@ -14,6 +14,7 @@ import numpy as np import pandas as pd +import pytest import bigframes.bigquery as bbq import bigframes.pandas as bpd @@ -23,10 +24,118 @@ def test_array_length(): series = bpd.Series([["A", "AA", "AAA"], ["BB", "B"], np.nan, [], ["C"]]) # TODO(b/336880368): Allow for NULL values to be input for ARRAY columns. # Once we actually store NULL values, this will be NULL where the input is NULL. - expected = pd.Series([3, 2, 0, 0, 1]) + expected = bpd.Series([3, 2, 0, 0, 1]) pd.testing.assert_series_equal( bbq.array_length(series).to_pandas(), - expected, - check_dtype=False, - check_index_type=False, + expected.to_pandas(), + ) + + +@pytest.mark.parametrize( + ("input_data", "output_data"), + [ + pytest.param([1, 2, 3, 4, 5], [[1, 2], [3, 4], [5]], id="ints"), + pytest.param( + ["e", "d", "c", "b", "a"], + [["e", "d"], ["c", "b"], ["a"]], + id="reverse_strings", + ), + pytest.param( + [1.0, 2.0, np.nan, np.nan, np.nan], [[1.0, 2.0], [], []], id="nans" + ), + pytest.param( + [{"A": {"x": 1.0}}, {"A": {"z": 4.0}}, {}, {"B": "b"}, np.nan], + [[{"A": {"x": 1.0}}, {"A": {"z": 4.0}}], [{}, {"B": "b"}], []], + id="structs", + ), + ], +) +def test_array_agg_w_series_groupby(input_data, output_data): + input_index = ["a", "a", "b", "b", "c"] + series = bpd.Series(input_data, index=input_index) + result = bbq.array_agg(series.groupby(level=0)) + + expected = bpd.Series(output_data, index=["a", "b", "c"]) + pd.testing.assert_series_equal( + result.to_pandas(), # type: ignore + expected.to_pandas(), + ) + + +def test_array_agg_w_dataframe_groupby(): + data = { + "a": [1, 1, 2, 1], + "b": [2, None, 1, 2], + "c": [3, 4, 3, 2], + } + df = bpd.DataFrame(data) + result = bbq.array_agg(df.groupby(by=["b"])) + + expected_data = { + "b": [1.0, 2.0], + "a": [[2], [1, 1]], + "c": [[3], [3, 2]], + } + expected = bpd.DataFrame(expected_data).set_index("b") + + pd.testing.assert_frame_equal( + result.to_pandas(), # type: ignore + expected.to_pandas(), + ) + + +def test_array_agg_w_series(): + series = bpd.Series([1, 2, 3, 4, 5], index=["a", "a", "b", "b", "c"]) + # Mypy error expected: array_agg currently incompatible with Series. + # Test for coverage. + with pytest.raises(ValueError): + bbq.array_agg(series) # type: ignore + + +@pytest.mark.parametrize( + ("ascending", "expected_b", "expected_c"), + [ + pytest.param( + True, [["a", "b"], ["e", "d", "c"]], [[4, 5], [1, 2, 3]], id="asc" + ), + pytest.param( + False, [["b", "a"], ["c", "d", "e"]], [[5, 4], [3, 2, 1]], id="des" + ), + ], +) +def test_array_agg_reserve_order(ascending, expected_b, expected_c): + data = { + "a": [1, 1, 2, 2, 2], + "b": ["a", "b", "c", "d", "e"], + "c": [4, 5, 3, 2, 1], + } + df = bpd.DataFrame(data) + + result = bbq.array_agg(df.sort_values("c", ascending=ascending).groupby(by=["a"])) + expected_data = { + "a": [1, 2], + "b": expected_b, + "c": expected_c, + } + expected = bpd.DataFrame(expected_data).set_index("a") + + pd.testing.assert_frame_equal( + result.to_pandas(), # type: ignore + expected.to_pandas(), + ) + + +def test_array_agg_matches_after_explode(): + data = { + "index": np.arange(10), + "a": [np.random.randint(0, 10, 10) for _ in range(10)], + "b": [np.random.randint(0, 10, 10) for _ in range(10)], + } + df = bpd.DataFrame(data).set_index("index") + result = bbq.array_agg(df.explode(["a", "b"]).groupby(level=0)) + result.index.name = "index" + + pd.testing.assert_frame_equal( + result.to_pandas(), # type: ignore + df.to_pandas(), ) diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py index 64ef05366d..ecef2115e5 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py @@ -43,6 +43,21 @@ def _quantile(translator, op: ibis_reductions.Quantile): return f"PERCENTILE_CONT({arg}, {quantile})" +def _array_aggregate(translator, op: vendored_ibis_ops.ArrayAggregate): + """This method provides the same functionality as the collect() method in Ibis, with + the added capability of ordering the results using order_by. + https://github.com/ibis-project/ibis/issues/9170 + """ + arg = translator.translate(op.arg) + + order_by_sql = "" + if len(op.order_by) > 0: + order_by = ", ".join([translator.translate(column) for column in op.order_by]) + order_by_sql = f"ORDER BY {order_by}" + + return f"ARRAY_AGG({arg} IGNORE NULLS {order_by_sql})" + + patched_ops = { vendored_ibis_ops.ApproximateMultiQuantile: _approx_quantiles, # type:ignore vendored_ibis_ops.FirstNonNullValue: _first_non_null_value, # type:ignore @@ -51,6 +66,7 @@ def _quantile(translator, op: ibis_reductions.Quantile): vendored_ibis_ops.GenerateArray: _generate_array, # type:ignore vendored_ibis_ops.SafeCastToDatetime: _safe_cast_to_datetime, # type:ignore ibis_reductions.Quantile: _quantile, # type:ignore + vendored_ibis_ops.ArrayAggregate: _array_aggregate, # type:ignore } OPERATION_REGISTRY.update(patched_ops) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/__init__.py b/third_party/bigframes_vendored/ibis/expr/operations/__init__.py index 3d5a5a7fa0..3ae5fc10e4 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/__init__.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/__init__.py @@ -2,6 +2,6 @@ from __future__ import annotations from bigframes_vendored.ibis.expr.operations.analytic import * # noqa: F401 F403 -from bigframes_vendored.ibis.expr.operations.generic import * # noqa: F401 F403 +from bigframes_vendored.ibis.expr.operations.arrays import * # noqa: F401 F403 from bigframes_vendored.ibis.expr.operations.json import * # noqa: F401 F403 from bigframes_vendored.ibis.expr.operations.reductions import * # noqa: F401 F403 diff --git a/third_party/bigframes_vendored/ibis/expr/operations/generic.py b/third_party/bigframes_vendored/ibis/expr/operations/arrays.py similarity index 64% rename from third_party/bigframes_vendored/ibis/expr/operations/generic.py rename to third_party/bigframes_vendored/ibis/expr/operations/arrays.py index 98acaacfbd..a0ad915a9b 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/generic.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/arrays.py @@ -1,4 +1,4 @@ -# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/generic.py +# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/arrays.py from __future__ import annotations import ibis.expr.datatypes as dt @@ -6,6 +6,11 @@ class GenerateArray(Unary): + """ + Generates an array of values, similar to ibis.range(), but with simpler and + more efficient SQL generation. + """ + dtype = dt.Array(dt.int64) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py index e6644f477a..bd971e408a 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py @@ -2,6 +2,8 @@ from __future__ import annotations +import ibis.common.annotations as ibis_annotations +from ibis.common.typing import VarTuple import ibis.expr.datatypes as dt import ibis.expr.operations.core as ibis_ops_core from ibis.expr.operations.reductions import Filterable, Reduction @@ -18,6 +20,18 @@ class ApproximateMultiQuantile(Filterable, Reduction): dtype = dt.Array(dt.float64) -__all__ = [ - "ApproximateMultiQuantile", -] +class ArrayAggregate(Filterable, Reduction): + """ + Collects the elements of this expression into an ordered array. Similar to + the ibis `ArrayCollect`, but adds `order_by_*` and `distinct_only` parameters. + """ + + arg: ibis_ops_core.Column + order_by: VarTuple[ibis_ops_core.Value] = () + + @ibis_annotations.attribute + def dtype(self): + return dt.Array(self.arg.dtype) + + +__all__ = ["ApproximateMultiQuantile", "ArrayAggregate"]