Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ impl TopK {
let array = filtered.into_array(num_rows)?;
let mut filter = array.as_boolean().clone();
if !filter.has_true() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is accurate, but it feels a bit longer than the control flow it explains. I think we could keep just the key invariant here: the heap is unchanged, but the rejected batch can still prove prefix completion.

For example:

// The heap is unchanged, but a fully rejected batch can still prove that
// the shared sort prefix has passed the heap boundary.
self.attempt_early_completion(&batch)?;
return Ok(());

@ajegou ajegou Jun 10, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I updated it, thanks

// nothing to filter, so no need to update
// The heap is unchanged, but a fully rejected batch can still prove
// that the shared sort prefix has passed the heap boundary.
self.attempt_early_completion(&batch)?;
return Ok(());
}
// only update the keys / rows if the filter does not match all rows
Expand Down Expand Up @@ -1099,20 +1101,15 @@ mod tests {
assert_eq!(record_batch_store.batches_size, 0);
}

/// This test validates that the `try_finish` method marks the TopK operator as finished
/// when the prefix (on column "a") of the last row in the current batch is strictly greater
/// than the max top‑k row.
/// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a".
#[tokio::test]
async fn test_try_finish_marks_finished_with_prefix() -> Result<()> {
// Create a schema with two columns.
/// Builds an `(a Int32, b Float64)` schema and a `TopK` with full sort
/// `(a ASC, b ASC)`, input prefix `[a]`, `k = 3`, `batch_size = 2`. Used by
/// the prefix-completion tests below to keep their per-scenario logic in focus.
fn build_ab_prefix_topk() -> Result<(Arc<Schema>, TopK)> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Float64, false),
]));

// Create sort expressions.
// Full sort: first by "a", then by "b".
let sort_expr_a = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
Expand All @@ -1122,28 +1119,33 @@ mod tests {
options: SortOptions::default(),
};

// Input ordering uses only column "a" (a prefix of the full sort).
// Input ordering uses only column "a" (a prefix of the full sort on (a, b)).
let prefix = vec![sort_expr_a.clone()];
let full_expr = LexOrdering::from([sort_expr_a, sort_expr_b]);

// Create a dummy runtime environment and metrics.
let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();

// Create a TopK instance with k = 3 and batch_size = 2.
let mut topk = TopK::try_new(
let topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
3,
2,
runtime,
&metrics,
Arc::new(RuntimeEnv::default()),
&ExecutionPlanMetricsSet::new(),
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
)))),
)?;
Ok((schema, topk))
}

/// This test validates that the `try_finish` method marks the TopK operator as finished
/// when the prefix (on column "a") of the last row in the current batch is strictly greater
/// than the max top‑k row.
/// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a".
#[tokio::test]
async fn test_try_finish_marks_finished_with_prefix() -> Result<()> {
let (schema, mut topk) = build_ab_prefix_topk()?;

// Create the first batch with two columns:
// Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0].
Expand Down Expand Up @@ -1196,6 +1198,67 @@ mod tests {
Ok(())
}

/// Regression test for #22849: a batch whose rows are entirely rejected by the
/// heap's dynamic filter must still trigger `attempt_early_completion` when its
/// last row's prefix is worse than the heap's worst.
///
/// Before the fix, the `!filter.has_true()` short-circuit returned without calling
/// `attempt_early_completion`. Because the heap's filter is itself derived from the
/// heap's worst row, a batch from a strictly-worse prefix is exactly the case the
/// filter rejects entirely — i.e. the very signal the early-exit was designed to
/// detect was being silently dropped.
#[tokio::test]
async fn test_try_finish_fires_when_filter_rejects_entire_batch() -> Result<()> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice regression coverage. One small maintainability thought: this repeats a lot of the setup from test_try_finish_marks_finished_with_prefix. A small helper for the (a, b) schema, sort expressions, and TopK construction could make future TopK prefix tests shorter and keep the main scenario easier to spot.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, I just made the change

let (schema, mut topk) = build_ab_prefix_topk()?;

// Batch 1 fills the heap with (1, 20.0), (1, 15.0), (2, 30.0).
// heap.max becomes (a=2, b=30.0); update_filter tightens the heap filter to
// a < 2 OR (a = 2 AND b < 30.0).
let array_a1: ArrayRef =
Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)]));
let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0]));
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?;
topk.insert_batch(batch1)?;
assert!(
!topk.finished,
"Expected 'finished' to be false after batch 1 \
(last row prefix a=2 equals heap.max prefix a=2, not strictly greater)."
);

// Batch 2: every row has a=3, so the heap's filter (a < 2 OR (a = 2 AND b < 30))
// rejects every row. Before the fix, `insert_batch` would short-circuit on
// `!filter.has_true()` and return without checking the prefix; `finished`
// would stay false even though no future batch could improve the heap.
let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(3)]));
let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0]));
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a2, array_b2])?;
topk.insert_batch(batch2)?;
assert!(
topk.finished,
"Expected 'finished' to be true after batch 2 \
(filter rejected every row, but the batch's last row prefix a=3 \
is strictly greater than heap.max prefix a=2)."
);

// The emitted top-k is unchanged from after batch 1 since none of batch 2's
// rows could improve the heap.
let results: Vec<_> = topk.emit()?.try_collect().await?;
assert_batches_eq!(
&[
"+---+------+",
"| a | b |",
"+---+------+",
"| 1 | 15.0 |",
"| 1 | 20.0 |",
"| 2 | 30.0 |",
"+---+------+",
],
&results
);

Ok(())
}

/// This test verifies that the dynamic filter is marked as complete after TopK processing finishes.
#[tokio::test]
async fn test_topk_marks_filter_complete() -> Result<()> {
Expand Down
Loading