Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pub enum EmitTo {
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
/// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`.
First(usize),
/// Emit one complete block of groups.
///
/// Implementations may handle this the same way as [`EmitTo::First`].
/// Block-aware implementations can use it to avoid arbitrary prefix
/// shifting when no later update or merge needs contiguous group indexes.
FirstBlock(usize),
}

impl EmitTo {
Expand All @@ -45,7 +51,7 @@ impl EmitTo {
// Take the entire vector, leave new (empty) vector
std::mem::take(v)
}
Self::First(n) => split_vec_min_alloc(v, *n),
Self::First(n) | Self::FirstBlock(n) => split_vec_min_alloc(v, *n),
}
}
}
Expand Down Expand Up @@ -152,6 +158,12 @@ pub trait GroupsAccumulator: Send + std::any::Any {
/// future use. The group_indices on subsequent calls to
/// `update_batch` or `merge_batch` will be shifted down by
/// `n`. See [`EmitTo::First`] for more details.
///
/// If `emit_to` is [`EmitTo::FirstBlock`], the same first `n`
/// groups should be emitted. Implementations may either use the same
/// behavior as [`EmitTo::First`] or retain block-aligned storage and an
/// internal cursor. Any retained state must still preserve group-index
/// order for future emits.
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;

/// Returns the intermediate aggregate state for this accumulator,
Expand Down
13 changes: 12 additions & 1 deletion datafusion/ffi/src/udaf/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl From<EmitTo> for FFI_EmitTo {
fn from(value: EmitTo) -> Self {
match value {
EmitTo::All => Self::All,
EmitTo::First(v) => Self::First(v),
EmitTo::First(v) | EmitTo::FirstBlock(v) => Self::First(v),
}
}
}
Expand Down Expand Up @@ -537,6 +537,17 @@ mod tests {
Ok(())
}

#[test]
fn test_first_block_falls_back_to_first_for_ffi() -> Result<()> {
let ffi_value: FFI_EmitTo = EmitTo::FirstBlock(10).into();

let FFI_EmitTo::First(n) = ffi_value else {
panic!("FirstBlock should use the existing FFI First variant");
};
assert_eq!(n, 10);
Ok(())
}

#[test]
fn test_ffi_groups_accumulator_local_bypass_inner() -> Result<()> {
let original_accum = StddevGroupsAccumulator::new(StatsType::Population);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
EmitTo::All => {
self.seen.clear();
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
let mut remaining = HashSet::default();
for (group_idx, value) in self.seen.drain() {
if group_idx >= n {
Expand All @@ -105,7 +105,7 @@ where
fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
let num_emitted = match emit_to {
EmitTo::All => self.counts.len(),
EmitTo::First(n) => n,
EmitTo::First(n) | EmitTo::FirstBlock(n) => n,
};

// Prefix-sum counts[..num_emitted] into offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl NullState {
}
}
}
EmitTo::First(n) => match &mut self.seen_values {
EmitTo::First(n) | EmitTo::FirstBlock(n) => match &mut self.seen_values {
SeenValues::All { num_values } => {
*num_values = num_values.saturating_sub(n);
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ where

let values = match emit_to {
EmitTo::All => values,
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
let first_n: BooleanBuffer = values.iter().take(n).collect();
// put n+1 back into self.values
for v in values.iter().skip(n) {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator {
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let emit_groups = match emit_to {
EmitTo::All => self.num_groups,
EmitTo::First(n) => n,
EmitTo::First(n) | EmitTo::FirstBlock(n) => n,
};

// Step 1: Count entries per group. For EmitTo::First(n), only groups
Expand Down Expand Up @@ -714,7 +714,9 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator {
// Step 4: Release state for emitted groups.
match emit_to {
EmitTo::All => self.clear_state(),
EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
EmitTo::First(_) | EmitTo::FirstBlock(_) => {
self.compact_retained_state(emit_groups)?
}
}

let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> {

match emit_to {
EmitTo::All => self.size_of_orderings = 0,
EmitTo::First(_) => {
EmitTo::First(_) | EmitTo::FirstBlock(_) => {
self.size_of_orderings -=
result.iter().map(ScalarValue::size_of_vec).sum::<usize>()
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/first_last/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub(crate) fn take_need(
let bool_buf = bool_buf_builder.finish();
match emit_to {
EmitTo::All => bool_buf,
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
// split off the first N values in seen_values
//
let first_n: BooleanBuffer = bool_buf.slice(0, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl MinMaxBytesState {
std::mem::take(&mut self.min_max),
)
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
let first_data_capacity: usize = first_min_maxes
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl MinMaxStructState {
std::mem::take(&mut self.min_max),
)
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
let first_data_capacity: usize = first_min_maxes
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
.map(|v| v.build())
.collect::<Vec<_>>()
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
let output = self
.group_values
.iter_mut()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl GroupValues for GroupValuesRows {
self.map.clear();
output
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
let groups_rows = group_values.iter().take(n);
let output = self.row_converter.convert_rows(groups_rows)?;
// Clear out first n group keys by copying them to a new Rows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl GroupValues for GroupValuesBoolean {
let mut builder = BooleanBufferBuilder::new(len);
let emit_count = match emit_to {
EmitTo::All => len,
EmitTo::First(n) => n,
EmitTo::First(n) | EmitTo::FirstBlock(n) => n,
};
builder.append_n(emit_count, false);
if let Some(idx) = self.true_group.as_mut() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) if n == self.len() => {
EmitTo::First(n) | EmitTo::FirstBlock(n) if n == self.len() => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
// if we only wanted to take the first n, insert the rest back
// into the map we could potentially avoid this reallocation, at
// the expense of much more complex code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ impl GroupValues for GroupValuesBytesView {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) if n == self.len() => {
EmitTo::First(n) | EmitTo::FirstBlock(n) if n == self.len() => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
// if we only wanted to take the first n, insert the rest back
// into the map we could potentially avoid this reallocation, at
// the expense of much more complex code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ where
self.map.clear();
build_primitive(std::mem::take(&mut self.values), self.null_group.take())
}
EmitTo::First(n) => {
EmitTo::First(n) | EmitTo::FirstBlock(n) => {
self.map.retain(|entry| {
// Decrement group index by n
let group_idx = entry.0;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6419,7 +6419,7 @@ mod tests {
let counts = std::mem::take(&mut self.counts);
Ok(Arc::new(Int64Array::from(counts)))
}
EmitTo::First(_) => internal_err!(
EmitTo::First(_) | EmitTo::FirstBlock(_) => internal_err!(
"partial grouped aggregate output must materialize with EmitTo::All before slicing"
),
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl GroupOrdering {
GroupOrdering::Partial(_) | GroupOrdering::Full(_) => {
self.emit_to().map(|emit_to| match emit_to {
EmitTo::First(max) => EmitTo::First(n.min(max)),
EmitTo::FirstBlock(max) => EmitTo::FirstBlock(n.min(max)),
EmitTo::All => EmitTo::First(n),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ LOCATION 'test_files/scratch/push_down_filter_regression/agg_dyn/';
statement ok
set datafusion.execution.collect_statistics = true;

# Suppress metrics: pruning counts are nondeterministic under parallel
# execution (the order in which Partial aggregates publish dynamic filter
# updates races against when the scan reads each partition). The original
# Rust test only asserted matched < 4; the important invariant here is
# that the DynamicFilter text is correct.
# Suppress metrics and dynamic filter thresholds: pruning counts and the
# threshold value are nondeterministic under parallel execution (the order in
# which Partial aggregates publish dynamic filter updates races against when
# the scan reads each partition). The original Rust test only asserted matched
# < 4; the important invariant here is that the DynamicFilter is present and
# contributes to the pruning predicate.
statement ok
set datafusion.explain.analyze_level = summary;

Expand All @@ -236,7 +237,7 @@ Plan with Metrics
01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[]
02)--CoalescePartitionsExec, metrics=[]
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_e2e.column1)], metrics=[]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > 4 ], dynamic_rg_pruning=eligible, pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > 4, required_guarantees=[], metrics=[]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_regression/agg_dyn/file_3.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 > 1 AND DynamicFilter [ column1@0 > <slt:ignore> ], dynamic_rg_pruning=eligible, pruning_predicate=column1_null_count@1 != row_count@2 AND column1_max@0 > 1 AND column1_null_count@1 != row_count@2 AND column1_max@0 > <slt:ignore>, required_guarantees=[], metrics=[]

statement ok
reset datafusion.explain.analyze_categories;
Expand Down
Loading