From 90b9484f5544dacbb5c32b501d546656ee136eed Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 May 2026 13:27:00 -0400 Subject: [PATCH 1/7] Bespoke columnar MergeBatcher --- .../src/columnar/arrangement/mod.rs | 5 +- .../src/columnar/arrangement/trie_merger.rs | 572 +++++++++--------- differential-dataflow/src/columnar/batcher.rs | 120 ++++ differential-dataflow/src/columnar/mod.rs | 1 + 4 files changed, 394 insertions(+), 304 deletions(-) create mode 100644 differential-dataflow/src/columnar/batcher.rs diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index be980fcdd..801ecbe0a 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -4,7 +4,6 @@ //! into DD's trace machinery. //! - `Coltainer` wraps a columnar `C::Container` as a DD `BatchContainer`. //! - `TrieChunker` strips `RecordedUpdates` down to `UpdatesTyped` for the merge batcher. -//! - `batcher` contains required trait stubs for `UpdatesTyped`. //! - `trie_merger` is the batch-at-a-time merging logic. //! - `builder::ValMirror` is the `trace::Builder` that seals melded chunks into //! an `OrdValBatch`. @@ -21,7 +20,7 @@ pub mod trie_merger; /// A trace implementation backed by columnar storage. pub type ValSpine = Spine>>>; /// A batcher for columnar storage. -pub type ValBatcher = ValBatcher2<(K,V,T,R)>; +pub type ValBatcher = super::batcher::MergeBatcher<(K,V,T,R)>; /// A builder for columnar storage. pub type ValBuilder = RcBuilder>; @@ -124,8 +123,6 @@ pub mod batch_container { use super::updates::UpdatesTyped; use super::RecordedUpdates; -use crate::trace::implementations::merge_batcher::MergeBatcher; -type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; /// A chunker that unwraps `RecordedUpdates` into bare `UpdatesTyped` for the merge batcher. /// diff --git a/differential-dataflow/src/columnar/arrangement/trie_merger.rs b/differential-dataflow/src/columnar/arrangement/trie_merger.rs index fb88140ca..20d68cef9 100644 --- a/differential-dataflow/src/columnar/arrangement/trie_merger.rs +++ b/differential-dataflow/src/columnar/arrangement/trie_merger.rs @@ -1,6 +1,6 @@ //! Batch-at-a-time merging of sorted, consolidated `UpdatesTyped` chains. //! -//! The core is `TrieMerger::merge_batches`, which walks pairs of chunks via +//! The core is `merge_batches`, which walks pairs of chunks via //! `merge_batch`, building a chain of merged outputs with `ChainBuilder`. //! `survey` maps the interleaving of the two inputs at each trie layer, //! `write_from_surveys` (via `write_layer` and `write_diffs`) copies the @@ -8,20 +8,10 @@ use columnar::{Columnar, Len}; use timely::progress::frontier::{Antichain, AntichainRef}; -use crate::trace::implementations::merge_batcher::Merger; use super::super::layout::ColumnarUpdate as Update; use super::super::updates::UpdatesTyped; -/// Merge-batcher merger that melds sorted, consolidated `UpdatesTyped` tries. -pub struct TrieMerger { - _marker: std::marker::PhantomData, -} - -impl Default for TrieMerger { - fn default() -> Self { Self { _marker: std::marker::PhantomData } } -} - /// A merging iterator over two sorted iterators. struct Merging { iter1: std::iter::Peekable, @@ -69,318 +59,300 @@ fn form_chunks<'a, U: Update>( } } -impl Merger for TrieMerger +/// Partition `merged` into chunks ready to ship (times strictly less than `upper`) +/// and chunks kept for future seals (times at-or-after `upper`). Updates `frontier` +/// to the antichain of kept times. +pub fn extract( + mut merged: Vec>, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec>, + kept: &mut Vec>, +) where U::Time: 'static, { - type Chunk = UpdatesTyped; - type Time = U::Time; - - fn merge( - &mut self, - list1: Vec>, - list2: Vec>, - output: &mut Vec>, - _stash: &mut Vec>, - ) { - Self::merge_batches(list1, list2, output, _stash); - } - - fn extract( - &mut self, - mut merged: Vec, - upper: AntichainRef, - frontier: &mut Antichain, - ship: &mut Vec, - kept: &mut Vec, - _stash: &mut Vec, - ) { - use columnar::{Container, ContainerOf, Index, Push}; - use columnar::primitive::offsets::Strides; - use crate::columnar::updates::{Lists, retain_items}; - - // TODO: rework to move from trie structure to trie structure. - let mut time_owned = U::Time::default(); - let mut bitmap = Vec::new(); // update should be kept. - for chunk in merged.drain(..) { - bitmap.clear(); - let view = chunk.view(); - let times = view.times.values; - for idx in 0 .. times.len() { - Columnar::copy_from(&mut time_owned, times.get(idx)); - if upper.less_equal(&time_owned) { - frontier.insert_ref(&time_owned); - bitmap.push(true); - } - else { bitmap.push(false); } + use columnar::{Container, ContainerOf, Index, Push}; + use columnar::primitive::offsets::Strides; + use crate::columnar::updates::{Lists, retain_items}; + + // TODO: rework to move from trie structure to trie structure. + let mut time_owned = U::Time::default(); + let mut bitmap = Vec::new(); // update should be kept. + for chunk in merged.drain(..) { + bitmap.clear(); + let view = chunk.view(); + let times = view.times.values; + for idx in 0 .. times.len() { + Columnar::copy_from(&mut time_owned, times.get(idx)); + if upper.less_equal(&time_owned) { + frontier.insert_ref(&time_owned); + bitmap.push(true); } - if bitmap.iter().all(|x| *x) { kept.push(chunk); } - else if bitmap.iter().all(|x| !*x) { ship.push(chunk); } - else { - - let (times, temp) = retain_items::>(view.times, &bitmap[..]); - let (vals, temp) = retain_items::>(view.vals, &temp[..]); - let (keys, _temp) = retain_items::>(view.keys, &temp[..]); - let d_borrow = view.diffs; - let mut diffs = > as Container>::with_capacity_for([d_borrow].into_iter()); - for (index, bit) in bitmap.iter().enumerate() { - if *bit { diffs.values.push(d_borrow.values.get(index)); } - } - diffs.bounds = Strides::new(1, times.values.len() as u64); - kept.push(UpdatesTyped { - keys, - vals, - times, - diffs, - }); - - for bit in bitmap.iter_mut() { *bit = !*bit; } - - let (times, temp) = retain_items::>(view.times, &bitmap[..]); - let (vals, temp) = retain_items::>(view.vals, &temp[..]); - let (keys, _temp) = retain_items::>(view.keys, &temp[..]); - let d_borrow = view.diffs; - let mut diffs = > as Container>::with_capacity_for([d_borrow].into_iter()); - for (index, bit) in bitmap.iter().enumerate() { - if *bit { diffs.values.push(d_borrow.values.get(index)); } - } - diffs.bounds = Strides::new(1, times.values.len() as u64); - ship.push(UpdatesTyped { - keys, - vals, - times, - diffs, - }); + else { bitmap.push(false); } + } + if bitmap.iter().all(|x| *x) { kept.push(chunk); } + else if bitmap.iter().all(|x| !*x) { ship.push(chunk); } + else { + + let (times, temp) = retain_items::>(view.times, &bitmap[..]); + let (vals, temp) = retain_items::>(view.vals, &temp[..]); + let (keys, _temp) = retain_items::>(view.keys, &temp[..]); + let d_borrow = view.diffs; + let mut diffs = > as Container>::with_capacity_for([d_borrow].into_iter()); + for (index, bit) in bitmap.iter().enumerate() { + if *bit { diffs.values.push(d_borrow.values.get(index)); } } + diffs.bounds = Strides::new(1, times.values.len() as u64); + kept.push(UpdatesTyped { + keys, + vals, + times, + diffs, + }); + + for bit in bitmap.iter_mut() { *bit = !*bit; } + + let (times, temp) = retain_items::>(view.times, &bitmap[..]); + let (vals, temp) = retain_items::>(view.vals, &temp[..]); + let (keys, _temp) = retain_items::>(view.keys, &temp[..]); + let d_borrow = view.diffs; + let mut diffs = > as Container>::with_capacity_for([d_borrow].into_iter()); + for (index, bit) in bitmap.iter().enumerate() { + if *bit { diffs.values.push(d_borrow.values.get(index)); } + } + diffs.bounds = Strides::new(1, times.values.len() as u64); + ship.push(UpdatesTyped { + keys, + vals, + times, + diffs, + }); } } - - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { - use timely::Accountable; - (chunk.record_count() as usize, 0, 0, 0) - } } -impl TrieMerger +/// Iterator-based merge: flatten, merge, consolidate, form. +/// Correct but slow — used as fallback. +#[allow(dead_code)] +fn merge_iterator( + list1: &[UpdatesTyped], + list2: &[UpdatesTyped], + output: &mut Vec>, +) where U::Time: 'static, { - /// Iterator-based merge: flatten, merge, consolidate, form. - /// Correct but slow — used as fallback. - #[allow(dead_code)] - fn merge_iterator( - list1: &[UpdatesTyped], - list2: &[UpdatesTyped], - output: &mut Vec>, - ) { - let iter1 = list1.iter().flat_map(|chunk| chunk.iter()); - let iter2 = list2.iter().flat_map(|chunk| chunk.iter()); - - let merged = Merging { - iter1: iter1.peekable(), - iter2: iter2.peekable(), - }; - - form_chunks::(merged, output); - } + let iter1 = list1.iter().flat_map(|chunk| chunk.iter()); + let iter2 = list2.iter().flat_map(|chunk| chunk.iter()); - /// A merge implementation that operates batch-at-a-time. - #[inline(never)] - fn merge_batches( - list1: Vec>, - list2: Vec>, - output: &mut Vec>, - stash: &mut Vec>, - ) { - - // The design for efficient "batch" merginging of chains of links is: - // 0. We choose a target link size, K, and will keep the average link size at least K and the max size at 2k. - // K should be large enough to amortize some set-up, but not so large that one or two extra break the bank. - // 1. We will repeatedly consider pairs of links, and fully merge one with a prefix of the other. - // The last elements of each link will tell us which of the two suffixes must be held back. - // 2. We then have a chain of as many links as we started with, with potential defects to correct: - // a. A link may contain some number of zeros: we can remove them if we are eager, based on size. - // b. A link may contain more than 2K updates; we can split it. - // c. Two adjacent links may contain fewer than 2K updates; we can meld (careful append) them. - // 3. After a pass of the above, we should have restored the invariant. - // We can try and me smarter and fuse some of the above work rather than explicitly stage results. - // - // The challenging moment is the merge that can start with a suffix of one link, involving a prefix of one link. - // These could be the same link, different links, and generally there is the potential for complexity here. - - let mut builder = ChainBuilder::default(); - - let mut queue1: std::collections::VecDeque<_> = list1.into(); - let mut queue2: std::collections::VecDeque<_> = list2.into(); - - // The first unconsumed update in each block, via (k_idx, v_idx, t_idx), or None if exhausted. - // These are (0,0,0) for a new block, and should become None once there are no remaining updates. - let mut cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); - let mut cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); - - // For each pair of batches - while cursor1.is_some() && cursor2.is_some() { - Self::merge_batch(&mut cursor1, &mut cursor2, &mut builder, stash); - if cursor1.is_none() { cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); } - if cursor2.is_none() { cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); } - } + let merged = Merging { + iter1: iter1.peekable(), + iter2: iter2.peekable(), + }; - // TODO: create batch for the non-empty cursor. - if let Some(((k,v,t),batch)) = cursor1 { - let mut out_batch = stash.pop().unwrap_or_default(); - let empty: UpdatesTyped = Default::default(); - let view = batch.view(); - write_from_surveys( - &batch, - &empty, - &[Report::This(0, 1)], - &[Report::This(k, view.keys.values.len())], - &[Report::This(v, view.vals.values.len())], - &[Report::This(t, view.times.values.len())], - &mut out_batch, - ); - builder.push(out_batch); - } - if let Some(((k,v,t),batch)) = cursor2 { - let mut out_batch = stash.pop().unwrap_or_default(); - let empty: UpdatesTyped = Default::default(); - let view = batch.view(); - write_from_surveys( - &empty, - &batch, - &[Report::That(0, 1)], - &[Report::That(k, view.keys.values.len())], - &[Report::That(v, view.vals.values.len())], - &[Report::That(t, view.times.values.len())], - &mut out_batch, - ); - builder.push(out_batch); - } + form_chunks::(merged, output); +} + +/// A merge implementation that operates batch-at-a-time. +#[inline(never)] +pub fn merge_batches( + list1: Vec>, + list2: Vec>, + output: &mut Vec>, +) +where + U::Time: 'static, +{ - builder.extend(queue1); - builder.extend(queue2); - *output = builder.done(); - // TODO: Tidy output to satisfy structural invariants. + // The design for efficient "batch" merginging of chains of links is: + // 0. We choose a target link size, K, and will keep the average link size at least K and the max size at 2k. + // K should be large enough to amortize some set-up, but not so large that one or two extra break the bank. + // 1. We will repeatedly consider pairs of links, and fully merge one with a prefix of the other. + // The last elements of each link will tell us which of the two suffixes must be held back. + // 2. We then have a chain of as many links as we started with, with potential defects to correct: + // a. A link may contain some number of zeros: we can remove them if we are eager, based on size. + // b. A link may contain more than 2K updates; we can split it. + // c. Two adjacent links may contain fewer than 2K updates; we can meld (careful append) them. + // 3. After a pass of the above, we should have restored the invariant. + // We can try and me smarter and fuse some of the above work rather than explicitly stage results. + // + // The challenging moment is the merge that can start with a suffix of one link, involving a prefix of one link. + // These could be the same link, different links, and generally there is the potential for complexity here. + + let mut builder = ChainBuilder::default(); + + let mut queue1: std::collections::VecDeque<_> = list1.into(); + let mut queue2: std::collections::VecDeque<_> = list2.into(); + + // The first unconsumed update in each block, via (k_idx, v_idx, t_idx), or None if exhausted. + // These are (0,0,0) for a new block, and should become None once there are no remaining updates. + let mut cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); + let mut cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); + + // For each pair of batches + while cursor1.is_some() && cursor2.is_some() { + merge_batch(&mut cursor1, &mut cursor2, &mut builder); + if cursor1.is_none() { cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); } + if cursor2.is_none() { cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); } } - /// Merge two batches, one completely and another through the corresponding prefix. - /// - /// Each invocation determines the maximum amount of both batches we can merge, determined - /// by comparing the elements at the tails of each batch, and locating the lesser in other. - /// We will merge the whole of the batch containing the lesser, and the prefix up through - /// the lesser element in the other batch, setting the cursor to the first element strictly - /// greater than that lesser element. - /// - /// The algorithm uses a list of `Report` findings to map the interleavings of the layers. - /// Each indicates either a range exclusive to one of the inputs, or a one element common - /// to the layers from both inputs, which must be further explored. This map would normally - /// allow the full merge to happen, but we need to carefully start at each cursor, and end - /// just before the first element greater than the lesser bound. - /// - /// The consumed prefix and disjoint suffix should be single report entries, and it seems - /// fine to first produce all reports and then reflect on the cursors, rather than use the - /// cursors as part of the mapping. - #[inline(never)] - fn merge_batch( - batch1: &mut Option<((usize, usize, usize), UpdatesTyped)>, - batch2: &mut Option<((usize, usize, usize), UpdatesTyped)>, - builder: &mut ChainBuilder, - stash: &mut Vec>, - ) { - // TODO: Optimization for one batch exceeding the other. - - let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); - let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); - - let view0 = updates0.view(); - let view1 = updates1.view(); - let keys0 = view0.keys; - let keys1 = view1.keys; - let vals0 = view0.vals; - let vals1 = view1.vals; - let times0 = view0.times; - let times1 = view1.times; - - // Survey the interleaving of the two inputs. - let mut key_survey = survey::>(keys0, keys1, &[Report::Both(0,0)]); - let mut val_survey = survey::>(vals0, vals1, &key_survey); - let mut time_survey = survey::>(times0, times1, &val_survey); - - // We now know enough to start writing into an output batch. - // We should update the input surveys to reflect the subset - // of data that we want. - // - // At most one cursor should be non-zero (assert!). - // A non-zero cursor must correspond to the first entry of the surveys, - // as there is at least one consumed update that precedes the other batch. - // We need to nudge that report forward to align with the cursor, potentially - // squeezing the report to nothing (to the upper bound). - - // We start by updating the surveys to reflect the cursors. - // If either cursor is set, then its batch has an element strictly less than the other batch. - // We therefore expect to find a prefix of This/That at the start of the survey. - if (k0_idx, v0_idx, t0_idx) != (0,0,0) { - let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } } - let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } } - let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } } - } + // TODO: create batch for the non-empty cursor. + if let Some(((k,v,t),batch)) = cursor1 { + let mut out_batch = UpdatesTyped::::default(); + let empty: UpdatesTyped = Default::default(); + let view = batch.view(); + write_from_surveys( + &batch, + &empty, + &[Report::This(0, 1)], + &[Report::This(k, view.keys.values.len())], + &[Report::This(v, view.vals.values.len())], + &[Report::This(t, view.times.values.len())], + &mut out_batch, + ); + builder.push(out_batch); + } + if let Some(((k,v,t),batch)) = cursor2 { + let mut out_batch = UpdatesTyped::::default(); + let empty: UpdatesTyped = Default::default(); + let view = batch.view(); + write_from_surveys( + &empty, + &batch, + &[Report::That(0, 1)], + &[Report::That(k, view.keys.values.len())], + &[Report::That(v, view.vals.values.len())], + &[Report::That(t, view.times.values.len())], + &mut out_batch, + ); + builder.push(out_batch); + } - if (k1_idx, v1_idx, t1_idx) != (0,0,0) { - let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } } - let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } } - let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } } - } + builder.extend(queue1); + builder.extend(queue2); + *output = builder.done(); + // TODO: Tidy output to satisfy structural invariants. +} - // We want to trim the tails of the surveys to only cover ranges present in both inputs. - // We can determine which was "longer" by looking at the last entry of the bottom layer, - // which tells us which input (or both) contained the last element. - // - // From the bottom layer up, we'll identify the index of the last item, and then determine - // the index of the list it belongs to. We use that index in the next layer, to locate the - // index of the list it belongs to, on upward. - let next_cursor = match time_survey.last().unwrap() { - Report::This(_,_) => { - // Collect the last value indexes known to strictly exceed an entry in the other batch. - let mut t = times0.values.len(); - while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } - let mut v = vals0.values.len(); - while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } - let mut k = keys0.values.len(); - while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } - // Now we may need to correct by nudging down. - if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; } - if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; } - Some(Ok((k,v,t))) - } - Report::Both(_,_) => { None } - Report::That(_,_) => { - // Collect the last value indexes known to strictly exceed an entry in the other batch. - let mut t = times1.values.len(); - while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } - let mut v = vals1.values.len(); - while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } - let mut k = keys1.values.len(); - while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } - // Now we may need to correct by nudging down. - if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; } - if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; } - Some(Err((k,v,t))) - } - }; +/// Merge two batches, one completely and another through the corresponding prefix. +/// +/// Each invocation determines the maximum amount of both batches we can merge, determined +/// by comparing the elements at the tails of each batch, and locating the lesser in other. +/// We will merge the whole of the batch containing the lesser, and the prefix up through +/// the lesser element in the other batch, setting the cursor to the first element strictly +/// greater than that lesser element. +/// +/// The algorithm uses a list of `Report` findings to map the interleavings of the layers. +/// Each indicates either a range exclusive to one of the inputs, or a one element common +/// to the layers from both inputs, which must be further explored. This map would normally +/// allow the full merge to happen, but we need to carefully start at each cursor, and end +/// just before the first element greater than the lesser bound. +/// +/// The consumed prefix and disjoint suffix should be single report entries, and it seems +/// fine to first produce all reports and then reflect on the cursors, rather than use the +/// cursors as part of the mapping. +#[inline(never)] +fn merge_batch( + batch1: &mut Option<((usize, usize, usize), UpdatesTyped)>, + batch2: &mut Option<((usize, usize, usize), UpdatesTyped)>, + builder: &mut ChainBuilder, +) +where + U::Time: 'static, +{ + // TODO: Optimization for one batch exceeding the other. - // Having updated the surveys, we now copy over the ranges they identify. - let mut out_batch = stash.pop().unwrap_or_default(); - // TODO: We should be able to size `out_batch` pretty accurately from the survey. - write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch); - builder.push(out_batch); + let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); + let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); - match next_cursor { - Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); } - Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); } - None => { } - } + let view0 = updates0.view(); + let view1 = updates1.view(); + let keys0 = view0.keys; + let keys1 = view1.keys; + let vals0 = view0.vals; + let vals1 = view1.vals; + let times0 = view0.times; + let times1 = view1.times; + + // Survey the interleaving of the two inputs. + let mut key_survey = survey::>(keys0, keys1, &[Report::Both(0,0)]); + let mut val_survey = survey::>(vals0, vals1, &key_survey); + let mut time_survey = survey::>(times0, times1, &val_survey); + + // We now know enough to start writing into an output batch. + // We should update the input surveys to reflect the subset + // of data that we want. + // + // At most one cursor should be non-zero (assert!). + // A non-zero cursor must correspond to the first entry of the surveys, + // as there is at least one consumed update that precedes the other batch. + // We need to nudge that report forward to align with the cursor, potentially + // squeezing the report to nothing (to the upper bound). + + // We start by updating the surveys to reflect the cursors. + // If either cursor is set, then its batch has an element strictly less than the other batch. + // We therefore expect to find a prefix of This/That at the start of the survey. + if (k0_idx, v0_idx, t0_idx) != (0,0,0) { + let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } } + } + + if (k1_idx, v1_idx, t1_idx) != (0,0,0) { + let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } } } + // We want to trim the tails of the surveys to only cover ranges present in both inputs. + // We can determine which was "longer" by looking at the last entry of the bottom layer, + // which tells us which input (or both) contained the last element. + // + // From the bottom layer up, we'll identify the index of the last item, and then determine + // the index of the list it belongs to. We use that index in the next layer, to locate the + // index of the list it belongs to, on upward. + let next_cursor = match time_survey.last().unwrap() { + Report::This(_,_) => { + // Collect the last value indexes known to strictly exceed an entry in the other batch. + let mut t = times0.values.len(); + while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } + let mut v = vals0.values.len(); + while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } + let mut k = keys0.values.len(); + while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } + // Now we may need to correct by nudging down. + if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; } + if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; } + Some(Ok((k,v,t))) + } + Report::Both(_,_) => { None } + Report::That(_,_) => { + // Collect the last value indexes known to strictly exceed an entry in the other batch. + let mut t = times1.values.len(); + while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } + let mut v = vals1.values.len(); + while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } + let mut k = keys1.values.len(); + while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } + // Now we may need to correct by nudging down. + if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; } + if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; } + Some(Err((k,v,t))) + } + }; + + // Having updated the surveys, we now copy over the ranges they identify. + let mut out_batch = UpdatesTyped::::default(); + // TODO: We should be able to size `out_batch` pretty accurately from the survey. + write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch); + builder.push(out_batch); + + match next_cursor { + Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); } + Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); } + None => { } + } } /// Write merged output from four levels of survey reports. diff --git a/differential-dataflow/src/columnar/batcher.rs b/differential-dataflow/src/columnar/batcher.rs new file mode 100644 index 000000000..0c622b50a --- /dev/null +++ b/differential-dataflow/src/columnar/batcher.rs @@ -0,0 +1,120 @@ +//! A `Batcher` for `RecordedUpdates` streams that consolidates input via +//! `TrieChunker` and merges sorted chains via the free functions in `trie_merger`. + +use timely::progress::frontier::AntichainRef; +use timely::progress::{frontier::Antichain, Timestamp}; +use timely::container::{ContainerBuilder, PushInto}; + +use crate::logging::Logger; +use crate::trace::{Batcher, Builder, Description}; + +use super::layout::ColumnarUpdate as Update; +use super::updates::UpdatesTyped; +use super::RecordedUpdates; +use super::arrangement::TrieChunker; +use super::arrangement::trie_merger; + +/// Creates batches from `RecordedUpdates` streams. +pub struct MergeBatcher { + /// Transforms input streams to chunks of sorted, consolidated data. + chunker: TrieChunker, + /// A sequence of power-of-two length lists of sorted, consolidated containers. + chains: Vec>>, + /// Current lower frontier, we sealed up to here. + lower: Antichain, + /// The lower-bound frontier of the data, after the last call to seal. + frontier: Antichain, +} + +impl> Batcher for MergeBatcher { + type Input = RecordedUpdates; + type Time = U::Time; + type Output = UpdatesTyped; + + fn new(_logger: Option, _operator_id: usize) -> Self { + Self { + chunker: TrieChunker::default(), + chains: Vec::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(U::Time::minimum()), + } + } + + /// Push a container of data into this merge batcher. Updates the internal chain structure if + /// needed. + fn push_container(&mut self, container: &mut RecordedUpdates) { + self.chunker.push_into(container); + while let Some(chunk) = self.chunker.extract() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + } + + // Sealing a batch means finding those updates with times not greater or equal to any time + // in `upper`. All updates must have time greater or equal to the previously used `upper`, + // which we call `lower`, by assumption that after sealing a batcher we receive no more + // updates with times not greater or equal to `upper`. + fn seal>(&mut self, upper: Antichain) -> B::Output { + // Finish + while let Some(chunk) = self.chunker.finish() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + + // Merge all remaining chains into a single chain. + while self.chains.len() > 1 { + let list1 = self.chains.pop().unwrap(); + let list2 = self.chains.pop().unwrap(); + let merged = Self::merge_by(list1, list2); + self.chains.push(merged); + } + let merged = self.chains.pop().unwrap_or_default(); + + // Extract readied data. + let mut kept = Vec::new(); + let mut readied = Vec::new(); + self.frontier.clear(); + + trie_merger::extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept); + + if !kept.is_empty() { + self.chains.push(kept); + } + + let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(U::Time::minimum())); + let seal = B::seal(&mut readied, description); + self.lower = upper; + seal + } + + /// The frontier of elements remaining after the most recent call to `self.seal`. + #[inline] + fn frontier(&mut self) -> AntichainRef<'_, U::Time> { + self.frontier.borrow() + } +} + +impl MergeBatcher { + /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered + /// by decreasing length. + fn insert_chain(&mut self, chain: Vec>) { + if !chain.is_empty() { + self.chains.push(chain); + while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) { + let list1 = self.chains.pop().unwrap(); + let list2 = self.chains.pop().unwrap(); + let merged = Self::merge_by(list1, list2); + self.chains.push(merged); + } + } + } + + // merges two sorted input lists into one sorted output list. + fn merge_by(list1: Vec>, list2: Vec>) -> Vec> { + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + trie_merger::merge_batches(list1, list2, &mut output); + + output + } +} diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index 0a22398b9..e38b4581a 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -36,6 +36,7 @@ pub mod updates; pub mod builder; pub mod exchange; pub mod arrangement; +pub mod batcher; pub use updates::UpdatesTyped; pub use builder::ValBuilder as ValColBuilder; From 627e936484b1954cdc5033d38d1319808cc9f9ec Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 May 2026 15:35:01 -0400 Subject: [PATCH 2/7] Introduce spill traits --- differential-dataflow/src/columnar/mod.rs | 1 + differential-dataflow/src/columnar/spill.rs | 49 +++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 differential-dataflow/src/columnar/spill.rs diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index e38b4581a..a0a104a18 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -37,6 +37,7 @@ pub mod builder; pub mod exchange; pub mod arrangement; pub mod batcher; +pub mod spill; pub use updates::UpdatesTyped; pub use builder::ValBuilder as ValColBuilder; diff --git a/differential-dataflow/src/columnar/spill.rs b/differential-dataflow/src/columnar/spill.rs new file mode 100644 index 000000000..0b3e8a7e7 --- /dev/null +++ b/differential-dataflow/src/columnar/spill.rs @@ -0,0 +1,49 @@ +//! Traits for paging chunks of merge-batcher state to and from backing storage. +//! +//! Modeled on timely's pager traits in +//! `timely-dataflow/communication/src/allocator/zero_copy/spill.rs` +//! (`SpillPolicy`, `BytesSpill`, `BytesFetch`), but parameterized over a chunk +//! type `C` rather than fixed to `timely::bytes::arc::Bytes`. For the columnar +//! batcher we expect `C = Updates`; that wiring lives elsewhere — this file +//! only defines the trait shapes. + +use std::collections::VecDeque; + +/// A queue entry: either an in-memory chunk or a handle that can fetch one +/// (or several) from backing storage. +pub enum Entry { + /// In-memory chunk. + Typed(C), + /// Paged-out chunk(s); fetch via the handle. + Paged(Box>), +} + +/// Decides which queue entries to spill out and which to keep resident. +/// +/// Invoked at well-defined moments by the holder of the queue (e.g., after +/// pushing a new chunk). The implementation may rewrite entries in either +/// direction: convert `Typed` to `Paged` (spill out) or `Paged` to `Typed` +/// (fetch back). +pub trait SpillPolicy { + /// Optionally transform the queue. + fn apply(&mut self, queue: &mut VecDeque>); +} + +/// Move in-memory chunks to backing storage, returning fetch handles. +/// +/// The implementation should drain from `chunks` and push to `handles` as it +/// goes; on failure it may stop partway, leaving the lists in a consistent +/// state that will be retried in the future. If it cannot leave the lists in +/// a consistent state it should panic. +pub trait Spill { + /// Spill `chunks` to storage, producing one fetch handle per spilled group. + fn spill(&mut self, chunks: &mut Vec, handles: &mut Vec>>); +} + +/// Handle to spilled chunk(s). Consume to retrieve them from storage. +pub trait Fetch { + /// Consume the handle and return the spilled chunks. + /// + /// On failure, the handle is returned so the caller can retry later. + fn fetch(self: Box) -> Result, Box>>; +} From e99c2ec1d878cb08b3b18ed0155dc692a0d35264 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 May 2026 20:59:40 -0400 Subject: [PATCH 3/7] Introduce fetching iteration --- differential-dataflow/Cargo.toml | 1 + .../examples/columnar_spill.rs | 681 ++++++++++++++++++ .../src/columnar/arrangement/trie_merger.rs | 116 +-- differential-dataflow/src/columnar/batcher.rs | 159 +++- 4 files changed, 892 insertions(+), 65 deletions(-) create mode 100644 differential-dataflow/examples/columnar_spill.rs diff --git a/differential-dataflow/Cargo.toml b/differential-dataflow/Cargo.toml index 5a42ce9b2..eddb1d88a 100644 --- a/differential-dataflow/Cargo.toml +++ b/differential-dataflow/Cargo.toml @@ -25,6 +25,7 @@ itertools="^0.13" graph_map = "0.1" bytemuck = "1.18.0" mimalloc = "0.1.48" +tempfile = "3" [dependencies] columnar = { workspace = true } diff --git a/differential-dataflow/examples/columnar_spill.rs b/differential-dataflow/examples/columnar_spill.rs new file mode 100644 index 000000000..6a59c8d88 --- /dev/null +++ b/differential-dataflow/examples/columnar_spill.rs @@ -0,0 +1,681 @@ +//! Example: file-backed spill for the columnar `MergeBatcher`. +//! +//! Demonstrates `Spill` / `Fetch` / `SpillPolicy` impls modeled on TD's +//! `communication/examples/spill_stress.rs`. Spills `UpdatesTyped` chunks +//! to a tempfile via per-column `Stash::write_bytes`, fetches them back via +//! `Stash::try_from_bytes` and `Updates::into_typed`. +//! +//! Run with: `cargo run --example columnar_spill` + +use std::io::{Read, Seek, SeekFrom, Write}; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +// Static spill-policy config, read by `SpillBatcher::new` when `arrange_core` +// constructs each worker's batcher (we can't pass parameters through the +// `Batcher::new(logger, op_id)` constructor). +static ENABLE_SPILL: AtomicBool = AtomicBool::new(true); +static HEAD: AtomicUsize = AtomicUsize::new(10_000_000); +static THRESH: AtomicUsize = AtomicUsize::new(50_000_000); + +/// Cross-worker registry of `Threshold` stats so we can sum them after a run. +static SHARED_STATS: OnceLock>>> = OnceLock::new(); + +fn register_stats(stats: Arc) { + SHARED_STATS + .get_or_init(|| Mutex::new(Vec::new())) + .lock() + .unwrap() + .push(stats); +} + +fn collect_stats() -> (usize, usize) { + if let Some(m) = SHARED_STATS.get() { + let v = m.lock().unwrap(); + let fires: usize = v.iter().map(|s| s.fires.load(Ordering::Relaxed)).sum(); + let chunks: usize = v.iter().map(|s| s.chunks_spilled.load(Ordering::Relaxed)).sum(); + (fires, chunks) + } else { + (0, 0) + } +} + +fn reset_stats() { + if let Some(m) = SHARED_STATS.get() { + m.lock().unwrap().clear(); + } +} + +use columnar::Push; +use columnar::bytes::stash::Stash; + +use differential_dataflow::columnar::{RecordedUpdates, ValBuilder, ValColBuilder, ValSpine}; +use differential_dataflow::columnar::batcher::MergeBatcher; +use differential_dataflow::columnar::layout::ColumnarUpdate as Update; +use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy}; +use differential_dataflow::columnar::updates::{Updates, UpdatesTyped}; +use differential_dataflow::logging::Logger; +use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::trace::{Batcher, Builder}; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe}; +use timely::dataflow::operators::Input; +use timely::dataflow::InputHandle; +use timely::progress::frontier::AntichainRef; +use timely::progress::{frontier::Antichain, Timestamp}; + +/// File-backed `Spill`. Serializes each chunk into a reusable `Vec` and +/// writes it with one `write_all` per chunk — one syscall per spill, vs. one +/// per column. +pub struct FileSpill { + file: Arc>, + /// Cumulative byte offset for the next write. + offset: u64, + /// Reusable serialization buffer; grows to fit the largest chunk seen, + /// then sticks at that capacity (no per-chunk allocation). + buf: Vec, + _marker: PhantomData, +} + +impl FileSpill { + pub fn new() -> std::io::Result { + let file = tempfile::tempfile()?; + Ok(Self { + file: Arc::new(Mutex::new(file)), + offset: 0, + buf: Vec::new(), + _marker: PhantomData, + }) + } +} + +impl Spill> for FileSpill { + fn spill( + &mut self, + chunks: &mut Vec>, + handles: &mut Vec>>>, + ) { + while let Some(chunk) = chunks.pop() { + let updates: Updates> = chunk.into(); + let keys_len = updates.keys.length_in_bytes() as u64; + let vals_len = updates.vals.length_in_bytes() as u64; + let times_len = updates.times.length_in_bytes() as u64; + let diffs_len = updates.diffs.length_in_bytes() as u64; + let total = 32 + keys_len + vals_len + times_len + diffs_len; + + // Serialize the whole chunk (header + four columns) into the + // reusable buffer, then issue a single write_all to the file. + self.buf.clear(); + self.buf.extend_from_slice(&keys_len.to_le_bytes()); + self.buf.extend_from_slice(&vals_len.to_le_bytes()); + self.buf.extend_from_slice(×_len.to_le_bytes()); + self.buf.extend_from_slice(&diffs_len.to_le_bytes()); + updates.keys.write_bytes(&mut self.buf).unwrap(); + updates.vals.write_bytes(&mut self.buf).unwrap(); + updates.times.write_bytes(&mut self.buf).unwrap(); + updates.diffs.write_bytes(&mut self.buf).unwrap(); + debug_assert_eq!(self.buf.len() as u64, total); + + let start = self.offset; + let mut file = self.file.lock().unwrap(); + file.seek(SeekFrom::Start(start)).unwrap(); + file.write_all(&self.buf).unwrap(); + drop(file); + self.offset += total; + + handles.push(Box::new(FileFetch:: { + file: self.file.clone(), + offset: start, + _marker: PhantomData, + })); + } + } +} + +/// Per-chunk fetch handle. Reads a 32-byte header (four column lengths) at the +/// recorded offset, then four `Stash::try_from_bytes` payloads. +pub struct FileFetch { + file: Arc>, + offset: u64, + _marker: PhantomData, +} + +impl Fetch> for FileFetch { + fn fetch(self: Box) -> Result>, Box>>> { + let mut file = self.file.lock().unwrap(); + file.seek(SeekFrom::Start(self.offset)).unwrap(); + let mut header = [0u8; 32]; + file.read_exact(&mut header).unwrap(); + let keys_len = u64::from_le_bytes(header[0..8].try_into().unwrap()) as usize; + let vals_len = u64::from_le_bytes(header[8..16].try_into().unwrap()) as usize; + let times_len = u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize; + let diffs_len = u64::from_le_bytes(header[24..32].try_into().unwrap()) as usize; + + let mut keys_bytes = vec![0u8; keys_len]; + file.read_exact(&mut keys_bytes).unwrap(); + let mut vals_bytes = vec![0u8; vals_len]; + file.read_exact(&mut vals_bytes).unwrap(); + let mut times_bytes = vec![0u8; times_len]; + file.read_exact(&mut times_bytes).unwrap(); + let mut diffs_bytes = vec![0u8; diffs_len]; + file.read_exact(&mut diffs_bytes).unwrap(); + drop(file); + + let keys = Stash::try_from_bytes(keys_bytes).unwrap(); + let vals = Stash::try_from_bytes(vals_bytes).unwrap(); + let times = Stash::try_from_bytes(times_bytes).unwrap(); + let diffs = Stash::try_from_bytes(diffs_bytes).unwrap(); + let updates: Updates> = Updates { keys, vals, times, diffs }; + Ok(vec![updates.into_typed()]) + } +} + +/// Trivial `SpillPolicy`: page out every `Typed` entry on each apply. +/// Useful for direct queue exercise; not intended as a real policy. +pub struct SpillEverything { + spill: FileSpill, +} + +impl SpillPolicy> for SpillEverything { + fn apply(&mut self, queue: &mut std::collections::VecDeque>>) { + let mut new_queue = std::collections::VecDeque::with_capacity(queue.len()); + let mut buf = Vec::new(); + let mut handles: Vec>>> = Vec::new(); + for entry in queue.drain(..) { + match entry { + Entry::Typed(c) => { + buf.push(c); + self.spill.spill(&mut buf, &mut handles); + let handle = handles.pop().expect("FileSpill produces a handle per chunk"); + new_queue.push_back(Entry::Paged(handle)); + } + Entry::Paged(h) => new_queue.push_back(Entry::Paged(h)), + } + } + *queue = new_queue; + } +} + +/// Threshold-based spill policy adapted from timely's +/// `communication::allocator::zero_copy::spill::threshold::Threshold`. +/// +/// Counts records (not bytes) for the threshold check. When the queue's +/// resident records exceed `head_reserve_records + threshold_records`, spill +/// chunks past the head reserve. Unlike TD we don't carve out the last +/// entry — TD's last entry is a `try_merge` target being extended in place; +/// our chunks are all finished, so any of them can be spilled. +pub struct Threshold { + spill: FileSpill, + /// Records near the head of the queue stay resident. + pub head_reserve_records: usize, + /// Spillable surplus: trigger when resident exceeds head + threshold. + pub threshold_records: usize, + /// Counters shared with the caller (chunks_spilled, fires). + pub stats: Arc, +} + +#[derive(Default)] +pub struct ThresholdStats { + pub fires: AtomicUsize, + pub chunks_spilled: AtomicUsize, +} + +impl Threshold { + pub fn new(spill: FileSpill, head_reserve_records: usize, threshold_records: usize) -> Self { + Self { + spill, + head_reserve_records, + threshold_records, + stats: Arc::new(ThresholdStats::default()), + } + } +} + +impl SpillPolicy> for Threshold { + fn apply(&mut self, queue: &mut std::collections::VecDeque>>) { + let resident: usize = queue.iter().map(|e| match e { + Entry::Typed(c) => c.len(), + Entry::Paged(_) => 0, + }).sum(); + if resident <= self.head_reserve_records + self.threshold_records { + return; + } + + // Walk the queue, accumulating a head reserve. Past the reserve, mark + // every Typed entry for spill. + let mut cumulative = 0usize; + let mut target_indices: Vec = Vec::new(); + for (i, entry) in queue.iter().enumerate() { + if let Entry::Typed(c) = entry { + if cumulative >= self.head_reserve_records { + target_indices.push(i); + } + cumulative += c.len(); + } + } + if target_indices.is_empty() { return; } + + // Take the targeted chunks out, leaving empty placeholders we overwrite below. + let mut targets: Vec> = Vec::with_capacity(target_indices.len()); + for &i in &target_indices { + if let Entry::Typed(c) = &mut queue[i] { + targets.push(std::mem::take(c)); + } + } + + let mut handles: Vec>>> = Vec::new(); + self.spill.spill(&mut targets, &mut handles); + // FileSpill drains via pop (LIFO); reverse so handles align with target_indices order. + handles.reverse(); + assert_eq!(target_indices.len(), handles.len()); + self.stats.fires.fetch_add(1, Ordering::Relaxed); + self.stats.chunks_spilled.fetch_add(handles.len(), Ordering::Relaxed); + for (i, handle) in target_indices.into_iter().zip(handles) { + queue[i] = Entry::Paged(handle); + } + } +} + +/// `Batcher` wrapper that installs a `Threshold` policy on a `MergeBatcher` +/// at construction time, reading config from `HEAD` / `THRESH` / `ENABLE_SPILL` +/// statics. Slots into `arrange_core` in place of `ValBatcher` and lets the +/// timely operator drive a spilling merger without surgery to the `Batcher` +/// trait signature. +pub struct SpillBatcher(MergeBatcher<(K, V, T, R)>) +where + (K, V, T, R): Update; + +impl Batcher for SpillBatcher +where + K: columnar::Columnar + 'static, + V: columnar::Columnar + 'static, + T: columnar::Columnar + Timestamp + 'static, + R: columnar::Columnar + 'static, + (K, V, T, R): Update