diff --git a/CMakeLists.txt b/CMakeLists.txt index 1b28383..fe5c95d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,6 +51,12 @@ set(HDR_HISTOGRAM_BUILD_SHARED OFF CACHE BOOL "" FORCE) set(HDR_LOG_REQUIRED "DISABLED" CACHE STRING "" FORCE) FetchContent_MakeAvailable(hdrhistogram) +if(TARGET hdr_histogram_static) + target_compile_options(hdr_histogram_static PRIVATE + $<$:-Wno-incompatible-pointer-types -Wno-deprecated-declarations> + ) +endif() + # moodycamel ConcurrentQueue FetchContent_Declare( concurrentqueue @@ -85,6 +91,34 @@ target_link_libraries(mpsc_bench_throughput ) target_compile_options(mpsc_bench_throughput ${COMMON_TARGET_PROPERTIES}) +add_executable(mpsc_bench_memory_cycle benchmarks/bench_memory_cycle.cpp) +target_include_directories(mpsc_bench_memory_cycle + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR} +) +target_link_libraries(mpsc_bench_memory_cycle + PRIVATE + benchmark::benchmark_main + Threads::Threads + ${ATOMIC_LIBRARY} +) +target_compile_options(mpsc_bench_memory_cycle ${COMMON_TARGET_PROPERTIES}) + +add_executable(mpsc_bench_reclaim benchmarks/bench_reclaim.cpp) +target_include_directories(mpsc_bench_reclaim + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR} +) +target_link_libraries(mpsc_bench_reclaim + PRIVATE + benchmark::benchmark_main + Threads::Threads + ${ATOMIC_LIBRARY} +) +target_compile_options(mpsc_bench_reclaim ${COMMON_TARGET_PROPERTIES}) + add_executable(mpsc_bench_latency benchmarks/bench_latency.cpp) target_include_directories(mpsc_bench_latency PRIVATE diff --git a/README.md b/README.md index e4f1c74..5c361bd 100644 --- a/README.md +++ b/README.md @@ -203,47 +203,69 @@ Command: ```powershell cmake -S . -B out/build/clang-local -G Ninja -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Release cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark -.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.1s --benchmark_repetitions=1 --benchmark_counters_tabular=true +.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.5s --benchmark_repetitions=1 --benchmark_counters_tabular=true ``` Uniform single-element enqueue: | Queue | P (Producers) | C (Consumer) | **Throughput (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **86.78** | -| daking | 2 | 1 | **32.29** | -| daking | 4 | 1 | **32.52** | -| daking | 8 | 1 | 31.16 | -| moodycamel | 1 | 1 | 22.74 | -| moodycamel | 2 | 1 | 27.82 | -| moodycamel | 4 | 1 | 29.79 | -| **moodycamel** | 8 | 1 | **32.07** | +| **daking** | 1 | 1 | **82.21** | +| daking | 2 | 1 | **29.69** | +| daking | 4 | 1 | **32.22** | +| daking | 8 | 1 | 28.22 | +| moodycamel | 1 | 1 | 23.21 | +| moodycamel | 2 | 1 | 28.50 | +| moodycamel | 4 | 1 | 30.75 | +| **moodycamel** | 8 | 1 | **30.86** | Uneven sequential burst: | Queue | P (Producers) | C (Consumer) | Relay % | **Throughput (M items/s)** | | :--- | :--- | :--- | :--- | :--- | -| daking | 4 | 1 | $50.0\%$ | **33.89** | -| daking | 4 | 1 | $90.0\%$ | **61.72** | -| daking | 4 | 1 | $98.0\%$ | **73.72** | -| moodycamel | 4 | 1 | $50.0\%$ | 23.84 | -| moodycamel | 4 | 1 | $90.0\%$ | 22.25 | -| moodycamel | 4 | 1 | $98.0\%$ | 17.64 | +| daking | 4 | 1 | $50.0\%$ | **32.62** | +| daking | 4 | 1 | $90.0\%$ | **58.54** | +| daking | 4 | 1 | $98.0\%$ | **68.39** | +| moodycamel | 4 | 1 | $50.0\%$ | 25.31 | +| moodycamel | 4 | 1 | $90.0\%$ | 22.75 | +| moodycamel | 4 | 1 | $98.0\%$ | 21.69 | Bulk enqueue: | Queue | P (Producers) | C (Consumer) | **Throughput (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **102.25** | -| **daking** | 2 | 1 | **102.37** | -| **daking** | 4 | 1 | **85.23** | -| **daking** | 8 | 1 | **77.16** | -| moodycamel | 1 | 1 | 17.02 | -| moodycamel | 2 | 1 | 18.85 | -| moodycamel | 4 | 1 | 18.06 | -| moodycamel | 8 | 1 | 16.02 | +| **daking** | 1 | 1 | **167.47** | +| **daking** | 2 | 1 | **191.27** | +| **daking** | 4 | 1 | **195.14** | +| **daking** | 8 | 1 | **159.11** | +| moodycamel | 1 | 1 | 36.24 | +| moodycamel | 2 | 1 | 36.23 | +| moodycamel | 4 | 1 | 34.82 | +| moodycamel | 8 | 1 | 33.99 | -**Part V: Enqueue/Dequeue Latency** +**Part V: Memory Lifecycle Benchmark (Stable vs Idle Reclaim)** + +This benchmark runs two production/drain bursts on the same queue. The `stable` mode keeps the warm global pool between bursts. The `idle_reclaim` mode calls `shrink_to_fit()` after the first burst and then measures the next burst from the reclaimed state. + +Command: + +```powershell +cmake --build out/build/clang-local --target mpsc_bench_memory_cycle +.\out\build\clang-local\mpsc_bench_memory_cycle.exe --benchmark_min_time=0.2s --benchmark_repetitions=1 --benchmark_counters_tabular=true +``` + +| Mode | P (Producers) | Nodes Before | Nodes After | **Throughput (M items/s)** | Shrink Time (us) | +| :--- | :--- | :--- | :--- | :--- | :--- | +| stable | 1 | 16.384k | 16.384k | 96.69 | - | +| stable | 4 | 16.384k | 16.384k | 32.20 | - | +| stable | 16 | 32.768k | 32.768k | 37.54 | - | +| idle_reclaim | 1 | 32.768k | 256 | 93.94 | 3.5 | +| idle_reclaim | 4 | 32.768k | 256 | 31.98 | 5.5 | +| idle_reclaim | 16 | 262.144k | 256 | 37.01 | 135.2 | + +The idle reclaim path aggressively returns the global pool to the minimum chunk count while keeping the next burst in the same throughput band. It should still be treated as a quiescent-state operation, not an online elastic reclamation mechanism. + +**Part VI: Enqueue/Dequeue Latency** (Based on HdrHistogram, Test on Linux) We get below performance: @@ -284,6 +306,8 @@ We get below performance: 2. `ThreadLocalCapacity` (thread-local capacity) is fixed at **compile time**. 3. Pointer chasing cannot be avoided because it is a pure linked-list structure. +If you need to reclaim memory while keeping the queue instance alive, call `shrink_to_fit()` only after the queue has been fully drained and all producers have stopped. This is an idle-time reclamation path, not a concurrent shrink path. It is intentionally conservative and is not designed for online elastic memory reclamation. + ## Features 1. Multiple-Producer, Single-Consumer (MPSC). The closer the contention scenario is to SPSC, the closer the throughput gets to the SPSC benchmark performance. diff --git a/README.zh.md b/README.zh.md index 3422e84..a8bda5c 100644 --- a/README.zh.md +++ b/README.zh.md @@ -203,47 +203,69 @@ Compiler: Clang 22.1.1 Release ```powershell cmake -S . -B out/build/clang-local -G Ninja -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Release cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark -.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.1s --benchmark_repetitions=1 --benchmark_counters_tabular=true +.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.5s --benchmark_repetitions=1 --benchmark_counters_tabular=true ``` 均匀单元素入队: | 队列 | P (生产者) | C (消费者) | **吞吐量 (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **86.78** | -| daking | 2 | 1 | **32.29** | -| daking | 4 | 1 | **32.52** | -| daking | 8 | 1 | 31.16 | -| moodycamel | 1 | 1 | 22.74 | -| moodycamel | 2 | 1 | 27.82 | -| moodycamel | 4 | 1 | 29.79 | -| **moodycamel** | 8 | 1 | **32.07** | +| **daking** | 1 | 1 | **82.21** | +| daking | 2 | 1 | **29.69** | +| daking | 4 | 1 | **32.22** | +| daking | 8 | 1 | 28.22 | +| moodycamel | 1 | 1 | 23.21 | +| moodycamel | 2 | 1 | 28.50 | +| moodycamel | 4 | 1 | 30.75 | +| **moodycamel** | 8 | 1 | **30.86** | 不均匀顺序爆发: | 队列 | P (生产者) | C (消费者) | 接力百分比 | **吞吐量 (M items/s)** | | :--- | :--- | :--- | :--- | :--- | -| daking | 4 | 1 | $50.0\%$ | **33.89** | -| daking | 4 | 1 | $90.0\%$ | **61.72** | -| daking | 4 | 1 | $98.0\%$ | **73.72** | -| moodycamel | 4 | 1 | $50.0\%$ | 23.84 | -| moodycamel | 4 | 1 | $90.0\%$ | 22.25 | -| moodycamel | 4 | 1 | $98.0\%$ | 17.64 | +| daking | 4 | 1 | $50.0\%$ | **32.62** | +| daking | 4 | 1 | $90.0\%$ | **58.54** | +| daking | 4 | 1 | $98.0\%$ | **68.39** | +| moodycamel | 4 | 1 | $50.0\%$ | 25.31 | +| moodycamel | 4 | 1 | $90.0\%$ | 22.75 | +| moodycamel | 4 | 1 | $98.0\%$ | 21.69 | 批量入队: | 队列 | P (生产者) | C (消费者) | **吞吐量 (M items/s)** | | :--- | :--- | :--- | :--- | -| **daking** | 1 | 1 | **102.25** | -| **daking** | 2 | 1 | **102.37** | -| **daking** | 4 | 1 | **85.23** | -| **daking** | 8 | 1 | **77.16** | -| moodycamel | 1 | 1 | 17.02 | -| moodycamel | 2 | 1 | 18.85 | -| moodycamel | 4 | 1 | 18.06 | -| moodycamel | 8 | 1 | 16.02 | +| **daking** | 1 | 1 | **167.47** | +| **daking** | 2 | 1 | **191.27** | +| **daking** | 4 | 1 | **195.14** | +| **daking** | 8 | 1 | **159.11** | +| moodycamel | 1 | 1 | 36.24 | +| moodycamel | 2 | 1 | 36.23 | +| moodycamel | 4 | 1 | 34.82 | +| moodycamel | 8 | 1 | 33.99 | -**第五部分:Enqueue/Dequeue Latency** +**第五部分:内存生命周期基准测试(稳定模式 vs 空闲回收)** + +这个基准测试在同一个队列上连续执行两轮生产/清空 burst。`stable` 模式会保留第一轮之后的全局池热状态;`idle_reclaim` 模式会在第一轮后调用 `shrink_to_fit()`,再从回收后的状态执行下一轮 burst。 + +运行命令: + +```powershell +cmake --build out/build/clang-local --target mpsc_bench_memory_cycle +.\out\build\clang-local\mpsc_bench_memory_cycle.exe --benchmark_min_time=0.2s --benchmark_repetitions=1 --benchmark_counters_tabular=true +``` + +| 模式 | P (生产者) | 回收前节点数 | 回收后节点数 | **吞吐量 (M items/s)** | Shrink 耗时 (us) | +| :--- | :--- | :--- | :--- | :--- | :--- | +| stable | 1 | 16.384k | 16.384k | 96.69 | - | +| stable | 4 | 16.384k | 16.384k | 32.20 | - | +| stable | 16 | 32.768k | 32.768k | 37.54 | - | +| idle_reclaim | 1 | 32.768k | 256 | 93.94 | 3.5 | +| idle_reclaim | 4 | 32.768k | 256 | 31.98 | 5.5 | +| idle_reclaim | 16 | 262.144k | 256 | 37.01 | 135.2 | + +空闲回收路径可以把全局池激进地回收到最小 chunk 数,同时下一轮 burst 的吞吐仍处在同一档。但它仍然应该被视为静止期操作,不是在线弹性回收机制。 + +**第六部分:Enqueue/Dequeue Latency** (此部分基于HdrHistogram,在Linux平台测试) 我们得到了以下延迟表现: @@ -286,6 +308,8 @@ cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark 2. `ThreadLocalCapacity`(线程本地容量)在**编译时**已固定。 3. 无法避免指针追逐,因为是纯链表结构。 +如果你需要在队列实例仍然存在时回收内存,可以在队列已经完全清空并且所有生产者停止之后调用 `shrink_to_fit()`。它是一个空闲期回收接口,不是并发缩容接口,也不是在线弹性回收设计。 + ## 特性 (FEATURES) 1. 多生产者,单消费者(MPSC), 若竞争场景越接近SPSC,吞吐量越接近SPSC基准测试性能。 diff --git a/benchmarks/bench_latency.cpp b/benchmarks/bench_latency.cpp index e1871c0..c6b274b 100644 --- a/benchmarks/bench_latency.cpp +++ b/benchmarks/bench_latency.cpp @@ -132,8 +132,36 @@ static void BM_MPSC_PureDequeueLatency(benchmark::State& state) { hdr_close(hist); } +static void BM_MPSC_ShrinkToFitLatency(benchmark::State& state) { + hdr_histogram* hist; + hdr_init(1, 1000000, 3, &hist); + + for (auto _ : state) { + TestQueue q; + TestQueue::reserve_global_chunk(64); + for (int i = 0; i < 10000; ++i) { + q.enqueue(i); + } + int val = 0; + while (q.try_dequeue(val)) {} + auto start = __rdtsc(); + bool ok = q.shrink_to_fit(); + auto end = __rdtsc(); + if (!ok) { + state.SkipWithError("shrink_to_fit failed unexpectedly"); + break; + } + hdr_record_value(hist, end - start); + } + + state.counters["P99_ns"] = hdr_value_at_percentile(hist, 99.0) / CYCLES_PER_NS; + state.counters["P99.9_ns"] = hdr_value_at_percentile(hist, 99.9) / CYCLES_PER_NS; + hdr_close(hist); +} + BENCHMARK(BM_MPSC_PureEnqueueLatency)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->Unit(benchmark::kMicrosecond); BENCHMARK(BM_MPSC_PureDequeueLatency)->Unit(benchmark::kMicrosecond); +BENCHMARK(BM_MPSC_ShrinkToFitLatency)->Unit(benchmark::kMicrosecond); BENCHMARK_MAIN(); @@ -181,4 +209,4 @@ int main(int argc, char** argv) { return 0; } -#endif \ No newline at end of file +#endif diff --git a/benchmarks/bench_linearizable.cpp b/benchmarks/bench_linearizable.cpp index f5c3b60..a6bba39 100644 --- a/benchmarks/bench_linearizable.cpp +++ b/benchmarks/bench_linearizable.cpp @@ -7,8 +7,6 @@ #include "daking/MPSC_queue.hpp" -#if DAKING_HAS_CXX20_OR_ABOVE - struct Message { int producer_id; uint64_t seq; @@ -16,6 +14,119 @@ struct Message { using LinearQueue = daking::MPSC_queue; +static void BM_MPSC_Test_Linearizable_TryDequeue(benchmark::State& state) { + const int num_producers = (int)state.range(0); + const uint64_t ops_per_producer = 1000000; + + for (auto _ : state) { + state.PauseTiming(); + + LinearQueue q; + std::atomic start_signal{false}; + std::atomic linear_error{false}; + std::vector last_seq(num_producers, 0); + + std::thread consumer([&]() { + while (!start_signal.load(std::memory_order_acquire)); + const uint64_t total_expected = ops_per_producer * num_producers; + uint64_t received = 0; + while (received < total_expected) { + Message msg; + if (!q.try_dequeue(msg)) { + continue; + } + if (msg.seq <= last_seq[msg.producer_id] && last_seq[msg.producer_id] != 0) { + linear_error.store(true, std::memory_order_relaxed); + } + last_seq[msg.producer_id] = msg.seq; + ++received; + } + }); + + std::vector producers; + producers.reserve(num_producers); + for (int p_id = 0; p_id < num_producers; ++p_id) { + producers.emplace_back([&, p_id]() { + while (!start_signal.load(std::memory_order_acquire)); + for (uint64_t s = 1; s <= ops_per_producer; ++s) { + q.enqueue({p_id, s}); + } + }); + } + + state.ResumeTiming(); + start_signal.store(true, std::memory_order_release); + + for (auto& p : producers) p.join(); + consumer.join(); + + if (linear_error.load(std::memory_order_relaxed)) { + state.SkipWithError("Linearizability Violation Detected!"); + break; + } + } + + state.SetItemsProcessed(state.range(0) * ops_per_producer * state.iterations()); + state.SetLabel("TryDequeue linearizable when " + std::to_string(state.range(0)) + "P."); +} + +static void BM_MPSC_Test_Linearizable_AfterShrink_TryDequeue(benchmark::State& state) { + const int num_producers = (int)state.range(0); + const uint64_t ops_per_producer = 200000; + + for (auto _ : state) { + state.PauseTiming(); + LinearQueue q; + q.shrink_to_fit(); + std::atomic start_signal{false}; + std::atomic linear_error{false}; + std::vector last_seq(num_producers, 0); + + std::thread consumer([&]() { + while (!start_signal.load(std::memory_order_acquire)); + const uint64_t total_expected = ops_per_producer * num_producers; + uint64_t received = 0; + while (received < total_expected) { + Message msg; + if (!q.try_dequeue(msg)) { + continue; + } + if (msg.seq <= last_seq[msg.producer_id] && last_seq[msg.producer_id] != 0) { + linear_error.store(true, std::memory_order_relaxed); + } + last_seq[msg.producer_id] = msg.seq; + ++received; + } + }); + + std::vector producers; + producers.reserve(num_producers); + for (int p_id = 0; p_id < num_producers; ++p_id) { + producers.emplace_back([&, p_id]() { + while (!start_signal.load(std::memory_order_acquire)); + for (uint64_t s = 1; s <= ops_per_producer; ++s) { + q.enqueue({p_id, s}); + } + }); + } + + state.ResumeTiming(); + start_signal.store(true, std::memory_order_release); + for (auto& p : producers) p.join(); + consumer.join(); + + if (linear_error.load(std::memory_order_relaxed)) { + state.SkipWithError("Linearizability Violation Detected After Shrink!"); + break; + } + } + + state.SetItemsProcessed(state.range(0) * ops_per_producer * state.iterations()); + state.SetLabel("TryDequeue linearizable after shrink when " + std::to_string(state.range(0)) + "P."); +} + +#if DAKING_HAS_CXX20_OR_ABOVE + static void BM_MPSC_Test_Linearizable(benchmark::State& state) { const int num_producers = (int)state.range(0); const uint64_t ops_per_producer = 1000000; @@ -79,13 +190,19 @@ BENCHMARK(BM_MPSC_Test_Linearizable) ->UseRealTime() ->Unit(benchmark::kMillisecond); -BENCHMARK_MAIN(); +#endif -#else +BENCHMARK(BM_MPSC_Test_Linearizable_TryDequeue) + ->Arg(1) + ->Arg(4) + ->Arg(16) + ->UseRealTime() + ->Unit(benchmark::kMillisecond); -int main(int argc, char** argv) { - std::cout << "This test is only for C++20 and above. Modify CMakeLists.txt \"set(CMAKE_CXX_STANDARD 17)\"->\"set(CMAKE_CXX_STANDARD 20)\"" << std::endl; - return 0; -} +BENCHMARK(BM_MPSC_Test_Linearizable_AfterShrink_TryDequeue) + ->Arg(1) + ->Arg(4) + ->UseRealTime() + ->Unit(benchmark::kMillisecond); -#endif \ No newline at end of file +BENCHMARK_MAIN(); diff --git a/benchmarks/bench_memory_cycle.cpp b/benchmarks/bench_memory_cycle.cpp new file mode 100644 index 0000000..b487336 --- /dev/null +++ b/benchmarks/bench_memory_cycle.cpp @@ -0,0 +1,162 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "daking/MPSC_queue.hpp" + +namespace { + +using StableQueue = daking::MPSC_queue, daking::memory_policy::stable>; +using IdleReclaimQueue = daking::MPSC_queue; +using ElasticQueue = daking::MPSC_queue, daking::memory_policy::elastic>; + +constexpr std::size_t kItemsPerProducer = 200000; + +enum class ReclaimMode { + none, + shrink_to_fit, + free_pages +}; + +template +void producer_thread(Queue* q, std::size_t items_to_push, std::atomic_bool* start) { + while (!start->load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + for (std::size_t i = 0; i < items_to_push; ++i) { + q->enqueue(1); + } +} + +template +void consumer_thread(Queue* q, std::size_t total_items_to_pop, std::atomic_bool* start) { + while (!start->load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + std::size_t popped_count = 0; + int val = 0; + while (popped_count < total_items_to_pop) { + if (q->try_dequeue(val)) { + ++popped_count; + } + else { + std::this_thread::yield(); + } + } +} + +template +void run_burst(Queue& q, int producers, std::size_t items_per_producer) { + const std::size_t total_items = items_per_producer * static_cast(producers); + std::atomic_bool start{false}; + + std::thread consumer(consumer_thread, &q, total_items, &start); + + std::vector producer_threads; + producer_threads.reserve(static_cast(producers)); + for (int i = 0; i < producers; ++i) { + producer_threads.emplace_back(producer_thread, &q, items_per_producer, &start); + } + + start.store(true, std::memory_order_release); + for (auto& producer : producer_threads) { + producer.join(); + } + consumer.join(); +} + +template +static void BM_MPSC_MemoryCycle(benchmark::State& state) { + const int producers = static_cast(state.range(0)); + const std::size_t items_per_producer = kItemsPerProducer; + const std::size_t total_items_per_burst = items_per_producer * static_cast(producers); + + for (auto _ : state) { + state.PauseTiming(); + Queue q; + Queue::reserve_global_chunk(64); + state.ResumeTiming(); + + run_burst(q, producers, items_per_producer); + if (!q.empty()) { + state.SkipWithError("Queue should be empty after first burst"); + break; + } + + const auto nodes_before = Queue::global_node_size_apprx(); + double reclaim_us = 0.0; + std::size_t reclaimed_nodes = 0; + if constexpr (Mode == ReclaimMode::shrink_to_fit) { + const auto reclaim_start = std::chrono::steady_clock::now(); + if (!q.shrink_to_fit()) { + state.SkipWithError("shrink_to_fit failed unexpectedly"); + break; + } + const auto reclaim_end = std::chrono::steady_clock::now(); + reclaim_us = std::chrono::duration(reclaim_end - reclaim_start).count(); + reclaimed_nodes = nodes_before - Queue::global_node_size_apprx(); + } + else if constexpr (Mode == ReclaimMode::free_pages) { + const auto reclaim_start = std::chrono::steady_clock::now(); + reclaimed_nodes = Queue::reclaim_free_pages(); + const auto reclaim_end = std::chrono::steady_clock::now(); + reclaim_us = std::chrono::duration(reclaim_end - reclaim_start).count(); + reclaimed_nodes = nodes_before - Queue::global_node_size_apprx(); + } + const auto nodes_after = Queue::global_node_size_apprx(); + + run_burst(q, producers, items_per_producer); + if (!q.empty()) { + state.SkipWithError("Queue should be empty after second burst"); + break; + } + + state.counters["global_nodes_before"] = static_cast(nodes_before); + state.counters["global_nodes_after"] = static_cast(nodes_after); + if constexpr (Mode != ReclaimMode::none) { + state.counters["reclaim_us"] = reclaim_us; + state.counters["reclaimed_nodes"] = static_cast(reclaimed_nodes); + } + } + + state.SetItemsProcessed(static_cast(total_items_per_burst * 2 * state.iterations())); + const char* policy = "unknown"; + const char* mode = "none"; + if constexpr (std::is_same_v) { + policy = "stable"; + } + else if constexpr (std::is_same_v) { + policy = "idle_reclaim"; + } + else if constexpr (std::is_same_v) { + policy = "elastic"; + } + + if constexpr (Mode == ReclaimMode::shrink_to_fit) { + mode = "shrink_to_fit"; + } + else if constexpr (Mode == ReclaimMode::free_pages) { + mode = "reclaim_free_pages"; + } + + state.SetLabel(std::string(policy) + ", " + mode + ", P=" + std::to_string(producers)); +} + +} // namespace + +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, StableQueue, ReclaimMode::none) + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, IdleReclaimQueue, ReclaimMode::shrink_to_fit) + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, ElasticQueue, ReclaimMode::none) + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); +BENCHMARK_TEMPLATE(BM_MPSC_MemoryCycle, ElasticQueue, ReclaimMode::free_pages) + ->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16)->UseRealTime()->MinWarmUpTime(1.0); + +BENCHMARK_MAIN(); diff --git a/benchmarks/bench_reclaim.cpp b/benchmarks/bench_reclaim.cpp new file mode 100644 index 0000000..473d82b --- /dev/null +++ b/benchmarks/bench_reclaim.cpp @@ -0,0 +1,49 @@ +#include + +#include +#include +#include + +#include "daking/MPSC_queue.hpp" + +namespace { + +using ElasticQueue = daking::MPSC_queue, daking::memory_policy::elastic>; + +static void BM_MPSC_ReclaimFreePages(benchmark::State& state) { + const std::size_t reserve_chunks = static_cast(state.range(0)); + + for (auto _ : state) { + state.PauseTiming(); + ElasticQueue q; + ElasticQueue::reserve_global_chunk(reserve_chunks); + const auto nodes_before = ElasticQueue::global_node_size_apprx(); + state.ResumeTiming(); + + const auto reclaim_start = std::chrono::steady_clock::now(); + const auto reclaimed_nodes = ElasticQueue::reclaim_free_pages(); + const auto reclaim_end = std::chrono::steady_clock::now(); + const auto nodes_after = ElasticQueue::global_node_size_apprx(); + + state.counters["global_nodes_before"] = static_cast(nodes_before); + state.counters["global_nodes_after"] = static_cast(nodes_after); + state.counters["reclaimed_nodes"] = static_cast(reclaimed_nodes); + state.counters["reclaim_us"] = std::chrono::duration(reclaim_end - reclaim_start).count(); + } + + state.SetItemsProcessed(static_cast(state.iterations())); + state.SetLabel("elastic, reclaim-only, chunks=" + std::to_string(static_cast(state.range(0)))); +} + +} // namespace + +BENCHMARK(BM_MPSC_ReclaimFreePages) + ->Arg(1) + ->Arg(4) + ->Arg(16) + ->Arg(64) + ->Arg(256) + ->UseRealTime() + ->MinWarmUpTime(1.0); + +BENCHMARK_MAIN(); diff --git a/benchmarks/bench_throughput.cpp b/benchmarks/bench_throughput.cpp index a3a129d..3e5d3e2 100644 --- a/benchmarks/bench_throughput.cpp +++ b/benchmarks/bench_throughput.cpp @@ -11,6 +11,13 @@ constexpr size_t TOTAL_OPS = 100000000; // using TestQueue = moodycamel::ConcurrentQueue; using TestQueue = daking::MPSC_queue; +struct BenchmarkRunConfig { + int producers; + size_t reserve_chunks; + bool shrink_before_run; + bool bulk_mode; +}; + void producer_thread(TestQueue* q, size_t items_to_push, std::atomic_bool* start) { while (!start->load(std::memory_order_acquire)) { std::this_thread::yield(); @@ -20,6 +27,16 @@ void producer_thread(TestQueue* q, size_t items_to_push, std::atomic_bool* start } } +void producer_thread_enqueue_bulk(TestQueue* q, size_t items_to_push, std::atomic_bool* start) { + while (!start->load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + int nums[32]{}; + for (size_t i = 0; i < items_to_push; i += 32) { + q->enqueue_bulk(nums, std::min(items_to_push - i, (std::size_t)32)); + } +} + void consumer_thread(TestQueue* q, size_t total_items_to_pop, std::atomic_bool* start) { while (!start->load(std::memory_order_acquire)) { std::this_thread::yield(); @@ -33,20 +50,30 @@ void consumer_thread(TestQueue* q, size_t total_items_to_pop, std::atomic_bool* } } -static void BM_MPSC_Throughput(benchmark::State& state) { - const int num_producers = (int)state.range(0); - const size_t items_per_producer = TOTAL_OPS / num_producers; - - TestQueue q; +static void RunThroughputScenario(benchmark::State& state, const BenchmarkRunConfig& cfg) { + const size_t items_per_producer = TOTAL_OPS / cfg.producers; for (auto _ : state) { + TestQueue q; + if (cfg.reserve_chunks != 0) { + TestQueue::reserve_global_chunk(cfg.reserve_chunks); + } + if (cfg.shrink_before_run) { + q.shrink_to_fit(); + } + state.PauseTiming(); std::vector producers; - producers.reserve(num_producers); - std::atomic_bool start{ false }; + producers.reserve(cfg.producers); + std::atomic_bool start{ false }; std::thread consumer(consumer_thread, &q, TOTAL_OPS, &start); - for (int i = 0; i < num_producers; ++i) { - producers.emplace_back(producer_thread, &q, items_per_producer, &start); + for (int i = 0; i < cfg.producers; ++i) { + if (cfg.bulk_mode) { + producers.emplace_back(producer_thread_enqueue_bulk, &q, items_per_producer, &start); + } + else { + producers.emplace_back(producer_thread, &q, items_per_producer, &start); + } } state.ResumeTiming(); start.store(true, std::memory_order_release); @@ -61,7 +88,11 @@ static void BM_MPSC_Throughput(benchmark::State& state) { } state.SetItemsProcessed(TOTAL_OPS * state.iterations()); - state.SetLabel("P=" + std::to_string(num_producers) + ", C=1"); +} + +static void BM_MPSC_Throughput(benchmark::State& state) { + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 0, false, false }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", warm"); } BENCHMARK(BM_MPSC_Throughput) @@ -129,47 +160,11 @@ BENCHMARK(BM_4x_UnevenWave_SPSClike_Aggregation) ->UseRealTime() ->MinWarmUpTime(2.0); -void producer_thread_enqueue_bulk(TestQueue* q, size_t items_to_push, std::atomic_bool* start) { - while (!start->load(std::memory_order_acquire)) { - std::this_thread::yield(); - } - int nums[32]{}; - for (size_t i = 0; i < items_to_push; i += 32) { - q->enqueue_bulk(nums, std::min(items_to_push - i, (std::size_t)32)); - } -} - // consumer thread remains the same, because dequeue_bulk is just a while loop of try_dequeue static void BM_MPSC_Throughput_Bulk(benchmark::State& state) { - const int num_producers = (int)state.range(0); - const size_t items_per_producer = TOTAL_OPS / num_producers; - - TestQueue q; - - for (auto _ : state) { - state.PauseTiming(); - std::vector producers; - producers.reserve(num_producers); - std::atomic_bool start{ false }; - std::thread consumer(consumer_thread, &q, TOTAL_OPS, &start); - for (int i = 0; i < num_producers; ++i) { - producers.emplace_back(producer_thread_enqueue_bulk, &q, items_per_producer, &start); - } - state.ResumeTiming(); - start.store(true, std::memory_order_release); - if (consumer.joinable()) { - consumer.join(); - } - state.PauseTiming(); - for (auto& p : producers) { - p.join(); - } - state.ResumeTiming(); - } - - state.SetItemsProcessed(TOTAL_OPS * state.iterations()); - state.SetLabel("P=" + std::to_string(num_producers) + ", C=1"); + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 0, false, true }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", bulk"); } BENCHMARK(BM_MPSC_Throughput_Bulk) @@ -181,6 +176,30 @@ BENCHMARK(BM_MPSC_Throughput_Bulk) ->UseRealTime() ->MinWarmUpTime(2.0); +static void BM_MPSC_Throughput_ColdStart(benchmark::State& state) { + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 0, true, false }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", cold-shrink"); +} + +BENCHMARK(BM_MPSC_Throughput_ColdStart) + ->Args({ 1 }) + ->Args({ 4 }) + ->Args({ 16 }) + ->UseRealTime() + ->MinWarmUpTime(2.0); + +static void BM_MPSC_Throughput_PreReserved(benchmark::State& state) { + RunThroughputScenario(state, BenchmarkRunConfig{ (int)state.range(0), 256, false, false }); + state.SetLabel("P=" + std::to_string((int)state.range(0)) + ", pre-reserved"); +} + +BENCHMARK(BM_MPSC_Throughput_PreReserved) + ->Args({ 1 }) + ->Args({ 4 }) + ->Args({ 16 }) + ->UseRealTime() + ->MinWarmUpTime(2.0); + BENCHMARK_MAIN(); /* @@ -236,4 +255,4 @@ BM_MPSC_Throughput_Bulk/2/min_warmup_time:2.000/real_time 1451039 BM_MPSC_Throughput_Bulk/4/min_warmup_time:2.000/real_time 1491950000 ns 0.000 ns 1 items_per_second=67.0264M/s P=4, C=1 BM_MPSC_Throughput_Bulk/8/min_warmup_time:2.000/real_time 1674253600 ns 0.000 ns 1 items_per_second=59.7281M/s P=8, C=1 BM_MPSC_Throughput_Bulk/16/min_warmup_time:2.000/real_time 2299654400 ns 0.000 ns 1 items_per_second=43.4848M/s P=16, C=1 -*/ \ No newline at end of file +*/ diff --git a/docs/elastic-reclaim-design.md b/docs/elastic-reclaim-design.md new file mode 100644 index 0000000..fc5892f --- /dev/null +++ b/docs/elastic-reclaim-design.md @@ -0,0 +1,176 @@ +# Elastic Reclaim Design Notes + +This document records the design direction for an online elastic reclamation mode. +The existing queue is optimized for stable, low-jitter throughput and should remain +the default behavior until the elastic design is proven by benchmarks and stress +tests. + +## Current Memory Model + +The current implementation uses: + +- Per-thread local node caches. +- A global lock-free chunk stack. +- Page allocation owned by a type-level global manager. +- Freely mixed chunks: nodes from different pages can be combined into the same + logical chunk. + +This is fast because a producer or consumer only deals with a local node list in +the hot path, and the global stack only moves chunk heads with a tagged pointer. + +## Why Online Page Reclaim Is Not Safe Today + +The current page list knows which contiguous allocation owns nodes, but the global +chunk stack does not preserve page ownership. After enough push/pop cycles, a chunk +can contain nodes from many pages. Because of that, the manager cannot prove that +all nodes from a page are back in the global pool while producers and the consumer +are still running. + +There is also a producer publication window: + +1. Producer allocates a node. +2. Producer exchanges `head_`. +3. Producer publishes `old_head->next_`. + +During that window, `empty()` can be insufficient as a global quiescence test. +This is why the current `shrink_to_fit()` is intentionally limited to idle-time +usage after all producers have stopped. + +## Target Modes + +### stable + +The stable mode keeps the current memory model: + +- Freely mixed chunks. +- Minimal hot-path metadata. +- No online page reclamation. +- Lowest jitter and strongest throughput target. + +### idle_reclaim + +The idle reclaim mode is the current conservative `shrink_to_fit()` path: + +- Requires a drained queue. +- Requires no active producers. +- Requires a single same-template queue instance. +- Rebuilds the minimum global pool after releasing all pages. + +This mode is already benchmarked by `mpsc_bench_memory_cycle`. + +### elastic + +The future elastic mode must preserve page ownership well enough to reclaim pages +online. The important design point is that this cannot be a small modification of +the current global chunk stack. It needs a different ownership layout. + +The likely direction: + +- Allocate fixed-size slabs/pages. +- Keep chunks page-local or attach page ownership metadata to chunk groups. +- Avoid per-node atomics on the single-element enqueue/dequeue fast path. +- Reclaim only pages whose nodes are all visible in free caches at reclaim time. + +## Proposed Architecture + +Use a policy split rather than a runtime branch: + +```cpp +namespace daking::memory_policy { + struct stable; + struct idle_reclaim; + struct elastic; +} +``` + +The queue can then grow toward: + +```cpp +template < + typename Ty, + std::size_t ThreadLocalCapacity = 256, + std::size_t Align = 64, + typename Alloc = std::allocator, + typename MemoryPolicy = memory_policy::idle_reclaim +> +class MPSC_queue; +``` + +The default should keep today's behavior. Experimental policies should be opt-in. + +Current branch status: + +- `memory_policy::stable` exists and disables `shrink_to_fit()`. +- `memory_policy::idle_reclaim` is the default and keeps today's behavior. +- `memory_policy::elastic` is an opt-in prototype. It uses per-node page metadata + and fixed-size page slabs, then scans free caches in `reclaim_free_pages()` to + release pages whose nodes are all free. + +## Current Elastic Prototype + +The current prototype intentionally favors correctness and observability over +final throughput: + +- It is selected only by `memory_policy::elastic`. +- Default `idle_reclaim` and `stable` allocation paths keep their thread-local + chunk caches. +- Elastic allocation reserves one page per thread-local chunk so page reclamation + has chunk-sized granularity instead of one large ever-growing page. +- Elastic allocation and deallocation use the same thread-local chunk cache path + as the default policy. There is no per-node page counter update in the enqueue + or dequeue hot path. +- Each elastic node records its owning page. Reclamation drains the global + free-list and the quiescent thread-local caches under the mutex, counts free + nodes per page, returns free nodes from live pages to the pool, and deallocates + fully free pages. +- A page containing the queue dummy node or any queued item is not reclaimed. +- The current public `reclaim_free_pages()` path is a quiescent reclaim hook. It + refuses to run when more than one thread-local hook is active, so it is not a + fully concurrent online shrinker for active producers. + +This is not the final low-jitter elastic architecture. It is a first correctness +prototype for validating the page ownership model, the public API shape, and +benchmark coverage before investing in page-local chunk caches or lower-contention +reclaim bookkeeping. + +## Current Benchmark Read + +The latest short-run matrix shows: + +- `elastic` without reclaim is now essentially on the same band as `stable`. +- The reclaim path still costs real time, which is expected because it is a + quiescent scan, not a hot-path feature. +- The remaining performance risk is reclaim-time scanning, not steady-state + enqueue/dequeue. +- `mpsc_bench_memory_cycle` excludes queue construction and explicit pre-reserve + setup from timing, so the reported throughput focuses on the two burst cycles + and the optional reclaim step. + +That means the next optimization target is the reclaim path itself, not the +default queue fast path. + +## Benchmark Gates + +An elastic implementation should not be considered successful unless it passes: + +- Existing functional tests. +- Linearizability benchmarks before and after reclaim. +- `mpsc_bench_memory_cycle`. +- A long-running grow/drain/reclaim/regrow soak. +- A throughput comparison against stable mode. +- Tail latency checks for enqueue and dequeue. + +Acceptance target: + +- No correctness regression. +- Online reclaim must reduce retained global nodes under bursty workloads. +- Stable mode throughput must not regress. +- Elastic mode overhead must be explicit and benchmarked, not assumed. + +## Implementation Plan + +1. Introduce policy type names without changing hot-path behavior. Done. +2. Add benchmarks that can compare policy types side by side. Done. +3. Prototype page-aware reclaim behind an opt-in `elastic` policy. In progress. +4. Add reclaim accounting and tests. In progress. +5. Run soak and latency benchmarks before considering a PR upstream. diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index 499eaeb..489059d 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -94,6 +94,8 @@ SOFTWARE. #include #include #include +#include +#include #include #include #include @@ -102,6 +104,20 @@ SOFTWARE. namespace daking { + namespace memory_policy { + struct stable { + static constexpr bool can_shrink_to_fit = false; + }; + + struct idle_reclaim { + static constexpr bool can_shrink_to_fit = true; + }; + + struct elastic { + static constexpr bool can_shrink_to_fit = false; + }; + } + /* SC MP [tail]->[]->[]->[]->[]->[head] @@ -148,6 +164,9 @@ namespace daking { */ namespace detail { + template + struct MPSC_page; + template struct MPSC_node { using value_type = typename Queue::value_type; @@ -180,6 +199,7 @@ namespace daking { size_type count_; node_t* node_; page_t* next_; + size_type reclaim_free_count_ = 0; }; template @@ -325,25 +345,227 @@ namespace daking { void reserve(size_type count) { /* Already locked */ - node_t* new_nodes = altraits_node_t::allocate(*this, count); - page_t* new_page = altraits_page_t::allocate(*this, 1); - altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); - global_page_list_ = new_page; - - for (size_type i = 0; i < count; i++) { - new_nodes[i].next_ = new_nodes + i + 1; // seq_cst - if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { - // chunk_count = count / ThreadLocalCapacity - new_nodes[i].next_ = nullptr; - std::atomic_thread_fence(std::memory_order_acq_rel); - // mutex don't protect global_chunk_stack_ - Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + if constexpr (Queue::uses_elastic_reclaim) { + size_type page_count = count / Queue::thread_local_capacity; + for (size_type p = 0; p < page_count; ++p) { + node_t* new_nodes = altraits_node_t::allocate(*this, Queue::thread_local_capacity); + page_t* new_page = altraits_page_t::allocate(*this, 1); + altraits_page_t::construct(*this, new_page, new_nodes, Queue::thread_local_capacity, global_page_list_); + global_page_list_ = new_page; + + for (size_type i = 0; i < Queue::thread_local_capacity; ++i) { + new_nodes[i].next_ = new_nodes + i + 1; // seq_cst + if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { + new_nodes[i].next_ = nullptr; + std::atomic_thread_fence(std::memory_order_acq_rel); + Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + } + } + } + } + else { + node_t* new_nodes = altraits_node_t::allocate(*this, count); + page_t* new_page = altraits_page_t::allocate(*this, 1); + altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); + global_page_list_ = new_page; + + for (size_type i = 0; i < count; i++) { + new_nodes[i].next_ = new_nodes + i + 1; // seq_cst + if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { + // chunk_count = count / ThreadLocalCapacity + new_nodes[i].next_ = nullptr; + std::atomic_thread_fence(std::memory_order_acq_rel); + // mutex don't protect global_chunk_stack_ + Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + } } } global_node_count_.store(global_node_count_ + count, std::memory_order_release); } + size_type reclaim_free_pages() { + static_assert(Queue::uses_elastic_reclaim); + + if (global_thread_local_manager_.size() != 1) { + return 0; + } + + struct page_range { + std::uintptr_t begin_; + std::uintptr_t end_; + page_t* page_; + }; + + std::vector page_ranges; + for (page_t* page = global_page_list_; page; page = page->next_) { + page->reclaim_free_count_ = 0; + const auto begin = reinterpret_cast(page->node_); + page_ranges.push_back(page_range{ + begin, + begin + sizeof(node_t) * page->count_, + page + }); + } + std::sort(page_ranges.begin(), page_ranges.end(), [](const page_range& lhs, const page_range& rhs) { + return lhs.begin_ < rhs.begin_; + }); + + auto find_page_for = [&](node_t* free_node) -> page_t* { + const auto addr = reinterpret_cast(free_node); + auto it = std::upper_bound(page_ranges.begin(), page_ranges.end(), addr, [](std::uintptr_t value, const page_range& range) { + return value < range.begin_; + }); + if (it == page_ranges.begin()) { + return nullptr; + } + --it; + return addr < it->end_ ? it->page_ : nullptr; + }; + + struct free_node_record { + node_t* node_; + page_t* page_; + }; + + std::vector free_nodes; + bool lookup_failed = false; + + node_t* node = nullptr; + while (Queue::global_chunk_stack_.try_pop(node)) { + for (size_type i = 0; i < Queue::thread_local_capacity && node; ++i) { + page_t* page = find_page_for(node); + if (!page) DAKING_UNLIKELY { + lookup_failed = true; + } + else { + ++page->reclaim_free_count_; + } + free_nodes.push_back(free_node_record{node, page}); + node = node->next_.load(std::memory_order_relaxed); + } + } + + auto collect_thread_local = [&](thread_local_t* pair_ptr) { + node_t* local_node = pair_ptr->first; + pair_ptr->first = nullptr; + pair_ptr->second = 0; + + while (local_node) { + page_t* page = find_page_for(local_node); + if (page) DAKING_LIKELY { + ++page->reclaim_free_count_; + } + else { + lookup_failed = true; + } + free_nodes.push_back(free_node_record{local_node, page}); + node_t* next = local_node->next_.load(std::memory_order_relaxed); + local_node->next_.store(nullptr, std::memory_order_relaxed); + local_node = next; + } + }; + + for (auto& [tid, pair_ptr] : global_thread_local_manager_) { + collect_thread_local(pair_ptr.get()); + } + for (auto& pair_ptr : global_thread_local_recycler_) { + collect_thread_local(pair_ptr.get()); + } + + auto republish_free_nodes = [&] { + node_t* chunk_head = nullptr; + node_t* chunk_tail = nullptr; + size_type chunk_size = 0; + + for (free_node_record free_node : free_nodes) { + if (!free_node.node_) DAKING_UNLIKELY { + continue; + } + node_t* node = free_node.node_; + node->next_.store(nullptr, std::memory_order_relaxed); + if (!chunk_head) { + chunk_head = node; + } + else { + chunk_tail->next_.store(node, std::memory_order_relaxed); + } + chunk_tail = node; + + if (++chunk_size == Queue::thread_local_capacity) { + Queue::global_chunk_stack_.push(chunk_head); + chunk_head = nullptr; + chunk_tail = nullptr; + chunk_size = 0; + } + } + + if (chunk_head) DAKING_UNLIKELY { + auto& pair_ptr = global_thread_local_manager_.begin()->second; + pair_ptr->first = chunk_head; + pair_ptr->second = chunk_size; + } + }; + + if (lookup_failed || !global_page_list_) { + republish_free_nodes(); + return 0; + } + + page_t* forced_keep_page = nullptr; + bool has_live_page = false; + for (page_t* page = global_page_list_; page; page = page->next_) { + if (page->reclaim_free_count_ != page->count_) { + has_live_page = true; + break; + } + } + if (!has_live_page) { + forced_keep_page = global_page_list_; + } + + auto should_reclaim = [&](page_t* page) { + if (page == forced_keep_page) { + return false; + } + return page->reclaim_free_count_ == page->count_; + }; + + std::vector kept_free_nodes; + kept_free_nodes.reserve(free_nodes.size()); + for (free_node_record free_node : free_nodes) { + if (!free_node.page_ || !should_reclaim(free_node.page_)) { + kept_free_nodes.push_back(free_node); + } + } + free_nodes.swap(kept_free_nodes); + + size_type reclaimed_nodes = 0; + page_t** current = &global_page_list_; + while (*current) { + page_t* page = *current; + if (!should_reclaim(page)) { + current = &page->next_; + continue; + } + + *current = page->next_; + reclaimed_nodes += page->count_; + altraits_node_t::deallocate(*this, page->node_, page->count_); + altraits_page_t::deallocate(*this, page, 1); + } + + if (!reclaimed_nodes) { + republish_free_nodes(); + return 0; + } + + republish_free_nodes(); + global_node_count_.fetch_sub(reclaimed_nodes, std::memory_order_acq_rel); + + return reclaimed_nodes; + } + DAKING_ALWAYS_INLINE thread_local_t* register_for(std::thread::id tid) { /* Already locked */ if (!global_thread_local_recycler_.empty()) { @@ -382,7 +604,8 @@ namespace daking { typename Ty, std::size_t ThreadLocalCapacity = 256, std::size_t Align = 64, /* std::hardware_destructive_interference_size */ - typename Alloc = std::allocator + typename Alloc = std::allocator, + typename MemoryPolicy = memory_policy::idle_reclaim > class MPSC_queue { public: @@ -395,9 +618,12 @@ namespace daking { using pointer = typename std::allocator_traits::pointer; using reference = Ty&; using const_reference = const Ty&; + using memory_policy_type = MemoryPolicy; static constexpr std::size_t thread_local_capacity = ThreadLocalCapacity; static constexpr std::size_t align = Align; + static constexpr bool uses_elastic_reclaim = + std::is_same_v; private: using node_t = detail::MPSC_node; @@ -420,6 +646,13 @@ namespace daking { "Alloc should have a template constructor like 'Alloc(const Alloc& alloc)' to meet internal conversion." ); + static_assert( + std::is_same_v || + std::is_same_v || + std::is_same_v, + "MemoryPolicy must be one of daking::memory_policy::{stable, idle_reclaim, elastic}." + ); + friend thread_hook_t; friend manager_t; friend altraits_node_t; @@ -644,6 +877,45 @@ namespace daking { return global_manager_instance_ ? _reserve_global_external(chunk_count) : false; } + DAKING_ALWAYS_INLINE static size_type reclaim_free_pages() { + if constexpr (!uses_elastic_reclaim) { + return 0; + } + else { + if (!global_manager_instance_) { + return 0; + } + std::lock_guard lock(global_mutex_); + return _get_global_manager().reclaim_free_pages(); + } + } + + DAKING_ALWAYS_INLINE bool shrink_to_fit() { + if constexpr (!memory_policy_type::can_shrink_to_fit) { + return false; + } + + if (!empty()) { + return false; + } + + std::lock_guard lock(global_mutex_); + if (global_instance_count_.load(std::memory_order_acquire) != 1) { + return false; + } + + if (!empty()) { + return false; + } + + _free_global(); + _get_global_manager().reserve(thread_local_capacity); + node_t* dummy = _allocate(); + tail_ = dummy; + head_.store(dummy, std::memory_order_release); + return true; + } + private: DAKING_ALWAYS_INLINE static manager_t& _get_global_manager() noexcept { return *global_manager_instance_; diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index 58988ff..e111e01 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -97,6 +97,136 @@ TEST(MPSCQueueMemoryTest, ReserveGlobalChunk) { EXPECT_EQ(Q::global_node_size_apprx(), reserved_size); } +TEST(MPSCQueueMemoryTest, ShrinkToFit_ReclaimsPagesWhenIdle) { + using Q = MPSC_queue; + Q q; + + Q::reserve_global_chunk(12); + size_t before = Q::global_node_size_apprx(); + EXPECT_GE(before, (size_t)12 * 64); + + int value = 0; + q.enqueue(7); + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_TRUE(q.empty()); + + EXPECT_TRUE(q.shrink_to_fit()); + EXPECT_TRUE(q.empty()); + EXPECT_EQ(Q::global_node_size_apprx(), (size_t)64); +} + +TEST(MPSCQueueMemoryTest, ShrinkToFit_FailsWhenQueueIsNotEmpty) { + using Q = MPSC_queue; + Q q; + + q.enqueue(7); + EXPECT_FALSE(q.shrink_to_fit()); + + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 7); + EXPECT_TRUE(q.empty()); +} + +TEST(MPSCQueueMemoryTest, ShrinkToFit_FailsWhileAnotherInstanceLives) { + using Q = MPSC_queue; + Q primary; + + { + Q secondary; + int value = 0; + + Q::reserve_global_chunk(4); + primary.enqueue(11); + EXPECT_TRUE(primary.try_dequeue(value)); + EXPECT_TRUE(primary.empty()); + EXPECT_FALSE(primary.shrink_to_fit()); + } + + EXPECT_TRUE(primary.shrink_to_fit()); + EXPECT_EQ(Q::global_node_size_apprx(), (size_t)64); +} + +TEST(MPSCQueueMemoryTest, ShrinkToFit_DisabledForStablePolicy) { + using StableQ = MPSC_queue, daking::memory_policy::stable>; + StableQ q; + + q.enqueue(9); + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_TRUE(q.empty()); + + EXPECT_FALSE(q.shrink_to_fit()); +} + +TEST(MPSCQueueMemoryTest, ElasticPolicy_ReclaimsFullyFreePages) { + using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; + ElasticQ q; + + ElasticQ::reserve_global_chunk(5); + const size_t before = ElasticQ::global_node_size_apprx(); + EXPECT_GE(before, (size_t)5 * 8); + + const size_t reclaimed = ElasticQ::reclaim_free_pages(); + const size_t after = ElasticQ::global_node_size_apprx(); + + EXPECT_GT(reclaimed, (size_t)0); + EXPECT_EQ(after + reclaimed, before); + EXPECT_GE(after, (size_t)8); + + q.enqueue(17); + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 17); + EXPECT_TRUE(q.empty()); +} + +TEST(MPSCQueueMemoryTest, ElasticPolicy_ReclaimsOnlyFullyFreePages) { + using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; + ElasticQ q; + + ElasticQ::reserve_global_chunk(6); + const size_t before = ElasticQ::global_node_size_apprx(); + EXPECT_GE(before, (size_t)6 * 8); + + q.enqueue(31); + const size_t reclaimed = ElasticQ::reclaim_free_pages(); + const size_t after = ElasticQ::global_node_size_apprx(); + + EXPECT_GT(reclaimed, (size_t)0); + EXPECT_LT(after, before); + EXPECT_GE(after, (size_t)8); + + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 31); + EXPECT_TRUE(q.empty()); + + q.enqueue(32); + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 32); + EXPECT_TRUE(q.empty()); +} + +TEST(MPSCQueueMemoryTest, ElasticPolicy_DoesNotReclaimLivePage) { + using ElasticQ = MPSC_queue, daking::memory_policy::elastic>; + ElasticQ q; + + const size_t initial = ElasticQ::global_node_size_apprx(); + q.enqueue(23); + + EXPECT_EQ(ElasticQ::reclaim_free_pages(), (size_t)0); + EXPECT_EQ(ElasticQ::global_node_size_apprx(), initial); + + int value = 0; + EXPECT_TRUE(q.try_dequeue(value)); + EXPECT_EQ(value, 23); + EXPECT_TRUE(q.empty()); + + EXPECT_EQ(ElasticQ::reclaim_free_pages(), (size_t)0); + EXPECT_EQ(ElasticQ::global_node_size_apprx(), initial); +} + // ------------------------------------------------------------------------- // III. Bulk Operation Tests // -------------------------------------------------------------------------