Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datafusion-examples/examples/udf/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&arrow::array::BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 2, "two arguments to merge_batch");
Expand All @@ -280,7 +279,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
self.null_state.accumulate(
group_indices,
partial_counts,
opt_filter,
None,
total_num_groups,
|group_index, partial_count| {
self.counts[group_index] += partial_count;
Expand All @@ -292,7 +291,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
self.null_state.accumulate(
group_indices,
partial_prods,
opt_filter,
None,
total_num_groups,
|group_index, new_value: <Float64Type as ArrowPrimitiveType>::Native| {
let prod = &mut self.prods[group_index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,6 @@ impl GroupsAccumulator for TestGroupsAccumulator {
&mut self,
_values: &[ArrayRef],
_group_indices: &[usize],
_opt_filter: Option<&arrow::array::BooleanArray>,
_total_num_groups: usize,
) -> Result<()> {
Ok(())
Expand Down
6 changes: 4 additions & 2 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,14 @@ pub trait GroupsAccumulator: Send + std::any::Any {
///
/// * `values`: arrays produced from previously calling `state` on other accumulators.
///
/// Other arguments are the same as for [`Self::update_batch`].
/// Other arguments are the same as for [`Self::update_batch`], except that
/// there is no `opt_filter` — aggregate filters are applied during the
/// partial (update) phase, so by the time intermediate states are merged
/// no per-row filtering is needed.
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>;

Expand Down
23 changes: 2 additions & 21 deletions datafusion/ffi/src/udaf/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct FFI_GroupsAccumulator {
accumulator: &mut Self,
values: SVec<WrappedArray>,
group_indices: SVec<usize>,
opt_filter: FFI_Option<WrappedArray>,
total_num_groups: usize,
) -> FFI_Result<()>,

Expand Down Expand Up @@ -195,21 +194,14 @@ unsafe extern "C" fn merge_batch_fn_wrapper(
accumulator: &mut FFI_GroupsAccumulator,
values: SVec<WrappedArray>,
group_indices: SVec<usize>,
opt_filter: FFI_Option<WrappedArray>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is supposed to be a stable ABI (so different versions of DataFusion can use the same table provider). However, I am not sure if this actually works in practice. Thus I think maybe we should keep this FFI interface the same (but ignore the parameter) ?

@timsaucer can you comment about if/when it is ok to change the FFI APIs?

I didn't see anything in the FFI readme that addressed if/when API changes are allowed to the FFI interfaces.

This crate contains code to allow interoperability of [Apache DataFusion] with

Says

This crate contains code to allow interoperability of [Apache DataFusion] with
functions from other libraries and/or DataFusion versions using a stable
interface.

But that doesn't specify if it is supposed to remain stable across major versions too 🤔

total_num_groups: usize,
) -> FFI_Result<()> {
unsafe {
let accumulator = accumulator.inner_mut();
let values = sresult_return!(process_values(values));
let group_indices: Vec<usize> = group_indices.into_iter().collect();
let opt_filter = sresult_return!(process_opt_filter(opt_filter));

sresult!(accumulator.merge_batch(
&values,
&group_indices,
opt_filter.as_ref(),
total_num_groups
))
sresult!(accumulator.merge_batch(&values, &group_indices, total_num_groups))
}
}

Expand Down Expand Up @@ -379,7 +371,6 @@ impl GroupsAccumulator for ForeignGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
unsafe {
Expand All @@ -388,20 +379,11 @@ impl GroupsAccumulator for ForeignGroupsAccumulator {
.map(WrappedArray::try_from)
.collect::<std::result::Result<Vec<_>, ArrowError>>()?;
let group_indices = group_indices.iter().cloned().collect();
let opt_filter = opt_filter
.map(|bool_array| to_ffi(&bool_array.to_data()))
.transpose()?
.map(|(array, schema)| WrappedArray {
array,
schema: WrappedSchema(schema),
})
.into();

df_result!((self.accumulator.merge_batch)(
&mut self.accumulator,
values.into_iter().collect(),
group_indices,
opt_filter,
total_num_groups
))
}
Expand Down Expand Up @@ -517,8 +499,7 @@ mod tests {
let second_states =
vec![make_array(create_array!(Boolean, vec![false]).to_data())];

let opt_filter = create_array!(Boolean, vec![true]);
foreign_accum.merge_batch(&second_states, &[0], Some(opt_filter.as_ref()), 1)?;
foreign_accum.merge_batch(&second_states, &[0], 1)?;
let groups_bool = foreign_accum.evaluate(EmitTo::All)?;
assert_eq!(groups_bool.len(), 1);
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ where
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> datafusion_common::Result<()> {
debug_assert_eq!(values.len(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,12 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
None,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.merge_batch(values_to_accumulate)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@ where
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// update / merge are the same
self.update_batch(values, group_indices, opt_filter, total_num_groups)
self.update_batch(values, group_indices, None, total_num_groups)
}

fn size(&self) -> usize {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,10 @@ where
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// update / merge are the same
self.update_batch(values, group_indices, opt_filter, total_num_groups)
self.update_batch(values, group_indices, None, total_num_groups)
}

/// Converts an input batch directly to a state batch
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions-aggregate/benches/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ fn merge_bench(
Arc::clone(&is_set),
],
&group_indices,
opt_filter,
num_groups,
)
.unwrap(),
Expand Down
7 changes: 0 additions & 7 deletions datafusion/functions-aggregate/src/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,15 +705,8 @@ impl<H: HllValueHasher> GroupsAccumulator for HllGroupsAccumulator<H> {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
// Since aggregate filter should be applied in partial stage, in final stage there should be no filter
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert!(
opt_filter.is_none(),
"aggregate filter should be applied in partial stage, there should be no filter in final stage"
);

self.ensure_groups(total_num_groups);
let states = downcast_value!(values[0], BinaryArray);
let mut delta: isize = 0;
Expand Down
7 changes: 3 additions & 4 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
Expand Down Expand Up @@ -2019,7 +2018,7 @@ mod tests {

// Merge acc2's state into acc1
let state = acc2.state(EmitTo::All)?;
acc1.merge_batch(&state, &[0, 1], None, 2)?;
acc1.merge_batch(&state, &[0, 1], 2)?;

// Another update_batch on acc1 after the merge
let values: ArrayRef = Arc::new(Int32Array::from(vec![5, 6]));
Expand Down Expand Up @@ -2088,7 +2087,7 @@ mod tests {

// Feed state into a new accumulator via merge_batch
let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
acc2.merge_batch(&state, &[0, 0, 1], None, 2)?;
acc2.merge_batch(&state, &[0, 0, 1], 2)?;

// Group 0 received rows 0 ([1]) and 1 ([NULL]) → [1, NULL]
let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
Expand Down Expand Up @@ -2118,7 +2117,7 @@ mod tests {

// Feed state into a new accumulator via merge_batch
let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?;
acc2.merge_batch(&state, &[0, 0, 1, 1], 2)?;

// Group 0: received [1] and null (skipped) → [1]
let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
Expand Down
5 changes: 2 additions & 3 deletions datafusion/functions-aggregate/src/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,6 @@ where
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 2, "two arguments to merge_batch");
Expand All @@ -920,7 +919,7 @@ where
self.null_state.accumulate(
group_indices,
partial_counts,
opt_filter,
None,
total_num_groups,
|group_index, partial_count| {
// SAFETY: group_index is guaranteed to be in bounds
Expand All @@ -934,7 +933,7 @@ where
self.null_state.accumulate(
group_indices,
partial_sums,
opt_filter,
None,
total_num_groups,
|group_index, new_value: <T as ArrowPrimitiveType>::Native| {
// SAFETY: group_index is guaranteed to be in bounds
Expand Down
6 changes: 0 additions & 6 deletions datafusion/functions-aggregate/src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,6 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// Resize vectors to accommodate total number of groups
Expand All @@ -508,11 +507,6 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
let partial_sum_xx = values[4].as_primitive::<Float64Type>();
let partial_sum_yy = values[5].as_primitive::<Float64Type>();

assert!(
opt_filter.is_none(),
"aggregate filter should be applied in partial stage, there should be no filter in final stage"
);

accumulate_correlation_states(
group_indices,
(
Expand Down
2 changes: 0 additions & 2 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,6 @@ impl GroupsAccumulator for CountGroupsAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
// Since aggregate filter should be applied in partial stage, in final stage there should be no filter
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
Expand Down
28 changes: 12 additions & 16 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,6 @@ impl<S: ValueState + 'static> GroupsAccumulator for FirstLastGroupsAccumulator<S
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.resize_states(total_num_groups);
Expand All @@ -690,7 +689,7 @@ impl<S: ValueState + 'static> GroupsAccumulator for FirstLastGroupsAccumulator<S
let groups = self.get_filtered_extreme_of_each_group(
&val_and_order_cols[1..],
group_indices,
opt_filter,
None,
vals,
Some(is_set_arr),
)?;
Expand Down Expand Up @@ -1587,12 +1586,7 @@ mod tests {
group_acc.compute_size_of_orderings()
);

group_acc.merge_batch(
&state,
&[0, 1, 2],
Some(&BooleanArray::from(vec![true, false, false])),
3,
)?;
group_acc.merge_batch(&state, &[0, 1, 2], 3)?;

assert_eq!(
group_acc.size_of_orderings,
Expand All @@ -1608,8 +1602,11 @@ mod tests {
let binding = group_acc.evaluate(EmitTo::All)?;
let eval_result = binding.as_any().downcast_ref::<Int64Array>().unwrap();

// group 0 keeps merged value=1 (ordering=1).
// group 1 keeps merged value=-6 (ordering=-6 < 6, so -6 is "first").
// group 2 had no merged value (is_set=false), so update_batch value=6 wins.
let expect: PrimitiveArray<Int64Type> =
Int64Array::from(vec![Some(1), Some(6), Some(6), None]);
Int64Array::from(vec![Some(1), Some(-6), Some(6), None]);

assert_eq!(eval_result, &expect);

Expand Down Expand Up @@ -1680,7 +1677,7 @@ mod tests {
group_acc.compute_size_of_orderings()
);

group_acc.merge_batch(&s, &Vec::from_iter(0..s[0].len()), None, 100)?;
group_acc.merge_batch(&s, &Vec::from_iter(0..s[0].len()), 100)?;
assert_eq!(
group_acc.size_of_orderings,
group_acc.compute_size_of_orderings()
Expand Down Expand Up @@ -1753,12 +1750,7 @@ mod tests {
];
assert_eq!(state, expected_state);

group_acc.merge_batch(
&state,
&[0, 1, 2],
Some(&BooleanArray::from(vec![true, false, false])),
3,
)?;
group_acc.merge_batch(&state, &[0, 1, 2], 3)?;

val_with_orderings.clear();
val_with_orderings.push(Arc::new(Int64Array::from(vec![66, 6])));
Expand All @@ -1769,6 +1761,10 @@ mod tests {
let binding = group_acc.evaluate(EmitTo::All)?;
let eval_result = binding.as_any().downcast_ref::<Int64Array>().unwrap();

// group 0: merged value=1 (ordering=1, is_set=true), update not called.
// group 1: merged value=-6 (ordering=-6, is_set=true); update ordering=66 > -6
// → LAST_VALUE keeps the higher ordering, so group 1 becomes 66.
// group 2: is_set=false after merge; update_batch sets it to 6.
let expect: PrimitiveArray<Int64Type> =
Int64Array::from(vec![Some(1), Some(66), Some(6), None]);

Expand Down
2 changes: 0 additions & 2 deletions datafusion/functions-aggregate/src/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,6 @@ impl<T: ArrowNumericType + Send> GroupsAccumulator for MedianGroupsAccumulator<T
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
// Since aggregate filter should be applied in partial stage, in final stage there should be no filter
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
Expand Down
3 changes: 1 addition & 2 deletions datafusion/functions-aggregate/src/min_max/min_max_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,10 @@ impl GroupsAccumulator for MinMaxBytesAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// min/max are their own states (no transition needed)
self.update_batch(values, group_indices, opt_filter, total_num_groups)
self.update_batch(values, group_indices, None, total_num_groups)
}

fn convert_to_state(
Expand Down
3 changes: 1 addition & 2 deletions datafusion/functions-aggregate/src/min_max/min_max_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,10 @@ impl GroupsAccumulator for MinMaxStructAccumulator {
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
// min/max are their own states (no transition needed)
self.update_batch(values, group_indices, opt_filter, total_num_groups)
self.update_batch(values, group_indices, None, total_num_groups)
}

fn convert_to_state(
Expand Down
Loading
Loading