diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 4dcc72d6c..651386096 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -19,6 +19,7 @@ #include "iceberg/update/expire_snapshots.h" +#include #include #include #include @@ -244,10 +245,13 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) { } TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); // Apply first so apply_result_ is cached ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); @@ -262,11 +266,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) { } TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->CleanupLevel(CleanupLevel::kNone); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); @@ -278,10 +285,13 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) { } TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); @@ -294,11 +304,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { } TEST_F(ExpireSnapshotsTest, FinalizeSkipsWhenNothingExpired) { + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->RetainLast(2); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); EXPECT_TRUE(result.snapshot_ids_to_remove.empty()); @@ -348,10 +361,13 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) { kCurrentSequenceNumber, {}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, @@ -386,10 +402,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) { kCurrentSequenceNumber, {}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, @@ -424,11 +443,14 @@ TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) { kCurrentSequenceNumber, {}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->CleanupLevel(CleanupLevel::kMetadataOnly); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -462,10 +484,13 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainedDeleteManifestSkipsDataDeletion) { kCurrentSequenceNumber, {current_delete_manifest}); RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path, @@ -487,10 +512,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredStats) { expired_manifest_list_path, current_manifest_list_path, {MakeStatisticsFile(kExpiredSnapshotId, expired_statistics_path)}, {}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path)); @@ -513,10 +541,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedStats) { MakeStatisticsFile(kCurrentSnapshotId, reused_statistics_path)}, {}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); @@ -538,10 +569,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredPartitionStats) { expired_manifest_list_path, current_manifest_list_path, {}, {MakePartitionStatisticsFile(kExpiredSnapshotId, expired_statistics_path)}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path)); @@ -564,10 +598,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) { {MakePartitionStatisticsFile(kExpiredSnapshotId, reused_statistics_path), MakePartitionStatisticsFile(kCurrentSnapshotId, reused_statistics_path)}); + std::mutex deleted_files_mu; std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); - update->DeleteWith( - [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { + std::lock_guard lock(deleted_files_mu); + deleted_files.push_back(path); + }); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index ce65882c9..708010706 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -20,11 +20,15 @@ #include "iceberg/update/expire_snapshots.h" #include +#include #include +#include #include #include #include +#include #include +#include #include #include @@ -54,6 +58,64 @@ Result> MakeManifestReader( return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); } +/// \brief Cap on per-CleanFiles worker concurrency. +/// +/// Java's RemoveSnapshots takes ExecutorServices from the table operations layer. +/// C++ has no shared executor today, so each strategy spins up an ad-hoc pool via +/// std::async. Cap concurrency to avoid swamping FileIO with hundreds of in-flight +/// requests on hosts with very high core counts. +constexpr std::size_t kMaxParallelism = 8; + +std::size_t WorkerCount(std::size_t item_count) { + if (item_count <= 1) return 1; + std::size_t hw = std::thread::hardware_concurrency(); + if (hw == 0) hw = 2; + return std::min({item_count, kMaxParallelism, hw}); +} + +/// \brief Run `work` over `items` using up to WorkerCount(items) std::async workers. +/// +/// Each worker drains a contiguous slice of `items`. Exceptions thrown by `work` are +/// swallowed -- callers that need to know whether a unit succeeded should record it +/// inside `work` (e.g. via an atomic counter or a thread-safe collection). +template +void RunInParallel(std::span items, Fn&& work) { + if (items.empty()) return; + std::size_t n = WorkerCount(items.size()); + if (n <= 1) { + for (const auto& item : items) { + try { + work(item); + } catch (...) { + // best-effort + } + } + return; + } + + std::vector> futures; + futures.reserve(n); + std::size_t per = (items.size() + n - 1) / n; + for (std::size_t i = 0; i < n; ++i) { + std::size_t begin = i * per; + if (begin >= items.size()) break; + std::size_t end = std::min(begin + per, items.size()); + auto slice = items.subspan(begin, end - begin); + futures.emplace_back(std::async(std::launch::async, [slice, &work]() { + for (const auto& item : slice) { + try { + work(item); + } catch (...) { + // best-effort: see RunInParallel doc + } + } + })); + } + for (auto& f : futures) { + f.wait(); + } +} + /// \brief Abstract strategy for cleaning up files after snapshot expiration. class FileCleanupStrategy { public: @@ -75,24 +137,53 @@ class FileCleanupStrategy { CleanupLevel level) = 0; protected: - /// \brief Delete a single file + /// Number of attempts for a single best-effort delete. Mirrors Java's + /// Tasks.foreach(...).retry(3). + static constexpr int kDeleteMaxAttempts = 3; + + /// \brief Delete a file, suppressing errors (best-effort) with bounded retries. + /// + /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. Retries + /// up to kDeleteMaxAttempts times on transient FileIO errors; stops immediately + /// when the underlying status is kNotFound (matching Java's stopRetryOn + /// NotFoundException). Custom delete callbacks are invoked exactly once -- caller + /// retry policy is opaque to us. void DeleteFile(const std::string& path) { - try { - if (delete_func_) { + if (delete_func_) { + try { delete_func_(path); - } else { - std::ignore = file_io_->DeleteFile(path); + } catch (...) { + // Suppress all exceptions during file cleanup to match Java's + // suppressFailureWhenFinished behavior. } - } catch (...) { - /// TODO(shangxinli): add retry + return; + } + + for (int attempt = 1; attempt <= kDeleteMaxAttempts; ++attempt) { + try { + auto status = file_io_->DeleteFile(path); + if (status.has_value()) return; + if (status.error().kind == ErrorKind::kNotFound) return; + if (attempt == kDeleteMaxAttempts) return; + } catch (...) { + if (attempt == kDeleteMaxAttempts) return; + } + // Linear backoff (10ms, 20ms). Tiny on purpose -- this is best-effort cleanup, + // not a critical write path; we just want to ride out a transient blip. + std::this_thread::sleep_for(std::chrono::milliseconds(10 * attempt)); } } - /// TODO(shangxinli): Add bulk deletion + /// \brief Delete a batch of files, parallelized via RunInParallel. + /// + /// TODO(shangxinli): When FileIO grows a SupportsBulkOperations-style + /// `DeleteFiles(span)` API, prefer the bulk path here (mirroring + /// Java's FileCleanupStrategy.deleteFiles). void DeleteFiles(const std::unordered_set& paths) { - for (const auto& path : paths) { - DeleteFile(path); - } + if (paths.empty()) return; + std::vector as_vec(paths.begin(), paths.end()); + RunInParallel(std::span(as_vec), + [this](const std::string& p) { DeleteFile(p); }); } bool HasAnyStatisticsFiles(const TableMetadata& metadata) const { diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 7c1588aa5..c47e2488a 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -114,6 +114,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// If this method is not called, unnecessary manifests and data files will still be /// deleted. /// + /// \note The supplied function may be invoked concurrently from worker threads + /// when bulk deletion is parallelized; implementations must be thread-safe. + /// /// \param delete_func A function that will be called to delete manifests and data files /// \return Reference to this for method chaining. ExpireSnapshots& DeleteWith(std::function delete_func);