Describe the bug
Problem
After #23055 hash aggregate output uses EmitTo::First(batch_size) to emit result batches incrementally.
This can regress high-cardinality aggregate output. The reason is that EmitTo::First(n) currently means:
- emit the first
n groups,
- remove them,
- shift all remaining group indexes down by
n.
That is useful when aggregation may continue after emitting. But in the final AggregateHashTableState::Outputting phase, no more intern calls are expected.
Despite that, GroupValuesColumn::emit(EmitTo::First(n)) still maintains lookup state for future interning. It calls HashTable::retain and rewrites remaining group indexes on every output batch.
So terminal output can spend roughly:
O(groups × output_batches)
maintaining lookup structures that are no longer needed.
Reproduction
One way to reproduce is with TPC-DS SF10 query 23, using the same Arrow/DataFusion dependency setup and the same runtime options.
Compare:
Example command shape:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 \
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true \
DATAFUSION_EXECUTION_PARQUET_PRUNING=true \
DATAFUSION_OPTIMIZER_ENABLE_DYNAMIC_FILTER_PUSHDOWN=true \
DATAFUSION_OPTIMIZER_ENABLE_JOIN_DYNAMIC_FILTER_PUSHDOWN=true \
DATAFUSION_OPTIMIZER_ENABLE_TOPK_DYNAMIC_FILTER_PUSHDOWN=true \
DATAFUSION_OPTIMIZER_ENABLE_AGGREGATE_DYNAMIC_FILTER_PUSHDOWN=true \
./target/release/dfbench tpcds \
--query 23 \
--iterations 3 \
--path /path/to/tpcds_sf10 \
--query_path datafusion/core/tests/tpc-ds \
--prefer_hash_join true
For attribution, run once with --debug and inspect the AggregateExec metrics.
Observed TPC-DS performance
On SF10 q23 with 24 target partitions:
parent 322f6862e0: ~2.43s mean
#23055 681ba9bc7a: ~8.04s mean
Diagnostic run details:
parent perf iterations:
2466.594 ms, 2414.212 ms, 2415.373 ms
681ba9bc7a perf iterations:
8091.431 ms, 7987.224 ms, 8026.777 ms
The regression is isolated to aggregate output. The problematic node is a final partitioned aggregate:
AggregateExec: mode=FinalPartitioned,
gby=[i_item_sk, d_date],
aggr=[count(*)],
output_rows ~= 18.36M,
output_batches ~= 2.26K
At the regressed commit, debug metrics showed this node spending about 32s summed time in emitting_time, while aggregation time stayed around 3s.
A profile showed the hot path as:
AggregateHashTable::next_output_batch
-> GroupValuesColumn::emit(EmitTo::First(batch_size))
-> hashbrown::HashTable::retain
Possible fix
Can you help to take a look? I can contribute to the code @2010YOUY01
Add an explicit terminal-output transition, for example:
GroupValues::release_interning_state()
called from AggregateHashTable::start_outputting().
For GroupValuesColumn, this could clear lookup-only state such as:
- hash map,
- collision lists,
- temporary intern buffers.
After that, terminal output would no longer pay to maintain lookup state that will not be used again.
Another option is to add an output-only cursor/range API for terminal output, separate from EmitTo::First, so EmitTo::First can keep its current destructive renumbering semantics for paths that still need it.
A short-term fallback is to materialize all output once and slice it when the estimated output size is safe, but that has higher peak-memory risk.
To Reproduce
No response
Expected behavior
No response
Additional context
No response
Describe the bug
Problem
After #23055 hash aggregate output uses
EmitTo::First(batch_size)to emit result batches incrementally.This can regress high-cardinality aggregate output. The reason is that
EmitTo::First(n)currently means:ngroups,n.That is useful when aggregation may continue after emitting. But in the final
AggregateHashTableState::Outputtingphase, no moreinterncalls are expected.Despite that,
GroupValuesColumn::emit(EmitTo::First(n))still maintains lookup state for future interning. It callsHashTable::retainand rewrites remaining group indexes on every output batch.So terminal output can spend roughly:
maintaining lookup structures that are no longer needed.
Reproduction
One way to reproduce is with TPC-DS SF10 query 23, using the same Arrow/DataFusion dependency setup and the same runtime options.
Compare:
322f6862e0744207ac24b5fedda3fb6716e654c3681ba9bc7a45b5d3de31438a0505b0ce1e4854cbEmitToto output #23055Example command shape:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 \ DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true \ DATAFUSION_EXECUTION_PARQUET_PRUNING=true \ DATAFUSION_OPTIMIZER_ENABLE_DYNAMIC_FILTER_PUSHDOWN=true \ DATAFUSION_OPTIMIZER_ENABLE_JOIN_DYNAMIC_FILTER_PUSHDOWN=true \ DATAFUSION_OPTIMIZER_ENABLE_TOPK_DYNAMIC_FILTER_PUSHDOWN=true \ DATAFUSION_OPTIMIZER_ENABLE_AGGREGATE_DYNAMIC_FILTER_PUSHDOWN=true \ ./target/release/dfbench tpcds \ --query 23 \ --iterations 3 \ --path /path/to/tpcds_sf10 \ --query_path datafusion/core/tests/tpc-ds \ --prefer_hash_join trueFor attribution, run once with
--debugand inspect theAggregateExecmetrics.Observed TPC-DS performance
On SF10 q23 with 24 target partitions:
Diagnostic run details:
The regression is isolated to aggregate output. The problematic node is a final partitioned aggregate:
At the regressed commit, debug metrics showed this node spending about 32s summed time in
emitting_time, while aggregation time stayed around 3s.A profile showed the hot path as:
Possible fix
Can you help to take a look? I can contribute to the code @2010YOUY01
Add an explicit terminal-output transition, for example:
called from
AggregateHashTable::start_outputting().For
GroupValuesColumn, this could clear lookup-only state such as:After that, terminal output would no longer pay to maintain lookup state that will not be used again.
Another option is to add an output-only cursor/range API for terminal output, separate from
EmitTo::First, soEmitTo::Firstcan keep its current destructive renumbering semantics for paths that still need it.A short-term fallback is to materialize all output once and slice it when the estimated output size is safe, but that has higher peak-memory risk.
To Reproduce
No response
Expected behavior
No response
Additional context
No response