Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
#include "bvar/latency_recorder.h" // bvar::LatencyRecorder
#include "bthread/bthread.h" // bthread_start_background
#include "bthread/task_group.h" // TaskGroup::address_meta
#include "brpc/event_dispatcher.h"

DECLARE_int32(task_group_ntags);
Expand Down Expand Up @@ -68,6 +69,7 @@ void InitializeGlobalDispatchers() {
bthread_attr_t attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j);
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ template <typename T> friend class IOEvent;
// Stop bthread of this dispatcher.
void Stop();

void set_priority_index(int idx) { _priority_index = idx; }

// Suspend calling thread until bthread of this dispatcher stops.
void Join();

Expand Down Expand Up @@ -188,6 +190,8 @@ template <typename T> friend class IOEvent;

// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];

int _priority_index{-1};
};

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
Expand Down
8 changes: 7 additions & 1 deletion src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) {
}

void* EventDispatcher::RunThis(void* arg) {
((EventDispatcher*)arg)->Run();
EventDispatcher* ed = (EventDispatcher*)arg;
if (ed->_priority_index >= 0) {
bthread::TaskMeta* meta =
bthread::TaskGroup::address_meta(bthread_self());
meta->priority_index = ed->_priority_index;
}
ed->Run();
return NULL;
}

Expand Down
40 changes: 32 additions & 8 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "",
"Set of CPUs to which cores are bound. "
"for example, 0-3,5,7; default: disable");

namespace brpc { DECLARE_int32(event_dispatcher_num); }

namespace bthread {

DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
Expand Down Expand Up @@ -205,11 +207,28 @@ TaskControl::TaskControl()
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
, _priority_queues(FLAGS_task_group_ntags)
, _pq_num_of_each_tag(brpc::FLAGS_event_dispatcher_num)
, _priority_queues(FLAGS_task_group_ntags * brpc::FLAGS_event_dispatcher_num)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
{}

int TaskControl::init_priority_queues() {
if (!_enable_priority_queue) {
return 0;
}
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (int j = 0; j < _pq_num_of_each_tag; ++j) {
if (priority_queue(i, j).init(BTHREAD_MAX_CONCURRENCY) != 0) {
LOG(ERROR) << "Fail to init priority queue for tag=" << i
<< " ed=" << j;
return -1;
}
}
}
return 0;
}

int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
Expand Down Expand Up @@ -238,10 +257,10 @@ int TaskControl::init(int concurrency) {
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
LOG(ERROR) << "Fail to init _priority_q";
return -1;
}
}

if (init_priority_queues() != 0) {
return -1;
}

// Make sure TimerThread is ready.
Expand Down Expand Up @@ -445,7 +464,7 @@ TaskControl::~TaskControl() {
_switch_per_second.hide();
_signal_per_second.hide();
_status.hide();

stop_and_join();
}

Expand Down Expand Up @@ -528,8 +547,12 @@ int TaskControl::_destroy_group(TaskGroup* g) {
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();

if (_priority_queues[tag].steal(tid)) {
return true;
if (_enable_priority_queue) {
for (int i = 0; i < _pq_num_of_each_tag; ++i) {
if (priority_queue(tag, i).steal(tid)) {
return true;
}
}
}

// 1: Acquiring fence is paired with releasing fence in _add_group to
Expand Down Expand Up @@ -689,4 +712,5 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() {
return living_bthread_ids;
}


} // namespace bthread
13 changes: 11 additions & 2 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ friend bthread_t init_for_pthread_stack_trace();
std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACER

void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
_priority_queues[tag].push(tid);
void push_priority_queue(bthread_tag_t tag, int priority_index, bthread_t tid) {
priority_queue(tag, priority_index).push(tid);
}

std::vector<bthread_t> get_living_bthreads();

private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
Expand All @@ -123,6 +124,13 @@ friend bthread_t init_for_pthread_stack_trace();
// Tag parking slot
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }

// Priority queue for a specific ED within a tag
WorkStealingQueue<bthread_t>& priority_queue(bthread_tag_t tag, int index) {
return _priority_queues[tag * _pq_num_of_each_tag + index];
}

int init_priority_queues();

static void delete_task_group(void* arg);

static void* worker_thread(void* task_control);
Expand Down Expand Up @@ -164,6 +172,7 @@ friend bthread_t init_for_pthread_stack_trace();
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;

bool _enable_priority_queue;
int _pq_num_of_each_tag;
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;

size_t _pl_num_of_each_tag;
Expand Down
4 changes: 3 additions & 1 deletion src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.

#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
Expand Down Expand Up @@ -908,7 +909,8 @@ void TaskGroup::priority_to_run(void* args_in) {
tls_task_group->_control->_task_tracer.set_status(
TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid);
return tls_task_group->control()->push_priority_queue(
args->tag, args->meta->priority_index, args->meta->tid);
}

struct SleepArgs {
Expand Down
2 changes: 2 additions & 0 deletions src/bthread/task_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ struct TaskMeta {
// simplified if they can get tid from TaskMeta.
bthread_t tid{INVALID_BTHREAD};

int priority_index{-1};

// User function and argument
void* (*fn)(void*){NULL};
void* arg{NULL};
Expand Down
13 changes: 13 additions & 0 deletions test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ cc_test(
"bthread_butex_multi_tag_unittest.cpp",
"bthread_rwlock_unittest.cpp",
"bthread_semaphore_unittest.cpp",
# Have custom main() that conflicts with gtest_main
"bthread_priority_queue_unittest.cpp",
],
),
copts = COPTS,
Expand All @@ -252,6 +254,17 @@ cc_test(
],
)

cc_test(
name = "bthread_priority_queue_test",
srcs = ["bthread_priority_queue_unittest.cpp"],
copts = COPTS,
deps = [
":sstream_workaround",
"//:brpc",
"@com_google_googletest//:gtest",
],
)

cc_test(
name = "brpc_prometheus_test",
srcs = glob(
Expand Down
Loading
Loading