Skip to content

Add a configurable cap on spill-file merge fan-in (max open files during external merge) #22848

@junknown

Description

@junknown

Is your feature request related to a problem or challenge?

When a spilling operator (hash aggregate, sort) merges its spill files, the number of
spill files opened simultaneously in a single merge pass is bounded only by available
memory, never by a file-count ceiling.

In MultiLevelMergeBuilder::get_sorted_spill_files_to_merge
(datafusion-physical-plan/src/sorts/multi_level_merge.rs), the loop keeps adding spill
files to the current pass as long as reservation.try_grow(...) succeeds — i.e. as long
as the (often large, shared) memory pool has room. There is no upper bound on how many
files are opened at once; the only floor is "at least 2 streams". DiskManager exposes
max_temp_directory_size (total bytes on disk) but nothing for the number of open
files / merge fan-in.

This becomes a process-level problem on high-core machines. The aggregation runs
target_partitions streams in parallel (= num CPUs by default), and each partition runs
its own multi-level merge. With a generous memory pool, each merge opens many spill files
at once, so peak open file descriptors ≈ target_partitions × fan_in_per_pass. On a
many-core node a heavy GROUP BY / COUNT(DISTINCT ...) over many spilled files fails
with:

  IO error: Too many open files (os error 24)

on a spill temp file. The error is a Result and is handled gracefully, but EMFILE is a
process-wide condition: at the peak it can transiently break unrelated operations in the
same process (opening other files, network connections, logging, other concurrent queries).

Today the only real control is the OS ulimit -n. Lowering target_partitions works but
penalizes every query on the node — including the vast majority that never spill — which
is the wrong trade-off, since non-spilling queries never enter the merge path at all.

Describe the solution you'd like

A configuration knob that caps the merge fan-in — the maximum number of spill files
opened simultaneously in a single merge pass — independently of available memory. When
the cap is hit, the merge simply performs more passes (slower, but bounded file
descriptors), which is exactly the desired trade-off for "make a very heavy aggregation
feasible, even if slower".

Proposed shape:

  • Add max_merge_fanin: usize (0 = unbounded, the current behavior and the default) to
    DiskManagerBuilder / DiskManager in datafusion-execution, alongside the existing
    max_temp_directory_size. It is a natural home: it is the type that already carries the
    spill/disk caps, and SpillManager already reaches it via env.disk_manager.

  • In MultiLevelMergeBuilder::get_sorted_spill_files_to_merge, after the existing
    memory-driven loop, clamp the count:

    let cap = self.spill_manager.env().disk_manager.max_merge_fanin();
    if cap != 0 {
        number_of_spills_to_read_for_current_phase =
            number_of_spills_to_read_for_current_phase.min(cap.max(2));
    }
    

    The .max(2) preserves the existing "at least 2 streams per merge" invariant, and the
    surrounding algorithm already handles leftover spill files by doing additional passes.

Crucially, this is zero-cost for queries that do not spill: MultiLevelMergeBuilder is
only constructed when there are spill files to merge, so non-spilling queries never
execute this branch. target_partitions can stay high (full parallelism for everyone),
and only the heavy spilling operators pay the price — more merge passes in exchange for a
peak FD bound of target_partitions × max_merge_fanin.

Optionally surface it as an execution config option (e.g.
datafusion.execution.max_merge_fanin) so it can be set per-session.

Describe alternatives you've considered

  • Lowering target_partitions: reduces the FD multiplier, but it is a per-node global
    that slows down all queries, including the majority that never spill. Wrong altitude:
    it taxes the common case to fix the rare heavy-aggregation case.

    • Lowering the memory pool size: indirectly shrinks the per-pass fan-in (toward the floor
      of 2), but the pool is shared across all concurrent queries, the relationship to FD count
      is indirect, and a smaller pool makes more queries spill.

    • Raising ulimit -n / LimitNOFILE: a necessary operational mitigation, but it only
      moves the ceiling; it does not give the engine a way to bound its own FD usage, and the
      bound still scales with cores × data.

    • max_temp_directory_size: caps total bytes on disk, not the number of files open at once,
      so it does not address EMFILE.

    None of these give a direct, memory-independent bound on simultaneously-open spill files
    that is also free for non-spilling queries — which the proposed fan-in cap does.

Additional context

Observed on DataFusion 53.1.0; the same code path is present on main.

Repro shape: a SELECT COUNT(DISTINCT col_b), col_a FROM t GROUP BY col_a over a large
dataset that forces the hash aggregate to spill many files, on a many-core machine with a
default-ish ulimit -n (e.g. 1024). COUNT(DISTINCT ...) is expanded into two stacked
aggregation layers, both of which can spill, compounding the FD pressure.

Relevant code:

  • datafusion-physical-plan/src/sorts/multi_level_merge.rs —
    MultiLevelMergeBuilder::get_sorted_spill_files_to_merge (the unbounded, memory-driven
    fan-in loop).
  • datafusion-physical-plan/src/aggregates/row_hash.rs — the hash aggregate hands all
    accumulated spill files to StreamingMergeBuilder::with_sorted_spill_files.
  • datafusion-execution/src/disk_manager.rs — DiskManagerBuilder / DiskManager, where
    max_temp_directory_size lives and where max_merge_fanin would be added.

Related: #7858 touches spilling improvements in the hash aggregate and notes the
"too many open files" concern in passing, but does not propose a fan-in cap.

Happy to open a PR implementing the above if the approach sounds reasonable.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions