From dfeb2c577583acb4f0c65c5b21be68aa95eedadf Mon Sep 17 00:00:00 2001 From: Ben Lai Date: Sat, 25 Apr 2026 20:31:18 +0800 Subject: [PATCH 1/6] perf(add_files): stream manifest entries for duplicate-files check --- pyiceberg/table/__init__.py | 40 ++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7f1524642b..f154f9a0ac 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -861,6 +861,40 @@ def upsert( return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt) + def _referenced_data_files(self, file_paths: list[str]) -> list[str]: + """Find any file_paths already referenced by data files in the current snapshot. + + Streams ManifestEntries from each data manifest and short-circuits on + file_path equality against the candidate set. Avoids the full + ``inspect.data_files()`` materialization, which builds a pyarrow Table + with every column (including readable_metrics, lower/upper bounds, + partition records) for every DataFile in the snapshot — an expensive + cost dominated by stats decoding that is not needed for path equality. + + Returns the file_paths that are already referenced; empty list if none. + """ + snapshot = self.table_metadata.current_snapshot() + if snapshot is None or not file_paths: + return [] + + candidates = set(file_paths) + io = self._table.io + + def _scan(manifest: ManifestFile) -> list[str]: + if manifest.content != ManifestContent.DATA: + return [] + return [ + entry.data_file.file_path + for entry in manifest.fetch_manifest_entry(io, discard_deleted=True) + if entry.data_file.file_path in candidates + ] + + executor = ExecutorFactory.get_or_create() + matches: list[str] = [] + for partial in executor.map(_scan, snapshot.manifests(io)): + matches.extend(partial) + return matches + def add_files( self, file_paths: list[str], @@ -883,11 +917,7 @@ def add_files( raise ValueError("File paths must be unique") if check_duplicate_files: - import pyarrow.compute as pc - - expr = pc.field("file_path").isin(file_paths) - referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()] - + referenced_files = self._referenced_data_files(file_paths) if referenced_files: raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}") From c12c1c2cf0dd0b0b643f7ac06b2b56391bdc15e9 Mon Sep 17 00:00:00 2001 From: Ben Lai Date: Sat, 25 Apr 2026 20:35:43 +0800 Subject: [PATCH 2/6] refactor(add_files): reuse _open_manifest + chain.from_iterable --- pyiceberg/table/__init__.py | 40 +++++++++++++------------------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f154f9a0ac..d990bb9122 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -861,39 +861,27 @@ def upsert( return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt) - def _referenced_data_files(self, file_paths: list[str]) -> list[str]: - """Find any file_paths already referenced by data files in the current snapshot. - - Streams ManifestEntries from each data manifest and short-circuits on - file_path equality against the candidate set. Avoids the full - ``inspect.data_files()`` materialization, which builds a pyarrow Table - with every column (including readable_metrics, lower/upper bounds, - partition records) for every DataFile in the snapshot — an expensive - cost dominated by stats decoding that is not needed for path equality. - - Returns the file_paths that are already referenced; empty list if none. - """ + def _find_referenced_data_files(self, file_paths: list[str]) -> list[str]: + """Return file_paths already referenced by data files in the current snapshot.""" snapshot = self.table_metadata.current_snapshot() - if snapshot is None or not file_paths: + if snapshot is None: return [] candidates = set(file_paths) io = self._table.io + data_manifests = [m for m in snapshot.manifests(io) if m.content == ManifestContent.DATA] - def _scan(manifest: ManifestFile) -> list[str]: - if manifest.content != ManifestContent.DATA: - return [] - return [ - entry.data_file.file_path - for entry in manifest.fetch_manifest_entry(io, discard_deleted=True) - if entry.data_file.file_path in candidates - ] + path_filter: Callable[[DataFile], bool] = lambda df: df.file_path in candidates + always_true: Callable[[DataFile], bool] = lambda _: True executor = ExecutorFactory.get_or_create() - matches: list[str] = [] - for partial in executor.map(_scan, snapshot.manifests(io)): - matches.extend(partial) - return matches + entries = chain.from_iterable( + executor.map( + lambda args: _open_manifest(*args), + [(io, manifest, path_filter, always_true) for manifest in data_manifests], + ) + ) + return [entry.data_file.file_path for entry in entries] def add_files( self, @@ -917,7 +905,7 @@ def add_files( raise ValueError("File paths must be unique") if check_duplicate_files: - referenced_files = self._referenced_data_files(file_paths) + referenced_files = self._find_referenced_data_files(file_paths) if referenced_files: raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}") From 4ed64d4d9ca254e7f973a7f51be4c8bc68f7b827 Mon Sep 17 00:00:00 2001 From: Ben Lai Date: Sat, 25 Apr 2026 20:42:49 +0800 Subject: [PATCH 3/6] =?UTF-8?q?fix:=20ruff=20E731=20=E2=80=94=20convert=20?= =?UTF-8?q?path=5Ffilter=20lambda=20to=20def?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyiceberg/table/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d990bb9122..87eddcc01f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -871,14 +871,14 @@ def _find_referenced_data_files(self, file_paths: list[str]) -> list[str]: io = self._table.io data_manifests = [m for m in snapshot.manifests(io) if m.content == ManifestContent.DATA] - path_filter: Callable[[DataFile], bool] = lambda df: df.file_path in candidates - always_true: Callable[[DataFile], bool] = lambda _: True + def path_filter(data_file: DataFile) -> bool: + return data_file.file_path in candidates executor = ExecutorFactory.get_or_create() entries = chain.from_iterable( executor.map( lambda args: _open_manifest(*args), - [(io, manifest, path_filter, always_true) for manifest in data_manifests], + [(io, manifest, path_filter, lambda _: True) for manifest in data_manifests], ) ) return [entry.data_file.file_path for entry in entries] From f7d63b65822a5763c8f21eb9e454af3e5be343c1 Mon Sep 17 00:00:00 2001 From: Ben Lai Date: Sat, 25 Apr 2026 20:52:15 +0800 Subject: [PATCH 4/6] test(benchmark): add_files dup-check wall + tracemalloc growth --- tests/benchmark/bench_add_files_dup_check.py | 114 +++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 tests/benchmark/bench_add_files_dup_check.py diff --git a/tests/benchmark/bench_add_files_dup_check.py b/tests/benchmark/bench_add_files_dup_check.py new file mode 100644 index 0000000000..6112e583c0 --- /dev/null +++ b/tests/benchmark/bench_add_files_dup_check.py @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Standalone benchmark for `add_files(check_duplicate_files=True)`. + +Measures wall-clock and `tracemalloc` peak for the dup-check phase across +N consecutive `add_files` calls on a growing table. Run before and after +the fix to compare; this script doesn't import unreleased code, so it +works against any pyiceberg checkout. + +Usage: + cd /path/to/iceberg-python + uv run python tests/benchmark/bench_add_files_dup_check.py +""" + +from __future__ import annotations + +import gc +import tempfile +import time +import tracemalloc +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType, NestedField, StringType + + +def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]: + """Build a wide-ish schema so per-column stats decoding has work to do.""" + iceberg_fields = [NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)] + for i in range(2, num_columns + 1): + iceberg_fields.append( + NestedField(field_id=i, name=f"col_{i}", field_type=StringType(), required=False) + ) + iceberg_schema = Schema(*iceberg_fields) + arrow_schema = pa.schema([pa.field("id", pa.int32(), nullable=False)] + [ + pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, num_columns + 1) + ]) + return iceberg_schema, arrow_schema + + +def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]: + """Write `n_files` tiny parquet files; return their absolute file:// paths.""" + paths: list[str] = [] + rows = pa.Table.from_pydict( + { + name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)] + for name in arrow_schema.names + }, + schema=arrow_schema, + ) + for i in range(n_files): + p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet" + pq.write_table(rows, p) + paths.append(f"file://{p}") + return paths + + +def main() -> None: + num_batches = 10 + files_per_batch = 200 + + iceberg_schema, arrow_schema = _wide_schema(num_columns=30) + + with tempfile.TemporaryDirectory() as tmp_root: + warehouse = Path(tmp_root) / "warehouse" + data_dir = Path(tmp_root) / "data" + warehouse.mkdir() + data_dir.mkdir() + + catalog = InMemoryCatalog("bench", warehouse=f"file://{warehouse}") + catalog.create_namespace("default") + table = catalog.create_table("default.bench", schema=iceberg_schema) + + gc.collect() + tracemalloc.start() + + print(f"\nadd_files(check_duplicate_files=True) benchmark") + print(f" batches={num_batches}, files_per_batch={files_per_batch}, columns={len(arrow_schema.names)}") + print(f"{'batch':>5} {'wall_s':>8} {'tracemalloc_peak_MB':>22} {'cumulative_files':>17}") + + cumulative = 0 + for b in range(num_batches): + paths = _write_files(data_dir, b, files_per_batch, arrow_schema) + tracemalloc.reset_peak() + t0 = time.perf_counter() + table.add_files(file_paths=paths, check_duplicate_files=True) + wall = time.perf_counter() - t0 + _, peak = tracemalloc.get_traced_memory() + cumulative += files_per_batch + print(f"{b:>5d} {wall:>8.2f} {peak / (1024 * 1024):>22.1f} {cumulative:>17d}") + + tracemalloc.stop() + + +if __name__ == "__main__": + main() From 9c0e129497e7c2b0b6ae9cde3ce5ed3cb9c64d0d Mon Sep 17 00:00:00 2001 From: Ben Lai Date: Sat, 25 Apr 2026 20:58:06 +0800 Subject: [PATCH 5/6] test(benchmark): convert add_files dup-check bench to pytest style --- tests/benchmark/bench_add_files_dup_check.py | 114 --------------- .../test_add_files_dup_check_benchmark.py | 134 ++++++++++++++++++ 2 files changed, 134 insertions(+), 114 deletions(-) delete mode 100644 tests/benchmark/bench_add_files_dup_check.py create mode 100644 tests/benchmark/test_add_files_dup_check_benchmark.py diff --git a/tests/benchmark/bench_add_files_dup_check.py b/tests/benchmark/bench_add_files_dup_check.py deleted file mode 100644 index 6112e583c0..0000000000 --- a/tests/benchmark/bench_add_files_dup_check.py +++ /dev/null @@ -1,114 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Standalone benchmark for `add_files(check_duplicate_files=True)`. - -Measures wall-clock and `tracemalloc` peak for the dup-check phase across -N consecutive `add_files` calls on a growing table. Run before and after -the fix to compare; this script doesn't import unreleased code, so it -works against any pyiceberg checkout. - -Usage: - cd /path/to/iceberg-python - uv run python tests/benchmark/bench_add_files_dup_check.py -""" - -from __future__ import annotations - -import gc -import tempfile -import time -import tracemalloc -from pathlib import Path - -import pyarrow as pa -import pyarrow.parquet as pq - -from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.schema import Schema -from pyiceberg.types import IntegerType, NestedField, StringType - - -def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]: - """Build a wide-ish schema so per-column stats decoding has work to do.""" - iceberg_fields = [NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)] - for i in range(2, num_columns + 1): - iceberg_fields.append( - NestedField(field_id=i, name=f"col_{i}", field_type=StringType(), required=False) - ) - iceberg_schema = Schema(*iceberg_fields) - arrow_schema = pa.schema([pa.field("id", pa.int32(), nullable=False)] + [ - pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, num_columns + 1) - ]) - return iceberg_schema, arrow_schema - - -def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]: - """Write `n_files` tiny parquet files; return their absolute file:// paths.""" - paths: list[str] = [] - rows = pa.Table.from_pydict( - { - name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)] - for name in arrow_schema.names - }, - schema=arrow_schema, - ) - for i in range(n_files): - p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet" - pq.write_table(rows, p) - paths.append(f"file://{p}") - return paths - - -def main() -> None: - num_batches = 10 - files_per_batch = 200 - - iceberg_schema, arrow_schema = _wide_schema(num_columns=30) - - with tempfile.TemporaryDirectory() as tmp_root: - warehouse = Path(tmp_root) / "warehouse" - data_dir = Path(tmp_root) / "data" - warehouse.mkdir() - data_dir.mkdir() - - catalog = InMemoryCatalog("bench", warehouse=f"file://{warehouse}") - catalog.create_namespace("default") - table = catalog.create_table("default.bench", schema=iceberg_schema) - - gc.collect() - tracemalloc.start() - - print(f"\nadd_files(check_duplicate_files=True) benchmark") - print(f" batches={num_batches}, files_per_batch={files_per_batch}, columns={len(arrow_schema.names)}") - print(f"{'batch':>5} {'wall_s':>8} {'tracemalloc_peak_MB':>22} {'cumulative_files':>17}") - - cumulative = 0 - for b in range(num_batches): - paths = _write_files(data_dir, b, files_per_batch, arrow_schema) - tracemalloc.reset_peak() - t0 = time.perf_counter() - table.add_files(file_paths=paths, check_duplicate_files=True) - wall = time.perf_counter() - t0 - _, peak = tracemalloc.get_traced_memory() - cumulative += files_per_batch - print(f"{b:>5d} {wall:>8.2f} {peak / (1024 * 1024):>22.1f} {cumulative:>17d}") - - tracemalloc.stop() - - -if __name__ == "__main__": - main() diff --git a/tests/benchmark/test_add_files_dup_check_benchmark.py b/tests/benchmark/test_add_files_dup_check_benchmark.py new file mode 100644 index 0000000000..1a0059c385 --- /dev/null +++ b/tests/benchmark/test_add_files_dup_check_benchmark.py @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Memory benchmark for `add_files(check_duplicate_files=True)`. + +Reproduces the per-call cost of the duplicate-files check on a growing +table. Before fix: each call materializes every DataFile in the snapshot +into a pyarrow Table (with readable_metrics, partition decode, full stats +dicts) and post-filters on file_path — peak memory grows roughly linearly +with cumulative file count, dominated by per-column stats decoding. +After fix: streaming manifest scan with set containment on file_path, +peak memory stays flat. + +Run with: uv run pytest tests/benchmark/test_add_files_dup_check_benchmark.py -v -s -m benchmark +""" + +from __future__ import annotations + +import gc +import tempfile +import tracemalloc +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType, NestedField, StringType + + +@pytest.fixture +def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog: + warehouse_path = str(tmp_path_factory.mktemp("warehouse")) + catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}") + catalog.create_namespace("default") + return catalog + + +def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]: + """Build a wide-ish schema so per-column stats decoding has work to do.""" + iceberg_fields = [NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)] + for i in range(2, num_columns + 1): + iceberg_fields.append(NestedField(field_id=i, name=f"col_{i}", field_type=StringType(), required=False)) + iceberg_schema = Schema(*iceberg_fields) + arrow_schema = pa.schema( + [pa.field("id", pa.int32(), nullable=False)] + + [pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, num_columns + 1)] + ) + return iceberg_schema, arrow_schema + + +def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]: + paths: list[str] = [] + rows = pa.Table.from_pydict( + { + name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)] + for name in arrow_schema.names + }, + schema=arrow_schema, + ) + for i in range(n_files): + p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet" + pq.write_table(rows, p) + paths.append(f"file://{p}") + return paths + + +@pytest.mark.benchmark +def test_add_files_dup_check_memory_growth(memory_catalog: InMemoryCatalog) -> None: + """Peak memory per `add_files(check_duplicate_files=True)` call should stay + flat across consecutive calls on a growing table. + + With the materialize-then-filter implementation, peak grows roughly linearly + with cumulative file count (per-column stats decoding into a pyarrow Table). + With the streaming-scan implementation, peak stays bounded by the per-call + workload. + """ + num_batches = 10 + files_per_batch = 200 + iceberg_schema, arrow_schema = _wide_schema(num_columns=30) + + with tempfile.TemporaryDirectory() as tmp_root: + data_dir = Path(tmp_root) / "data" + data_dir.mkdir() + table = memory_catalog.create_table("default.add_files_bench", schema=iceberg_schema) + + gc.collect() + tracemalloc.start() + + peaks_mb: list[float] = [] + print(f"\n--- add_files dup-check benchmark ({num_batches} batches × {files_per_batch} files, 30 cols) ---") + print(f"{'batch':>5} {'tracemalloc_peak_MB':>22} {'cumulative_files':>17}") + + cumulative = 0 + for b in range(num_batches): + paths = _write_files(data_dir, b, files_per_batch, arrow_schema) + tracemalloc.reset_peak() + table.add_files(file_paths=paths, check_duplicate_files=True) + _, peak = tracemalloc.get_traced_memory() + peak_mb = peak / (1024 * 1024) + peaks_mb.append(peak_mb) + cumulative += files_per_batch + print(f"{b:>5d} {peak_mb:>22.1f} {cumulative:>17d}") + + tracemalloc.stop() + + # Growth ratio: last call peak vs first call peak. + # Materialize-then-filter (pre-fix): observed ~7× on this workload. + # Streaming scan (post-fix): observed ~1×–1.5× (mostly noise). + # Threshold of 3× catches the regression while tolerating variance. + first_peak = peaks_mb[0] + last_peak = peaks_mb[-1] + ratio = last_peak / first_peak if first_peak > 0 else float("inf") + print(f"\n Peak ratio (last / first): {ratio:.1f}×") + max_ratio = 3.0 + assert ratio < max_ratio, ( + f"Peak memory ratio ({ratio:.1f}×) exceeds {max_ratio}×. " + "Dup-check materializes the full snapshot rather than streaming on file_path." + ) From f9e9b138b369821103f6189fb6f43bcddaf901a2 Mon Sep 17 00:00:00 2001 From: Ben Lai Date: Sat, 25 Apr 2026 21:04:47 +0800 Subject: [PATCH 6/6] fix(benchmark): ruff format + mypy type annotation --- .../benchmark/test_add_files_dup_check_benchmark.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/benchmark/test_add_files_dup_check_benchmark.py b/tests/benchmark/test_add_files_dup_check_benchmark.py index 1a0059c385..731305bf02 100644 --- a/tests/benchmark/test_add_files_dup_check_benchmark.py +++ b/tests/benchmark/test_add_files_dup_check_benchmark.py @@ -33,6 +33,7 @@ import tempfile import tracemalloc from pathlib import Path +from typing import Any import pyarrow as pa import pyarrow.parquet as pq @@ -66,13 +67,10 @@ def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]: def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]: paths: list[str] = [] - rows = pa.Table.from_pydict( - { - name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)] - for name in arrow_schema.names - }, - schema=arrow_schema, - ) + columns: dict[str, list[Any]] = { + name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)] for name in arrow_schema.names + } + rows = pa.Table.from_pydict(columns, schema=arrow_schema) for i in range(n_files): p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet" pq.write_table(rows, p)