From 22cd14fa229877b633614ddbc67b2e2ed3c26573 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 17:13:14 +0800 Subject: [PATCH 01/10] Add idle shrink benchmarks and tests --- CMakeLists.txt | 6 ++ README.md | 2 + README.zh.md | 2 + benchmarks/bench_latency.cpp | 30 ++++++- benchmarks/bench_linearizable.cpp | 135 ++++++++++++++++++++++++++++-- benchmarks/bench_throughput.cpp | 117 +++++++++++++++----------- include/daking/MPSC_queue.hpp | 22 +++++ tests/test_MPSC.cpp | 18 ++++ 8 files changed, 273 insertions(+), 59 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1b28383..dfa0db3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,6 +51,12 @@ set(HDR_HISTOGRAM_BUILD_SHARED OFF CACHE BOOL "" FORCE) set(HDR_LOG_REQUIRED "DISABLED" CACHE STRING "" FORCE) FetchContent_MakeAvailable(hdrhistogram) +if(TARGET hdr_histogram_static) + target_compile_options(hdr_histogram_static PRIVATE + $<$:-Wno-incompatible-pointer-types -Wno-deprecated-declarations> + ) +endif() + # moodycamel ConcurrentQueue FetchContent_Declare( concurrentqueue diff --git a/README.md b/README.md index e4f1c74..b96b6db 100644 --- a/README.md +++ b/README.md @@ -284,6 +284,8 @@ We get below performance: 2. `ThreadLocalCapacity` (thread-local capacity) is fixed at **compile time**. 3. Pointer chasing cannot be avoided because it is a pure linked-list structure. +If you need to reclaim memory while keeping the queue instance alive, call `shrink_to_fit()` only after the queue has been fully drained and all producers have stopped. This is an idle-time reclamation path, not a concurrent shrink path. + ## Features 1. Multiple-Producer, Single-Consumer (MPSC). The closer the contention scenario is to SPSC, the closer the throughput gets to the SPSC benchmark performance. diff --git a/README.zh.md b/README.zh.md index 3422e84..3c10bcc 100644 --- a/README.zh.md +++ b/README.zh.md @@ -286,6 +286,8 @@ cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark 2. `ThreadLocalCapacity`(线程本地容量)在**编译时**已固定。 3. 无法避免指针追逐,因为是纯链表结构。 +如果你需要在队列实例仍然存在时回收内存,可以在队列已经完全清空并且所有生产者停止之后调用 `shrink_to_fit()`。它是一个空闲期回收接口,不是并发缩容接口。 + ## 特性 (FEATURES) 1. 多生产者,单消费者(MPSC), 若竞争场景越接近SPSC,吞吐量越接近SPSC基准测试性能。 diff --git a/benchmarks/bench_latency.cpp b/benchmarks/bench_latency.cpp index e1871c0..c6b274b 100644 --- a/benchmarks/bench_latency.cpp +++ b/benchmarks/bench_latency.cpp @@ -132,8 +132,36 @@ static void BM_MPSC_PureDequeueLatency(benchmark::State& state) { hdr_close(hist); } +static void BM_MPSC_ShrinkToFitLatency(benchmark::State& state) { + hdr_histogram* hist; + hdr_init(1, 1000000, 3, &hist); + + for (auto _ : state) { + TestQueue q; + TestQueue::reserve_global_chunk(64); + for (int i = 0; i < 10000; ++i) { + q.enqueue(i); + } + int val = 0; + while (q.try_dequeue(val)) {} + auto start = __rdtsc(); + bool ok = q.shrink_to_fit(); + auto end = __rdtsc(); + if (!ok) { + state.SkipWithError("shrink_to_fit failed unexpectedly"); + break; + } + hdr_record_value(hist, end - start); + } + + state.counters["P99_ns"] = hdr_value_at_percentile(hist, 99.0) / CYCLES_PER_NS; + state.counters["P99.9_ns"] = hdr_value_at_percentile(hist, 99.9) / CYCLES_PER_NS; + hdr_close(hist); +} + BENCHMARK(BM_MPSC_PureEnqueueLatency)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->Unit(benchmark::kMicrosecond); BENCHMARK(BM_MPSC_PureDequeueLatency)->Unit(benchmark::kMicrosecond); +BENCHMARK(BM_MPSC_ShrinkToFitLatency)->Unit(benchmark::kMicrosecond); BENCHMARK_MAIN(); @@ -181,4 +209,4 @@ int main(int argc, char** argv) { return 0; } -#endif \ No newline at end of file +#endif diff --git a/benchmarks/bench_linearizable.cpp b/benchmarks/bench_linearizable.cpp index f5c3b60..a6bba39 100644 --- a/benchmarks/bench_linearizable.cpp +++ b/benchmarks/bench_linearizable.cpp @@ -7,8 +7,6 @@ #include "daking/MPSC_queue.hpp" -#if DAKING_HAS_CXX20_OR_ABOVE - struct Message { int producer_id; uint64_t seq; @@ -16,6 +14,119 @@ struct Message { using LinearQueue = daking::MPSC_queue; +static void BM_MPSC_Test_Linearizable_TryDequeue(benchmark::State& state) { + const int num_producers = (int)state.range(0); + const uint64_t ops_per_producer = 1000000; + + for (auto _ : state) { + state.PauseTiming(); + + LinearQueue q; + std::atomic start_signal{false}; + std::atomic linear_error{false}; + std::vector last_seq(num_producers, 0); + + std::thread consumer([&]() { + while (!start_signal.load(std::memory_order_acquire)); + const uint64_t total_expected = ops_per_producer * num_producers; + uint64_t received = 0; + while (received < total_expected) { + Message msg; + if (!q.try_dequeue(msg)) { + continue; + } + if (msg.seq <= last_seq[msg.producer_id] && last_seq[msg.producer_id] != 0) { + linear_error.store(true, std::memory_order_relaxed); + } + last_seq[msg.producer_id] = msg.seq; + ++received; + } + }); + + std::vector producers; + producers.reserve(num_producers); + for (int p_id = 0; p_id < num_producers; ++p_id) { + producers.emplace_back([&, p_id]() { + while (!start_signal.load(std::memory_order_acquire)); + for (uint64_t s = 1; s <= ops_per_producer; ++s) { + q.enqueue({p_id, s}); + } + }); + } + + state.ResumeTiming(); + start_signal.store(true, std::memory_order_release); + + for (auto& p : producers) p.join(); + consumer.join(); + + if (linear_error.load(std::memory_order_relaxed)) { + state.SkipWithError("Linearizability Violation Detected!"); + break; + } + } + + state.SetItemsProcessed(state.range(0) * ops_per_producer * state.iterations()); + state.SetLabel("TryDequeue linearizable when " + std::to_string(state.range(0)) + "P."); +} + +static void BM_MPSC_Test_Linearizable_AfterShrink_TryDequeue(benchmark::State& state) { + const int num_producers = (int)state.range(0); + const uint64_t ops_per_producer = 200000; + + for (auto _ : state) { + state.PauseTiming(); + LinearQueue q; + q.shrink_to_fit(); + std::atomic start_signal{false}; + std::atomic linear_error{false}; + std::vector last_seq(num_producers, 0); + + std::thread consumer([&]() { + while (!start_signal.load(std::memory_order_acquire)); + const uint64_t total_expected = ops_per_producer * num_producers; + uint64_t received = 0; + while (received < total_expected) { + Message msg; + if (!q.try_dequeue(msg)) { + continue; + } + if (msg.seq <= last_seq[msg.producer_id] && last_seq[msg.producer_id] != 0) { + linear_error.store(true, std::memory_order_relaxed); + } + last_seq[msg.producer_id] = msg.seq; + ++received; + } + }); + + std::vector producers; + producers.reserve(num_producers); + for (int p_id = 0; p_id < num_producers; ++p_id) { + producers.emplace_back([&, p_id]() { + while (!start_signal.load(std::memory_order_acquire)); + for (uint64_t s = 1; s <= ops_per_producer; ++s) { + q.enqueue({p_id, s}); + } + }); + } + + state.ResumeTiming(); + start_signal.store(true, std::memory_order_release); + for (auto& p : producers) p.join(); + consumer.join(); + + if (linear_error.load(std::memory_order_relaxed)) { + state.SkipWithError("Linearizability Violation Detected After Shrink!"); + break; + } + } + + state.SetItemsProcessed(state.range(0) * ops_per_producer * state.iterations()); + state.SetLabel("TryDequeue linearizable after shrink when " + std::to_string(state.range(0)) + "P."); +} + +#if DAKING_HAS_CXX20_OR_ABOVE + static void BM_MPSC_Test_Linearizable(benchmark::State& state) { const int num_producers = (int)state.range(0); const uint64_t ops_per_producer = 1000000; @@ -79,13 +190,19 @@ BENCHMARK(BM_MPSC_Test_Linearizable) ->UseRealTime() ->Unit(benchmark::kMillisecond); -BENCHMARK_MAIN(); +#endif -#else +BENCHMARK(BM_MPSC_Test_Linearizable_TryDequeue) + ->Arg(1) + ->Arg(4) + ->Arg(16) + ->UseRealTime() + ->Unit(benchmark::kMillisecond); -int main(int argc, char** argv) { - std::cout << "This test is only for C++20 and above. Modify CMakeLists.txt \"set(CMAKE_CXX_STANDARD 17)\"->\"set(CMAKE_CXX_STANDARD 20)\"" << std::endl; - return 0; -} +BENCHMARK(BM_MPSC_Test_Linearizable_AfterShrink_TryDequeue) + ->Arg(1) + ->Arg(4) + ->UseRealTime() + ->Unit(benchmark::kMillisecond); -#endif \ No newline at end of file +BENCHMARK_MAIN(); diff --git a/benchmarks/bench_throughput.cpp b/benchmarks/bench_throughput.cpp index a3a129d..3e5d3e2 100644 --- a/benchmarks/bench_throughput.cpp +++ b/benchmarks/bench_throughput.cpp @@ -11,6 +11,13 @@ constexpr size_t TOTAL_OPS = 100000000; // using TestQueue = moodycamel::ConcurrentQueue; using TestQueue = daking::MPSC_queue; +struct BenchmarkRunConfig { + int producers; + size_t reserve_chunks; + bool shrink_before_run; + bool bulk_mode; +}; + void producer_thread(TestQueue* q, size_t items_to_push, std::atomic_bool* start) { while (!start->load(std::memory_order_acquire)) { std::this_thread::yield(); @@ -20,6 +27,16 @@ void producer_thread(TestQueue* q, size_t items_to_push, std::atomic_bool* start } } +void producer_thread_enqueue_bulk(TestQueue* q, size_t items_to_push, std::atomic_bool* start) { + while (!start->load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + int nums[32]{}; + for (size_t i = 0; i < items_to_push; i += 32) { + q->enqueue_bulk(nums, std::min(items_to_push - i, (std::size_t)32)); + } +} + void consumer_thread(TestQueue* q, size_t total_items_to_pop, std::atomic_bool* start) { while (!start->load(std::memory_order_acquire)) { std::this_thread::yield(); @@ -33,20 +50,30 @@ void consumer_thread(TestQueue* q, size_t total_items_to_pop, std::atomic_bool* } } -static void BM_MPSC_Throughput(benchmark::State& state) { - const int num_producers = (int)state.range(0); - const size_t items_per_producer = TOTAL_OPS / num_producers; - - TestQueue q; +static void RunThroughputScenario(benchmark::State& state, const BenchmarkRunConfig& cfg) { + const size_t items_per_producer = TOTAL_OPS / cfg.producers; for (auto _ : state) { + TestQueue q; + if (cfg.reserve_chunks != 0) { + TestQueue::reserve_global_chunk(cfg.reserve_chunks); + } + if (cfg.shrink_before_run) { + q.shrink_to_fit(); + } + state.PauseTiming(); std::vector producers; - producers.reserve(num_producers); - std::atomic_bool start{ false }; + producers.reserve(cfg.producers); + std::atomic_bool start{ false }; std::thread consumer(consumer_thread, &q, TOTAL_OPS, &start); - for (int i = 0; i < num_producers; ++i) { - producers.emplace_back(producer_thread, &q, items_per_producer, &start); + for (int i = 0; i < cfg.producers; ++i) { + if (cfg.bulk_mode) { + producers.emplace_back(producer_thread_enqueue_bulk, &q, items_per_producer, &start); + } + else { + producers.emplace_back(producer_thread, &q, items_per_producer, &start); + } } state.ResumeTiming(); start.store(true, std::memory_order_release); @@ -61,7 +88,11 @@ static void BM_MPSC_Throughput(benchmark::State& state) { } state.SetItemsProcessed(TOTAL_OPS * state.iterations()); - state.SetLabel("P=" + std::to_string(num_producers) + ", C=1"); +} + +static void BM_MPSC_Throughput(benchmark::State& state) { + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 0, false, false }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", warm"); } BENCHMARK(BM_MPSC_Throughput) @@ -129,47 +160,11 @@ BENCHMARK(BM_4x_UnevenWave_SPSClike_Aggregation) ->UseRealTime() ->MinWarmUpTime(2.0); -void producer_thread_enqueue_bulk(TestQueue* q, size_t items_to_push, std::atomic_bool* start) { - while (!start->load(std::memory_order_acquire)) { - std::this_thread::yield(); - } - int nums[32]{}; - for (size_t i = 0; i < items_to_push; i += 32) { - q->enqueue_bulk(nums, std::min(items_to_push - i, (std::size_t)32)); - } -} - // consumer thread remains the same, because dequeue_bulk is just a while loop of try_dequeue static void BM_MPSC_Throughput_Bulk(benchmark::State& state) { - const int num_producers = (int)state.range(0); - const size_t items_per_producer = TOTAL_OPS / num_producers; - - TestQueue q; - - for (auto _ : state) { - state.PauseTiming(); - std::vector producers; - producers.reserve(num_producers); - std::atomic_bool start{ false }; - std::thread consumer(consumer_thread, &q, TOTAL_OPS, &start); - for (int i = 0; i < num_producers; ++i) { - producers.emplace_back(producer_thread_enqueue_bulk, &q, items_per_producer, &start); - } - state.ResumeTiming(); - start.store(true, std::memory_order_release); - if (consumer.joinable()) { - consumer.join(); - } - state.PauseTiming(); - for (auto& p : producers) { - p.join(); - } - state.ResumeTiming(); - } - - state.SetItemsProcessed(TOTAL_OPS * state.iterations()); - state.SetLabel("P=" + std::to_string(num_producers) + ", C=1"); + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 0, false, true }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", bulk"); } BENCHMARK(BM_MPSC_Throughput_Bulk) @@ -181,6 +176,30 @@ BENCHMARK(BM_MPSC_Throughput_Bulk) ->UseRealTime() ->MinWarmUpTime(2.0); +static void BM_MPSC_Throughput_ColdStart(benchmark::State& state) { + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 0, true, false }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", cold-shrink"); +} + +BENCHMARK(BM_MPSC_Throughput_ColdStart) + ->Args({ 1 }) + ->Args({ 4 }) + ->Args({ 16 }) + ->UseRealTime() + ->MinWarmUpTime(2.0); + +static void BM_MPSC_Throughput_PreReserved(benchmark::State& state) { + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 256, false, false }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", pre-reserved"); +} + +BENCHMARK(BM_MPSC_Throughput_PreReserved) + ->Args({ 1 }) + ->Args({ 4 }) + ->Args({ 16 }) + ->UseRealTime() + ->MinWarmUpTime(2.0); + BENCHMARK_MAIN(); /* @@ -236,4 +255,4 @@ BM_MPSC_Throughput_Bulk/2/min_warmup_time:2.000/real_time 1451039 BM_MPSC_Throughput_Bulk/4/min_warmup_time:2.000/real_time 1491950000 ns 0.000 ns 1 items_per_second=67.0264M/s P=4, C=1 BM_MPSC_Throughput_Bulk/8/min_warmup_time:2.000/real_time 1674253600 ns 0.000 ns 1 items_per_second=59.7281M/s P=8, C=1 BM_MPSC_Throughput_Bulk/16/min_warmup_time:2.000/real_time 2299654400 ns 0.000 ns 1 items_per_second=43.4848M/s P=16, C=1 -*/ \ No newline at end of file +*/ diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index 499eaeb..4213164 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -644,6 +644,28 @@ namespace daking { return global_manager_instance_ ? _reserve_global_external(chunk_count) : false; } + DAKING_ALWAYS_INLINE bool shrink_to_fit() { + if (!empty()) { + return false; + } + + std::lock_guard lock(global_mutex_); + if (global_instance_count_.load(std::memory_order_acquire) != 1) { + return false; + } + + if (!empty()) { + return false; + } + + _free_global(); + _get_global_manager().reserve(thread_local_capacity); + node_t* dummy = _allocate(); + tail_ = dummy; + head_.store(dummy, std::memory_order_release); + return true; + } + private: DAKING_ALWAYS_INLINE static manager_t& _get_global_manager() noexcept { return *global_manager_instance_; diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index 58988ff..cfc70ef 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -97,6 +97,24 @@ TEST(MPSCQueueMemoryTest, ReserveGlobalChunk) { EXPECT_EQ(Q::global_node_size_apprx(), reserved_size); } +TEST(MPSCQueueMemoryTest, ShrinkToFit_ReclaimsPagesWhenIdle) { + using Q = MPSC_queue; + Q q; + + Q::reserve_global_chunk(12); + size_t before = Q::global_node_size_apprx(); + EXPECT_GE(before, (size_t)12 * 64); + + int value = 0; + q.enqueue(7); + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_TRUE(q.empty()); + + EXPECT_TRUE(q.shrink_to_fit()); + EXPECT_TRUE(q.empty()); + EXPECT_EQ(Q::global_node_size_apprx(), (size_t)64); +} + // ------------------------------------------------------------------------- // III. Bulk Operation Tests // ------------------------------------------------------------------------- From 958e7e9345be8e9d4a7bc3225bb7e70fe57dbc3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 17:33:08 +0800 Subject: [PATCH 02/10] Document idle shrink boundaries --- README.md | 48 ++++++++++++++++++++++----------------------- README.zh.md | 48 ++++++++++++++++++++++----------------------- tests/test_MPSC.cpp | 32 ++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index b96b6db..2f4bad5 100644 --- a/README.md +++ b/README.md @@ -203,45 +203,45 @@ Command: ```powershell cmake -S . -B out/build/clang-local -G Ninja -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Release cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark -.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.1s --benchmark_repetitions=1 --benchmark_counters_tabular=true +.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.5s --benchmark_repetitions=1 --benchmark_counters_tabular=true ``` Uniform single-element enqueue: | Queue | P (Producers) | C (Consumer) | **Throughput (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **86.78** | -| daking | 2 | 1 | **32.29** | -| daking | 4 | 1 | **32.52** | -| daking | 8 | 1 | 31.16 | -| moodycamel | 1 | 1 | 22.74 | -| moodycamel | 2 | 1 | 27.82 | -| moodycamel | 4 | 1 | 29.79 | -| **moodycamel** | 8 | 1 | **32.07** | +| **daking** | 1 | 1 | **82.21** | +| daking | 2 | 1 | **29.69** | +| daking | 4 | 1 | **32.22** | +| daking | 8 | 1 | 28.22 | +| moodycamel | 1 | 1 | 23.21 | +| moodycamel | 2 | 1 | 28.50 | +| moodycamel | 4 | 1 | 30.75 | +| **moodycamel** | 8 | 1 | **30.86** | Uneven sequential burst: | Queue | P (Producers) | C (Consumer) | Relay % | **Throughput (M items/s)** | | :--- | :--- | :--- | :--- | :--- | -| daking | 4 | 1 | $50.0\%$ | **33.89** | -| daking | 4 | 1 | $90.0\%$ | **61.72** | -| daking | 4 | 1 | $98.0\%$ | **73.72** | -| moodycamel | 4 | 1 | $50.0\%$ | 23.84 | -| moodycamel | 4 | 1 | $90.0\%$ | 22.25 | -| moodycamel | 4 | 1 | $98.0\%$ | 17.64 | +| daking | 4 | 1 | $50.0\%$ | **32.62** | +| daking | 4 | 1 | $90.0\%$ | **58.54** | +| daking | 4 | 1 | $98.0\%$ | **68.39** | +| moodycamel | 4 | 1 | $50.0\%$ | 25.31 | +| moodycamel | 4 | 1 | $90.0\%$ | 22.75 | +| moodycamel | 4 | 1 | $98.0\%$ | 21.69 | Bulk enqueue: | Queue | P (Producers) | C (Consumer) | **Throughput (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **102.25** | -| **daking** | 2 | 1 | **102.37** | -| **daking** | 4 | 1 | **85.23** | -| **daking** | 8 | 1 | **77.16** | -| moodycamel | 1 | 1 | 17.02 | -| moodycamel | 2 | 1 | 18.85 | -| moodycamel | 4 | 1 | 18.06 | -| moodycamel | 8 | 1 | 16.02 | +| **daking** | 1 | 1 | **167.47** | +| **daking** | 2 | 1 | **191.27** | +| **daking** | 4 | 1 | **195.14** | +| **daking** | 8 | 1 | **159.11** | +| moodycamel | 1 | 1 | 36.24 | +| moodycamel | 2 | 1 | 36.23 | +| moodycamel | 4 | 1 | 34.82 | +| moodycamel | 8 | 1 | 33.99 | **Part V: Enqueue/Dequeue Latency** @@ -284,7 +284,7 @@ We get below performance: 2. `ThreadLocalCapacity` (thread-local capacity) is fixed at **compile time**. 3. Pointer chasing cannot be avoided because it is a pure linked-list structure. -If you need to reclaim memory while keeping the queue instance alive, call `shrink_to_fit()` only after the queue has been fully drained and all producers have stopped. This is an idle-time reclamation path, not a concurrent shrink path. +If you need to reclaim memory while keeping the queue instance alive, call `shrink_to_fit()` only after the queue has been fully drained and all producers have stopped. This is an idle-time reclamation path, not a concurrent shrink path. It is intentionally conservative and is not designed for online elastic memory reclamation. ## Features diff --git a/README.zh.md b/README.zh.md index 3c10bcc..68543cf 100644 --- a/README.zh.md +++ b/README.zh.md @@ -203,45 +203,45 @@ Compiler: Clang 22.1.1 Release ```powershell cmake -S . -B out/build/clang-local -G Ninja -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Release cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark -.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.1s --benchmark_repetitions=1 --benchmark_counters_tabular=true +.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.5s --benchmark_repetitions=1 --benchmark_counters_tabular=true ``` 均匀单元素入队: | 队列 | P (生产者) | C (消费者) | **吞吐量 (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **86.78** | -| daking | 2 | 1 | **32.29** | -| daking | 4 | 1 | **32.52** | -| daking | 8 | 1 | 31.16 | -| moodycamel | 1 | 1 | 22.74 | -| moodycamel | 2 | 1 | 27.82 | -| moodycamel | 4 | 1 | 29.79 | -| **moodycamel** | 8 | 1 | **32.07** | +| **daking** | 1 | 1 | **82.21** | +| daking | 2 | 1 | **29.69** | +| daking | 4 | 1 | **32.22** | +| daking | 8 | 1 | 28.22 | +| moodycamel | 1 | 1 | 23.21 | +| moodycamel | 2 | 1 | 28.50 | +| moodycamel | 4 | 1 | 30.75 | +| **moodycamel** | 8 | 1 | **30.86** | 不均匀顺序爆发: | 队列 | P (生产者) | C (消费者) | 接力百分比 | **吞吐量 (M items/s)** | | :--- | :--- | :--- | :--- | :--- | -| daking | 4 | 1 | $50.0\%$ | **33.89** | -| daking | 4 | 1 | $90.0\%$ | **61.72** | -| daking | 4 | 1 | $98.0\%$ | **73.72** | -| moodycamel | 4 | 1 | $50.0\%$ | 23.84 | -| moodycamel | 4 | 1 | $90.0\%$ | 22.25 | -| moodycamel | 4 | 1 | $98.0\%$ | 17.64 | +| daking | 4 | 1 | $50.0\%$ | **32.62** | +| daking | 4 | 1 | $90.0\%$ | **58.54** | +| daking | 4 | 1 | $98.0\%$ | **68.39** | +| moodycamel | 4 | 1 | $50.0\%$ | 25.31 | +| moodycamel | 4 | 1 | $90.0\%$ | 22.75 | +| moodycamel | 4 | 1 | $98.0\%$ | 21.69 | 批量入队: | 队列 | P (生产者) | C (消费者) | **吞吐量 (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **102.25** | -| **daking** | 2 | 1 | **102.37** | -| **daking** | 4 | 1 | **85.23** | -| **daking** | 8 | 1 | **77.16** | -| moodycamel | 1 | 1 | 17.02 | -| moodycamel | 2 | 1 | 18.85 | -| moodycamel | 4 | 1 | 18.06 | -| moodycamel | 8 | 1 | 16.02 | +| **daking** | 1 | 1 | **167.47** | +| **daking** | 2 | 1 | **191.27** | +| **daking** | 4 | 1 | **195.14** | +| **daking** | 8 | 1 | **159.11** | +| moodycamel | 1 | 1 | 36.24 | +| moodycamel | 2 | 1 | 36.23 | +| moodycamel | 4 | 1 | 34.82 | +| moodycamel | 8 | 1 | 33.99 | **第五部分:Enqueue/Dequeue Latency** @@ -286,7 +286,7 @@ cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark 2. `ThreadLocalCapacity`(线程本地容量)在**编译时**已固定。 3. 无法避免指针追逐,因为是纯链表结构。 -如果你需要在队列实例仍然存在时回收内存,可以在队列已经完全清空并且所有生产者停止之后调用 `shrink_to_fit()`。它是一个空闲期回收接口,不是并发缩容接口。 +如果你需要在队列实例仍然存在时回收内存,可以在队列已经完全清空并且所有生产者停止之后调用 `shrink_to_fit()`。它是一个空闲期回收接口,不是并发缩容接口,也不是在线弹性回收设计。 ## 特性 (FEATURES) diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index cfc70ef..54a1f16 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -115,6 +115,38 @@ TEST(MPSCQueueMemoryTest, ShrinkToFit_ReclaimsPagesWhenIdle) { EXPECT_EQ(Q::global_node_size_apprx(), (size_t)64); } +TEST(MPSCQueueMemoryTest, ShrinkToFit_FailsWhenQueueIsNotEmpty) { + using Q = MPSC_queue; + Q q; + + q.enqueue(7); + EXPECT_FALSE(q.shrink_to_fit()); + + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 7); + EXPECT_TRUE(q.empty()); +} + +TEST(MPSCQueueMemoryTest, ShrinkToFit_FailsWhileAnotherInstanceLives) { + using Q = MPSC_queue; + Q primary; + + { + Q secondary; + int value = 0; + + Q::reserve_global_chunk(4); + primary.enqueue(11); + EXPECT_TRUE(primary.try_dequeue(value)); + EXPECT_TRUE(primary.empty()); + EXPECT_FALSE(primary.shrink_to_fit()); + } + + EXPECT_TRUE(primary.shrink_to_fit()); + EXPECT_EQ(Q::global_node_size_apprx(), (size_t)64); +} + // ------------------------------------------------------------------------- // III. Bulk Operation Tests // ------------------------------------------------------------------------- From bb4fcb7b82f45d997ff6140ea9baac7c592dde06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 18:07:16 +0800 Subject: [PATCH 03/10] Add memory lifecycle benchmark --- CMakeLists.txt | 14 ++++ README.md | 24 ++++++- README.zh.md | 24 ++++++- benchmarks/bench_memory_cycle.cpp | 114 ++++++++++++++++++++++++++++++ 4 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 benchmarks/bench_memory_cycle.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index dfa0db3..39604d4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,6 +91,20 @@ target_link_libraries(mpsc_bench_throughput ) target_compile_options(mpsc_bench_throughput ${COMMON_TARGET_PROPERTIES}) +add_executable(mpsc_bench_memory_cycle benchmarks/bench_memory_cycle.cpp) +target_include_directories(mpsc_bench_memory_cycle + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR} +) +target_link_libraries(mpsc_bench_memory_cycle + PRIVATE + benchmark::benchmark_main + Threads::Threads + ${ATOMIC_LIBRARY} +) +target_compile_options(mpsc_bench_memory_cycle ${COMMON_TARGET_PROPERTIES}) + add_executable(mpsc_bench_latency benchmarks/bench_latency.cpp) target_include_directories(mpsc_bench_latency PRIVATE diff --git a/README.md b/README.md index 2f4bad5..5c361bd 100644 --- a/README.md +++ b/README.md @@ -243,7 +243,29 @@ Bulk enqueue: | moodycamel | 4 | 1 | 34.82 | | moodycamel | 8 | 1 | 33.99 | -**Part V: Enqueue/Dequeue Latency** +**Part V: Memory Lifecycle Benchmark (Stable vs Idle Reclaim)** + +This benchmark runs two production/drain bursts on the same queue. The `stable` mode keeps the warm global pool between bursts. The `idle_reclaim` mode calls `shrink_to_fit()` after the first burst and then measures the next burst from the reclaimed state. + +Command: + +```powershell +cmake --build out/build/clang-local --target mpsc_bench_memory_cycle +.\out\build\clang-local\mpsc_bench_memory_cycle.exe --benchmark_min_time=0.2s --benchmark_repetitions=1 --benchmark_counters_tabular=true +``` + +| Mode | P (Producers) | Nodes Before | Nodes After | **Throughput (M items/s)** | Shrink Time (us) | +| :--- | :--- | :--- | :--- | :--- | :--- | +| stable | 1 | 16.384k | 16.384k | 96.69 | - | +| stable | 4 | 16.384k | 16.384k | 32.20 | - | +| stable | 16 | 32.768k | 32.768k | 37.54 | - | +| idle_reclaim | 1 | 32.768k | 256 | 93.94 | 3.5 | +| idle_reclaim | 4 | 32.768k | 256 | 31.98 | 5.5 | +| idle_reclaim | 16 | 262.144k | 256 | 37.01 | 135.2 | + +The idle reclaim path aggressively returns the global pool to the minimum chunk count while keeping the next burst in the same throughput band. It should still be treated as a quiescent-state operation, not an online elastic reclamation mechanism. + +**Part VI: Enqueue/Dequeue Latency** (Based on HdrHistogram, Test on Linux) We get below performance: diff --git a/README.zh.md b/README.zh.md index 68543cf..a8bda5c 100644 --- a/README.zh.md +++ b/README.zh.md @@ -243,7 +243,29 @@ cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark | moodycamel | 4 | 1 | 34.82 | | moodycamel | 8 | 1 | 33.99 | -**第五部分:Enqueue/Dequeue Latency** +**第五部分:内存生命周期基准测试(稳定模式 vs 空闲回收)** + +这个基准测试在同一个队列上连续执行两轮生产/清空 burst。`stable` 模式会保留第一轮之后的全局池热状态;`idle_reclaim` 模式会在第一轮后调用 `shrink_to_fit()`,再从回收后的状态执行下一轮 burst。 + +运行命令: + +```powershell +cmake --build out/build/clang-local --target mpsc_bench_memory_cycle +.\out\build\clang-local\mpsc_bench_memory_cycle.exe --benchmark_min_time=0.2s --benchmark_repetitions=1 --benchmark_counters_tabular=true +``` + +| 模式 | P (生产者) | 回收前节点数 | 回收后节点数 | **吞吐量 (M items/s)** | Shrink 耗时 (us) | +| :--- | :--- | :--- | :--- | :--- | :--- | +| stable | 1 | 16.384k | 16.384k | 96.69 | - | +| stable | 4 | 16.384k | 16.384k | 32.20 | - | +| stable | 16 | 32.768k | 32.768k | 37.54 | - | +| idle_reclaim | 1 | 32.768k | 256 | 93.94 | 3.5 | +| idle_reclaim | 4 | 32.768k | 256 | 31.98 | 5.5 | +| idle_reclaim | 16 | 262.144k | 256 | 37.01 | 135.2 | + +空闲回收路径可以把全局池激进地回收到最小 chunk 数,同时下一轮 burst 的吞吐仍处在同一档。但它仍然应该被视为静止期操作,不是在线弹性回收机制。 + +**第六部分:Enqueue/Dequeue Latency** (此部分基于HdrHistogram,在Linux平台测试) 我们得到了以下延迟表现: diff --git a/benchmarks/bench_memory_cycle.cpp b/benchmarks/bench_memory_cycle.cpp new file mode 100644 index 0000000..3960944 --- /dev/null +++ b/benchmarks/bench_memory_cycle.cpp @@ -0,0 +1,114 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "daking/MPSC_queue.hpp" + +namespace { + +using TestQueue = daking::MPSC_queue; + +constexpr std::size_t kItemsPerProducer = 200000; + +void producer_thread(TestQueue* q, std::size_t items_to_push, std::atomic_bool* start) { + while (!start->load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + for (std::size_t i = 0; i < items_to_push; ++i) { + q->enqueue(1); + } +} + +void consumer_thread(TestQueue* q, std::size_t total_items_to_pop, std::atomic_bool* start) { + while (!start->load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + std::size_t popped_count = 0; + int val = 0; + while (popped_count < total_items_to_pop) { + if (q->try_dequeue(val)) { + ++popped_count; + } + else { + std::this_thread::yield(); + } + } +} + +void run_burst(TestQueue& q, int producers, std::size_t items_per_producer) { + const std::size_t total_items = items_per_producer * static_cast(producers); + std::atomic_bool start{false}; + + std::thread consumer(consumer_thread, &q, total_items, &start); + + std::vector producer_threads; + producer_threads.reserve(static_cast(producers)); + for (int i = 0; i < producers; ++i) { + producer_threads.emplace_back(producer_thread, &q, items_per_producer, &start); + } + + start.store(true, std::memory_order_release); + for (auto& producer : producer_threads) { + producer.join(); + } + consumer.join(); +} + +template +static void BM_MPSC_MemoryCycle(benchmark::State& state) { + const int producers = static_cast(state.range(0)); + const std::size_t items_per_producer = kItemsPerProducer; + const std::size_t total_items_per_burst = items_per_producer * static_cast(producers); + + for (auto _ : state) { + TestQueue q; + TestQueue::reserve_global_chunk(64); + + run_burst(q, producers, items_per_producer); + if (!q.empty()) { + state.SkipWithError("Queue should be empty after first burst"); + break; + } + + const auto nodes_before = TestQueue::global_node_size_apprx(); + double shrink_us = 0.0; + if constexpr (ReclaimBetweenBursts) { + const auto shrink_start = std::chrono::steady_clock::now(); + if (!q.shrink_to_fit()) { + state.SkipWithError("shrink_to_fit failed unexpectedly"); + break; + } + const auto shrink_end = std::chrono::steady_clock::now(); + shrink_us = std::chrono::duration(shrink_end - shrink_start).count(); + } + const auto nodes_after = TestQueue::global_node_size_apprx(); + + run_burst(q, producers, items_per_producer); + if (!q.empty()) { + state.SkipWithError("Queue should be empty after second burst"); + break; + } + + state.counters["global_nodes_before"] = static_cast(nodes_before); + state.counters["global_nodes_after"] = static_cast(nodes_after); + if constexpr (ReclaimBetweenBursts) { + state.counters["shrink_us"] = shrink_us; + } + } + + state.SetItemsProcessed(static_cast(total_items_per_burst * 2 * state.iterations())); + state.SetLabel(std::string(ReclaimBetweenBursts ? "idle_reclaim" : "stable") + ", P=" + std::to_string(producers)); +} + +} // namespace + +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, false)->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, true)->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); + +BENCHMARK_MAIN(); From 494835395cf1d7ae4a88be73085a19690600fe4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 19:14:14 +0800 Subject: [PATCH 04/10] Add memory policy scaffold --- docs/elastic-reclaim-design.md | 134 +++++++++++++++++++++++++++++++++ include/daking/MPSC_queue.hpp | 29 ++++++- tests/test_MPSC.cpp | 12 +++ 3 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 docs/elastic-reclaim-design.md diff --git a/docs/elastic-reclaim-design.md b/docs/elastic-reclaim-design.md new file mode 100644 index 0000000..3cbd4aa --- /dev/null +++ b/docs/elastic-reclaim-design.md @@ -0,0 +1,134 @@ +# Elastic Reclaim Design Notes + +This document records the design direction for an online elastic reclamation mode. +The existing queue is optimized for stable, low-jitter throughput and should remain +the default behavior until the elastic design is proven by benchmarks and stress +tests. + +## Current Memory Model + +The current implementation uses: + +- Per-thread local node caches. +- A global lock-free chunk stack. +- Page allocation owned by a type-level global manager. +- Freely mixed chunks: nodes from different pages can be combined into the same + logical chunk. + +This is fast because a producer or consumer only deals with a local node list in +the hot path, and the global stack only moves chunk heads with a tagged pointer. + +## Why Online Page Reclaim Is Not Safe Today + +The current page list knows which contiguous allocation owns nodes, but the global +chunk stack does not preserve page ownership. After enough push/pop cycles, a chunk +can contain nodes from many pages. Because of that, the manager cannot prove that +all nodes from a page are back in the global pool while producers and the consumer +are still running. + +There is also a producer publication window: + +1. Producer allocates a node. +2. Producer exchanges `head_`. +3. Producer publishes `old_head->next_`. + +During that window, `empty()` can be insufficient as a global quiescence test. +This is why the current `shrink_to_fit()` is intentionally limited to idle-time +usage after all producers have stopped. + +## Target Modes + +### stable + +The stable mode keeps the current memory model: + +- Freely mixed chunks. +- Minimal hot-path metadata. +- No online page reclamation. +- Lowest jitter and strongest throughput target. + +### idle_reclaim + +The idle reclaim mode is the current conservative `shrink_to_fit()` path: + +- Requires a drained queue. +- Requires no active producers. +- Requires a single same-template queue instance. +- Rebuilds the minimum global pool after releasing all pages. + +This mode is already benchmarked by `mpsc_bench_memory_cycle`. + +### elastic + +The future elastic mode must preserve page ownership well enough to reclaim pages +online. The important design point is that this cannot be a small modification of +the current global chunk stack. It needs a different ownership layout. + +The likely direction: + +- Allocate fixed-size slabs/pages. +- Keep chunks page-local or attach page ownership metadata to chunk groups. +- Track per-page free node counts. +- Reclaim only pages with all nodes returned and no active references. +- Avoid adding atomics to the single-element enqueue fast path unless the benchmark + proves the trade-off is acceptable. + +## Proposed Architecture + +Use a policy split rather than a runtime branch: + +```cpp +namespace daking::memory_policy { + struct stable; + struct idle_reclaim; + struct elastic; +} +``` + +The queue can then grow toward: + +```cpp +template < + typename Ty, + std::size_t ThreadLocalCapacity = 256, + std::size_t Align = 64, + typename Alloc = std::allocator, + typename MemoryPolicy = memory_policy::idle_reclaim +> +class MPSC_queue; +``` + +The default should keep today's behavior. Experimental policies should be opt-in. + +Current branch status: + +- `memory_policy::stable` exists and disables `shrink_to_fit()`. +- `memory_policy::idle_reclaim` is the default and keeps today's behavior. +- `memory_policy::elastic` exists only as an experimental placeholder. It does + not implement online reclaim yet. + +## Benchmark Gates + +An elastic implementation should not be considered successful unless it passes: + +- Existing functional tests. +- Linearizability benchmarks before and after reclaim. +- `mpsc_bench_memory_cycle`. +- A long-running grow/drain/reclaim/regrow soak. +- A throughput comparison against stable mode. +- Tail latency checks for enqueue and dequeue. + +Acceptance target: + +- No correctness regression. +- Online reclaim must reduce retained global nodes under bursty workloads. +- Stable mode throughput must not regress. +- Elastic mode overhead must be explicit and benchmarked, not assumed. + +## Implementation Plan + +1. Introduce policy type names without changing hot-path behavior. Done. +2. Add benchmarks that can compare policy types side by side. +3. Prototype page-local chunks behind an opt-in `elastic` policy. +4. Add reclaim accounting and tests. +5. Run soak and latency benchmarks before considering a PR upstream. diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index 4213164..75bb84e 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -102,6 +102,20 @@ SOFTWARE. namespace daking { + namespace memory_policy { + struct stable { + static constexpr bool can_shrink_to_fit = false; + }; + + struct idle_reclaim { + static constexpr bool can_shrink_to_fit = true; + }; + + struct elastic { + static constexpr bool can_shrink_to_fit = false; + }; + } + /* SC MP [tail]->[]->[]->[]->[]->[head] @@ -382,7 +396,8 @@ namespace daking { typename Ty, std::size_t ThreadLocalCapacity = 256, std::size_t Align = 64, /* std::hardware_destructive_interference_size */ - typename Alloc = std::allocator + typename Alloc = std::allocator, + typename MemoryPolicy = memory_policy::idle_reclaim > class MPSC_queue { public: @@ -395,6 +410,7 @@ namespace daking { using pointer = typename std::allocator_traits::pointer; using reference = Ty&; using const_reference = const Ty&; + using memory_policy_type = MemoryPolicy; static constexpr std::size_t thread_local_capacity = ThreadLocalCapacity; static constexpr std::size_t align = Align; @@ -420,6 +436,13 @@ namespace daking { "Alloc should have a template constructor like 'Alloc(const Alloc& alloc)' to meet internal conversion." ); + static_assert( + std::is_same_v || + std::is_same_v || + std::is_same_v, + "MemoryPolicy must be one of daking::memory_policy::{stable, idle_reclaim, elastic}." + ); + friend thread_hook_t; friend manager_t; friend altraits_node_t; @@ -645,6 +668,10 @@ namespace daking { } DAKING_ALWAYS_INLINE bool shrink_to_fit() { + if constexpr (!memory_policy_type::can_shrink_to_fit) { + return false; + } + if (!empty()) { return false; } diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index 54a1f16..39bab84 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -147,6 +147,18 @@ TEST(MPSCQueueMemoryTest, ShrinkToFit_FailsWhileAnotherInstanceLives) { EXPECT_EQ(Q::global_node_size_apprx(), (size_t)64); } +TEST(MPSCQueueMemoryTest, ShrinkToFit_DisabledForStablePolicy) { + using StableQ = MPSC_queue, daking::memory_policy::stable>; + StableQ q; + + q.enqueue(9); + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_TRUE(q.empty()); + + EXPECT_FALSE(q.shrink_to_fit()); +} + // ------------------------------------------------------------------------- // III. Bulk Operation Tests // ------------------------------------------------------------------------- From e3b7be6be1d73e73889df9178de616def59cb8bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 19:47:21 +0800 Subject: [PATCH 05/10] Prototype elastic page reclaim --- benchmarks/bench_memory_cycle.cpp | 88 ++++++++++---- docs/elastic-reclaim-design.md | 37 +++++- include/daking/MPSC_queue.hpp | 186 ++++++++++++++++++++++++++++-- tests/test_MPSC.cpp | 41 +++++++ 4 files changed, 316 insertions(+), 36 deletions(-) diff --git a/benchmarks/bench_memory_cycle.cpp b/benchmarks/bench_memory_cycle.cpp index 3960944..6813dd8 100644 --- a/benchmarks/bench_memory_cycle.cpp +++ b/benchmarks/bench_memory_cycle.cpp @@ -11,11 +11,20 @@ namespace { -using TestQueue = daking::MPSC_queue; +using StableQueue = daking::MPSC_queue, daking::memory_policy::stable>; +using IdleReclaimQueue = daking::MPSC_queue; +using ElasticQueue = daking::MPSC_queue, daking::memory_policy::elastic>; constexpr std::size_t kItemsPerProducer = 200000; -void producer_thread(TestQueue* q, std::size_t items_to_push, std::atomic_bool* start) { +enum class ReclaimMode { + none, + shrink_to_fit, + free_pages +}; + +template +void producer_thread(Queue* q, std::size_t items_to_push, std::atomic_bool* start) { while (!start->load(std::memory_order_acquire)) { std::this_thread::yield(); } @@ -24,7 +33,8 @@ void producer_thread(TestQueue* q, std::size_t items_to_push, std::atomic_bool* } } -void consumer_thread(TestQueue* q, std::size_t total_items_to_pop, std::atomic_bool* start) { +template +void consumer_thread(Queue* q, std::size_t total_items_to_pop, std::atomic_bool* start) { while (!start->load(std::memory_order_acquire)) { std::this_thread::yield(); } @@ -41,16 +51,17 @@ void consumer_thread(TestQueue* q, std::size_t total_items_to_pop, std::atomic_b } } -void run_burst(TestQueue& q, int producers, std::size_t items_per_producer) { +template +void run_burst(Queue& q, int producers, std::size_t items_per_producer) { const std::size_t total_items = items_per_producer * static_cast(producers); std::atomic_bool start{false}; - std::thread consumer(consumer_thread, &q, total_items, &start); + std::thread consumer(consumer_thread, &q, total_items, &start); std::vector producer_threads; producer_threads.reserve(static_cast(producers)); for (int i = 0; i < producers; ++i) { - producer_threads.emplace_back(producer_thread, &q, items_per_producer, &start); + producer_threads.emplace_back(producer_thread, &q, items_per_producer, &start); } start.store(true, std::memory_order_release); @@ -60,15 +71,15 @@ void run_burst(TestQueue& q, int producers, std::size_t items_per_producer) { consumer.join(); } -template +template static void BM_MPSC_MemoryCycle(benchmark::State& state) { const int producers = static_cast(state.range(0)); const std::size_t items_per_producer = kItemsPerProducer; const std::size_t total_items_per_burst = items_per_producer * static_cast(producers); for (auto _ : state) { - TestQueue q; - TestQueue::reserve_global_chunk(64); + Queue q; + Queue::reserve_global_chunk(64); run_burst(q, producers, items_per_producer); if (!q.empty()) { @@ -76,18 +87,27 @@ static void BM_MPSC_MemoryCycle(benchmark::State& state) { break; } - const auto nodes_before = TestQueue::global_node_size_apprx(); - double shrink_us = 0.0; - if constexpr (ReclaimBetweenBursts) { - const auto shrink_start = std::chrono::steady_clock::now(); + const auto nodes_before = Queue::global_node_size_apprx(); + double reclaim_us = 0.0; + std::size_t reclaimed_nodes = 0; + if constexpr (Mode == ReclaimMode::shrink_to_fit) { + const auto reclaim_start = std::chrono::steady_clock::now(); if (!q.shrink_to_fit()) { state.SkipWithError("shrink_to_fit failed unexpectedly"); break; } - const auto shrink_end = std::chrono::steady_clock::now(); - shrink_us = std::chrono::duration(shrink_end - shrink_start).count(); + const auto reclaim_end = std::chrono::steady_clock::now(); + reclaim_us = std::chrono::duration(reclaim_end - reclaim_start).count(); + reclaimed_nodes = nodes_before - Queue::global_node_size_apprx(); + } + else if constexpr (Mode == ReclaimMode::free_pages) { + const auto reclaim_start = std::chrono::steady_clock::now(); + reclaimed_nodes = Queue::reclaim_free_pages(); + const auto reclaim_end = std::chrono::steady_clock::now(); + reclaim_us = std::chrono::duration(reclaim_end - reclaim_start).count(); + reclaimed_nodes = nodes_before - Queue::global_node_size_apprx(); } - const auto nodes_after = TestQueue::global_node_size_apprx(); + const auto nodes_after = Queue::global_node_size_apprx(); run_burst(q, producers, items_per_producer); if (!q.empty()) { @@ -97,18 +117,44 @@ static void BM_MPSC_MemoryCycle(benchmark::State& state) { state.counters["global_nodes_before"] = static_cast(nodes_before); state.counters["global_nodes_after"] = static_cast(nodes_after); - if constexpr (ReclaimBetweenBursts) { - state.counters["shrink_us"] = shrink_us; + if constexpr (Mode != ReclaimMode::none) { + state.counters["reclaim_us"] = reclaim_us; + state.counters["reclaimed_nodes"] = static_cast(reclaimed_nodes); } } state.SetItemsProcessed(static_cast(total_items_per_burst * 2 * state.iterations())); - state.SetLabel(std::string(ReclaimBetweenBursts ? "idle_reclaim" : "stable") + ", P=" + std::to_string(producers)); + const char* policy = "unknown"; + const char* mode = "none"; + if constexpr (std::is_same_v) { + policy = "stable"; + } + else if constexpr (std::is_same_v) { + policy = "idle_reclaim"; + } + else if constexpr (std::is_same_v) { + policy = "elastic"; + } + + if constexpr (Mode == ReclaimMode::shrink_to_fit) { + mode = "shrink_to_fit"; + } + else if constexpr (Mode == ReclaimMode::free_pages) { + mode = "reclaim_free_pages"; + } + + state.SetLabel(std::string(policy) + ", " + mode + ", P=" + std::to_string(producers)); } } // namespace -BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, false)->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); -BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, true)->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, StableQueue, ReclaimMode::none) + ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, IdleReclaimQueue, ReclaimMode::shrink_to_fit) + ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, ElasticQueue, ReclaimMode::none) + ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, ElasticQueue, ReclaimMode::free_pages) + ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); BENCHMARK_MAIN(); diff --git a/docs/elastic-reclaim-design.md b/docs/elastic-reclaim-design.md index 3cbd4aa..efd389a 100644 --- a/docs/elastic-reclaim-design.md +++ b/docs/elastic-reclaim-design.md @@ -104,8 +104,35 @@ Current branch status: - `memory_policy::stable` exists and disables `shrink_to_fit()`. - `memory_policy::idle_reclaim` is the default and keeps today's behavior. -- `memory_policy::elastic` exists only as an experimental placeholder. It does - not implement online reclaim yet. +- `memory_policy::elastic` is an opt-in prototype. It uses per-node page metadata, + per-page free counts, and `reclaim_free_pages()` to release pages whose nodes + have all returned to the global free stack. + +## Current Elastic Prototype + +The current prototype intentionally favors correctness and observability over +final throughput: + +- It is selected only by `memory_policy::elastic`. +- Default `idle_reclaim` and `stable` allocation paths keep their thread-local + chunk caches. +- Elastic allocation and deallocation still use thread-local chunk caches. The + global mutex is only used when a thread refills or flushes a chunk and when + reclamation runs. +- Each elastic node records its owning page. Each page tracks an approximate + free-node count. +- Reclamation drains the global free-list under the mutex, optionally sweeps + recycled thread-local caches when no live thread hooks remain, marks fully + free pages, returns nodes from live pages to the free-list, and deallocates + reclaimable pages. +- A page containing the queue dummy node or any queued item is not reclaimed. +- The current public `reclaim_free_pages()` path is a quiescent reclaim hook. It + is not yet a fully concurrent online shrinker for active producers. + +This is not the final low-jitter elastic architecture. It is a first correctness +prototype for validating the page ownership model, the public API shape, and +benchmark coverage before investing in page-local chunk caches or lower-contention +reclaim bookkeeping. ## Benchmark Gates @@ -128,7 +155,7 @@ Acceptance target: ## Implementation Plan 1. Introduce policy type names without changing hot-path behavior. Done. -2. Add benchmarks that can compare policy types side by side. -3. Prototype page-local chunks behind an opt-in `elastic` policy. -4. Add reclaim accounting and tests. +2. Add benchmarks that can compare policy types side by side. Done. +3. Prototype page-aware reclaim behind an opt-in `elastic` policy. In progress. +4. Add reclaim accounting and tests. In progress. 5. Run soak and latency benchmarks before considering a PR upstream. diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index 75bb84e..96a9ae8 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -163,7 +163,18 @@ namespace daking { namespace detail { template - struct MPSC_node { + struct MPSC_page; + + template + struct MPSC_node_page_link {}; + + template + struct MPSC_node_page_link { + MPSC_page* page_ = nullptr; + }; + + template + struct MPSC_node : MPSC_node_page_link { using value_type = typename Queue::value_type; using node_t = MPSC_node; @@ -188,12 +199,14 @@ namespace daking { using page_t = MPSC_page; MPSC_page(node_t* node, size_type count, page_t* next) - : node_(node), count_(count), next_(next) {} + : node_(node), count_(count), next_(next), free_count_(count) {} ~MPSC_page() = default; size_type count_; node_t* node_; page_t* next_; + std::atomic free_count_; + bool reclaiming_ = false; }; template @@ -344,20 +357,124 @@ namespace daking { altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); global_page_list_ = new_page; - for (size_type i = 0; i < count; i++) { - new_nodes[i].next_ = new_nodes + i + 1; // seq_cst - if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { - // chunk_count = count / ThreadLocalCapacity - new_nodes[i].next_ = nullptr; - std::atomic_thread_fence(std::memory_order_acq_rel); - // mutex don't protect global_chunk_stack_ - Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + if constexpr (Queue::uses_elastic_reclaim) { + for (size_type i = 0; i < count; i++) { + new_nodes[i].page_ = new_page; + new_nodes[i].next_ = new_nodes + i + 1; // seq_cst + if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { + new_nodes[i].next_ = nullptr; + std::atomic_thread_fence(std::memory_order_acq_rel); + Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + } + } + } + else { + for (size_type i = 0; i < count; i++) { + new_nodes[i].next_ = new_nodes + i + 1; // seq_cst + if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { + // chunk_count = count / ThreadLocalCapacity + new_nodes[i].next_ = nullptr; + std::atomic_thread_fence(std::memory_order_acq_rel); + // mutex don't protect global_chunk_stack_ + Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + } } } global_node_count_.store(global_node_count_ + count, std::memory_order_release); } + size_type reclaim_free_pages() { + static_assert(Queue::uses_elastic_reclaim); + + std::vector free_nodes; + std::unordered_map global_free_by_page; + node_t* node = nullptr; + while (Queue::global_chunk_stack_.try_pop(node)) { + for (size_type i = 0; i < Queue::thread_local_capacity && node; ++i) { + free_nodes.push_back(node); + ++global_free_by_page[node->page_]; + node = node->next_.load(std::memory_order_relaxed); + } + } + + if (global_thread_local_manager_.empty()) { + auto collect_thread_local = [&](thread_local_t* pair_ptr) { + node_t* local_node = pair_ptr->first; + pair_ptr->first = nullptr; + pair_ptr->second = 0; + + while (local_node) { + free_nodes.push_back(local_node); + ++global_free_by_page[local_node->page_]; + node_t* next = local_node->next_.load(std::memory_order_relaxed); + local_node->next_.store(nullptr, std::memory_order_relaxed); + local_node = next; + } + }; + + for (auto& pair_ptr : global_thread_local_recycler_) { + collect_thread_local(pair_ptr.get()); + } + } + + size_type reclaimed_nodes = 0; + for (page_t* page = global_page_list_; page; page = page->next_) { + page->reclaiming_ = false; + if (page->free_count_.load(std::memory_order_acquire) == page->count_ && + global_free_by_page[page] == page->count_) { + page->reclaiming_ = true; + reclaimed_nodes += page->count_; + } + } + + node_t* chunk_head = nullptr; + node_t* chunk_tail = nullptr; + size_type chunk_size = 0; + for (node_t* free_node : free_nodes) { + if (!free_node->page_->reclaiming_) { + free_node->next_.store(nullptr, std::memory_order_relaxed); + if (!chunk_head) { + chunk_head = free_node; + } + else { + chunk_tail->next_.store(free_node, std::memory_order_relaxed); + } + chunk_tail = free_node; + + if (++chunk_size == Queue::thread_local_capacity) { + Queue::global_chunk_stack_.push(chunk_head); + chunk_head = nullptr; + chunk_tail = nullptr; + chunk_size = 0; + } + } + } + + if (chunk_head) DAKING_UNLIKELY { + Queue::global_chunk_stack_.push(chunk_head); + } + + page_t** current = &global_page_list_; + while (*current) { + page_t* page = *current; + if (!page->reclaiming_) { + current = &page->next_; + continue; + } + + *current = page->next_; + altraits_node_t::deallocate(*this, page->node_, page->count_); + altraits_page_t::deallocate(*this, page, 1); + } + + if (reclaimed_nodes) { + global_node_count_.fetch_sub(reclaimed_nodes, std::memory_order_acq_rel); + } + + return reclaimed_nodes; + } + DAKING_ALWAYS_INLINE thread_local_t* register_for(std::thread::id tid) { /* Already locked */ if (!global_thread_local_recycler_.empty()) { @@ -414,6 +531,8 @@ namespace daking { static constexpr std::size_t thread_local_capacity = ThreadLocalCapacity; static constexpr std::size_t align = Align; + static constexpr bool uses_elastic_reclaim = + std::is_same_v; private: using node_t = detail::MPSC_node; @@ -667,6 +786,19 @@ namespace daking { return global_manager_instance_ ? _reserve_global_external(chunk_count) : false; } + DAKING_ALWAYS_INLINE static size_type reclaim_free_pages() { + if constexpr (!uses_elastic_reclaim) { + return 0; + } + else { + if (!global_manager_instance_) { + return 0; + } + std::lock_guard lock(global_mutex_); + return _get_global_manager().reclaim_free_pages(); + } + } + DAKING_ALWAYS_INLINE bool shrink_to_fit() { if constexpr (!memory_policy_type::can_shrink_to_fit) { return false; @@ -723,6 +855,25 @@ namespace daking { } DAKING_ALWAYS_INLINE node_t* _allocate() { + if constexpr (uses_elastic_reclaim) { + node_t*& thread_local_node_list = _get_thread_local_node_list(); + size_type& thread_local_node_size = _get_thread_local_node_size(); + if (thread_local_node_size == 0) DAKING_UNLIKELY { + std::lock_guard lock(global_mutex_); + while (!global_chunk_stack_.try_pop(thread_local_node_list)) { + _get_global_manager().reserve(std::max(thread_local_capacity, _get_global_manager().node_count())); + } + thread_local_node_size = thread_local_capacity; + } + thread_local_node_size--; + DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list); + DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list->next_); + node_t* res = std::exchange(thread_local_node_list, thread_local_node_list->next_.load(std::memory_order_relaxed)); + res->page_->free_count_.fetch_sub(1, std::memory_order_acq_rel); + res->next_.store(nullptr, std::memory_order_relaxed); + return res; + } + node_t*& thread_local_node_list = _get_thread_local_node_list(); size_type& thread_local_node_size = _get_thread_local_node_size(); if (thread_local_node_size == 0) DAKING_UNLIKELY { @@ -740,6 +891,21 @@ namespace daking { } DAKING_ALWAYS_INLINE void _deallocate(node_t* node) noexcept { + if constexpr (uses_elastic_reclaim) { + node_t*& thread_local_node_list = _get_thread_local_node_list(); + node->next_.store(thread_local_node_list, std::memory_order_relaxed); + thread_local_node_list = node; + node->page_->free_count_.fetch_add(1, std::memory_order_acq_rel); + DAKING_TSAN_ANNOTATE_RELEASE(node); + if (++_get_thread_local_node_size() >= thread_local_capacity) DAKING_UNLIKELY { + std::lock_guard lock(global_mutex_); + global_chunk_stack_.push(thread_local_node_list); + thread_local_node_list = nullptr; + _get_thread_local_node_size() = 0; + } + return; + } + node_t*& thread_local_node_list = _get_thread_local_node_list(); node->next_.store(thread_local_node_list, std::memory_order_relaxed); thread_local_node_list = node; diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index 39bab84..8dceb5b 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -159,6 +159,47 @@ TEST(MPSCQueueMemoryTest, ShrinkToFit_DisabledForStablePolicy) { EXPECT_FALSE(q.shrink_to_fit()); } +TEST(MPSCQueueMemoryTest, ElasticPolicy_ReclaimsFullyFreePages) { + using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; + ElasticQ q; + + ElasticQ::reserve_global_chunk(5); + const size_t before = ElasticQ::global_node_size_apprx(); + EXPECT_GE(before, (size_t)5 * 8); + + const size_t reclaimed = ElasticQ::reclaim_free_pages(); + const size_t after = ElasticQ::global_node_size_apprx(); + + EXPECT_GT(reclaimed, (size_t)0); + EXPECT_EQ(after + reclaimed, before); + EXPECT_GE(after, (size_t)8); + + q.enqueue(17); + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 17); + EXPECT_TRUE(q.empty()); +} + +TEST(MPSCQueueMemoryTest, ElasticPolicy_DoesNotReclaimLivePage) { + using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; + ElasticQ q; + + const size_t initial = ElasticQ::global_node_size_apprx(); + q.enqueue(23); + + EXPECT_EQ(ElasticQ::reclaim_free_pages(), (size_t)0); + EXPECT_EQ(ElasticQ::global_node_size_apprx(), initial); + + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 23); + EXPECT_TRUE(q.empty()); + + EXPECT_EQ(ElasticQ::reclaim_free_pages(), (size_t)0); + EXPECT_EQ(ElasticQ::global_node_size_apprx(), initial); +} + // ------------------------------------------------------------------------- // III. Bulk Operation Tests // ------------------------------------------------------------------------- From ff42534afe271a2aee2ef3a91a3218cccdb0c577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 20:25:49 +0800 Subject: [PATCH 06/10] Refine elastic reclaim hot path --- docs/elastic-reclaim-design.md | 33 +++--- include/daking/MPSC_queue.hpp | 185 +++++++++++++++++---------------- tests/test_MPSC.cpp | 27 +++++ 3 files changed, 141 insertions(+), 104 deletions(-) diff --git a/docs/elastic-reclaim-design.md b/docs/elastic-reclaim-design.md index efd389a..8baaf97 100644 --- a/docs/elastic-reclaim-design.md +++ b/docs/elastic-reclaim-design.md @@ -68,10 +68,8 @@ The likely direction: - Allocate fixed-size slabs/pages. - Keep chunks page-local or attach page ownership metadata to chunk groups. -- Track per-page free node counts. -- Reclaim only pages with all nodes returned and no active references. -- Avoid adding atomics to the single-element enqueue fast path unless the benchmark - proves the trade-off is acceptable. +- Avoid per-node atomics on the single-element enqueue/dequeue fast path. +- Reclaim only pages whose nodes are all visible in free caches at reclaim time. ## Proposed Architecture @@ -104,9 +102,9 @@ Current branch status: - `memory_policy::stable` exists and disables `shrink_to_fit()`. - `memory_policy::idle_reclaim` is the default and keeps today's behavior. -- `memory_policy::elastic` is an opt-in prototype. It uses per-node page metadata, - per-page free counts, and `reclaim_free_pages()` to release pages whose nodes - have all returned to the global free stack. +- `memory_policy::elastic` is an opt-in prototype. It uses per-node page metadata + and fixed-size page slabs, then scans free caches in `reclaim_free_pages()` to + release pages whose nodes are all free. ## Current Elastic Prototype @@ -116,18 +114,19 @@ final throughput: - It is selected only by `memory_policy::elastic`. - Default `idle_reclaim` and `stable` allocation paths keep their thread-local chunk caches. -- Elastic allocation and deallocation still use thread-local chunk caches. The - global mutex is only used when a thread refills or flushes a chunk and when - reclamation runs. -- Each elastic node records its owning page. Each page tracks an approximate - free-node count. -- Reclamation drains the global free-list under the mutex, optionally sweeps - recycled thread-local caches when no live thread hooks remain, marks fully - free pages, returns nodes from live pages to the free-list, and deallocates - reclaimable pages. +- Elastic allocation reserves one page per thread-local chunk so page reclamation + has chunk-sized granularity instead of one large ever-growing page. +- Elastic allocation and deallocation use the same thread-local chunk cache path + as the default policy. There is no per-node page counter update in the enqueue + or dequeue hot path. +- Each elastic node records its owning page. Reclamation drains the global + free-list and the quiescent thread-local caches under the mutex, counts free + nodes per page, returns free nodes from live pages to the pool, and deallocates + fully free pages. - A page containing the queue dummy node or any queued item is not reclaimed. - The current public `reclaim_free_pages()` path is a quiescent reclaim hook. It - is not yet a fully concurrent online shrinker for active producers. + refuses to run when more than one thread-local hook is active, so it is not a + fully concurrent online shrinker for active producers. This is not the final low-jitter elastic architecture. It is a first correctness prototype for validating the page ownership model, the public API shape, and diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index 96a9ae8..c31898f 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -199,14 +199,12 @@ namespace daking { using page_t = MPSC_page; MPSC_page(node_t* node, size_type count, page_t* next) - : node_(node), count_(count), next_(next), free_count_(count) {} + : node_(node), count_(count), next_(next) {} ~MPSC_page() = default; size_type count_; node_t* node_; page_t* next_; - std::atomic free_count_; - bool reclaiming_ = false; }; template @@ -352,23 +350,31 @@ namespace daking { void reserve(size_type count) { /* Already locked */ - node_t* new_nodes = altraits_node_t::allocate(*this, count); - page_t* new_page = altraits_page_t::allocate(*this, 1); - altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); - global_page_list_ = new_page; - if constexpr (Queue::uses_elastic_reclaim) { - for (size_type i = 0; i < count; i++) { - new_nodes[i].page_ = new_page; - new_nodes[i].next_ = new_nodes + i + 1; // seq_cst - if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { - new_nodes[i].next_ = nullptr; - std::atomic_thread_fence(std::memory_order_acq_rel); - Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + size_type page_count = count / Queue::thread_local_capacity; + for (size_type p = 0; p < page_count; ++p) { + node_t* new_nodes = altraits_node_t::allocate(*this, Queue::thread_local_capacity); + page_t* new_page = altraits_page_t::allocate(*this, 1); + altraits_page_t::construct(*this, new_page, new_nodes, Queue::thread_local_capacity, global_page_list_); + global_page_list_ = new_page; + + for (size_type i = 0; i < Queue::thread_local_capacity; ++i) { + new_nodes[i].page_ = new_page; + new_nodes[i].next_ = new_nodes + i + 1; // seq_cst + if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { + new_nodes[i].next_ = nullptr; + std::atomic_thread_fence(std::memory_order_acq_rel); + Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + } } } } else { + node_t* new_nodes = altraits_node_t::allocate(*this, count); + page_t* new_page = altraits_page_t::allocate(*this, 1); + altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); + global_page_list_ = new_page; + for (size_type i = 0; i < count; i++) { new_nodes[i].next_ = new_nodes + i + 1; // seq_cst if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { @@ -387,52 +393,48 @@ namespace daking { size_type reclaim_free_pages() { static_assert(Queue::uses_elastic_reclaim); + if (global_thread_local_manager_.size() != 1) { + return 0; + } + std::vector free_nodes; - std::unordered_map global_free_by_page; + std::unordered_map free_by_page; node_t* node = nullptr; while (Queue::global_chunk_stack_.try_pop(node)) { for (size_type i = 0; i < Queue::thread_local_capacity && node; ++i) { free_nodes.push_back(node); - ++global_free_by_page[node->page_]; + ++free_by_page[node->page_]; node = node->next_.load(std::memory_order_relaxed); } } - if (global_thread_local_manager_.empty()) { - auto collect_thread_local = [&](thread_local_t* pair_ptr) { - node_t* local_node = pair_ptr->first; - pair_ptr->first = nullptr; - pair_ptr->second = 0; - - while (local_node) { - free_nodes.push_back(local_node); - ++global_free_by_page[local_node->page_]; - node_t* next = local_node->next_.load(std::memory_order_relaxed); - local_node->next_.store(nullptr, std::memory_order_relaxed); - local_node = next; - } - }; - - for (auto& pair_ptr : global_thread_local_recycler_) { - collect_thread_local(pair_ptr.get()); + auto collect_thread_local = [&](thread_local_t* pair_ptr) { + node_t* local_node = pair_ptr->first; + pair_ptr->first = nullptr; + pair_ptr->second = 0; + + while (local_node) { + free_nodes.push_back(local_node); + ++free_by_page[local_node->page_]; + node_t* next = local_node->next_.load(std::memory_order_relaxed); + local_node->next_.store(nullptr, std::memory_order_relaxed); + local_node = next; } - } + }; - size_type reclaimed_nodes = 0; - for (page_t* page = global_page_list_; page; page = page->next_) { - page->reclaiming_ = false; - if (page->free_count_.load(std::memory_order_acquire) == page->count_ && - global_free_by_page[page] == page->count_) { - page->reclaiming_ = true; - reclaimed_nodes += page->count_; - } + for (auto& [tid, pair_ptr] : global_thread_local_manager_) { + collect_thread_local(pair_ptr.get()); + } + for (auto& pair_ptr : global_thread_local_recycler_) { + collect_thread_local(pair_ptr.get()); } - node_t* chunk_head = nullptr; - node_t* chunk_tail = nullptr; - size_type chunk_size = 0; - for (node_t* free_node : free_nodes) { - if (!free_node->page_->reclaiming_) { + auto republish_free_nodes = [&] { + node_t* chunk_head = nullptr; + node_t* chunk_tail = nullptr; + size_type chunk_size = 0; + + for (node_t* free_node : free_nodes) { free_node->next_.store(nullptr, std::memory_order_relaxed); if (!chunk_head) { chunk_head = free_node; @@ -449,29 +451,72 @@ namespace daking { chunk_size = 0; } } + + if (chunk_head) DAKING_UNLIKELY { + auto& pair_ptr = global_thread_local_manager_.begin()->second; + pair_ptr->first = chunk_head; + pair_ptr->second = chunk_size; + } + }; + + if (!global_page_list_) { + republish_free_nodes(); + return 0; + } + + page_t* forced_keep_page = nullptr; + bool has_live_page = false; + for (page_t* page = global_page_list_; page; page = page->next_) { + auto it = free_by_page.find(page); + if (it == free_by_page.end() || it->second != page->count_) { + has_live_page = true; + break; + } } + if (!has_live_page) { + forced_keep_page = global_page_list_; + } + + auto should_reclaim = [&](page_t* page) { + if (page == forced_keep_page) { + return false; + } + auto it = free_by_page.find(page); + return it != free_by_page.end() && it->second == page->count_; + }; - if (chunk_head) DAKING_UNLIKELY { - Queue::global_chunk_stack_.push(chunk_head); + std::vector kept_free_nodes; + kept_free_nodes.reserve(free_nodes.size()); + for (node_t* free_node : free_nodes) { + if (!should_reclaim(free_node->page_)) { + kept_free_nodes.push_back(free_node); + } } + free_nodes.swap(kept_free_nodes); + size_type reclaimed_nodes = 0; page_t** current = &global_page_list_; while (*current) { page_t* page = *current; - if (!page->reclaiming_) { + if (!should_reclaim(page)) { current = &page->next_; continue; } *current = page->next_; + reclaimed_nodes += page->count_; altraits_node_t::deallocate(*this, page->node_, page->count_); altraits_page_t::deallocate(*this, page, 1); } - if (reclaimed_nodes) { - global_node_count_.fetch_sub(reclaimed_nodes, std::memory_order_acq_rel); + if (!reclaimed_nodes) { + republish_free_nodes(); + return 0; } + republish_free_nodes(); + global_node_count_.fetch_sub(reclaimed_nodes, std::memory_order_acq_rel); + return reclaimed_nodes; } @@ -855,25 +900,6 @@ namespace daking { } DAKING_ALWAYS_INLINE node_t* _allocate() { - if constexpr (uses_elastic_reclaim) { - node_t*& thread_local_node_list = _get_thread_local_node_list(); - size_type& thread_local_node_size = _get_thread_local_node_size(); - if (thread_local_node_size == 0) DAKING_UNLIKELY { - std::lock_guard lock(global_mutex_); - while (!global_chunk_stack_.try_pop(thread_local_node_list)) { - _get_global_manager().reserve(std::max(thread_local_capacity, _get_global_manager().node_count())); - } - thread_local_node_size = thread_local_capacity; - } - thread_local_node_size--; - DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list); - DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list->next_); - node_t* res = std::exchange(thread_local_node_list, thread_local_node_list->next_.load(std::memory_order_relaxed)); - res->page_->free_count_.fetch_sub(1, std::memory_order_acq_rel); - res->next_.store(nullptr, std::memory_order_relaxed); - return res; - } - node_t*& thread_local_node_list = _get_thread_local_node_list(); size_type& thread_local_node_size = _get_thread_local_node_size(); if (thread_local_node_size == 0) DAKING_UNLIKELY { @@ -891,21 +917,6 @@ namespace daking { } DAKING_ALWAYS_INLINE void _deallocate(node_t* node) noexcept { - if constexpr (uses_elastic_reclaim) { - node_t*& thread_local_node_list = _get_thread_local_node_list(); - node->next_.store(thread_local_node_list, std::memory_order_relaxed); - thread_local_node_list = node; - node->page_->free_count_.fetch_add(1, std::memory_order_acq_rel); - DAKING_TSAN_ANNOTATE_RELEASE(node); - if (++_get_thread_local_node_size() >= thread_local_capacity) DAKING_UNLIKELY { - std::lock_guard lock(global_mutex_); - global_chunk_stack_.push(thread_local_node_list); - thread_local_node_list = nullptr; - _get_thread_local_node_size() = 0; - } - return; - } - node_t*& thread_local_node_list = _get_thread_local_node_list(); node->next_.store(thread_local_node_list, std::memory_order_relaxed); thread_local_node_list = node; diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index 8dceb5b..e111e01 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -181,6 +181,33 @@ TEST(MPSCQueueMemoryTest, ElasticPolicy_ReclaimsFullyFreePages) { EXPECT_TRUE(q.empty()); } +TEST(MPSCQueueMemoryTest, ElasticPolicy_ReclaimsOnlyFullyFreePages) { + using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; + ElasticQ q; + + ElasticQ::reserve_global_chunk(6); + const size_t before = ElasticQ::global_node_size_apprx(); + EXPECT_GE(before, (size_t)6 * 8); + + q.enqueue(31); + const size_t reclaimed = ElasticQ::reclaim_free_pages(); + const size_t after = ElasticQ::global_node_size_apprx(); + + EXPECT_GT(reclaimed, (size_t)0); + EXPECT_LT(after, before); + EXPECT_GE(after, (size_t)8); + + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 31); + EXPECT_TRUE(q.empty()); + + q.enqueue(32); + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 32); + EXPECT_TRUE(q.empty()); +} + TEST(MPSCQueueMemoryTest, ElasticPolicy_DoesNotReclaimLivePage) { using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; ElasticQ q; From c1dc1af3a9dfb4e169c2a46ce8501fd26c05181a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 20:36:45 +0800 Subject: [PATCH 07/10] Reduce elastic reclaim hot-path overhead --- benchmarks/bench_memory_cycle.cpp | 8 +++---- docs/elastic-reclaim-design.md | 13 ++++++++++++ include/daking/MPSC_queue.hpp | 35 +++++++++++++++++++------------ 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/benchmarks/bench_memory_cycle.cpp b/benchmarks/bench_memory_cycle.cpp index 6813dd8..784bb33 100644 --- a/benchmarks/bench_memory_cycle.cpp +++ b/benchmarks/bench_memory_cycle.cpp @@ -149,12 +149,12 @@ static void BM_MPSC_MemoryCycle(benchmark::State& state) { } // namespace BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, StableQueue, ReclaimMode::none) - ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, IdleReclaimQueue, ReclaimMode::shrink_to_fit) - ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, ElasticQueue, ReclaimMode::none) - ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, ElasticQueue, ReclaimMode::free_pages) - ->Arg(1)->Arg(4)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); BENCHMARK_MAIN(); diff --git a/docs/elastic-reclaim-design.md b/docs/elastic-reclaim-design.md index 8baaf97..00909c4 100644 --- a/docs/elastic-reclaim-design.md +++ b/docs/elastic-reclaim-design.md @@ -133,6 +133,19 @@ prototype for validating the page ownership model, the public API shape, and benchmark coverage before investing in page-local chunk caches or lower-contention reclaim bookkeeping. +## Current Benchmark Read + +The latest short-run matrix shows: + +- `elastic` without reclaim is now essentially on the same band as `stable`. +- The reclaim path still costs real time, which is expected because it is a + quiescent scan, not a hot-path feature. +- The remaining performance risk is reclaim-time scanning, not steady-state + enqueue/dequeue. + +That means the next optimization target is the reclaim path itself, not the +default queue fast path. + ## Benchmark Gates An elastic implementation should not be considered successful unless it passes: diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index c31898f..107ad45 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -165,16 +165,8 @@ namespace daking { template struct MPSC_page; - template - struct MPSC_node_page_link {}; - - template - struct MPSC_node_page_link { - MPSC_page* page_ = nullptr; - }; - template - struct MPSC_node : MPSC_node_page_link { + struct MPSC_node { using value_type = typename Queue::value_type; using node_t = MPSC_node; @@ -359,7 +351,6 @@ namespace daking { global_page_list_ = new_page; for (size_type i = 0; i < Queue::thread_local_capacity; ++i) { - new_nodes[i].page_ = new_page; new_nodes[i].next_ = new_nodes + i + 1; // seq_cst if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { new_nodes[i].next_ = nullptr; @@ -399,11 +390,25 @@ namespace daking { std::vector free_nodes; std::unordered_map free_by_page; + + auto find_page_for = [&](node_t* free_node) { + for (page_t* page = global_page_list_; page; page = page->next_) { + if (free_node >= page->node_ && free_node < page->node_ + page->count_) { + return page; + } + } + return static_cast(nullptr); + }; + node_t* node = nullptr; while (Queue::global_chunk_stack_.try_pop(node)) { for (size_type i = 0; i < Queue::thread_local_capacity && node; ++i) { free_nodes.push_back(node); - ++free_by_page[node->page_]; + page_t* page = find_page_for(node); + if (!page) DAKING_UNLIKELY { + return 0; + } + ++free_by_page[page]; node = node->next_.load(std::memory_order_relaxed); } } @@ -415,7 +420,10 @@ namespace daking { while (local_node) { free_nodes.push_back(local_node); - ++free_by_page[local_node->page_]; + page_t* page = find_page_for(local_node); + if (page) DAKING_LIKELY { + ++free_by_page[page]; + } node_t* next = local_node->next_.load(std::memory_order_relaxed); local_node->next_.store(nullptr, std::memory_order_relaxed); local_node = next; @@ -488,7 +496,8 @@ namespace daking { std::vector kept_free_nodes; kept_free_nodes.reserve(free_nodes.size()); for (node_t* free_node : free_nodes) { - if (!should_reclaim(free_node->page_)) { + page_t* page = find_page_for(free_node); + if (!page || !should_reclaim(page)) { kept_free_nodes.push_back(free_node); } } From f0c3729873ec93a7646833d1f620b385d0d8ff09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 20:50:08 +0800 Subject: [PATCH 08/10] Speed up elastic reclaim lookup --- include/daking/MPSC_queue.hpp | 91 ++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 27 deletions(-) diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index 107ad45..489059d 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -94,6 +94,8 @@ SOFTWARE. #include #include #include +#include +#include #include #include #include @@ -197,6 +199,7 @@ namespace daking { size_type count_; node_t* node_; page_t* next_; + size_type reclaim_free_count_ = 0; }; template @@ -388,27 +391,57 @@ namespace daking { return 0; } - std::vector free_nodes; - std::unordered_map free_by_page; + struct page_range { + std::uintptr_t begin_; + std::uintptr_t end_; + page_t* page_; + }; - auto find_page_for = [&](node_t* free_node) { - for (page_t* page = global_page_list_; page; page = page->next_) { - if (free_node >= page->node_ && free_node < page->node_ + page->count_) { - return page; - } + std::vector page_ranges; + for (page_t* page = global_page_list_; page; page = page->next_) { + page->reclaim_free_count_ = 0; + const auto begin = reinterpret_cast(page->node_); + page_ranges.push_back(page_range{ + begin, + begin + sizeof(node_t) * page->count_, + page + }); + } + std::sort(page_ranges.begin(), page_ranges.end(), [](const page_range& lhs, const page_range& rhs) { + return lhs.begin_ < rhs.begin_; + }); + + auto find_page_for = [&](node_t* free_node) -> page_t* { + const auto addr = reinterpret_cast(free_node); + auto it = std::upper_bound(page_ranges.begin(), page_ranges.end(), addr, [](std::uintptr_t value, const page_range& range) { + return value < range.begin_; + }); + if (it == page_ranges.begin()) { + return nullptr; } - return static_cast(nullptr); + --it; + return addr < it->end_ ? it->page_ : nullptr; + }; + + struct free_node_record { + node_t* node_; + page_t* page_; }; + std::vector free_nodes; + bool lookup_failed = false; + node_t* node = nullptr; while (Queue::global_chunk_stack_.try_pop(node)) { for (size_type i = 0; i < Queue::thread_local_capacity && node; ++i) { - free_nodes.push_back(node); page_t* page = find_page_for(node); if (!page) DAKING_UNLIKELY { - return 0; + lookup_failed = true; + } + else { + ++page->reclaim_free_count_; } - ++free_by_page[page]; + free_nodes.push_back(free_node_record{node, page}); node = node->next_.load(std::memory_order_relaxed); } } @@ -419,11 +452,14 @@ namespace daking { pair_ptr->second = 0; while (local_node) { - free_nodes.push_back(local_node); page_t* page = find_page_for(local_node); if (page) DAKING_LIKELY { - ++free_by_page[page]; + ++page->reclaim_free_count_; } + else { + lookup_failed = true; + } + free_nodes.push_back(free_node_record{local_node, page}); node_t* next = local_node->next_.load(std::memory_order_relaxed); local_node->next_.store(nullptr, std::memory_order_relaxed); local_node = next; @@ -442,15 +478,19 @@ namespace daking { node_t* chunk_tail = nullptr; size_type chunk_size = 0; - for (node_t* free_node : free_nodes) { - free_node->next_.store(nullptr, std::memory_order_relaxed); + for (free_node_record free_node : free_nodes) { + if (!free_node.node_) DAKING_UNLIKELY { + continue; + } + node_t* node = free_node.node_; + node->next_.store(nullptr, std::memory_order_relaxed); if (!chunk_head) { - chunk_head = free_node; + chunk_head = node; } else { - chunk_tail->next_.store(free_node, std::memory_order_relaxed); + chunk_tail->next_.store(node, std::memory_order_relaxed); } - chunk_tail = free_node; + chunk_tail = node; if (++chunk_size == Queue::thread_local_capacity) { Queue::global_chunk_stack_.push(chunk_head); @@ -467,7 +507,7 @@ namespace daking { } }; - if (!global_page_list_) { + if (lookup_failed || !global_page_list_) { republish_free_nodes(); return 0; } @@ -475,8 +515,7 @@ namespace daking { page_t* forced_keep_page = nullptr; bool has_live_page = false; for (page_t* page = global_page_list_; page; page = page->next_) { - auto it = free_by_page.find(page); - if (it == free_by_page.end() || it->second != page->count_) { + if (page->reclaim_free_count_ != page->count_) { has_live_page = true; break; } @@ -489,15 +528,13 @@ namespace daking { if (page == forced_keep_page) { return false; } - auto it = free_by_page.find(page); - return it != free_by_page.end() && it->second == page->count_; + return page->reclaim_free_count_ == page->count_; }; - std::vector kept_free_nodes; + std::vector kept_free_nodes; kept_free_nodes.reserve(free_nodes.size()); - for (node_t* free_node : free_nodes) { - page_t* page = find_page_for(free_node); - if (!page || !should_reclaim(page)) { + for (free_node_record free_node : free_nodes) { + if (!free_node.page_ || !should_reclaim(free_node.page_)) { kept_free_nodes.push_back(free_node); } } From 208041dccccdd5989211885843d75a8f9b87bcc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 21:01:21 +0800 Subject: [PATCH 09/10] Clarify memory cycle benchmark timing --- benchmarks/bench_memory_cycle.cpp | 2 ++ docs/elastic-reclaim-design.md | 3 +++ 2 files changed, 5 insertions(+) diff --git a/benchmarks/bench_memory_cycle.cpp b/benchmarks/bench_memory_cycle.cpp index 784bb33..b487336 100644 --- a/benchmarks/bench_memory_cycle.cpp +++ b/benchmarks/bench_memory_cycle.cpp @@ -78,8 +78,10 @@ static void BM_MPSC_MemoryCycle(benchmark::State& state) { const std::size_t total_items_per_burst = items_per_producer * static_cast(producers); for (auto _ : state) { + state.PauseTiming(); Queue q; Queue::reserve_global_chunk(64); + state.ResumeTiming(); run_burst(q, producers, items_per_producer); if (!q.empty()) { diff --git a/docs/elastic-reclaim-design.md b/docs/elastic-reclaim-design.md index 00909c4..fc5892f 100644 --- a/docs/elastic-reclaim-design.md +++ b/docs/elastic-reclaim-design.md @@ -142,6 +142,9 @@ The latest short-run matrix shows: quiescent scan, not a hot-path feature. - The remaining performance risk is reclaim-time scanning, not steady-state enqueue/dequeue. +- `mpsc_bench_memory_cycle` excludes queue construction and explicit pre-reserve + setup from timing, so the reported throughput focuses on the two burst cycles + and the optional reclaim step. That means the next optimization target is the reclaim path itself, not the default queue fast path. From cafb554f7fe14089f73e81a49d0929c16204ae17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E6=A2=A6=E8=BD=A9?= <114896873+caomengxuan666@users.noreply.github.com> Date: Wed, 6 May 2026 21:27:58 +0800 Subject: [PATCH 10/10] Add reclaim benchmark --- CMakeLists.txt | 14 +++++++++++ benchmarks/bench_reclaim.cpp | 49 ++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 benchmarks/bench_reclaim.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 39604d4..fe5c95d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -105,6 +105,20 @@ target_link_libraries(mpsc_bench_memory_cycle ) target_compile_options(mpsc_bench_memory_cycle ${COMMON_TARGET_PROPERTIES}) +add_executable(mpsc_bench_reclaim benchmarks/bench_reclaim.cpp) +target_include_directories(mpsc_bench_reclaim + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR} +) +target_link_libraries(mpsc_bench_reclaim + PRIVATE + benchmark::benchmark_main + Threads::Threads + ${ATOMIC_LIBRARY} +) +target_compile_options(mpsc_bench_reclaim ${COMMON_TARGET_PROPERTIES}) + add_executable(mpsc_bench_latency benchmarks/bench_latency.cpp) target_include_directories(mpsc_bench_latency PRIVATE diff --git a/benchmarks/bench_reclaim.cpp b/benchmarks/bench_reclaim.cpp new file mode 100644 index 0000000..473d82b --- /dev/null +++ b/benchmarks/bench_reclaim.cpp @@ -0,0 +1,49 @@ +#include + +#include +#include +#include + +#include "daking/MPSC_queue.hpp" + +namespace { + +using ElasticQueue = daking::MPSC_queue, daking::memory_policy::elastic>; + +static void BM_MPSC_ReclaimFreePages(benchmark::State& state) { + const std::size_t reserve_chunks = static_cast(state.range(0)); + + for (auto _ : state) { + state.PauseTiming(); + ElasticQueue q; + ElasticQueue::reserve_global_chunk(reserve_chunks); + const auto nodes_before = ElasticQueue::global_node_size_apprx(); + state.ResumeTiming(); + + const auto reclaim_start = std::chrono::steady_clock::now(); + const auto reclaimed_nodes = ElasticQueue::reclaim_free_pages(); + const auto reclaim_end = std::chrono::steady_clock::now(); + const auto nodes_after = ElasticQueue::global_node_size_apprx(); + + state.counters["global_nodes_before"] = static_cast(nodes_before); + state.counters["global_nodes_after"] = static_cast(nodes_after); + state.counters["reclaimed_nodes"] = static_cast(reclaimed_nodes); + state.counters["reclaim_us"] = std::chrono::duration(reclaim_end - reclaim_start).count(); + } + + state.SetItemsProcessed(static_cast(state.iterations())); + state.SetLabel("elastic, reclaim-only, chunks=" + std::to_string(static_cast(state.range(0)))); +} + +} // namespace + +BENCHMARK(BM_MPSC_ReclaimFreePages) + ->Arg(1) + ->Arg(4) + ->Arg(16) + ->Arg(64) + ->Arg(256) + ->UseRealTime() + ->MinWarmUpTime(1.0); + +BENCHMARK_MAIN();