Is your feature request related to a problem or challenge?
When a spilling operator (hash aggregate, sort) merges its spill files, the number of
spill files opened simultaneously in a single merge pass is bounded only by available
memory, never by a file-count ceiling.
In MultiLevelMergeBuilder::get_sorted_spill_files_to_merge
(datafusion-physical-plan/src/sorts/multi_level_merge.rs), the loop keeps adding spill
files to the current pass as long as reservation.try_grow(...) succeeds — i.e. as long
as the (often large, shared) memory pool has room. There is no upper bound on how many
files are opened at once; the only floor is "at least 2 streams". DiskManager exposes
max_temp_directory_size (total bytes on disk) but nothing for the number of open
files / merge fan-in.
This becomes a process-level problem on high-core machines. The aggregation runs
target_partitions streams in parallel (= num CPUs by default), and each partition runs
its own multi-level merge. With a generous memory pool, each merge opens many spill files
at once, so peak open file descriptors ≈ target_partitions × fan_in_per_pass. On a
many-core node a heavy GROUP BY / COUNT(DISTINCT ...) over many spilled files fails
with:
IO error: Too many open files (os error 24)
on a spill temp file. The error is a Result and is handled gracefully, but EMFILE is a
process-wide condition: at the peak it can transiently break unrelated operations in the
same process (opening other files, network connections, logging, other concurrent queries).
Today the only real control is the OS ulimit -n. Lowering target_partitions works but
penalizes every query on the node — including the vast majority that never spill — which
is the wrong trade-off, since non-spilling queries never enter the merge path at all.
Describe the solution you'd like
A configuration knob that caps the merge fan-in — the maximum number of spill files
opened simultaneously in a single merge pass — independently of available memory. When
the cap is hit, the merge simply performs more passes (slower, but bounded file
descriptors), which is exactly the desired trade-off for "make a very heavy aggregation
feasible, even if slower".
Proposed shape:
-
Add max_merge_fanin: usize (0 = unbounded, the current behavior and the default) to
DiskManagerBuilder / DiskManager in datafusion-execution, alongside the existing
max_temp_directory_size. It is a natural home: it is the type that already carries the
spill/disk caps, and SpillManager already reaches it via env.disk_manager.
-
In MultiLevelMergeBuilder::get_sorted_spill_files_to_merge, after the existing
memory-driven loop, clamp the count:
let cap = self.spill_manager.env().disk_manager.max_merge_fanin();
if cap != 0 {
number_of_spills_to_read_for_current_phase =
number_of_spills_to_read_for_current_phase.min(cap.max(2));
}
The .max(2) preserves the existing "at least 2 streams per merge" invariant, and the
surrounding algorithm already handles leftover spill files by doing additional passes.
Crucially, this is zero-cost for queries that do not spill: MultiLevelMergeBuilder is
only constructed when there are spill files to merge, so non-spilling queries never
execute this branch. target_partitions can stay high (full parallelism for everyone),
and only the heavy spilling operators pay the price — more merge passes in exchange for a
peak FD bound of target_partitions × max_merge_fanin.
Optionally surface it as an execution config option (e.g.
datafusion.execution.max_merge_fanin) so it can be set per-session.
Describe alternatives you've considered
-
Lowering target_partitions: reduces the FD multiplier, but it is a per-node global
that slows down all queries, including the majority that never spill. Wrong altitude:
it taxes the common case to fix the rare heavy-aggregation case.
-
Lowering the memory pool size: indirectly shrinks the per-pass fan-in (toward the floor
of 2), but the pool is shared across all concurrent queries, the relationship to FD count
is indirect, and a smaller pool makes more queries spill.
-
Raising ulimit -n / LimitNOFILE: a necessary operational mitigation, but it only
moves the ceiling; it does not give the engine a way to bound its own FD usage, and the
bound still scales with cores × data.
-
max_temp_directory_size: caps total bytes on disk, not the number of files open at once,
so it does not address EMFILE.
None of these give a direct, memory-independent bound on simultaneously-open spill files
that is also free for non-spilling queries — which the proposed fan-in cap does.
Additional context
Observed on DataFusion 53.1.0; the same code path is present on main.
Repro shape: a SELECT COUNT(DISTINCT col_b), col_a FROM t GROUP BY col_a over a large
dataset that forces the hash aggregate to spill many files, on a many-core machine with a
default-ish ulimit -n (e.g. 1024). COUNT(DISTINCT ...) is expanded into two stacked
aggregation layers, both of which can spill, compounding the FD pressure.
Relevant code:
- datafusion-physical-plan/src/sorts/multi_level_merge.rs —
MultiLevelMergeBuilder::get_sorted_spill_files_to_merge (the unbounded, memory-driven
fan-in loop).
- datafusion-physical-plan/src/aggregates/row_hash.rs — the hash aggregate hands all
accumulated spill files to StreamingMergeBuilder::with_sorted_spill_files.
- datafusion-execution/src/disk_manager.rs —
DiskManagerBuilder / DiskManager, where
max_temp_directory_size lives and where max_merge_fanin would be added.
Related: #7858 touches spilling improvements in the hash aggregate and notes the
"too many open files" concern in passing, but does not propose a fan-in cap.
Happy to open a PR implementing the above if the approach sounds reasonable.
Is your feature request related to a problem or challenge?
When a spilling operator (hash aggregate, sort) merges its spill files, the number of
spill files opened simultaneously in a single merge pass is bounded only by available
memory, never by a file-count ceiling.
In
MultiLevelMergeBuilder::get_sorted_spill_files_to_merge(datafusion-physical-plan/src/sorts/multi_level_merge.rs), the loop keeps adding spill
files to the current pass as long as
reservation.try_grow(...)succeeds — i.e. as longas the (often large, shared) memory pool has room. There is no upper bound on how many
files are opened at once; the only floor is "at least 2 streams".
DiskManagerexposesmax_temp_directory_size(total bytes on disk) but nothing for the number of openfiles / merge fan-in.
This becomes a process-level problem on high-core machines. The aggregation runs
target_partitionsstreams in parallel (= num CPUs by default), and each partition runsits own multi-level merge. With a generous memory pool, each merge opens many spill files
at once, so peak open file descriptors ≈
target_partitions × fan_in_per_pass. On amany-core node a heavy
GROUP BY/COUNT(DISTINCT ...)over many spilled files failswith:
on a spill temp file. The error is a
Resultand is handled gracefully, but EMFILE is aprocess-wide condition: at the peak it can transiently break unrelated operations in the
same process (opening other files, network connections, logging, other concurrent queries).
Today the only real control is the OS
ulimit -n. Loweringtarget_partitionsworks butpenalizes every query on the node — including the vast majority that never spill — which
is the wrong trade-off, since non-spilling queries never enter the merge path at all.
Describe the solution you'd like
A configuration knob that caps the merge fan-in — the maximum number of spill files
opened simultaneously in a single merge pass — independently of available memory. When
the cap is hit, the merge simply performs more passes (slower, but bounded file
descriptors), which is exactly the desired trade-off for "make a very heavy aggregation
feasible, even if slower".
Proposed shape:
Add
max_merge_fanin: usize(0 = unbounded, the current behavior and the default) toDiskManagerBuilder/DiskManagerin datafusion-execution, alongside the existingmax_temp_directory_size. It is a natural home: it is the type that already carries thespill/disk caps, and
SpillManageralready reaches it viaenv.disk_manager.In
MultiLevelMergeBuilder::get_sorted_spill_files_to_merge, after the existingmemory-driven loop, clamp the count:
The
.max(2)preserves the existing "at least 2 streams per merge" invariant, and thesurrounding algorithm already handles leftover spill files by doing additional passes.
Crucially, this is zero-cost for queries that do not spill:
MultiLevelMergeBuilderisonly constructed when there are spill files to merge, so non-spilling queries never
execute this branch.
target_partitionscan stay high (full parallelism for everyone),and only the heavy spilling operators pay the price — more merge passes in exchange for a
peak FD bound of
target_partitions × max_merge_fanin.Optionally surface it as an execution config option (e.g.
datafusion.execution.max_merge_fanin) so it can be set per-session.Describe alternatives you've considered
Lowering
target_partitions: reduces the FD multiplier, but it is a per-node globalthat slows down all queries, including the majority that never spill. Wrong altitude:
it taxes the common case to fix the rare heavy-aggregation case.
Lowering the memory pool size: indirectly shrinks the per-pass fan-in (toward the floor
of 2), but the pool is shared across all concurrent queries, the relationship to FD count
is indirect, and a smaller pool makes more queries spill.
Raising
ulimit -n/LimitNOFILE: a necessary operational mitigation, but it onlymoves the ceiling; it does not give the engine a way to bound its own FD usage, and the
bound still scales with cores × data.
max_temp_directory_size: caps total bytes on disk, not the number of files open at once,so it does not address EMFILE.
None of these give a direct, memory-independent bound on simultaneously-open spill files
that is also free for non-spilling queries — which the proposed fan-in cap does.
Additional context
Observed on DataFusion 53.1.0; the same code path is present on
main.Repro shape: a
SELECT COUNT(DISTINCT col_b), col_a FROM t GROUP BY col_aover a largedataset that forces the hash aggregate to spill many files, on a many-core machine with a
default-ish
ulimit -n(e.g. 1024).COUNT(DISTINCT ...)is expanded into two stackedaggregation layers, both of which can spill, compounding the FD pressure.
Relevant code:
MultiLevelMergeBuilder::get_sorted_spill_files_to_merge(the unbounded, memory-drivenfan-in loop).
accumulated spill files to
StreamingMergeBuilder::with_sorted_spill_files.DiskManagerBuilder/DiskManager, wheremax_temp_directory_sizelives and wheremax_merge_faninwould be added.Related: #7858 touches spilling improvements in the hash aggregate and notes the
"too many open files" concern in passing, but does not propose a fan-in cap.
Happy to open a PR implementing the above if the approach sounds reasonable.