Skip to content

perf(add_files): stream manifest entries for duplicate-files check#3287

Open
laichunpongben wants to merge 6 commits intoapache:mainfrom
laichunpongben:fix-add-files-dup-check-perf
Open

perf(add_files): stream manifest entries for duplicate-files check#3287
laichunpongben wants to merge 6 commits intoapache:mainfrom
laichunpongben:fix-add-files-dup-check-perf

Conversation

@laichunpongben
Copy link
Copy Markdown

@laichunpongben laichunpongben commented Apr 25, 2026

Fixes #3286.

The hot path today

if check_duplicate_files:
    import pyarrow.compute as pc

    expr = pc.field("file_path").isin(file_paths)
    referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()]

What this actually does, per call:

  1. inspect.data_files() — for every manifest in the current snapshot, calls _get_files_from_manifest (pyiceberg/table/inspect.py:548), which for each ManifestEntry builds a Python dict with ~17 fields. The expensive ones:
    • readable_metrics — for every column in the table schema, decodes lower/upper bound bytes via from_bytes and packs the result into a per-column dict. This is the single biggest cost on wide tables.
    • partition — decodes the partition struct into a name → value dict.
    • column_sizes, value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds — each materialized as a Python dict per file.
  2. The dicts are batched into a pyarrow.Table per manifest.
  3. pa.concat_tables glues all manifests' Tables together.
  4. .filter(expr) applies an Arrow-compute isin over the concatenated Table.
  5. .to_pylist() converts back to Python dicts.
  6. The list comprehension throws away 16 of the 17 columns and keeps only file_path.

For a backfill that calls add_files once per day on a growing table, per-call cost is O(snapshot file count); cumulative cost is O(N²). After ~15 daily commits on a wide-schema table, dup-check time dominates: each call takes ~10–15 minutes vs seconds early on.

The workaround in #2132 / docs PR #2249 is check_duplicate_files=False, which trades away the idempotency guarantee that re-running a partial-failure resume is safe.

Benchmark — before vs after

tests/benchmark/bench_add_files_dup_check.py (added in this PR) runs 10 sequential add_files(check_duplicate_files=True) calls on an InMemoryCatalog table with a 30-column schema, 200 small parquet files per call. Measures wall-clock and tracemalloc peak per call. Run on macOS arm64 / Python 3.11.

Before (upstream main):

batch   wall_s    tracemalloc_peak_MB  cumulative_files
    0     1.05                    5.5               200
    1     1.00                    9.2               400
    2     1.06                   12.7               600
    3     1.13                   18.5               800
    4     1.18                   20.2              1000
    5     1.26                   23.4              1200
    6     1.32                   30.2              1400
    7     1.39                   34.0              1600
    8     1.46                   29.9              1800
    9     1.51                   39.8              2000

Wall climbs ~44%; tracemalloc peak grows ~7.2×.

After (this PR):

batch   wall_s    tracemalloc_peak_MB  cumulative_files
    0     1.05                    5.5               200
    1     1.56                    5.6               400
    2     0.96                    6.2               600
    3     0.97                    6.3               800
    4     0.98                    6.5              1000
    5     1.00                    6.8              1200
    6     1.00                    6.6              1400
    7     1.03                    8.2              1600
    8     1.04                    7.2              1800
    9     1.07                    6.9              2000

Wall flat at ~1s; tracemalloc peak flat at ~6–8 MB. The growth disappears because the dup-check no longer materializes per-file dicts / pyarrow Tables / readable_metrics — it just does set containment on file_path while streaming manifest entries.

This is a 10-batch run on a small, narrow workload. Real backfills with wider schemas (more columns × more row groups), more files per batch, and many more batches see the constant factor amplify; the production workload that motivated this PR was hitting ~10–15 minutes per call after 15 commits.

What this PR does

Replace the materialize-then-filter with a streaming scan that reuses the existing _open_manifest helper (pyiceberg/table/__init__.py:1918) — the canonical "open a manifest, fetch entries with discard_deleted=True, apply data-file predicates" pattern already used by DataScan.scan_plan_helper (line 2050). Delete manifests are skipped at the top level (same shape as _min_sequence_number).

The loop body becomes a set containment check on data_file.file_path, scheduled via executor.map and flattened with chain.from_iterable — same idiom as the existing scan path.

The same approach Spark's add_files action takes: predicate-based against the new paths only, no pre-scan of all data files.

What this is and isn't

  • Is: a constant-factor reduction. The Avro decode of manifest entries is unchanged (still happens via fetch_manifest_entry), but everything downstream of the read — readable_metrics computation, partition decode, per-file dict construction, pyarrow Table construction, concat_tables, filter, to_pylist — is gone. That post-processing was the bulk of the time, not the Avro read.
  • Isn't: an asymptotic fix. Per-call cost is still O(snapshot file count) for the manifest entry reads; cumulative backfill cost is still O(N²). Truly eliminating the linear scan would need file_path lower/upper bounds at the ManifestFile level so most manifests can be pruned without opening — that's a spec extension and a follow-up.

Compatibility / behavior preservation

Audited the change for any behavioral divergence from the old inspect.data_files().filter(...) path:

  • Public API: add_files signature and exception message unchanged. Existing integration tests at tests/integration/test_add_files.py:test_add_files_that_referenced_by_current_snapshot{,_with_check_duplicate_files_true,_with_check_duplicate_files_false} exercise the dup-check contract and assert the exact error string — both preserved verbatim.
  • Callers: only Table.add_files (pyiceberg/table/__init__.py:1491). No subclass overrides exist (e.g. CreateTableTransaction doesn't redefine it). Transaction.upsert/append/overwrite, _FastAppendFiles, MergingSnapshotProducer don't share the dup-check path.
  • File set scanned: inspect.data_files() filtered per-entry on DataFileContent.DATA; new code filters at ManifestContent.DATA. These are theoretically distinct but produce identical sets per the Iceberg spec — delete entries cannot live in DATA manifests.
  • discard_deleted: both paths use True (fetch_manifest_entry defaults to True; _open_manifest passes it explicitly).
  • Snapshot scope: both paths use current_snapshot()inspect.data_files() via _get_snapshot(None), new code directly via self.table_metadata.current_snapshot().
  • Empty file_paths: same result (empty list) and same exceptions either way. Slight efficiency regression in this edge case — the new code still walks data manifests where the old code short-circuited via pc.field("file_path").isin([]). Not user-visible; can be optimized in a follow-up if anyone cares.
  • Side effects: both paths are read-only; no manifest cache state mutation, no transaction state changes.
  • Concurrency: both submit to the shared ExecutorFactory.get_or_create() thread pool.
  • Branch parameter: add_files accepts a branch argument, but the dup-check has always run against current_snapshot() (i.e. main) regardless. This is a pre-existing inconsistency, not introduced by this PR. Preserved exactly to keep this change behavior-preserving.

Refs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

add_files(check_duplicate_files=True) is O(N²) across incremental backfill — predicate-pushdown into manifest scan

2 participants