diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 4dcc72d6c..8068811e6 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -350,6 +350,11 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // Force ReachableFileCleanup: the empty current manifest list makes this an + // unreachable-orphan scenario, which Incremental cannot detect (added-in-ancestor + // files are preserved unless an explicit DELETED entry exists). Specifying the + // snapshot id forces the dispatch to Reachable. + update->ExpireSnapshotId(kExpiredSnapshotId); update->DeleteWith( [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); @@ -388,6 +393,8 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // See note above; same scenario. + update->ExpireSnapshotId(kExpiredSnapshotId); update->DeleteWith( [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); @@ -573,4 +580,75 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) { EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); } +// Linear-ancestry, no specified ID: dispatch must pick IncrementalFileCleanup. +// Since the expired snapshot is an ancestor and the expired manifest only +// contains ADDED entries (no DELETED), incremental correctly preserves the +// data file (matches Java IncrementalFileCleanup semantics) and only removes +// the manifest, manifest list, and (if any) delete manifest. +TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFiles) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // No ExpireSnapshotId -> incremental dispatch. + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + // Manifest + manifest list deleted, data file preserved. + EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path)); + EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path)); + EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path))); +} + +// Same fixture, but force the Reachable path with ExpireSnapshotId. Reachable +// detects the data file as unreachable from the (empty) current state and +// deletes it -- demonstrating the dispatch actually selects different code paths. +TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, + expired_data_manifest_path, + expired_manifest_list_path)); +} + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index ce65882c9..5ccd56d89 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -331,6 +331,303 @@ class ReachableFileCleanup : public FileCleanupStrategy { } }; +/// \brief Incremental file cleanup strategy for simple linear-ancestry expirations. +/// +/// Mirrors Java's IncrementalFileCleanup. Only safe when: +/// * No snapshot IDs were explicitly listed for expiration. +/// * No removed snapshots lived outside the current main ancestry. +/// * No retained snapshots live outside the current main ancestry. +/// +/// Each manifest is attributed to its writer snapshot via added_snapshot_id, so +/// two snapshot passes are enough -- one over retained snapshots to learn which +/// manifests are still live, one over expired snapshots to learn which manifests, +/// manifest lists, and data files to drop. Cherry-pick protection via +/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was +/// logically introduced by a snapshot whose changes are still present in the +/// current state under a different id. +/// +/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. +class IncrementalFileCleanup : public FileCleanupStrategy { + public: + using FileCleanupStrategy::FileCleanupStrategy; + + Status CleanFiles(const TableMetadata& metadata_before_expiration, + const TableMetadata& metadata_after_expiration, + const std::unordered_set& expired_snapshot_ids, + CleanupLevel level) override { + if (expired_snapshot_ids.empty()) { + return {}; + } + + std::unordered_set valid_ids; + valid_ids.reserve(metadata_after_expiration.snapshots.size()); + for (const auto& snapshot : metadata_after_expiration.snapshots) { + if (snapshot) { + valid_ids.insert(snapshot->snapshot_id); + } + } + + auto current_result = metadata_before_expiration.SnapshotById( + metadata_before_expiration.current_snapshot_id); + if (!current_result.has_value() || current_result.value() == nullptr) { + return {}; + } + + // Ancestors of the current table state. Files deleted in a non-ancestor + // snapshot may still belong to the current state (rolled-back commits), + // so we only physically delete files removed by ancestor snapshots. + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, [&metadata_before_expiration](int64_t id) { + return metadata_before_expiration.SnapshotById(id); + }); + if (!ancestors_result.has_value()) { + return {}; + } + std::unordered_set ancestor_ids; + ancestor_ids.reserve(ancestors_result.value().size()); + for (const auto& ancestor : ancestors_result.value()) { + if (ancestor) ancestor_ids.insert(ancestor->snapshot_id); + } + + // Cherry-pick protection: snapshots whose changes were picked into the + // current ancestry under a different snapshot id should not be cleaned up. + // Iterate the ancestor pointers we already have rather than re-looking-up + // each snapshot by id. + std::unordered_set picked_ancestor_snapshot_ids; + for (const auto& ancestor : ancestors_result.value()) { + if (!ancestor) continue; + const auto& summary = ancestor->summary; + auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (it == summary.end()) continue; + try { + picked_ancestor_snapshot_ids.insert(std::stoll(it->second)); + } catch (...) { + // Malformed source-snapshot-id; skip rather than fail cleanup. + } + } + + // Find manifests still referenced by a valid snapshot but written by an + // expired snapshot. Their deleted entries point at data files now safe to + // remove and become candidates for manifests_to_scan below. + std::unordered_set valid_manifests; + std::vector manifests_to_scan; + for (const auto& snapshot : metadata_after_expiration.snapshots) { + if (!snapshot) continue; + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) continue; // best-effort + for (const auto& manifest : manifests_result.value()) { + valid_manifests.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool from_valid_snapshots = valid_ids.contains(writer_id); + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_picked = picked_ancestor_snapshot_ids.contains(writer_id); + if (!from_valid_snapshots && (is_from_ancestor || is_picked) && + manifest.has_deleted_files()) { + manifests_to_scan.push_back(manifest); + } + } + } + + // Find manifests that were only referenced by snapshots that have expired, + // and split them by what kind of cleanup they need: + // - manifests_to_delete: not referenced by any retained snapshot; + // - manifests_to_scan: from a current-state ancestor and has deleted + // entries (data files now safe to drop); + // - manifests_to_revert: written by an expiring non-ancestor snapshot + // and contains added entries -- those data files were never adopted. + std::unordered_set manifest_lists_to_delete; + std::unordered_set manifests_to_delete; + std::vector manifests_to_revert; + for (const auto& snapshot : metadata_before_expiration.snapshots) { + if (!snapshot) continue; + int64_t snapshot_id = snapshot->snapshot_id; + if (valid_ids.contains(snapshot_id)) continue; + + // Skip cherry-picked snapshots; the picked snapshot owns its cleanup. + if (picked_ancestor_snapshot_ids.contains(snapshot_id)) { + continue; + } + + int64_t source_snapshot_id = -1; + auto src_it = snapshot->summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (src_it != snapshot->summary.end()) { + try { + source_snapshot_id = std::stoll(src_it->second); + } catch (...) { + source_snapshot_id = -1; + } + } + // If this commit was cherry-picked from a still-live snapshot, skip -- + // removing its data files would revert additions still in the table. + if (ancestor_ids.contains(source_snapshot_id) || + picked_ancestor_snapshot_ids.contains(source_snapshot_id)) { + continue; + } + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (manifests_result.has_value()) { + for (const auto& manifest : manifests_result.value()) { + if (valid_manifests.contains(manifest.manifest_path)) continue; + manifests_to_delete.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_from_expiring_snapshot = expired_snapshot_ids.contains(writer_id); + + if (is_from_ancestor && manifest.has_deleted_files()) { + manifests_to_scan.push_back(manifest); + } + if (!is_from_ancestor && is_from_expiring_snapshot && + manifest.has_added_files()) { + // Files added in this manifest never made it into the current + // ancestry. The is_from_expiring_snapshot guard ensures full + // ancestry between when the manifest was written and now is + // known -- otherwise missing history could hide that the + // snapshot is in fact an ancestor. + manifests_to_revert.push_back(manifest); + } + } + } + + if (!snapshot->manifest_list.empty()) { + manifest_lists_to_delete.insert(snapshot->manifest_list); + } + } + + // Deleting data files + if (level == CleanupLevel::kAll) { + // Manifests may reference partition specs that were pruned during expiration + // when CleanExpiredMetadata is enabled, so resolve schemas/specs against the + // pre-expiration metadata. Mirrors Java's findFilesToDelete which is called + // with beforeExpiration.specsById(). + auto files_to_delete = FindFilesToDelete( + metadata_before_expiration, manifests_to_scan, manifests_to_revert, valid_ids); + DeleteFiles(files_to_delete); + } + + // Deleting manifest files + DeleteFiles(manifests_to_delete); + + // Deleting manifest-list files + DeleteFiles(manifest_lists_to_delete); + + // Deleting statistics files + if (HasAnyStatisticsFiles(metadata_before_expiration) || + HasAnyStatisticsFiles(metadata_after_expiration)) { + DeleteFiles( + StatisticsFilesToDelete(metadata_before_expiration, metadata_after_expiration)); + } + + return {}; + } + + private: + /// \brief Resolve the data files that the incremental pass identified for deletion. + /// + /// For manifests_to_scan: read DELETED entries whose snapshot id is no longer + /// valid -- those are files an expired-but-ancestral snapshot removed. + /// For manifests_to_revert: read every ADDED entry -- those are files a + /// non-ancestral expired snapshot introduced and the current state never adopted. + std::unordered_set FindFilesToDelete( + const TableMetadata& metadata, const std::vector& manifests_to_scan, + const std::vector& manifests_to_revert, + const std::unordered_set& valid_ids) { + std::unordered_set files_to_delete; + + for (const auto& manifest : manifests_to_scan) { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kDeleted && entry.snapshot_id.has_value() && + !valid_ids.contains(entry.snapshot_id.value()) && entry.data_file) { + files_to_delete.insert(entry.data_file->file_path); + } + } + } + + for (const auto& manifest : manifests_to_revert) { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kAdded && entry.data_file) { + files_to_delete.insert(entry.data_file->file_path); + } + } + } + + return files_to_delete; + } +}; + +/// \brief True if any retained snapshot sits outside the current main ancestry. +/// Mirrors Java's RemoveSnapshots::hasNonMainSnapshots. +bool HasNonMainSnapshots(const TableMetadata& metadata) { + auto current_result = metadata.SnapshotById(metadata.current_snapshot_id); + if (!current_result.has_value() || current_result.value() == nullptr) { + return !metadata.snapshots.empty(); + } + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, + [&metadata](int64_t id) { return metadata.SnapshotById(id); }); + if (!ancestors_result.has_value()) { + return true; + } + std::unordered_set main_ancestors; + for (const auto& a : ancestors_result.value()) { + if (a) main_ancestors.insert(a->snapshot_id); + } + for (const auto& snapshot : metadata.snapshots) { + if (snapshot && !main_ancestors.contains(snapshot->snapshot_id)) { + return true; + } + } + return false; +} + +/// \brief True if any expired snapshot lived outside the current main ancestry. +/// Mirrors Java's RemoveSnapshots::hasRemovedNonMainAncestors. +/// +/// When `before` has no current snapshot, the main-ancestor set is empty (matching +/// Java's mainAncestors()); any removed snapshot then counts as "non-main" and +/// returns true. This guards the dispatch in Finalize() against picking incremental +/// cleanup when before-state has snapshots but no current pointer. +bool HasRemovedNonMainAncestors(const TableMetadata& before, const TableMetadata& after) { + std::unordered_set main_ancestors; + auto current_result = before.SnapshotById(before.current_snapshot_id); + if (current_result.has_value() && current_result.value() != nullptr) { + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, + [&before](int64_t id) { return before.SnapshotById(id); }); + if (!ancestors_result.has_value()) { + return true; + } + for (const auto& a : ancestors_result.value()) { + if (a) main_ancestors.insert(a->snapshot_id); + } + } + std::unordered_set after_ids; + after_ids.reserve(after.snapshots.size()); + for (const auto& s : after.snapshots) { + if (s) after_ids.insert(s->snapshot_id); + } + for (const auto& snapshot : before.snapshots) { + if (!snapshot) continue; + bool removed = !after_ids.contains(snapshot->snapshot_id); + bool in_main = main_ancestors.contains(snapshot->snapshot_id); + if (removed && !in_main) { + return true; + } + } + return false; +} + } // namespace Result> ExpireSnapshots::Make( @@ -613,11 +910,23 @@ Status ExpireSnapshots::Finalize(Result commit_result) { apply_result_.reset(); // File cleanup is best-effort: log and continue on individual file deletion failures - ReachableFileCleanup strategy(ctx_->table->io(), delete_func_); - return strategy.CleanFiles(metadata_before_expiration, metadata_after_expiration, - expired_ids, cleanup_level_); + // Pick incremental cleanup when the expiration is a simple linear-ancestry walk: + // no explicit snapshot IDs, no removed snapshots outside main ancestry, and no + // retained snapshots outside main ancestry. Mirrors Java RemoveSnapshots's + // dispatch in cleanExpiredSnapshots(). + bool can_use_incremental = !specified_snapshot_id_ && + !HasRemovedNonMainAncestors(metadata_before_expiration, + metadata_after_expiration) && + !HasNonMainSnapshots(metadata_after_expiration); + + std::unique_ptr strategy; + if (can_use_incremental) { + strategy = std::make_unique(ctx_->table->io(), delete_func_); + } else { + strategy = std::make_unique(ctx_->table->io(), delete_func_); + } + return strategy->CleanFiles(metadata_before_expiration, metadata_after_expiration, + expired_ids, cleanup_level_); } -// TODO(shangxinli): add IncrementalFileCleanup strategy for linear ancestry optimization. - } // namespace iceberg