From e4c203c609b08f00b7e2a17effb1166dfcd89163 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Fri, 13 Mar 2026 17:02:54 +0800 Subject: [PATCH 1/2] fix ann omp thread budget and add concurrency test --- be/src/storage/index/ann/faiss_ann_index.cpp | 9 ++- .../index/ann/faiss_vector_index_test.cpp | 62 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/be/src/storage/index/ann/faiss_ann_index.cpp b/be/src/storage/index/ann/faiss_ann_index.cpp index 77b865d82bf7b3..70a2ed01bc9e71 100644 --- a/be/src/storage/index/ann/faiss_ann_index.cpp +++ b/be/src/storage/index/ann/faiss_ann_index.cpp @@ -62,6 +62,7 @@ namespace doris::segment_v2 { namespace { std::mutex g_omp_thread_mutex; +std::condition_variable g_omp_thread_cv; int g_index_threads_in_use = 0; // Guard that ensures the total OpenMP threads used by concurrent index builds @@ -71,7 +72,11 @@ class ScopedOmpThreadBudget { // For each index build, reserve at most half of the remaining threads, at least 1 thread. ScopedOmpThreadBudget() { std::unique_lock lock(g_omp_thread_mutex); - auto thread_cap = config::omp_threads_limit - g_index_threads_in_use; + auto omp_threads_limit = get_omp_threads_limit(); + // Block until there is at least one OpenMP slot available under the global cap. + g_omp_thread_cv.wait(lock, [&] { return g_index_threads_in_use < omp_threads_limit; }); + auto thread_cap = omp_threads_limit - g_index_threads_in_use; + // Keep headroom for other concurrent index builds: take up to half of remaining budget. _reserved_threads = std::max(1, thread_cap / 2); g_index_threads_in_use += _reserved_threads; DorisMetrics::instance()->ann_index_build_index_threads->increment(_reserved_threads); @@ -88,6 +93,8 @@ class ScopedOmpThreadBudget { if (g_index_threads_in_use < 0) { g_index_threads_in_use = 0; } + // Wake waiting index builders so they can compete for the released OpenMP budget. + g_omp_thread_cv.notify_all(); VLOG_DEBUG << fmt::format( "ScopedOmpThreadBudget release threads reserved={}, remaining_in_use={}, limit={}", _reserved_threads, g_index_threads_in_use, get_omp_threads_limit()); diff --git a/be/test/storage/index/ann/faiss_vector_index_test.cpp b/be/test/storage/index/ann/faiss_vector_index_test.cpp index 9b4016bbcb8df7..3a2b7020fd5117 100644 --- a/be/test/storage/index/ann/faiss_vector_index_test.cpp +++ b/be/test/storage/index/ann/faiss_vector_index_test.cpp @@ -21,13 +21,18 @@ #include #include +#include +#include #include #include #include #include #include +#include #include +#include "common/config.h" +#include "common/metrics/doris_metrics.h" #include "storage/index/ann/ann_index.h" #include "storage/index/ann/ann_search_params.h" #include "storage/index/ann/faiss_ann_index.h" @@ -233,6 +238,63 @@ TEST_F(VectorSearchTest, UpdateRoaring) { } } +TEST_F(VectorSearchTest, OmpThreadBudgetNeverExceedsLimit) { + constexpr int kWorkers = 4; + constexpr int kDim = 64; + constexpr int kNumVectors = 20000; + + const auto old_omp_threads_limit = config::omp_threads_limit; + config::omp_threads_limit = 1; + + auto* budget_metric = DorisMetrics::instance()->ann_index_build_index_threads; + std::atomic start {false}; + std::atomic finished {0}; + std::vector workers; + workers.reserve(kWorkers); + + for (int worker_id = 0; worker_id < kWorkers; ++worker_id) { + workers.emplace_back([&start, &finished, worker_id]() { + auto index = std::make_unique(); + FaissBuildParameter params; + params.dim = kDim; + params.max_degree = 32; + params.index_type = FaissBuildParameter::IndexType::HNSW; + index->build(params); + + std::vector vectors(static_cast(kNumVectors) * kDim, + static_cast(worker_id + 1)); + while (!start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + auto st = index->add(kNumVectors, vectors.data()); + EXPECT_TRUE(st.ok()) << st.to_string(); + finished.fetch_add(1, std::memory_order_acq_rel); + }); + } + + start.store(true, std::memory_order_release); + + int64_t observed_peak = 0; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(20); + while (finished.load(std::memory_order_acquire) < kWorkers && + std::chrono::steady_clock::now() < deadline) { + observed_peak = std::max(observed_peak, budget_metric->value()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + for (auto& worker : workers) { + worker.join(); + } + + observed_peak = std::max(observed_peak, budget_metric->value()); + EXPECT_EQ(finished.load(std::memory_order_acquire), kWorkers); + EXPECT_LE(observed_peak, 1); + EXPECT_EQ(budget_metric->value(), 0); + + config::omp_threads_limit = old_omp_threads_limit; +} + TEST_F(VectorSearchTest, CompareResultWithNativeFaiss1) { const size_t iterations = 3; // Create random number generator From b7e3b20d5de34d09070ed5f7aabb169163408bb0 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Tue, 17 Mar 2026 21:03:10 +0800 Subject: [PATCH 2/2] fix compile --- .../storage/index/ann/faiss_vector_index_test.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/be/test/storage/index/ann/faiss_vector_index_test.cpp b/be/test/storage/index/ann/faiss_vector_index_test.cpp index 3a2b7020fd5117..d9a61adcaebfde 100644 --- a/be/test/storage/index/ann/faiss_vector_index_test.cpp +++ b/be/test/storage/index/ann/faiss_vector_index_test.cpp @@ -38,6 +38,7 @@ #include "storage/index/ann/faiss_ann_index.h" // metrics.h not used directly here #include "storage/index/ann/vector_search_utils.h" +#include "util/defer_op.h" using namespace doris::segment_v2; @@ -239,12 +240,15 @@ TEST_F(VectorSearchTest, UpdateRoaring) { } TEST_F(VectorSearchTest, OmpThreadBudgetNeverExceedsLimit) { - constexpr int kWorkers = 4; + constexpr int kWorkers = 2; constexpr int kDim = 64; - constexpr int kNumVectors = 20000; + // Keep this workload small to avoid long-running BE UT under ASAN. + constexpr int kNumVectors = 500; const auto old_omp_threads_limit = config::omp_threads_limit; config::omp_threads_limit = 1; + Defer reset_omp_threads_limit( + [&old_omp_threads_limit]() { config::omp_threads_limit = old_omp_threads_limit; }); auto* budget_metric = DorisMetrics::instance()->ann_index_build_index_threads; std::atomic start {false}; @@ -257,7 +261,8 @@ TEST_F(VectorSearchTest, OmpThreadBudgetNeverExceedsLimit) { auto index = std::make_unique(); FaissBuildParameter params; params.dim = kDim; - params.max_degree = 32; + params.max_degree = 8; + params.ef_construction = 20; params.index_type = FaissBuildParameter::IndexType::HNSW; index->build(params); @@ -291,8 +296,6 @@ TEST_F(VectorSearchTest, OmpThreadBudgetNeverExceedsLimit) { EXPECT_EQ(finished.load(std::memory_order_acquire), kWorkers); EXPECT_LE(observed_peak, 1); EXPECT_EQ(budget_metric->value(), 0); - - config::omp_threads_limit = old_omp_threads_limit; } TEST_F(VectorSearchTest, CompareResultWithNativeFaiss1) {