fix: count shared buffers once in hash join build-side memory accounting#22862
fix: count shared buffers once in hash join build-side memory accounting#22862jordepic wants to merge 1 commit into
Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you! Should be good to go after CI passes.
I left a suggestion for you to consider.
| /// [`get_record_batch_memory_size`] on each such slice counts the shared | ||
| /// buffers once per slice, while sharing `counted_buffers` across the calls | ||
| /// counts each buffer exactly once. | ||
| pub fn count_record_batch_memory_size( |
There was a problem hiding this comment.
We could make it a deeper module like
/// Tracks memory already accounted for across multiple `RecordBatch`es.
///
/// Some batches may share the same underlying Arrow buffers, for example when
/// they are zero-copy slices of a larger batch. This counter remembers buffer
/// start addresses so shared buffers are counted only once.
#[derive(Default)]
struct RecordBatchMemoryCounter {
accounted_buffers: HashSet<usize>,
memory_usage: usize,
}
impl RecordBatchMemoryCounter {
/// Accounts for buffers in `batch` that have not already been seen.
fn count_batch(&mut self, batch: &RecordBatch) {
self.memory_usage += count_record_batch_memory_size(
batch,
&mut self.accounted_buffers,
);
}
/// Returns the total memory accounted for so far.
fn memory_usage(&self) -> usize {
self.memory_usage
}
}The benefit is that users such as HashJoinExec do not need to know about the implementation details (e.g. the HashSet used to track buffer addresses). It also makes the intent clearer for readers who are not already familiar with this context.
Since this issue is not specific to hash joins, a simpler abstraction could make it easier to reuse.
There was a problem hiding this comment.
@2010YOUY01 I went ahead and implemented that, thank you for the speedy review! Feel free to merge it once CI passes :)
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
bcb4574 to
6443c06
Compare
Samyak2
left a comment
There was a problem hiding this comment.
There was some discussion here regarding this: #22526 (comment)
Looks good to me. We have been using a very similar API internally to fix such issues. Left a minor comment
| pub struct RecordBatchMemoryCounter { | ||
| /// Start addresses of `Buffer`s that have already been counted (instead of | ||
| /// actual used data region's pointer represented by current `Array`) | ||
| counted_buffers: HashSet<usize>, |
There was a problem hiding this comment.
We could use NonZero<usize> instead and then use as_ptr().addr() below.
|
I agree that fixing the root issue in hash aggregate is the long-term solution. The idea that operators should avoid returning small sliced batches backed by large buffers sounds like a reasonable property to follow. So I think it's a good idea to be defensive in memory-intensive operators, and this PR makes the situation better (arguably in an entropy-reducing way). It also makes sense to apply the same approach to other memory-intensive operators. I'm wondering whether you have a better alternative in mind, or if there are other concerns that would suggest we should not merge this? |
For this particular issue I think Another concern I have is that operators should follow the same approach for reserving RecordBatches. As I mentioned before, this issue applies to other downstream consumers of HashAggregate. |
I also think we should design the memory accounting changes more intentionally, understand how we want the memory accounting feature to work and deduplicate the issues, e.g. I've raised a similar issue in #22526 |
Yes, I agree. I believe the root cause of many existing memory-limited query bugs is that the current memory tracking protocol is ambiguous, and the behavior of operators and the memory pool is inconsistent. We should definitely think through a better design for both operators and the memory pool, and then specify a clear protocol, so we can coordinate efforts around this issue.
I'm not sure which solution is better at this point. This requires deeper, more holistic thinking about the overall spilling-query design and may challenge some of our existing assumptions. I'll spend more time thinking about it later. Regarding this PR, I don't think it a major architectural commitment. Even if we later decide to switch to a |
Continuing from #22526 (comment), this happens with the current memory tracking too. I see this PR as a strict improvement over the current
I would imagine so. As an example, the same problem currently exists with Repartition over Aggregate. The same fix would be needed in any operator that stores a sequence of For Repartition specifically, we would need to extend this API to be able to remove a record batch too. |
|
Ok, I agree it's improving the existing state and there's no better short-term solution. |
The hash join build side reserves get_record_batch_memory_size(&batch) per collected batch. That function deduplicates shared buffers only within a single batch, so when the build input emits zero-copy slices of one larger batch (e.g. GroupedHashAggregateStream emitting its result in batch_size chunks), every slice is charged the full parent allocation: an aggregate output of S bytes in n slices reserves n * S for S bytes of physical memory. Since the build collection cannot spill, the inflated reservation aborts queries that fit in memory with large headroom (observed: 26GB reserved for 136MB resident). Add RecordBatchMemoryCounter, which tracks the buffers counted so far across a sequence of batches and counts each buffer exactly once, and use it in the build-side collection so each buffer is reserved exactly once.
6443c06 to
783414b
Compare
|
Thanks for the detailed discussion all, sorry I was asleep for all of it! Understood on all points looking for a longer term solution, but as things stand this is a pretty nefarious issue in datafusion and I think a short term patch that's easily modifiable (and not really part of the public API) is the best route to take. |
| memory_usage: usize, | ||
| } | ||
|
|
||
| impl RecordBatchMemoryCounter { |
There was a problem hiding this comment.
It would be useful to also have a clear method for operators which spill and want to reset the memory counter
There was a problem hiding this comment.
Can you elaborate on the API you're expecting? I'm also happy to just cross that bridge as we need it, as nobody is calling it just yet
There was a problem hiding this comment.
a clear method which resets memory_usage to 0 and clears the counted_buffers hash_set
There was a problem hiding this comment.
I had an alternate API in mind, but I don't know the details of spilling, so not sure if this is viable.
A uncount_batch (or similarly named) method that stops tracking a batch. This would mean a HashMap of pointer -> number of occurrences instead of a HashSet. This API is needed for RepartitionExec after a batch is consumed from the channel. I was thinking we could use the same for spill.
I would also be inclined towards adding this when we need it, instead of making this PR bigger.
Which issue does this PR close?
Rationale for this change
When using DataFusion comet I noticed that my hash join operator was failing with the following error:
Failed to acquire 142606336 bytes where 17142251456 bytes already reserved and the fair limit is 17179869184 bytes, 4 registered. Looking into this more, DataFusion asks to reserve memory for each batch (by default 8192 rows) of the build side of a hash join - and tries to reserve (without actually allocating it) num_batches * batch_size. This is problematic when these are batches are zero-copy slices of a larger batch (e.g. GroupedHashAggregateStream), since the slice size is evaluated to be the size of the larger buffer. This is because the reference to the slice actually keeps the entire buffer from being freed. DataFusion doesn't overallocate memory (the underlying data is the same), but it does over-request it (in the centralized accounting system), which can lead to these "ResourcesExhausted" exceptions.What changes are included in this PR?
In this change, we keep track of all of the buffers that we've already counted via a set of pointers. This way, we don't redundantly request memory for the whole arrow buffer for each sub-slice of it. We choose this approach as opposed to just requesting a smaller amount of memory per batch, because as mentioned before, the pointer to each batch technically keeps the entire arrow-buffer from being freed.
Are these changes tested?
The new hash join test fails on main with ResourcesExhausted and passes with this change.
Are there any user-facing changes?
No breaking changes. Adds a new public helper count_record_batch_memory_size to datafusion-common.