diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 9da606dc90db2..8a8bfd204ecb6 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -255,7 +255,9 @@ impl TopK { let array = filtered.into_array(num_rows)?; let mut filter = array.as_boolean().clone(); if !filter.has_true() { - // nothing to filter, so no need to update + // The heap is unchanged, but a fully rejected batch can still prove + // that the shared sort prefix has passed the heap boundary. + self.attempt_early_completion(&batch)?; return Ok(()); } // only update the keys / rows if the filter does not match all rows @@ -1099,20 +1101,15 @@ mod tests { assert_eq!(record_batch_store.batches_size, 0); } - /// This test validates that the `try_finish` method marks the TopK operator as finished - /// when the prefix (on column "a") of the last row in the current batch is strictly greater - /// than the max top‑k row. - /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". - #[tokio::test] - async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { - // Create a schema with two columns. + /// Builds an `(a Int32, b Float64)` schema and a `TopK` with full sort + /// `(a ASC, b ASC)`, input prefix `[a]`, `k = 3`, `batch_size = 2`. Used by + /// the prefix-completion tests below to keep their per-scenario logic in focus. + fn build_ab_prefix_topk() -> Result<(Arc, TopK)> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Float64, false), ])); - // Create sort expressions. - // Full sort: first by "a", then by "b". let sort_expr_a = PhysicalSortExpr { expr: col("a", schema.as_ref())?, options: SortOptions::default(), @@ -1122,28 +1119,33 @@ mod tests { options: SortOptions::default(), }; - // Input ordering uses only column "a" (a prefix of the full sort). + // Input ordering uses only column "a" (a prefix of the full sort on (a, b)). let prefix = vec![sort_expr_a.clone()]; let full_expr = LexOrdering::from([sort_expr_a, sort_expr_b]); - // Create a dummy runtime environment and metrics. - let runtime = Arc::new(RuntimeEnv::default()); - let metrics = ExecutionPlanMetricsSet::new(); - - // Create a TopK instance with k = 3 and batch_size = 2. - let mut topk = TopK::try_new( + let topk = TopK::try_new( 0, Arc::clone(&schema), prefix, full_expr, 3, 2, - runtime, - &metrics, + Arc::new(RuntimeEnv::default()), + &ExecutionPlanMetricsSet::new(), Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( DynamicFilterPhysicalExpr::new(vec![], lit(true)), )))), )?; + Ok((schema, topk)) + } + + /// This test validates that the `try_finish` method marks the TopK operator as finished + /// when the prefix (on column "a") of the last row in the current batch is strictly greater + /// than the max top‑k row. + /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". + #[tokio::test] + async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { + let (schema, mut topk) = build_ab_prefix_topk()?; // Create the first batch with two columns: // Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0]. @@ -1196,6 +1198,67 @@ mod tests { Ok(()) } + /// Regression test for #22849: a batch whose rows are entirely rejected by the + /// heap's dynamic filter must still trigger `attempt_early_completion` when its + /// last row's prefix is worse than the heap's worst. + /// + /// Before the fix, the `!filter.has_true()` short-circuit returned without calling + /// `attempt_early_completion`. Because the heap's filter is itself derived from the + /// heap's worst row, a batch from a strictly-worse prefix is exactly the case the + /// filter rejects entirely — i.e. the very signal the early-exit was designed to + /// detect was being silently dropped. + #[tokio::test] + async fn test_try_finish_fires_when_filter_rejects_entire_batch() -> Result<()> { + let (schema, mut topk) = build_ab_prefix_topk()?; + + // Batch 1 fills the heap with (1, 20.0), (1, 15.0), (2, 30.0). + // heap.max becomes (a=2, b=30.0); update_filter tightens the heap filter to + // a < 2 OR (a = 2 AND b < 30.0). + let array_a1: ArrayRef = + Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)])); + let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0])); + let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?; + topk.insert_batch(batch1)?; + assert!( + !topk.finished, + "Expected 'finished' to be false after batch 1 \ + (last row prefix a=2 equals heap.max prefix a=2, not strictly greater)." + ); + + // Batch 2: every row has a=3, so the heap's filter (a < 2 OR (a = 2 AND b < 30)) + // rejects every row. Before the fix, `insert_batch` would short-circuit on + // `!filter.has_true()` and return without checking the prefix; `finished` + // would stay false even though no future batch could improve the heap. + let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(3)])); + let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0])); + let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a2, array_b2])?; + topk.insert_batch(batch2)?; + assert!( + topk.finished, + "Expected 'finished' to be true after batch 2 \ + (filter rejected every row, but the batch's last row prefix a=3 \ + is strictly greater than heap.max prefix a=2)." + ); + + // The emitted top-k is unchanged from after batch 1 since none of batch 2's + // rows could improve the heap. + let results: Vec<_> = topk.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+---+------+", + "| a | b |", + "+---+------+", + "| 1 | 15.0 |", + "| 1 | 20.0 |", + "| 2 | 30.0 |", + "+---+------+", + ], + &results + ); + + Ok(()) + } + /// This test verifies that the dynamic filter is marked as complete after TopK processing finishes. #[tokio::test] async fn test_topk_marks_filter_complete() -> Result<()> {