Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 61 additions & 24 deletions src/iceberg/test/expire_snapshots_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/update/expire_snapshots.h"

#include <mutex>
#include <optional>
#include <string>
#include <vector>
Expand Down Expand Up @@ -244,10 +245,13 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
}

TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

// Apply first so apply_result_ is cached
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
Expand All @@ -262,11 +266,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
}

TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) {
std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->CleanupLevel(CleanupLevel::kNone);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
Expand All @@ -278,10 +285,13 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) {
}

TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
Expand All @@ -294,11 +304,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
}

TEST_F(ExpireSnapshotsTest, FinalizeSkipsWhenNothingExpired) {
std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->RetainLast(2);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
Expand Down Expand Up @@ -348,10 +361,13 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) {
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
Expand Down Expand Up @@ -386,10 +402,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
Expand Down Expand Up @@ -424,11 +443,14 @@ TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) {
kCurrentSequenceNumber, {});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->CleanupLevel(CleanupLevel::kMetadataOnly);
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
Expand Down Expand Up @@ -462,10 +484,13 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainedDeleteManifestSkipsDataDeletion) {
kCurrentSequenceNumber, {current_delete_manifest});
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
Expand All @@ -487,10 +512,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredStats) {
expired_manifest_list_path, current_manifest_list_path,
{MakeStatisticsFile(kExpiredSnapshotId, expired_statistics_path)}, {});

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path));
Expand All @@ -513,10 +541,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedStats) {
MakeStatisticsFile(kCurrentSnapshotId, reused_statistics_path)},
{});

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
Expand All @@ -538,10 +569,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredPartitionStats) {
expired_manifest_list_path, current_manifest_list_path, {},
{MakePartitionStatisticsFile(kExpiredSnapshotId, expired_statistics_path)});

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path));
Expand All @@ -564,10 +598,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) {
{MakePartitionStatisticsFile(kExpiredSnapshotId, reused_statistics_path),
MakePartitionStatisticsFile(kCurrentSnapshotId, reused_statistics_path)});

std::mutex deleted_files_mu;
std::vector<std::string> deleted_files;
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
update->DeleteWith(
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
std::lock_guard<std::mutex> lock(deleted_files_mu);
deleted_files.push_back(path);
});

EXPECT_THAT(update->Commit(), IsOk());
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
Expand Down
113 changes: 102 additions & 11 deletions src/iceberg/update/expire_snapshots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
#include "iceberg/update/expire_snapshots.h"

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <future>
#include <iterator>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>

Expand Down Expand Up @@ -54,6 +58,64 @@ Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec));
}

/// \brief Cap on per-CleanFiles worker concurrency.
///
/// Java's RemoveSnapshots takes ExecutorServices from the table operations layer.
/// C++ has no shared executor today, so each strategy spins up an ad-hoc pool via
/// std::async. Cap concurrency to avoid swamping FileIO with hundreds of in-flight
/// requests on hosts with very high core counts.
constexpr std::size_t kMaxParallelism = 8;

std::size_t WorkerCount(std::size_t item_count) {
if (item_count <= 1) return 1;
std::size_t hw = std::thread::hardware_concurrency();
if (hw == 0) hw = 2;
return std::min({item_count, kMaxParallelism, hw});
}

/// \brief Run `work` over `items` using up to WorkerCount(items) std::async workers.
///
/// Each worker drains a contiguous slice of `items`. Exceptions thrown by `work` are
/// swallowed -- callers that need to know whether a unit succeeded should record it
/// inside `work` (e.g. via an atomic counter or a thread-safe collection).
template <typename Item, typename Fn>
void RunInParallel(std::span<const Item> items, Fn&& work) {
if (items.empty()) return;
std::size_t n = WorkerCount(items.size());
if (n <= 1) {
for (const auto& item : items) {
try {
work(item);
} catch (...) {
// best-effort
}
}
return;
}

std::vector<std::future<void>> futures;
futures.reserve(n);
std::size_t per = (items.size() + n - 1) / n;
for (std::size_t i = 0; i < n; ++i) {
std::size_t begin = i * per;
if (begin >= items.size()) break;
std::size_t end = std::min(begin + per, items.size());
auto slice = items.subspan(begin, end - begin);
futures.emplace_back(std::async(std::launch::async, [slice, &work]() {
for (const auto& item : slice) {
try {
work(item);
} catch (...) {
// best-effort: see RunInParallel doc
}
}
}));
}
for (auto& f : futures) {
f.wait();
}
}

/// \brief Abstract strategy for cleaning up files after snapshot expiration.
class FileCleanupStrategy {
public:
Expand All @@ -75,24 +137,53 @@ class FileCleanupStrategy {
CleanupLevel level) = 0;

protected:
/// \brief Delete a single file
/// Number of attempts for a single best-effort delete. Mirrors Java's
/// Tasks.foreach(...).retry(3).
static constexpr int kDeleteMaxAttempts = 3;

/// \brief Delete a file, suppressing errors (best-effort) with bounded retries.
///
/// Uses the custom delete function if set, otherwise FileIO::DeleteFile. Retries
/// up to kDeleteMaxAttempts times on transient FileIO errors; stops immediately
/// when the underlying status is kNotFound (matching Java's stopRetryOn
/// NotFoundException). Custom delete callbacks are invoked exactly once -- caller
/// retry policy is opaque to us.
void DeleteFile(const std::string& path) {
try {
if (delete_func_) {
if (delete_func_) {
try {
delete_func_(path);
} else {
std::ignore = file_io_->DeleteFile(path);
} catch (...) {
// Suppress all exceptions during file cleanup to match Java's
// suppressFailureWhenFinished behavior.
}
} catch (...) {
/// TODO(shangxinli): add retry
return;
}

for (int attempt = 1; attempt <= kDeleteMaxAttempts; ++attempt) {
try {
auto status = file_io_->DeleteFile(path);
if (status.has_value()) return;
if (status.error().kind == ErrorKind::kNotFound) return;
if (attempt == kDeleteMaxAttempts) return;
} catch (...) {
if (attempt == kDeleteMaxAttempts) return;
}
// Linear backoff (10ms, 20ms). Tiny on purpose -- this is best-effort cleanup,
// not a critical write path; we just want to ride out a transient blip.
std::this_thread::sleep_for(std::chrono::milliseconds(10 * attempt));
}
}

/// TODO(shangxinli): Add bulk deletion
/// \brief Delete a batch of files, parallelized via RunInParallel.
///
/// TODO(shangxinli): When FileIO grows a SupportsBulkOperations-style
/// `DeleteFiles(span<string>)` API, prefer the bulk path here (mirroring
/// Java's FileCleanupStrategy.deleteFiles).
void DeleteFiles(const std::unordered_set<std::string>& paths) {
for (const auto& path : paths) {
DeleteFile(path);
}
if (paths.empty()) return;
std::vector<std::string> as_vec(paths.begin(), paths.end());
RunInParallel<std::string>(std::span<const std::string>(as_vec),
[this](const std::string& p) { DeleteFile(p); });
}

bool HasAnyStatisticsFiles(const TableMetadata& metadata) const {
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/update/expire_snapshots.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
/// If this method is not called, unnecessary manifests and data files will still be
/// deleted.
///
/// \note The supplied function may be invoked concurrently from worker threads
/// when bulk deletion is parallelized; implementations must be thread-safe.
///
/// \param delete_func A function that will be called to delete manifests and data files
/// \return Reference to this for method chaining.
ExpireSnapshots& DeleteWith(std::function<void(const std::string&)> delete_func);
Expand Down
Loading