Skip to content

HashAggregate output regression after #23055: repeated EmitTo::First maintains unused GroupValues lookup state #23178

Description

@hhhizzz

Describe the bug

Problem

After #23055 hash aggregate output uses EmitTo::First(batch_size) to emit result batches incrementally.

This can regress high-cardinality aggregate output. The reason is that EmitTo::First(n) currently means:

  1. emit the first n groups,
  2. remove them,
  3. shift all remaining group indexes down by n.

That is useful when aggregation may continue after emitting. But in the final AggregateHashTableState::Outputting phase, no more intern calls are expected.

Despite that, GroupValuesColumn::emit(EmitTo::First(n)) still maintains lookup state for future interning. It calls HashTable::retain and rewrites remaining group indexes on every output batch.

So terminal output can spend roughly:

O(groups × output_batches)

maintaining lookup structures that are no longer needed.

Reproduction

One way to reproduce is with TPC-DS SF10 query 23, using the same Arrow/DataFusion dependency setup and the same runtime options.

Compare:

Example command shape:

DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 \
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true \
DATAFUSION_EXECUTION_PARQUET_PRUNING=true \
DATAFUSION_OPTIMIZER_ENABLE_DYNAMIC_FILTER_PUSHDOWN=true \
DATAFUSION_OPTIMIZER_ENABLE_JOIN_DYNAMIC_FILTER_PUSHDOWN=true \
DATAFUSION_OPTIMIZER_ENABLE_TOPK_DYNAMIC_FILTER_PUSHDOWN=true \
DATAFUSION_OPTIMIZER_ENABLE_AGGREGATE_DYNAMIC_FILTER_PUSHDOWN=true \
./target/release/dfbench tpcds \
  --query 23 \
  --iterations 3 \
  --path /path/to/tpcds_sf10 \
  --query_path datafusion/core/tests/tpc-ds \
  --prefer_hash_join true

For attribution, run once with --debug and inspect the AggregateExec metrics.

Observed TPC-DS performance

On SF10 q23 with 24 target partitions:

parent  322f6862e0: ~2.43s mean
#23055  681ba9bc7a: ~8.04s mean

Diagnostic run details:

parent perf iterations:
2466.594 ms, 2414.212 ms, 2415.373 ms

681ba9bc7a perf iterations:
8091.431 ms, 7987.224 ms, 8026.777 ms

The regression is isolated to aggregate output. The problematic node is a final partitioned aggregate:

AggregateExec: mode=FinalPartitioned,
  gby=[i_item_sk, d_date],
  aggr=[count(*)],
  output_rows ~= 18.36M,
  output_batches ~= 2.26K

At the regressed commit, debug metrics showed this node spending about 32s summed time in emitting_time, while aggregation time stayed around 3s.

A profile showed the hot path as:

AggregateHashTable::next_output_batch
  -> GroupValuesColumn::emit(EmitTo::First(batch_size))
     -> hashbrown::HashTable::retain

Possible fix

Can you help to take a look? I can contribute to the code @2010YOUY01

Add an explicit terminal-output transition, for example:

GroupValues::release_interning_state()

called from AggregateHashTable::start_outputting().

For GroupValuesColumn, this could clear lookup-only state such as:

  • hash map,
  • collision lists,
  • temporary intern buffers.

After that, terminal output would no longer pay to maintain lookup state that will not be used again.

Another option is to add an output-only cursor/range API for terminal output, separate from EmitTo::First, so EmitTo::First can keep its current destructive renumbering semantics for paths that still need it.

A short-term fallback is to materialize all output once and slice it when the estimated output size is safe, but that has higher peak-memory risk.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Fields

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions