Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions datafusion/physical-plan/benches/hash_join_semi_anti.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Criterion benchmarks for Hash Join with RightSemi/RightAnti joins with Int32 keys.
//! Criterion benchmarks for Hash Join with RightSemi/RightAnti/RightMark joins with Int32 keys.
//!
//! ## Key Benchmark Axes
//!
Expand All @@ -37,7 +37,7 @@
//! - **Hit Rate**: The percentage of probe rows that find a match in the build side.
//! This controls how often the join produces output rows.
//!
//! Semi/anti joins can short-circuit after finding the first match, so these
//! Semi/anti/mark joins can short-circuit after finding the first match, so these
//! benchmarks help evaluate optimization strategies for existence checks.

use std::sync::Arc;
Expand Down Expand Up @@ -292,6 +292,30 @@ fn bench_hash_join_semi_anti(c: &mut Criterion) {
);
}

// =========================================================================
// RightMark Join benchmarks
// =========================================================================

// RightMark - 100% Density, ~1% hit rate, fanout ~100
// Build keys are duplicated: 100K rows over 1K distinct keys. Matching
// probe rows only need a boolean mark, so duplicate build matches can be
// skipped.
{
let fanout_keys = 1_000;
let left_batches = build_batches(build_rows, fanout_keys, 0, &s);
let right_batches = build_batches(probe_rows, build_rows, 0, &s);
group.bench_function(
BenchmarkId::new("right_mark_fanout100_h1", probe_rows),
|b| {
b.iter(|| {
let left = make_exec(&left_batches, &s);
let right = make_exec(&right_batches, &s);
do_hash_join(left, right, JoinType::RightMark, &rt)
})
},
);
}

// =========================================================================
// RightAnti Join benchmarks
// =========================================================================
Expand Down
118 changes: 118 additions & 0 deletions datafusion/physical-plan/src/joins/array_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,35 @@ impl ArrayMap {
)
}

pub fn get_matching_indices_with_limit_offset(
&self,
prob_side_keys: &[ArrayRef],
limit: usize,
current_offset: MapOffset,
probe_indices: &mut Vec<u32>,
build_indices: &mut Vec<u64>,
) -> Result<Option<MapOffset>> {
if prob_side_keys.len() != 1 {
return internal_err!(
"ArrayMap expects 1 join key, but got {}",
prob_side_keys.len()
);
}
let array = &prob_side_keys[0];

downcast_supported_integer!(
array.data_type() => (
lookup_and_get_matching_indices,
self,
array,
limit,
current_offset,
probe_indices,
build_indices
)
)
}

fn lookup_and_get_indices<T: ArrowNumericType>(
&self,
array: &ArrayRef,
Expand Down Expand Up @@ -370,6 +399,61 @@ impl ArrayMap {
}
}

fn lookup_and_get_matching_indices<T: ArrowNumericType>(
&self,
array: &ArrayRef,
limit: usize,
current_offset: MapOffset,
probe_indices: &mut Vec<u32>,
build_indices: &mut Vec<u64>,
) -> Result<Option<MapOffset>>
where
T::Native: Copy + AsPrimitive<u64>,
{
probe_indices.clear();
build_indices.clear();

debug_assert!(
current_offset.1.is_none(),
"existence lookup never resumes within a single probe row"
);

let arr = array.as_primitive::<T>();
let have_null = arr.null_count() > 0;

for prob_idx in current_offset.0..arr.len() {
if have_null && arr.is_null(prob_idx) {
continue;
}

// SAFETY: prob_idx is guaranteed to be within bounds by the loop range.
let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_();
let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize;

if idx_in_build_side >= self.data.len() {
continue;
}

let build_idx = self.data[idx_in_build_side];
if build_idx == 0 {
continue;
}

probe_indices.push(prob_idx as u32);
build_indices.push((build_idx - 1) as u64);

if probe_indices.len() == limit {
return if prob_idx + 1 == arr.len() {
Ok(None)
} else {
Ok(Some((prob_idx + 1, None)))
};
}
}

Ok(None)
}

