Describe the bug
TopK::insert_batch (in datafusion/physical-plan/src/topk/mod.rs) calls attempt_early_completion(&batch) only inside the if replacements > 0 branch — i.e. only when the batch successfully updated the heap. The same function short-circuits earlier with return Ok(()) when the heap's dynamic filter rejects every row in the batch:
if !filter.has_true() {
// nothing to filter, so no need to update
return Ok(());
}
The heap's dynamic filter (TopKDynamicFilters, updated by update_filter) is derived from the heap's worst row. A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — which is precisely the signal attempt_early_completion is designed to detect ("the next batch is past the heap's boundary, we can stop"). Because the check is gated on replacements > 0, it never fires for those batches, and finished = true is never set. Since finished is the signal that tells the surrounding stream to stop pulling from the input, the source keeps being polled long past the point where no further row can improve the heap.
To Reproduce
- Build a TopK over a 2-column schema
(a Int32, b Float64) with ORDER BY a ASC, b ASC LIMIT 3. Input ordering is on a only (the prefix).
- Batch 1:
a = [1, 1, 2], b = [20.0, 15.0, 30.0]. Fills the heap; heap.max becomes (a=2, b=30.0). update_filter tightens the filter to a < 2 OR (a = 2 AND b < 30.0).
- Batch 2:
a = [3, 3], b = [10.0, 20.0]. Every row has a = 3, so the filter rejects them all.
After insert_batch(batch2), expected state is topk.finished == true (the last row's prefix a = 3 is strictly greater than the heap's worst-row prefix a = 2, so no future batch can improve the heap). Observed: topk.finished == false.
Expected behavior
attempt_early_completion should be invoked for every batch where its preconditions hold (heap full, common sort prefix declared), regardless of whether the batch updated the heap. A batch fully rejected by the heap's dynamic filter should still be able to set finished = true if its last-row prefix is strictly worse than the heap's worst.
Additional context
The TopK partial-prefix early-termination optimization (the attempt_early_completion mechanism) was introduced by #15563, closing #15529. At that point there was no heap-derived dynamic filter on TopK; the natural and only call site for the check was right after a successful heap insertion.
Two months later, #15770 added dynamic-filter pushdown for TopK sorts. It introduced the !filter.has_true() short-circuit to handle batches the heap's filter rejects entirely. The two features address different problems and #15770 didn't connect its new short-circuit to #15563's check — which is how the gap this issue describes opened up.
The symptom is a silent performance regression (output of TopK is unchanged; only the source-stopping signal fails to fire), which is likely why it went unreported for ~5 months: default test suites verify TopK output rather than asserting on topk.finished after specific batch patterns.
Describe the bug
TopK::insert_batch(indatafusion/physical-plan/src/topk/mod.rs) callsattempt_early_completion(&batch)only inside theif replacements > 0branch — i.e. only when the batch successfully updated the heap. The same function short-circuits earlier withreturn Ok(())when the heap's dynamic filter rejects every row in the batch:The heap's dynamic filter (
TopKDynamicFilters, updated byupdate_filter) is derived from the heap's worst row. A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — which is precisely the signalattempt_early_completionis designed to detect ("the next batch is past the heap's boundary, we can stop"). Because the check is gated onreplacements > 0, it never fires for those batches, andfinished = trueis never set. Sincefinishedis the signal that tells the surrounding stream to stop pulling from the input, the source keeps being polled long past the point where no further row can improve the heap.To Reproduce
(a Int32, b Float64)withORDER BY a ASC, b ASC LIMIT 3. Input ordering is onaonly (the prefix).a = [1, 1, 2],b = [20.0, 15.0, 30.0]. Fills the heap;heap.maxbecomes(a=2, b=30.0).update_filtertightens the filter toa < 2 OR (a = 2 AND b < 30.0).a = [3, 3],b = [10.0, 20.0]. Every row hasa = 3, so the filter rejects them all.After
insert_batch(batch2), expected state istopk.finished == true(the last row's prefixa = 3is strictly greater than the heap's worst-row prefixa = 2, so no future batch can improve the heap). Observed:topk.finished == false.Expected behavior
attempt_early_completionshould be invoked for every batch where its preconditions hold (heap full, common sort prefix declared), regardless of whether the batch updated the heap. A batch fully rejected by the heap's dynamic filter should still be able to setfinished = trueif its last-row prefix is strictly worse than the heap's worst.Additional context
The TopK partial-prefix early-termination optimization (the
attempt_early_completionmechanism) was introduced by #15563, closing #15529. At that point there was no heap-derived dynamic filter on TopK; the natural and only call site for the check was right after a successful heap insertion.Two months later, #15770 added dynamic-filter pushdown for TopK sorts. It introduced the
!filter.has_true()short-circuit to handle batches the heap's filter rejects entirely. The two features address different problems and #15770 didn't connect its new short-circuit to #15563's check — which is how the gap this issue describes opened up.The symptom is a silent performance regression (output of TopK is unchanged; only the source-stopping signal fails to fire), which is likely why it went unreported for ~5 months: default test suites verify TopK output rather than asserting on
topk.finishedafter specific batch patterns.