Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions datafusion/expr/src/higher_order_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,14 +718,13 @@ pub trait HigherOrderUDFImpl: Debug + DynEq + DynHash + Send + Sync + Any {
args: HigherOrderReturnFieldArgs,
) -> Result<FieldRef>;

/// Whether List or LargeList arguments should have it's non-empty null
/// sublists cleaned with [remove_list_null_values] before invoking this function
/// Whether List or LargeList or Map arguments should have it's non-empty null
/// sublists/submaps cleaned before invoking this function
///
/// The default implementation always returns true and should only be implemented
/// if you want to handle non-empty null sublists yourself
/// if you want to handle non-empty null sublists/maps yourself
///
/// [remove_list_null_values]: datafusion_common::utils::remove_list_null_values
// todo: extend this to listview and maps when remove_list_null_values supports it
// todo: extend this to listview when remove_list_null_values supports it
fn clear_null_values(&self) -> bool {
true
}
Expand Down Expand Up @@ -963,7 +962,7 @@ impl HigherOrderUDF {
self.inner.return_field_from_args(args)
}

/// Whether List or LargeList arguments should have non-empty null sublists
/// Whether List or LargeList or Map arguments should have non-empty null sublists/maps
/// cleaned before invoking this function.
pub fn clear_null_values(&self) -> bool {
self.inner.clear_null_values()
Expand Down
158 changes: 150 additions & 8 deletions datafusion/physical-expr/src/higher_order_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use std::sync::Arc;
use crate::PhysicalExpr;
use crate::expressions::{LambdaExpr, Literal};

use arrow::array::{Array, RecordBatch};
use arrow::array::{make_array, Array, ArrayRef, AsArray, BooleanArray, Datum, GenericListArray, Int32Array, Int64Array, MapArray, MutableArrayData, RecordBatch};
use arrow::buffer::OffsetBuffer;
use arrow::compute::kernels::cmp::eq;
use arrow::datatypes::{DataType, FieldRef, Schema};
use datafusion_common::config::{ConfigEntry, ConfigOptions};
use datafusion_common::datatype::FieldExt;
Expand All @@ -45,6 +47,7 @@ use datafusion_common::{
Result, ScalarValue, exec_err, internal_datafusion_err, internal_err,
plan_datafusion_err, plan_err,
};
use datafusion_common::error::_exec_err;
use datafusion_expr::type_coercion::functions::value_fields_with_higher_order_udf;
use datafusion_expr::{
ColumnarValue, HigherOrderFunctionArgs, HigherOrderReturnFieldArgs, HigherOrderUDF,
Expand Down Expand Up @@ -359,16 +362,13 @@ impl PhysicalExpr for HigherOrderFunctionExpr {
let value = arg.evaluate(batch)?;

let value = if self.fun.clear_null_values()
&& matches!(
value.data_type(),
DataType::List(_) | DataType::LargeList(_)
)
&& support_clear_non_empty_null_values(&value.data_type())
{
let arr = value.into_array(batch.num_rows())?;
if arr.null_count() == 0 {
ColumnarValue::Array(arr)
} else {
ColumnarValue::Array(remove_list_null_values(&arr)?)
ColumnarValue::Array(remove_non_empty_null_values(&arr)?)
}
} else {
value
Expand Down Expand Up @@ -503,8 +503,87 @@ fn wrapped_lambda(expr: &Arc<dyn PhysicalExpr>) -> Option<&LambdaExpr> {
}
}


fn support_clear_non_empty_null_values(data_type: &DataType) -> bool {
match data_type {
DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _) => true,
_ => false,
}
}

/// For lists and large lists, truncates the sublist of null values
/// For maps, truncates the submaps of null values
/// Otherwise returns an error
fn remove_non_empty_null_values(array: &ArrayRef) -> Result<ArrayRef> {
// todo: handle list view
let array = match array.data_type() {
DataType::List(_) | DataType::LargeList(_) => remove_list_null_values(array)? as ArrayRef,
DataType::Map(_, _) => Arc::new(remove_map_non_empty_null_values(array.as_map())?) as ArrayRef,
dt => {
assert!(!support_clear_non_empty_null_values(dt), "data type marked as supported but not implemented");
return _exec_err!("expected List/LargeList/Map, got {dt}")
},
};

assert!(support_clear_non_empty_null_values(array.data_type()), "data type marked as not supported but implemented");

Ok(array)
}

fn remove_map_non_empty_null_values(map: &MapArray) -> Result<MapArray> {
if let Some(nulls) = map.nulls()
&& nulls.null_count() > 0
{
let lengths = Int32Array::from_iter(map.offsets().lengths().zip(nulls.iter()).map(|(len, is_valid)| if is_valid { Some(len as i32)} else {None}));
let zero: &dyn Datum = &Int32Array::new_scalar(0);

let (mut valid_or_empty, _nulls) = eq(&lengths, zero)?.into_parts();
valid_or_empty |= nulls.inner();
let valid_or_empty = BooleanArray::from(valid_or_empty);

if valid_or_empty.has_false() {
let array_data = map.entries().to_data();
let offsets = map.offsets();
let capacity = offsets[offsets.len() - 1] - offsets[0];
let mut mutable_array_data =
MutableArrayData::new(vec![&array_data], false, capacity as usize);

let (valid_or_empty, _nulls) = valid_or_empty.into_parts();

for (start, end) in valid_or_empty.set_slices() {
mutable_array_data.extend(
0,
offsets[start] as usize,
offsets[end] as usize
);
}

let lengths = std::iter::zip(offsets.lengths(), nulls)
.map(|(length, is_valid)| if is_valid { length } else { 0 });

let offsets = OffsetBuffer::from_lengths(lengths);
let entries = make_array(mutable_array_data.freeze());
let entries = entries.as_struct().clone();

let DataType::Map(field, ordered) = map.data_type() else {
unreachable!("map array data type is not Map");
};

return Ok(MapArray::try_new(
Arc::clone(field),
offsets,
entries,
map.nulls().cloned(),
*ordered
)?);
}
}
Ok(map.clone())
}
Comment on lines +507 to +582

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to remove this once my arrow pr that does that is merged and released:

so I did not make it public or modify the old function to make sure to not increase the breaking change scope when removing this function and the remove_list_null_values


#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use super::*;
Expand All @@ -513,11 +592,13 @@ mod tests {
use crate::expressions::NoOp;
use crate::expressions::lambda;
use crate::expressions::not;
use arrow::array::NullArray;
use arrow::array::{ListArray, NullArray, StructArray};
use arrow::array::RecordBatchOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field, Fields, Int32Type, Schema};
use datafusion_common::Result;
use datafusion_common::assert_contains;
use datafusion_common::utils::adjust_offsets_for_slice;
use datafusion_expr::{
HigherOrderFunctionArgs, HigherOrderSignature, HigherOrderUDF, HigherOrderUDFImpl,
};
Expand Down Expand Up @@ -715,4 +796,65 @@ mod tests {
"mock_function received a lambda via with_new_children at position 0 that wasn't a lambda before"
);
}

fn create_i32_to_i32_map(
keys: impl Into<Int32Array>,
values: impl Into<Int32Array>,
offsets: OffsetBuffer<i32>,
nulls: Option<NullBuffer>,
) -> MapArray {
let keys = Arc::new(keys.into()) as ArrayRef;
let values = Arc::new(values.into()) as ArrayRef;

let entries = StructArray::new(
Fields::from(vec![
Field::new("keys", keys.data_type().clone(), false),
Field::new("values", values.data_type().clone(), values.is_nullable()),
]),
vec![keys, values],
None,
);

let map_array = MapArray::new(
Arc::new(Field::new("entries", entries.data_type().clone(), false)),
offsets,
entries,
nulls,
false,
);

// check that the map is valid since uniqueness is not being checked in MapArray::new
map_array.iter().flatten().for_each(|map| {
let keys_subset = map.column(0).as_primitive::<Int32Type>();
let number_of_uniques = keys_subset.iter().collect::<HashSet<Option<i32>>>();
assert_eq!(number_of_uniques.len(), keys_subset.len(), "keys should be unique within a map");
});

map_array
}

#[test]
fn test_remove_map_null_values_map() {
let map = Arc::new(create_i32_to_i32_map(
vec![100, 20, 10, 0, 0, 0, -9, 1, 50],
vec![101, 21, 11, 0, 1, 2, 3, 4, 5],
OffsetBuffer::<i32>::from_lengths(vec![3, 4, 0, 2, 0]),
Some(NullBuffer::from(vec![true, false, false, true, false])),
)) as ArrayRef;

let res = remove_non_empty_null_values(&map).unwrap();
let res = res.as_map();

let expected = create_i32_to_i32_map(
vec![100, 20, 10, 1, 50],
vec![101, 21, 11, 4, 5],
OffsetBuffer::<i32>::from_lengths(vec![3, 0, 0, 2, 0]),
Some(NullBuffer::from(vec![true, false, false, true, false])),
);

assert_eq!(res, &expected);
// check above skips inner value of nulls
assert_eq!(res.values(), expected.values());
assert_eq!(res.offsets(), expected.offsets());
}
}
Loading