pub fn contain_keys(&self, probe_side_keys: &[ArrayRef]) -> Result<BooleanArray> {
if probe_side_keys.len() != 1 {
return internal_err!(
Expand Down Expand Up @@ -506,6 +590,40 @@ mod tests {
Ok(())
}

#[test]
fn test_array_map_matching_indices_emit_once_per_probe_key() -> Result<()> {
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2]));
let map = ArrayMap::try_new(&build, 1, 2)?;
let probe = [Arc::new(Int32Array::from(vec![1, 2, 3, 1])) as ArrayRef];

let mut prob_idx = Vec::new();
let mut build_idx = Vec::new();
let next = map.get_matching_indices_with_limit_offset(
&probe,
2,
(0, None),
&mut prob_idx,
&mut build_idx,
)?;

assert_eq!(prob_idx, vec![0, 1]);
assert_eq!(build_idx, vec![0, 2]);
assert_eq!(next, Some((2, None)));

let next = map.get_matching_indices_with_limit_offset(
&probe,
2,
next.unwrap(),
&mut prob_idx,
&mut build_idx,
)?;

assert_eq!(prob_idx, vec![3]);
assert_eq!(build_idx, vec![0]);
assert_eq!(next, None);
Ok(())
}

#[test]
fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> {
// Build array with a mix of negative and positive i64 values, no duplicates
Expand Down
58 changes: 58 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,16 @@ fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize {
count
}

fn can_use_probe_existence_fast_path(
join_type: JoinType,
filter: &Option<JoinFilter>,
) -> bool {
matches!(
join_type,
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark
) && filter.is_none()
}

impl HashJoinStream {
#[expect(clippy::too_many_arguments)]
pub(super) fn new(
Expand Down Expand Up @@ -781,9 +791,37 @@ impl HashJoinStream {
return Ok(StatefulStreamResult::Continue);
}

let use_probe_existence_fast_path =
can_use_probe_existence_fast_path(self.join_type, &self.filter);

// get the matched by join keys indices
let (left_indices, right_indices, next_offset) = match build_side.left_data.map()
{
Map::HashMap(map)
if use_probe_existence_fast_path && map.may_contain_hash_chains() =>
{
let next_offset = map.get_matching_indices_with_limit_offset(
&self.hashes_buffer,
build_side.left_data.values(),
&state.values,
self.null_equality,
self.batch_size,
state.offset,
&mut self.probe_indices_buffer,
&mut self.build_indices_buffer,
)?;
(
UInt64Array::from(std::mem::replace(
&mut self.build_indices_buffer,
Vec::with_capacity(self.batch_size),
)),
UInt32Array::from(std::mem::replace(
&mut self.probe_indices_buffer,
Vec::with_capacity(self.batch_size),
)),
next_offset,
)
}
Map::HashMap(map) => lookup_join_hashmap(
map.as_ref(),
build_side.left_data.values(),
Expand All @@ -795,6 +833,26 @@ impl HashJoinStream {
&mut self.probe_indices_buffer,
&mut self.build_indices_buffer,
)?,
Map::ArrayMap(array_map) if use_probe_existence_fast_path => {
let next_offset = array_map.get_matching_indices_with_limit_offset(
&state.values,
self.batch_size,
state.offset,
&mut self.probe_indices_buffer,
&mut self.build_indices_buffer,
)?;
(
UInt64Array::from(std::mem::replace(
&mut self.build_indices_buffer,
Vec::with_capacity(self.batch_size),
)),
UInt32Array::from(std::mem::replace(
&mut self.probe_indices_buffer,
Vec::with_capacity(self.batch_size),
)),
next_offset,
)
}
Map::ArrayMap(array_map) => {
let next_offset = array_map.get_matched_indices_with_limit_offset(
&state.values,
Expand Down
Loading
Loading