Skip to content

TopK early-exit doesn't fire when heap's dynamic filter rejects an entire batch #22849

@ajegou

Description

@ajegou

Describe the bug

TopK::insert_batch (in datafusion/physical-plan/src/topk/mod.rs) calls attempt_early_completion(&batch) only inside the if replacements > 0 branch — i.e. only when the batch successfully updated the heap. The same function short-circuits earlier with return Ok(()) when the heap's dynamic filter rejects every row in the batch:

if !filter.has_true() {
    // nothing to filter, so no need to update
    return Ok(());
}

The heap's dynamic filter (TopKDynamicFilters, updated by update_filter) is derived from the heap's worst row. A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — which is precisely the signal attempt_early_completion is designed to detect ("the next batch is past the heap's boundary, we can stop"). Because the check is gated on replacements > 0, it never fires for those batches, and finished = true is never set. Since finished is the signal that tells the surrounding stream to stop pulling from the input, the source keeps being polled long past the point where no further row can improve the heap.

To Reproduce

  • Build a TopK over a 2-column schema (a Int32, b Float64) with ORDER BY a ASC, b ASC LIMIT 3. Input ordering is on a only (the prefix).
  • Batch 1: a = [1, 1, 2], b = [20.0, 15.0, 30.0]. Fills the heap; heap.max becomes (a=2, b=30.0). update_filter tightens the filter to a < 2 OR (a = 2 AND b < 30.0).
  • Batch 2: a = [3, 3], b = [10.0, 20.0]. Every row has a = 3, so the filter rejects them all.

After insert_batch(batch2), expected state is topk.finished == true (the last row's prefix a = 3 is strictly greater than the heap's worst-row prefix a = 2, so no future batch can improve the heap). Observed: topk.finished == false.

Expected behavior

attempt_early_completion should be invoked for every batch where its preconditions hold (heap full, common sort prefix declared), regardless of whether the batch updated the heap. A batch fully rejected by the heap's dynamic filter should still be able to set finished = true if its last-row prefix is strictly worse than the heap's worst.

Additional context

The TopK partial-prefix early-termination optimization (the attempt_early_completion mechanism) was introduced by #15563, closing #15529. At that point there was no heap-derived dynamic filter on TopK; the natural and only call site for the check was right after a successful heap insertion.

Two months later, #15770 added dynamic-filter pushdown for TopK sorts. It introduced the !filter.has_true() short-circuit to handle batches the heap's filter rejects entirely. The two features address different problems and #15770 didn't connect its new short-circuit to #15563's check — which is how the gap this issue describes opened up.

The symptom is a silent performance regression (output of TopK is unchanged; only the source-stopping signal fails to fire), which is likely why it went unreported for ~5 months: default test suites verify TopK output rather than asserting on topk.finished after specific batch patterns.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions