From 886cc48afd32049dabf9e2a577f00f464d40e648 Mon Sep 17 00:00:00 2001 From: "yannan.wyn" Date: Wed, 15 Apr 2026 10:38:03 +0800 Subject: [PATCH] fix(bthread): refactor sharded priority queue with per-ED shard Each EventDispatcher gets its own WorkStealingQueue, making concurrent push from multiple EDs naturally SPMC-safe without spinlocks. --- src/brpc/event_dispatcher.cpp | 2 + src/brpc/event_dispatcher.h | 4 + src/brpc/event_dispatcher_epoll.cpp | 8 +- src/bthread/task_control.cpp | 40 +++- src/bthread/task_control.h | 13 +- src/bthread/task_group.cpp | 4 +- src/bthread/task_meta.h | 2 + test/BUILD.bazel | 13 ++ test/bthread_priority_queue_unittest.cpp | 225 +++++++++++++++++++++++ 9 files changed, 299 insertions(+), 12 deletions(-) create mode 100644 test/bthread_priority_queue_unittest.cpp diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index a5265b8ccd..c8e6b80240 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -23,6 +23,7 @@ #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32 #include "bvar/latency_recorder.h" // bvar::LatencyRecorder #include "bthread/bthread.h" // bthread_start_background +#include "bthread/task_group.h" // TaskGroup::address_meta #include "brpc/event_dispatcher.h" DECLARE_int32(task_group_ntags); @@ -68,6 +69,7 @@ void InitializeGlobalDispatchers() { bthread_attr_t attr = FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags; + g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j); CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr)); } } diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index 3fdc9f17b9..d95243ac2d 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -114,6 +114,8 @@ template friend class IOEvent; // Stop bthread of this dispatcher. void Stop(); + void set_priority_index(int idx) { _priority_index = idx; } + // Suspend calling thread until bthread of this dispatcher stops. void Join(); @@ -188,6 +190,8 @@ template friend class IOEvent; // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit int _wakeup_fds[2]; + + int _priority_index{-1}; }; EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag); diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 5a6c23b0e5..31f60aace3 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) { } void* EventDispatcher::RunThis(void* arg) { - ((EventDispatcher*)arg)->Run(); + EventDispatcher* ed = (EventDispatcher*)arg; + if (ed->_priority_index >= 0) { + bthread::TaskMeta* meta = + bthread::TaskGroup::address_meta(bthread_self()); + meta->priority_index = ed->_priority_index; + } + ed->Run(); return NULL; } diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index ba067e3976..d4b45b55da 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "", "Set of CPUs to which cores are bound. " "for example, 0-3,5,7; default: disable"); +namespace brpc { DECLARE_int32(event_dispatcher_num); } + namespace bthread { DEFINE_bool(parking_lot_no_signal_when_no_waiter, false, @@ -205,11 +207,28 @@ TaskControl::TaskControl() , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") , _enable_priority_queue(FLAGS_enable_bthread_priority_queue) - , _priority_queues(FLAGS_task_group_ntags) + , _pq_num_of_each_tag(brpc::FLAGS_event_dispatcher_num) + , _priority_queues(FLAGS_task_group_ntags * brpc::FLAGS_event_dispatcher_num) , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) , _tagged_pl(FLAGS_task_group_ntags) {} +int TaskControl::init_priority_queues() { + if (!_enable_priority_queue) { + return 0; + } + for (int i = 0; i < FLAGS_task_group_ntags; ++i) { + for (int j = 0; j < _pq_num_of_each_tag; ++j) { + if (priority_queue(i, j).init(BTHREAD_MAX_CONCURRENCY) != 0) { + LOG(ERROR) << "Fail to init priority queue for tag=" << i + << " ed=" << j; + return -1; + } + } + } + return 0; +} + int TaskControl::init(int concurrency) { if (_concurrency != 0) { LOG(ERROR) << "Already initialized"; @@ -238,10 +257,10 @@ int TaskControl::init(int concurrency) { _tagged_worker_usage_second.push_back(new bvar::PerSecond>( "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); _tagged_nbthreads.push_back(new bvar::Adder("bthread_count", tag_str)); - if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { - LOG(ERROR) << "Fail to init _priority_q"; - return -1; - } + } + + if (init_priority_queues() != 0) { + return -1; } // Make sure TimerThread is ready. @@ -445,7 +464,7 @@ TaskControl::~TaskControl() { _switch_per_second.hide(); _signal_per_second.hide(); _status.hide(); - + stop_and_join(); } @@ -528,8 +547,12 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); - if (_priority_queues[tag].steal(tid)) { - return true; + if (_enable_priority_queue) { + for (int i = 0; i < _pq_num_of_each_tag; ++i) { + if (priority_queue(tag, i).steal(tid)) { + return true; + } + } } // 1: Acquiring fence is paired with releasing fence in _add_group to @@ -689,4 +712,5 @@ std::vector TaskControl::get_living_bthreads() { return living_bthread_ids; } + } // namespace bthread diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 4480daa677..89b3cb5912 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -101,11 +101,12 @@ friend bthread_t init_for_pthread_stack_trace(); std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER - void push_priority_queue(bthread_tag_t tag, bthread_t tid) { - _priority_queues[tag].push(tid); + void push_priority_queue(bthread_tag_t tag, int priority_index, bthread_t tid) { + priority_queue(tag, priority_index).push(tid); } std::vector get_living_bthreads(); + private: typedef std::array TaggedGroups; typedef std::array TaggedParkingLot; @@ -123,6 +124,13 @@ friend bthread_t init_for_pthread_stack_trace(); // Tag parking slot TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; } + // Priority queue for a specific ED within a tag + WorkStealingQueue& priority_queue(bthread_tag_t tag, int index) { + return _priority_queues[tag * _pq_num_of_each_tag + index]; + } + + int init_priority_queues(); + static void delete_task_group(void* arg); static void* worker_thread(void* task_control); @@ -164,6 +172,7 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector*> _tagged_nbthreads; bool _enable_priority_queue; + int _pq_num_of_each_tag; std::vector> _priority_queues; size_t _pl_num_of_each_tag; diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 877a5d406e..604f545c8a 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -640,6 +640,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) { TaskGroup* g = *pg; bthread_t next_tid = 0; // Find next task to run, if none, switch to idle thread of the group. + #ifndef BTHREAD_FAIR_WSQ // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9% @@ -908,7 +909,8 @@ void TaskGroup::priority_to_run(void* args_in) { tls_task_group->_control->_task_tracer.set_status( TASK_STATUS_READY, args->meta); #endif // BRPC_BTHREAD_TRACER - return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid); + return tls_task_group->control()->push_priority_queue( + args->tag, args->meta->priority_index, args->meta->tid); } struct SleepArgs { diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h index 1b77c0b601..c2c9a8510a 100644 --- a/src/bthread/task_meta.h +++ b/src/bthread/task_meta.h @@ -88,6 +88,8 @@ struct TaskMeta { // simplified if they can get tid from TaskMeta. bthread_t tid{INVALID_BTHREAD}; + int priority_index{-1}; + // User function and argument void* (*fn)(void*){NULL}; void* arg{NULL}; diff --git a/test/BUILD.bazel b/test/BUILD.bazel index b68b3fa08a..c90499b29f 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -240,6 +240,8 @@ cc_test( "bthread_butex_multi_tag_unittest.cpp", "bthread_rwlock_unittest.cpp", "bthread_semaphore_unittest.cpp", + # Have custom main() that conflicts with gtest_main + "bthread_priority_queue_unittest.cpp", ], ), copts = COPTS, @@ -252,6 +254,17 @@ cc_test( ], ) +cc_test( + name = "bthread_priority_queue_test", + srcs = ["bthread_priority_queue_unittest.cpp"], + copts = COPTS, + deps = [ + ":sstream_workaround", + "//:brpc", + "@com_google_googletest//:gtest", + ], +) + cc_test( name = "brpc_prometheus_test", srcs = glob( diff --git a/test/bthread_priority_queue_unittest.cpp b/test/bthread_priority_queue_unittest.cpp new file mode 100644 index 0000000000..faa23f3078 --- /dev/null +++ b/test/bthread_priority_queue_unittest.cpp @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include "bthread/bthread.h" +#include "bthread/task_group.h" + +namespace { + +// Counter incremented by priority bthreads to verify execution +std::atomic g_priority_count(0); +// Mutex + set for collecting executed tids to verify no loss +std::mutex g_tid_mutex; +std::set g_executed_ids; + +void reset_globals() { + g_priority_count.store(0); + std::lock_guard lk(g_tid_mutex); + g_executed_ids.clear(); +} + +struct TaskArg { + int id; +}; + +void* priority_task_fn(void* arg) { + TaskArg* ta = static_cast(arg); + g_priority_count.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard lk(g_tid_mutex); + g_executed_ids.insert(ta->id); + } + delete ta; + return NULL; +} + +void* normal_task_fn(void* /*arg*/) { + // Just a normal task that does nothing, used as a filler + bthread_usleep(1000); + return NULL; +} + +class PriorityQueueTest : public ::testing::Test { +protected: + void SetUp() override { + reset_globals(); + } +}; + +// Test 1: End-to-end priority task submission and execution. +// Multiple producers submit priority tasks, verify all tasks are executed. +TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) { + const int N = 200; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); + for (int i = 0; i < N; ++i) { + ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i; + } +} + +// Test 2: Mix of priority and normal tasks, all complete correctly. +TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) { + const int N_PRIORITY = 100; + const int N_NORMAL = 100; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids; + tids.reserve(N_PRIORITY + N_NORMAL); + + for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) { + bthread_t tid; + if (i % 2 == 0 && (i / 2) < N_PRIORITY) { + TaskArg* arg = new TaskArg{i / 2}; + ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr, + priority_task_fn, arg)); + } else { + ASSERT_EQ(0, bthread_start_background(&tid, NULL, + normal_task_fn, NULL)); + } + tids.push_back(tid); + } + + for (auto tid : tids) { + bthread_join(tid, NULL); + } + + ASSERT_EQ(N_PRIORITY, g_priority_count.load()); +} + +// Test 3: start_foreground (bthread_start_urgent) with GLOBAL_PRIORITY. +// Simulates ED calling StartInputEvent: ED bthread calls start_urgent, +// gets preempted into PQ via priority_to_run, child runs and ends, +// ending_sched steals ED from PQ to resume. +TEST_F(PriorityQueueTest, start_foreground_priority_to_run) { + const int N = 200; + + struct EDSimArg { + int n_tasks; + }; + EDSimArg ed_arg{N}; + + auto ed_fn = [](void* arg) -> void* { + EDSimArg* ea = static_cast(arg); + bthread::TaskMeta* meta = + bthread::TaskGroup::address_meta(bthread_self()); + meta->priority_index = 0; + + for (int i = 0; i < ea->n_tasks; ++i) { + TaskArg* ta = new TaskArg{i}; + bthread_t child; + bthread_start_urgent(&child, NULL, priority_task_fn, ta); + } + return NULL; + }; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + bthread_t ed_tid; + ASSERT_EQ(0, bthread_start_background(&ed_tid, &priority_attr, + ed_fn, &ed_arg)); + bthread_join(ed_tid, NULL); + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); +} + +// Test 4: Multiple ED-like bthreads concurrently calling start_urgent. +// Verifies PQ correctness under concurrent preemption from multiple EDs. +TEST_F(PriorityQueueTest, multiple_eds_concurrent_preempt) { + const int NUM_EDS = 4; + const int TASKS_PER_ED = 50; + const int TOTAL = NUM_EDS * TASKS_PER_ED; + std::atomic resume_count(0); + + struct EDArg { + int ed_index; + int n_children; + std::atomic* resume_count; + }; + + auto ed_fn = [](void* arg) -> void* { + EDArg* ea = static_cast(arg); + bthread::TaskMeta* meta = + bthread::TaskGroup::address_meta(bthread_self()); + meta->priority_index = ea->ed_index; + + for (int i = 0; i < ea->n_children; ++i) { + int id = ea->ed_index * ea->n_children + i; + TaskArg* ta = new TaskArg{id}; + bthread_t child; + bthread_start_urgent(&child, NULL, priority_task_fn, ta); + ea->resume_count->fetch_add(1, std::memory_order_relaxed); + } + return NULL; + }; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector ed_args(NUM_EDS); + std::vector ed_tids(NUM_EDS); + for (int i = 0; i < NUM_EDS; ++i) { + ed_args[i] = {i, TASKS_PER_ED, &resume_count}; + ASSERT_EQ(0, bthread_start_background(&ed_tids[i], &priority_attr, + ed_fn, &ed_args[i])); + } + + for (int i = 0; i < NUM_EDS; ++i) { + bthread_join(ed_tids[i], NULL); + } + + ASSERT_EQ(TOTAL, g_priority_count.load()); + ASSERT_EQ(TOTAL, resume_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)TOTAL, g_executed_ids.size()); +} + +} // namespace + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + google::SetCommandLineOption("enable_bthread_priority_queue", "true"); + google::SetCommandLineOption("event_dispatcher_num", "4"); + return RUN_ALL_TESTS(); +}