fix(topk): call attempt_early_completion when filter rejects entire b…#22852
Open
ajegou wants to merge 1 commit into
Open
fix(topk): call attempt_early_completion when filter rejects entire b…#22852ajegou wants to merge 1 commit into
ajegou wants to merge 1 commit into
Conversation
…atch
`TopK::insert_batch` short-circuits when the heap's dynamic filter
rejects every row in a batch:
if !filter.has_true() {
// nothing to filter, so no need to update
return Ok(());
}
The early-exit check `attempt_early_completion(&batch)` lives later in
the same function, gated on `replacements > 0`. So a batch that the
filter rejects entirely bypasses the check.
The heap's dynamic filter is derived from the heap's worst row (via
`update_filter`). A batch whose rows all come from a strictly worse
sort prefix is exactly the batch the filter rejects entirely -- i.e.
the very signal `attempt_early_completion` is designed to detect ("the
next batch is past the heap's boundary, we can stop") is what causes
the function to short-circuit *before* the check runs.
This is a feature-interaction regression between two PRs that were
both correct in isolation. The `attempt_early_completion` mechanism
was added by apache#15563 (closing apache#15529). At the time, there was no
heap-derived dynamic filter on TopK, so the only sensible call site
was right after a successful heap insertion. Two months later, apache#15770
added the dynamic-filter pushdown for TopK sorts, introducing the
`!filter.has_true()` short-circuit. The two features address different
problems and the new short-circuit didn't connect to the existing
prefix-completion check -- which is how this gap opened up.
Consequence: on a TopK over an input ordered on the sort prefix,
`finished = true` is never set once the heap stabilizes. Since
`finished` is the signal `SortExec` uses to stop pulling from its
input, the source keeps being polled long past the point where no
further row can improve the heap.
Fix: call `attempt_early_completion(&batch)` immediately before the
`return Ok(())` in the `!filter.has_true()` branch.
Adds a regression test `test_try_finish_fires_when_filter_rejects_entire_batch`
that fails on unpatched code and passes with the fix.
Closes apache#22849.
kosiew
approved these changes
Jun 10, 2026
| @@ -255,7 +255,13 @@ impl TopK { | |||
| let array = filtered.into_array(num_rows)?; | |||
| let mut filter = array.as_boolean().clone(); | |||
| if !filter.has_true() { | |||
Contributor
There was a problem hiding this comment.
This comment is accurate, but it feels a bit longer than the control flow it explains. I think we could keep just the key invariant here: the heap is unchanged, but the rejected batch can still prove prefix completion.
For example:
// The heap is unchanged, but a fully rejected batch can still prove that
// the shared sort prefix has passed the heap boundary.
self.attempt_early_completion(&batch)?;
return Ok(());| /// filter rejects entirely — i.e. the very signal the early-exit was designed to | ||
| /// detect was being silently dropped. | ||
| #[tokio::test] | ||
| async fn test_try_finish_fires_when_filter_rejects_entire_batch() -> Result<()> { |
Contributor
There was a problem hiding this comment.
Nice regression coverage. One small maintainability thought: this repeats a lot of the setup from test_try_finish_marks_finished_with_prefix. A small helper for the (a, b) schema, sort expressions, and TopK construction could make future TopK prefix tests shorter and keep the main scenario easier to spot.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
…atch
Which issue does this PR close?
Rationale for this change
TopK::insert_batchshort-circuits when the heap's dynamic filter rejects every row in a batch:The early-exit check
attempt_early_completion(&batch)lives later in the same function, gated onreplacements > 0. So a batch that the filter rejects entirely bypasses the check.The heap's dynamic filter is derived from the heap's worst row (via
update_filter). A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — i.e. the very signalattempt_early_completionis designed to detect ("the next batch is past the heap's boundary, we can stop") is what causes the function to short-circuit before the check runs.This is a feature-interaction regression between two PRs that were both correct in isolation. The
attempt_early_completionmechanism was added by #15563 (closing #15529). At the time, there was no heap-derived dynamic filter on TopK, so the only sensible call site was right after a successful heap insertion. Two months later, #15770 added the dynamic-filter pushdown for TopK sorts, introducing the!filter.has_true()short-circuit. The two features address different problems and the new short-circuit didn't connect to the existing prefix-completion check — which is how this gap opened up.Consequence: on a TopK over an input ordered on the sort prefix,
finished = trueis never set once the heap stabilizes. Sincefinishedis the signalSortExecuses to stop pulling from its input (viaPoll::Ready(None)from the TopK stream, which cascades into dropping the source stream), the source keeps being polled long past the point where no further row can improve the heap. The LIMIT optimization effectively degrades to "heap saves memory but reads everything"; sources with cancellable streams (e.g. networked sources) never receive the cancellation signal.What changes are included in this PR?
Single behavioral change in
datafusion/physical-plan/src/topk/mod.rs: callattempt_early_completion(&batch)immediately before thereturn Ok(())in the!filter.has_true()branch.Why this scope, not a broader restructuring:
attempt_early_completioncall insideif replacements > 0is load-bearing for a related case: a batch containing a mix of "still valuable" rows and "past the boundary" rows. The existingtest_try_finish_marks_finished_with_prefixtest covers this case — Batch 2 witha=[2,3], b=[10,20]against a heap whereheap.max.a = 2; the(2, 10)row must be inserted before the check on the(3, 20)last row triggers. Moving the call earlier would skip the insertion of valuable rows and break that test.filter.has_true() == truebutreplacements == 0(the filter accepts some rows butfind_new_topk_itemsends up inserting none of them), the existing call insideif replacements > 0is also skipped. This requires a divergence between the heap's filter predicate and the row-byte comparison used insidefind_new_topk_items, which shouldn't normally happen (the filter is derived from the heap's worst row using the same comparator). A deterministic synthetic repro would likely require concurrent heap updates from sibling partitions or boundary-value edge cases (NaN/NULL semantics, type coercion). Happy to send a follow-up if reviewers want it covered; the workload that motivated this fix was the filter-rejection case empirically.Are these changes tested?
Yes. Added a regression test
test_try_finish_fires_when_filter_rejects_entire_batch. The assertion target istopk.finished— the flag that signals "stop pulling from the source" to upstream consumers (read byTopKExec::poll_nextto emitPoll::Ready(None)). Asserting that the flag transitions on the fully-filter-rejected batch is equivalent to asserting that the source-stopping mechanism activates.(a, b)sort with prefixa, k=3.a ∈ {1, 2};update_filtertightens the filter toa < 2 OR (a = 2 AND b < 30).a = 3— filter rejects every row.insert_batchshort-circuits,topk.finishedstaysfalse. Test fails.attempt_early_completionfires (last-row prefixa = 3> heap.max prefixa = 2),topk.finishedbecomestrue. Test passes.The test also asserts the emitted top-K is unchanged from after batch 1, confirming no candidate row was incorrectly excluded by the early bail.
All 28 existing
topk::tests continue to pass (includingtest_try_finish_marks_finished_with_prefix, which exercises the mixed-prefix case).Are there any user-facing changes?
No public API or output changes. The fix only changes when TopK marks itself
finished = true— specifically, it now firesattempt_early_completionfor batches that are entirely rejected by the heap's dynamic filter, where previously it would silently skip the check. Output of TopK is unchanged; only the early-exit behavior improves.