-
Notifications
You must be signed in to change notification settings - Fork 2.2k
fix(topk): call attempt_early_completion when filter rejects entire batch #22852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -255,7 +255,9 @@ impl TopK { | |
| let array = filtered.into_array(num_rows)?; | ||
| let mut filter = array.as_boolean().clone(); | ||
| if !filter.has_true() { | ||
| // nothing to filter, so no need to update | ||
| // The heap is unchanged, but a fully rejected batch can still prove | ||
| // that the shared sort prefix has passed the heap boundary. | ||
| self.attempt_early_completion(&batch)?; | ||
| return Ok(()); | ||
| } | ||
| // only update the keys / rows if the filter does not match all rows | ||
|
|
@@ -1099,20 +1101,15 @@ mod tests { | |
| assert_eq!(record_batch_store.batches_size, 0); | ||
| } | ||
|
|
||
| /// This test validates that the `try_finish` method marks the TopK operator as finished | ||
| /// when the prefix (on column "a") of the last row in the current batch is strictly greater | ||
| /// than the max top‑k row. | ||
| /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". | ||
| #[tokio::test] | ||
| async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { | ||
| // Create a schema with two columns. | ||
| /// Builds an `(a Int32, b Float64)` schema and a `TopK` with full sort | ||
| /// `(a ASC, b ASC)`, input prefix `[a]`, `k = 3`, `batch_size = 2`. Used by | ||
| /// the prefix-completion tests below to keep their per-scenario logic in focus. | ||
| fn build_ab_prefix_topk() -> Result<(Arc<Schema>, TopK)> { | ||
| let schema = Arc::new(Schema::new(vec![ | ||
| Field::new("a", DataType::Int32, false), | ||
| Field::new("b", DataType::Float64, false), | ||
| ])); | ||
|
|
||
| // Create sort expressions. | ||
| // Full sort: first by "a", then by "b". | ||
| let sort_expr_a = PhysicalSortExpr { | ||
| expr: col("a", schema.as_ref())?, | ||
| options: SortOptions::default(), | ||
|
|
@@ -1122,28 +1119,33 @@ mod tests { | |
| options: SortOptions::default(), | ||
| }; | ||
|
|
||
| // Input ordering uses only column "a" (a prefix of the full sort). | ||
| // Input ordering uses only column "a" (a prefix of the full sort on (a, b)). | ||
| let prefix = vec![sort_expr_a.clone()]; | ||
| let full_expr = LexOrdering::from([sort_expr_a, sort_expr_b]); | ||
|
|
||
| // Create a dummy runtime environment and metrics. | ||
| let runtime = Arc::new(RuntimeEnv::default()); | ||
| let metrics = ExecutionPlanMetricsSet::new(); | ||
|
|
||
| // Create a TopK instance with k = 3 and batch_size = 2. | ||
| let mut topk = TopK::try_new( | ||
| let topk = TopK::try_new( | ||
| 0, | ||
| Arc::clone(&schema), | ||
| prefix, | ||
| full_expr, | ||
| 3, | ||
| 2, | ||
| runtime, | ||
| &metrics, | ||
| Arc::new(RuntimeEnv::default()), | ||
| &ExecutionPlanMetricsSet::new(), | ||
| Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( | ||
| DynamicFilterPhysicalExpr::new(vec![], lit(true)), | ||
| )))), | ||
| )?; | ||
| Ok((schema, topk)) | ||
| } | ||
|
|
||
| /// This test validates that the `try_finish` method marks the TopK operator as finished | ||
| /// when the prefix (on column "a") of the last row in the current batch is strictly greater | ||
| /// than the max top‑k row. | ||
| /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". | ||
| #[tokio::test] | ||
| async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { | ||
| let (schema, mut topk) = build_ab_prefix_topk()?; | ||
|
|
||
| // Create the first batch with two columns: | ||
| // Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0]. | ||
|
|
@@ -1196,6 +1198,67 @@ mod tests { | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Regression test for #22849: a batch whose rows are entirely rejected by the | ||
| /// heap's dynamic filter must still trigger `attempt_early_completion` when its | ||
| /// last row's prefix is worse than the heap's worst. | ||
| /// | ||
| /// Before the fix, the `!filter.has_true()` short-circuit returned without calling | ||
| /// `attempt_early_completion`. Because the heap's filter is itself derived from the | ||
| /// heap's worst row, a batch from a strictly-worse prefix is exactly the case the | ||
| /// filter rejects entirely — i.e. the very signal the early-exit was designed to | ||
| /// detect was being silently dropped. | ||
| #[tokio::test] | ||
| async fn test_try_finish_fires_when_filter_rejects_entire_batch() -> Result<()> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice regression coverage. One small maintainability thought: this repeats a lot of the setup from
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes good idea, I just made the change |
||
| let (schema, mut topk) = build_ab_prefix_topk()?; | ||
|
|
||
| // Batch 1 fills the heap with (1, 20.0), (1, 15.0), (2, 30.0). | ||
| // heap.max becomes (a=2, b=30.0); update_filter tightens the heap filter to | ||
| // a < 2 OR (a = 2 AND b < 30.0). | ||
| let array_a1: ArrayRef = | ||
| Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)])); | ||
| let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0])); | ||
| let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?; | ||
| topk.insert_batch(batch1)?; | ||
| assert!( | ||
| !topk.finished, | ||
| "Expected 'finished' to be false after batch 1 \ | ||
| (last row prefix a=2 equals heap.max prefix a=2, not strictly greater)." | ||
| ); | ||
|
|
||
| // Batch 2: every row has a=3, so the heap's filter (a < 2 OR (a = 2 AND b < 30)) | ||
| // rejects every row. Before the fix, `insert_batch` would short-circuit on | ||
| // `!filter.has_true()` and return without checking the prefix; `finished` | ||
| // would stay false even though no future batch could improve the heap. | ||
| let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(3)])); | ||
| let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0])); | ||
| let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a2, array_b2])?; | ||
| topk.insert_batch(batch2)?; | ||
| assert!( | ||
| topk.finished, | ||
| "Expected 'finished' to be true after batch 2 \ | ||
| (filter rejected every row, but the batch's last row prefix a=3 \ | ||
| is strictly greater than heap.max prefix a=2)." | ||
| ); | ||
|
|
||
| // The emitted top-k is unchanged from after batch 1 since none of batch 2's | ||
| // rows could improve the heap. | ||
| let results: Vec<_> = topk.emit()?.try_collect().await?; | ||
| assert_batches_eq!( | ||
| &[ | ||
| "+---+------+", | ||
| "| a | b |", | ||
| "+---+------+", | ||
| "| 1 | 15.0 |", | ||
| "| 1 | 20.0 |", | ||
| "| 2 | 30.0 |", | ||
| "+---+------+", | ||
| ], | ||
| &results | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// This test verifies that the dynamic filter is marked as complete after TopK processing finishes. | ||
| #[tokio::test] | ||
| async fn test_topk_marks_filter_complete() -> Result<()> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is accurate, but it feels a bit longer than the control flow it explains. I think we could keep just the key invariant here: the heap is unchanged, but the rejected batch can still prove prefix completion.
For example:
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I updated it, thanks