相关文章推荐
读研的山楂  ·  Apache bRPC 1.4.0 ...·  3 月前    · 

本文主要介绍brpc的线程模型,以及网络请求处理的整个流程。

1.1 brpc线程模型简介

bthread是brpc使用的M:N线程库,目的是在提高程序的并发度的同时,降低编码难度,并在核数日益增多的CPU上提供更好的scalability和cache locality。
”M:N“是指M个bthread会映射至N个pthread,一般M远大于N。由于linux当下的pthread实现(NPTL)是1:1的,M个bthread也相当于映射至N个LWP。
bthread的前身是Distributed Process(DP)中的fiber,一个N:1的合作式线程库,等价于event-loop库,但写的是同步代码。

bthread的设计目标:

  • 用户可以延续同步的编程模式,能在数百纳秒内建立bthread,可以使用多种原语进行同步;
  • bthread所有接口可在pthread中被调用并有合理的行为,使用bthread的代码可以在pthread中正常执行;
  • 能充分利用多核;
  • better cache locality, supporting NUMA is a plus。

目前来看,当时的设计目标基本上都达到了,下图是请求处理bthread在线程中的切换流程。
brpc线程切换图

1.2 bprc网络模型

brpc的网络模型是event_dispatcher+worker的non-blocking IO模型,和常规的event_dispatcher+worker模型不太一样的是,dispatcher和worker可以同时在不同的核上运行,内核不用频繁的切换就能完成有效的工作。线程总量也不用很多,所以对thread-local的使用也比较充分。

2. bthread实现原理

2.1 TaskGroup —— 1:N的 pthread调度器

1个TaskGroup对应着1个pthread,每个TaskGroup都有一个run_queue和remote_run_queue,里面放着待执行的bthread。
这两个queue的区别如下:

  • run_queue:里面放着在brpc框架内提交的bthread,是一个SPMC(单生产者多消费者)队列;
  • remote_queue:放着在brpc框架外的pthread提交bthread,是一个MPMC(多生产者多消费者)队列;
  • 调度方式:都是FIFO调度,但是优先级不一样,优先级顺序为:run_queue > remote_run_queue > 其他TaskGroup的run_queue > 其他TaskGroup的remote_run_queue。

TaskGroup结构说明:

// source code: src/bthread/task_group.h
class TaskGroup {
public:
    // 在前台创建一个bthread,并且立即执行,对应于外部接扣bthread_start_urgent接口
    // 将当前的bthread压入待执行队列,立即创建一个新的bthread执行对应的逻辑(fn和参数)
    static int start_foreground(TaskGroup** pg, 
                               bthread_t* __restrict tid,
                               const bthread_attr_t* __restrict attr,
                               void* (*fn)(void*),
                               void* __restrict arg);
    // 创建一个bthread,对应外部接口为bthread_start_background,将对应
    // 的新创建的bthread的tid push 到当前TaskGroup的 run-queue(_rq)
    // 并且继续执行当前执行体的逻辑
    // REMOTE是用来表示该bthread放入_rq还是_remote_rq
    template <bool REMOTE>
    int start_background(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void* (*fn)(void*),
                         void* __restrict arg);
    // 调度下一个bthread
    static void sched(TaskGroup** pg);
    static void ending_sched(TaskGroup** pg);
    static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
    static void sched_to(TaskGroup** pg, bthread_t next_tid);
    static void exchange(TaskGroup** pg, bthread_t next_tid);
    // bthread调度原语
    static void join(bthread_t tid, void** return_value);
    static void yield(TaskGroup** pg);  
    static void usleep(TasKGroup** pg, uint64_t timeout_us);
    static void set_stopped(bthread_t tid);
    static bool is_stopped(bthread_t tid);
    // TaskGroup执行loop
    // 通过在wait_task中steal_task从queue中获取任务
    void run_main_task();
private:
    bool wait_task(bthread_t* tid);
    bool steal_task(bthread_t* tid);
    TaskMeta*  _cur_meta;             // 当前执行过程中的bthread对应的TaskMeta
    TaskControl* _control;            // 当前Pthread(TaskGroup)所属的TaskControl(管理TaskGroup)
    int _num_signal;                  // push任务到自身队列,未signal唤醒其他TaskGroup的次数
    int _nsignaled;                   // push任务到自身队列,signal其他TaskGroup的次数(一旦signal置0)
    int64_t _last_run_ns;             // 上一次调度执行的时间
    int64_t _cumulated_cputime_ns;    // 总共累计的执行时间
    size_t  _nswitch;                 // bthread之间的切换次数
    // 在执行下一个任务(task)之前调用的函数,比如前一个bthread切入到新的bthread之后
    // 需要把前一个任务的btread_id push到runqueue以备下次调用
    RemainedFn _last_context_remained;       // 执行函数
    void*      _last_context_remained_args;  // 执行参数
    // 没有任务在的时候停在停车场等待唤醒,主要为了在闲时降低CPU占用
    ParkingLot* _pl;
    ParkingLot::State _last_pl_state;        // 停车场的上一次状态
    // 当前TaskGroup(pthread)从其他TaskGroup抢占任务的随机因子信息(work-steal)
    size_t  _steal_seed;        
    size_t  _steal_offset;
    ContexualStack* _main_stack;             // 初始化执行该TaskGroup的pthread的初始_main_stack
    bthread_t _main_tid;                     // 对应的main thread的pthread信息 
    WorkStrealingQueue<bthread_t> _rq;       // 本taskGroup的run_queue
    RemoteTaskQueue _remote_rq;              // 用于存放外围pthread线程创建的bthread,也就是上面说的remote_run_queue
                                             // 外围的pthread直接调用bthread库接口创建的bthread
    int     _remote_num_nosignal;   
    int     _remote_nsignaled;

TaskGroup的调度循环:

// source code: src/bthread/task_group.cpp
// 由TaskControl创建的线程(pthread) 通过调用如下函数进入调度所有bthread任务的主循环
TaskGroup::run_main_task() {
    // 启动时候任务队列中没有任何(或者说任务做完之后),需要等待or去其他TaskGroup偷bthread
    // 任务可能是已经放入自身的remote_queue,也有可能从别的TaskGroup中steal来的
    TaskGroup *dummy = this;
    while(wait_task(&tid)) { 
        // 获取到任务进行持续调度,这里会在设置完tid对应的task-meta之后
        // 调用TaskGroup::task_runner函数对当前任务进行处理,然后持续调用
        // ending_shced 从自身的run_queue和remote_queue中或者steal的方式调度任务
        TaskGroup::sched_to(&dummy, tid); 
    // 程序退出

2.2 TaskControl —— 管理线程池以及任务调度

TaskControl主要用来做管理TaskGroup,全局唯一,以及管理任务调度的(work-steal)。

// source code: src/bthread/task_control.h
class TaskControl {
public:
    // 初始化线程池,启动nconcurrency个线程,cpu_set为二次开发的绑核功能
    int init(int nconcurrency, const std::string &cpu_set="");
    // 创建一个TaskGroup(pthread)
    TaskGroup* create_group();
    // 在TaskGroup之间steal bthread
    bool steal_task(bthread_t* tid, size_t* seed, size_t offset);
    // 用于唤醒在停车场等待的TaskGroup
    void signal_task(int num_task);
    // 停止TaskControl
    void stop_and_join();
    // [线程不安全]
    // 在init之后,动态增加TaskGroup
    // 比如在启动server时,设定了比init的时候多的线程数
    int add_workers(int num);
    // 从TaskControl中随机选取一个TaskGroup
    // 主要用于外围pthread提交bthread等
    TaskGroup* choose_one_group();
private:
    // TaskGroup管理接口
    int _add_group(TaskGroup*);
    int _destroy_group(TaskGroup*);
    static void delete_task_group(void* arg);
    // 线程池中线程执行的函数:
    // 创建一个TaskGroup,然后执行TaskGroup的run_main_task函数进行loop
    static void* worker_thread(void* task_control);
	// TaskGroup的数量以及指针
    butil::atomic<size_t> _ngroup;
    TaskGroup** _groups;
    butil::Mutex _modify_group_mutex;
    bool _stop;                       // TaskControl退出标识
    butil::atomic<int> _concurrency;  // pthread个数
    std::vector<pthread_t> _workers;  // 线程池中tid
    // 停车场,用于TaskGroup没有任务的时候在这里进行停车,来任务了会进行唤醒
    static const int PARKING_LOT_NUM = 4;
    ParkingLot _pl[PARKING_LOT_NUM];

work-steal逻辑较为简单,使用随机的方式找一个TaskGroup去偷一个bthread,没有偷到,就换个TaskGroup继续偷。
代码逻辑如下:

// source code: src/bthread/task_control.cpp
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
    if (0 == ngroup) {return false; }
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
    *seed = s;
    return stolen;

2.3 bthread.h —— 对外api

src/bthread/bthread.h是brpc bthread对外暴露的接口,同时支持C++和C的调用。
接口的具体实现都在TaskGroup中,这里简单介绍其功能:

  • start_foreground:set_mained(ready_to_run(current_bth)) + sched_to(new_bth)
  • start_background:ready_to_run<_remote>(new_bth)
  • yield:set_mained(ready_to_run(current_bth)) + sched
  • usleep:add timer(ready_to_run_remote(current_bth)) + sched
  • join:wait on join butex until bthread quits

bthread还有线程调度的其他接口,比如:bthread_mutex_t、bthread_cond_t、bthread_rwlock_t等组件接口。

2.4 WorkStealQueue —— 单生产者多消费者环状队列

WorkStealQueue用在TaskGroup的run_queue,该queue是SPMC:

  • 生产者:生产者为当前TaskGroup中正在执行的bthread,通过start_foreground/start_background将任务入队
  • 消费者:消费者为当前TaskGroup和其他TaskGroup,它们通过steal获取bthread

WorkStealQueue是一个lock-free的环状队列,下面是WorkStealQueue的主要逻辑的代码:

// source code: src/bthread/work_stealing_queue.h
// WorkStealingQueue支持模版,但是brpc中仅在TaskGroup的_rq中用它管理bthread
// 所有后续的代码说明中都是用了bthread
template <typename T>
class WorkStealingQueue {
public:
    WorkStealingQueue() : _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}
    // WorkStealingQueue的初始化
    // capacity要求必须是2的幂
    int init(size_t capacity) {
        _buffer = new(std::nothrow) T[capacity];
        _capacity = capacity;
        return 0;
    // 将bthread入队,生产者为当前TaskGroup的bthread
    // Push an item into the queue.
    // Returns true on pushed.
    // May run in parallel with steal().
    // Never run in parallel with pop() or another push().
    bool push(const T& x) {
        const size_t b = _bottom.load(butil::memory_order_relaxed);
        const size_t t = _top.load(butil::memory_order_acquire);
        if (b >= t + _capacity) { // Full queue.
            return false;
        _buffer[b & (_capacity - 1)] = x;
        _bottom.store(b + 1, butil::memory_order_release);
        return true;
    // 从queue中取出一个bthread
    // 在开启BTHREAD_FAIR_WSQ时(default:off),当前TaskGroup会通过pop获取任务
    // 在不开启BTHREAD_FAIR_WSQ时,当前和其他的TaskGroup会通过下面的steal函数获取任务
    // Pop an item from the queue.
    // Returns true on popped and the item is written to `val'.
    // May run in parallel with steal().
    // Never run in parallel with push() or another pop().
    bool pop(T* val) {
        const size_t b = _bottom.load(butil::memory_order_relaxed);
        size_t t = _top.load(butil::memory_order_relaxed);
        if (t >= b) {
            // fast check since we call pop() in each sched.
            // Stale _top which is smaller should not enter this branch.
            return false;
        const size_t newb = b - 1;
        _bottom.store(newb, butil::memory_order_relaxed);
        butil::atomic_thread_fence(butil::memory_order_seq_cst);
        t = _top.load(butil::memory_order_relaxed);
        if (t > newb) {
            _bottom.store(b, butil::memory_order_relaxed);
            return false;
        *val = _buffer[newb & (_capacity - 1)];
        if (t != newb) {
            return true;
        // Single last element, compete with steal()
        const bool popped = _top.compare_exchange_strong(
            t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
        _bottom.store(b, butil::memory_order_relaxed);
        return popped;
    // 从queue中取出一个bthread
    // Pop an item from the queue.
    // Returns true on popped and the item is written to `val'.
    // May run in parallel with steal().
    // Never run in parallel with push() or another pop().
    bool steal(T* val) {
        size_t t = _top.load(butil::memory_order_acquire);
        size_t b = _bottom.load(butil::memory_order_acquire);
        if (t >= b) {
            // Permit false negative for performance considerations.
            return false;
        do {
            butil::atomic_thread_fence(butil::memory_order_seq_cst);
            b = _bottom.load(butil::memory_order_acquire);
            if (t >= b) {
                return false;
            *val = _buffer[t & (_capacity - 1)];
        } while (!_top.compare_exchange_strong(t, t + 1,
                                               butil::memory_order_seq_cst,
                                               butil::memory_order_relaxed));
        return true;
    // 获取当前队列中堆积的bthread个数
    size_t volatile_size() const {
        const size_t b = _bottom.load(butil::memory_order_relaxed);
        const size_t t = _top.load(butil::memory_order_relaxed);
        return (b <= t ? 0 : (b - t));
    size_t capacity() const { return _capacity; }
private:
    // Copying a concurrent structure makes no sense.
    DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);
    butil::atomic<size_t> _bottom;
    size_t _capacity;
    T* _buffer;
    butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top;

默认情况下brpc都是通过bthread的方式进行多线程任务处理,其实brpc还支持usercode_in_pthread,就是用户的CallMethod和Done中的回调都在pthread中执行,但是该功能性能并不好,不建议长期使用。

2.5 RemoteTaskQueue —— 在外围pthread中启动bthread

在外围pthread中启动bthread会放入到_remote_rq中,类型是RemoteTaskQueue。
RemoteTaskQueue是MPMC的队列,入队和出队都会用mutex加锁,相比于WorkStealQueue,性能不好。

下面是入队的过程:

// source code: src/bthread/task_group.cpp
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
    _remote_rq._mutex.lock();
    while (!_remote_rq.push_locked(tid)) {
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
        LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                                << _remote_rq.capacity();
        ::usleep(1000);
        _remote_rq._mutex.lock();
    if (nosignal) {
        ++_remote_num_nosignal;
        _remote_rq._mutex.unlock();
    } else {
        const int additional_signal = _remote_num_nosignal;
        _remote_num_nosignal = 0;
        _remote_nsignaled += 1 + additional_signal;
        _remote_rq._mutex.unlock();
        _control->signal_task(1 + additional_signal);

2.5 usercode_in_pthread —— 在pthread中执行用户代码

brpc为了方便业务快速接入brpc,支持了usercode_in_pthread选项(default:off),开启该功能,性能并不好,原因和RemoteTaskQueue类似。
在负载较大的情况下,上下文切换会占用较多的CPU资源,导致iops和latency等性能指标严重影响,不建议使用该功能。

开始介绍之前,先说一下usercode:

  • 服务端的CallMethod:用户实现的rpc接口,比如:Echo(…)
  • 客户端的done:用户指定的异步回调函数,比如:OnRPCDone(…)

usercode_in_pthread的原理也比较简单,就是典型的多生产者多消费者(MPMC)模型:

  • 一个queue,由全局mutex保护
  • 多个bthread在PrecessRequest中提交任务到queue中
  • 多个usercode线程消费queue中的任务

一个依赖MPMC队列的程序很难有很好的多核扩展性,因为这个队列的极限吞吐取决于同步cache的延时,而不是核心的个数。
最好是用多个SPMC或多个MPSC队列,甚至多个SPSC队列代替,在源头就规避掉竞争。

bthread就是使用SPMC队列,所以它具有更好的性能表现:

  • bthread会尽量减少上下文切换,在高并发情况下能够降低延迟;
  • 每个bthread线程都有一个任务执行队列(_rq和_remote_rq),在多线程中可以做到无锁;
  • usercode_in_pthread多个线程共用一个执行队列,通过mutex进行线程安全保护,上下文切换严重,效率较低;
  • bthread支持work-steal,能尽量减少长尾请求,usercode_in_pthread不支持work-steal。

主要代码逻辑如下:

// source code: src/bprc/policy/baidu_rpc_protocol.cpp
void ProcessRpcRequest(InputMessageBase* msg_base) {
    // 省略请求的pb解析、检查、预处理、错误处理等逻辑,直接看CallMethod的流程
    // 如果未开启usercode_in_pthread,直接原地调用CallMethod(比如:Echo(...))
    if (!FLAGS_usercode_in_pthread) {
        return svc->CallMethod(method, cntl.release(), 
                               req.release(), res.release(), done);
    // usercode_in_pthread的请求处理流程
    if (BeginRunningUserCode()) { // 如果inplace执行的任务不多,则inplace执行usercode
        svc->CallMethod(method, cntl.release(), 
                        req.release(), res.release(), done);
        return EndRunningUserCodeInPlace();
    } else { // 如果inplace执行的任务太多,则任务入队列,由usercode后台线程执行
        return EndRunningCallMethodInPool(
            svc, method, cntl.release(),
            req.release(), res.release(), done);
// source code: src/brpc/detail/usercode_backup_pool.cpp
// usercode任务入队流程
void EndRunningUserCodeInPool(void (*fn)(void*), void* arg) {
    InitUserCodeBackupPoolOnceOrDie();
    g_usercode_inplace.fetch_sub(1, butil::memory_order_relaxed);
    const UserCode usercode = { fn, arg };
    // 获取锁,并将任务入队列
    pthread_mutex_lock(&s_usercode_mutex);
    s_usercode_pool->queue.push_back(usercode);
    // 做过载检查
    if ((int)s_usercode_pool->queue.size() >=
        (FLAGS_usercode_backup_threads *
         FLAGS_max_pending_in_each_backup_thread)) {
        g_too_many_usercode = true;
    // 释放锁,并通知后台线程处理
    pthread_mutex_unlock(&s_usercode_mutex);
    pthread_cond_signal(&s_usercode_cond);
// source code: src/brpc/detail/usercode_backup_pool.cpp
// usercode任务处理流程
void UserCodeBackupPool::UserCodeRunningLoop() {
    while (true) {
        bool blocked = false;
        UserCode usercode = { NULL, NULL };
            // 等待通知
            BAIDU_SCOPED_LOCK(s_usercode_mutex);
            while (queue.empty()) {
                pthread_cond_wait(&s_usercode_cond, &s_usercode_mutex);
                blocked = true;
            // 从queue中拿到一个任务
            usercode = queue.front();
            queue.pop_front();
            // 过载检查
            if (g_too_many_usercode &&
                (int)queue.size() <= FLAGS_usercode_backup_threads) {
                g_too_many_usercode = false;
        // 处理任务
        usercode.fn(usercode.arg);

由上面的逻辑可以看出,usercode_in_pthread的瓶颈在具有全局锁的MPMC队列,在高负载情况下,不仅性能下降的厉害(折半),并且多核扩展性不好,故不建议在生产环境使用该功能。

2.6 brpc线程分类

brpc线程主要有这几类:

  • 1*主线程:一般用来启动server,然后监听中断信号(SIGINT、SIGTERM…),等待退出;
  • n*worker线程:由TaskControl管理,dispatcher和请求的处理都是由该组线程处理的,可以通过-bthread_concurrency来指定;
  • 1*timer线程:处理定时任务线程,比如bthread_usleep、control的超时就是通过timer来实现的,可以通过bthread_timer_add提交计划任务;
  • 1*sampler线程:主要是指标采样的线程。

3. brpc网络模型

服务端对网络请求的处理大致分这三个阶段:

  • dispatcher:接收到pollin事件;
  • ProcessEvent:处理事件,主要是读取请求数据,做内置协议的解析,为后续调用协议的处理逻辑做准备;
  • ProcessInputMessage:处理请求,调用usercode,即时:CallMethod。

3.1 EventDispatcher —— 事件分发器

brpc支持多个EventDispatcher(简写为EDISP),具体由-event_dispatcher_num参数决定,默认数量是1,每个EDISP负责一部分fd的监听处理(通过对fd哈希取模确定EDISP)。

在客户端连接不多的情况下,设置多个EDISP没有效果(TODO:后续需要测试多链接情况下,多个EDISP的性能)。

如下是brpc的EDISP的主要结构:

// source code: src/brpc/event_dispatcher.h
namespace brpc {
class EventDispatcher {
public:
    // 在全局初始化的时候调用Start启动EDISP
    // 创建一个bthread在TaskGroup里面执行EventLoop,也就是后面的 RunThis 函数
    virtual int Start(const bthread_attr_t* consumer_thread_attr);
    // EDISP管理接口
    bool Running() const;
    void Stop();
    void Join();
    // 向EventLoop中添加事件:EPOLLIN | EPOLLET
    // fd和socket_id一一对应
    // 当fd上的事件触发之后,就会调用对应socket的on_edge_triggered_events函数:
    // server accptor on_edge_triggered_events: Acceptor::OnNewConnections
    // 在OnNewConnections中,对创建好的链接,设置on_edge_triggered_events:InputMessenger::OnNewMessages
    int AddConsumer(SocketId socket_id, int fd);
    // Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
    // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
    // be used instead of EPOLL_CTL_ADD. When event arrives,
    // `Socket::HandleEpollOut' will be called with `socket_id'
    // Returns 0 on success, -1 otherwise and errno is set
    // 向EventLoop中添加事件:EPOLLIN | EPOLLET
    int AddEpollOut(SocketId socket_id, int fd, bool pollin);
    // Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
    // will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
    // Returns 0 on success, -1 otherwise and errno is set
    int RemoveEpollOut(SocketId socket_id, int fd, bool pollin);
private:
    DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
    // EventLoop
    // 会调用下面的Run函数进行loop
    static void* RunThis(void* arg);
    // Thread entry.
    void Run();
    // Remove the file descriptor `fd' from epoll.
    int RemoveConsumer(int fd);
    // The epoll to watch events.
    int _epfd;
    // false unless Stop() is called.
    volatile bool _stop;
    // identifier of hosting bthread
    bthread_t _tid;
    // The attribute of bthreads calling user callbacks.
    bthread_attr_t _consumer_thread_attr;
    // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
    int _wakeup_fds[2];
// 当有多个EDISP,根据fd找对应的EDISP:对fd进行hash取模。
EventDispatcher& GetGlobalEventDispatcher(int fd);
} // namespace brpc

以上是EDISP主要的结构,如下是其EDISP的EventLoop:

// source code: src/brpc/event_dispatcher.cpp
void EventDispatcher::Run() {
    while (!_stop) {
        epoll_event e[32];
        const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
        if (_stop) {
            break;
        if (n < 0) {
            if (EINTR == errno) {
                // We've checked _stop, no wake-up will be missed.
                continue;
            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
            break;
        for (int i = 0; i < n; ++i) {
            if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
                // We don't care about the return value.
                Socket::StartInputEvent(e[i].data.u64, e[i].events,
                                        _consumer_thread_attr);
        for (int i = 0; i < n; ++i) {
            if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
                // We don't care about the return value.
                Socket::HandleEpollOut(e[i].data.u64);
// source code: src/brpc/socket.cpp
// 开始处理input事件
int Socket::StartInputEvent(SocketId id, uint32_t events,
                            const bthread_attr_t& thread_attr) {
    SocketUniquePtr s;
    if (Address(id, &s) < 0) {
        return -1;
    // s->_nevent主要用于将同一个fd上的多个input事件放在一个bthread中处理
    // 在开始s->_nevent等于0时,创建一个bthread,后续尽量复用该bthread
    // 这里也对socket做了读保护:保证单线程读取数据
    if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
        bthread_t tid;
        Socket* const p = s.release();
        bthread_attr_t attr = thread_attr;
        attr.keytable_pool = p->_keytable_pool;
        // 在前台启动一个bthread处理input事件:
        // 1. 将当前的bthread放入_rq中
        // 2. 当前线程调度执行新创建的bthread:ProcessEvent
        if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
            LOG(FATAL) << "Fail to start ProcessEvent";
            ProcessEvent(p);
    return 0;
// 处理event
void* Socket::ProcessEvent(void* arg) {
    SocketUniquePtr s(static_cast<Socket*>(arg));
    // 调用socket上注册的回调函数
    // Input事件有两类:
    // 1. server的acceptor上注册的是:Acceptor::OnNewConnections
    // 2. 建立好连接的socket上注册的是:InputMessenger::OnNewMessages
    s->_on_edge_triggered_events(s.get());
    return NULL;

3.2 Acceptor::OnNewConnections —— 建立连接

建立连接的逻辑相对比较简单,主要就是在src/brpc/acceptor.cpp中的Acceptor::OnNewConnections函数,通过调用Socket::Create创建一个O_NONBLOCK、TCP_NODELAY的socket连接。

3.3 InputMessenger::OnNewMessages —— 读取rpc请求

接收读事件的逻辑相对复杂点,主要包括如下几个逻辑:

  • 从socket中读取数据,直到EOF
  • 对数据做请求切分,此处涉及到baidu-rpc协议的解析,baidu-rpc协议比较简单,总共12个字节:“PRPC” + body_size(uint32_t) + meta_size(uint32_t)
  • 启动bthread处理用户请求

下面是其主要的逻辑:

// source code: src/brpc/input_messenger.cpp
void InputMessenger::OnNewMessages(Socket* m) {
    // 如果socket中只有一个请求,那么这个请求的解析和处理都是在当前bthread,主要是为了减少上下文切换
    // 如果socket中有多个请求,所有的请求都会在这里解析,除了最后一个请求,其他请求都会生成一个bthread
    InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
    const InputMessageHandler* handlers = messenger->_handlers;
    int progress = Socket::PROGRESS_INIT;
    // 这个是自定义了资源释放函数的智能指针
    // 在智能指针析构时,会调用自定义的释放资源函数
    // 这里主要用于原地处理最后一个请求:RunLastMessage
    std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
    bool read_eof = false;
    while (!read_eof) {
        // 获取当前时间,主要是为了避免socket因为idle_timeout_s被关闭
        const int64_t received_us = butil::cpuwide_time_us();
        // 计算本次从socket中读取的数据长度
        size_t once_read = m->_avg_msg_size * 16;
        if (once_read < MIN_ONCE_READ) {
            once_read = MIN_ONCE_READ;  // min: 4KB
        } else if (once_read > MAX_ONCE_READ) {
            once_read = MAX_ONCE_READ;  // max: 512KB
        // 将数据从socket中读取到socket中IOBuf
        const ssize_t nr = m->DoRead(once_read);
        if (nr <= 0) {
            if (0 == nr) {
                // 读完了
                LOG_IF(WARNING, FLAGS_log_connection_close) << *m << " was closed by remote side";
                read_eof = true;                
            } else if (errno != EAGAIN) {
                if (errno == EINTR) {
                    continue;  // just retry
                // 错误处理
                const int saved_errno = errno;
                PLOG(WARNING) << "Fail to read from " << *m;
                m->SetFailed(saved_errno, "Fail to read from %s: %s",
                             m->description().c_str(), berror(saved_errno));
                // 注意:此处代码虽然是直接返回,但是在返回之前会处理last_msg
                return;
            } else if (!m->MoreReadEvents(&progress)) {
                // 该socket上没有新入的读事件,就返回
                // 注意:此处代码虽然是直接返回,但是在返回之前会处理last_msg
                return;
            } else {
                // 有新事件到达,继续处理
                continue;
        // 指标统计
        m->AddInputBytes(nr);
        // 避免socket因为idle_timeout_s被关闭
        m->_last_readtime_us.store(received_us, butil::memory_order_relaxed);
        size_t last_size = m->_read_buf.length();
        int num_bthread_created = 0;
        while (1) {
            size_t index = 8888;
            // 做请求切分
            ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
            if (!pr.is_ok()) {
                if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
                    // 读取的请求msg不完整,需要从socket中再次读取
                    m->_last_msg_size += (last_size - m->_read_buf.length());
                    break;
                } else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {
                    // 错误处理
                    LOG(WARNING)
                        << "Close " << *m << " due to unknown message: "
                        << butil::ToPrintable(m->_read_buf);
                    m->SetFailed(EINVAL, "Close %s due to unknown message",
                                 m->description().c_str());
                    return;
                } else {
                    // 错误处理
                    LOG(WARNING) << "Close " << *m << ": " << pr.error_str();
                    m->SetFailed(EINVAL, "Close %s: %s",
                                 m->description().c_str(), pr.error_str());
                    return;
            // 指标统计
            m->AddInputMessages(1);
            // 计算请求msg的平均大小
            const size_t cur_size = m->_read_buf.length();
            if (cur_size == 0) {
                // 如果socket中的有效数据size为空
                // 那么就将该buf中cache的block归还给TLS(ThreadLocalStorage)
                m->_read_buf.return_cached_blocks();
            m->_last_msg_size += (last_size - cur_size);
            last_size = cur_size;
            const size_t old_avg = m->_avg_msg_size;
            if (old_avg != 0) {
                m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size)
                / MSG_SIZE_WINDOW;
            } else {
                m->_avg_msg_size = m->_last_msg_size;
            m->_last_msg_size = 0;
            if (pr.message() == NULL) { // the Process() step can be skipped.
                continue;
            pr.message()->_received_us = received_us;
            pr.message()->_base_real_us = base_realtime;
            // 启动一个bthread处理上一个请求:msg->_process(msg)
            // 在第一次循环时由于last_msg为空,所以不会创建bthread
            // 在第二次之后的每个循环都会启动一个bthread处理上一个请求
            DestroyingPtr<InputMessageBase> msg(pr.message());
            QueueMessage(last_msg.release(), &num_bthread_created,
                             m->_keytable_pool);
            if (handlers[index].process == NULL) {
                LOG(ERROR) << "process of index=" << index << " is NULL";
                continue;
            m->ReAddress(&msg->_socket);
            m->PostponeEOF();
            msg->_process = handlers[index].process;
            msg->_arg = handlers[index].arg;
            if (!m->is_read_progressive()) {
                // Transfer ownership to last_msg
                last_msg.reset(msg.release());
            } else {
                // brpc走的是上一个分支
                // 在http rpc短连接中应该是走这个分支
                QueueMessage(msg.release(), &num_bthread_created,
                                 m->_keytable_pool);
                bthread_flush();
                num_bthread_created = 0;
        // 一个活跃的TaskGroup会立即处理,无需通知
        // 在比较差的情况下,TaskGroup线程正在休眠,那么就需要通知线程处理bthread
        if (num_bthread_created) {
            bthread_flush();
    if (read_eof) {
        m->SetEOF();
// 为last_msg启动bthread处理请求
static void QueueMessage(InputMessageBase* to_run_msg,
                         int* num_bthread_created,
                         bthread_keytable_pool_t* keytable_pool) {
    if (!to_run_msg) {
        return;
    bthread_t th;
    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
                          BTHREAD_ATTR_PTHREAD :
                          BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
    tmp.keytable_pool = keytable_pool;
    if (bthread_start_background(
            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
        ++*num_bthread_created;
    } else {
        ProcessInputMessage(to_run_msg);
void* ProcessInputMessage(void* void_arg) {
    InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
    // 如果是baidu_rpc协议的话,msg->_process为:
    // baidu_rpc_protocal.h: ProcessRpcRequest
    msg->_process(msg);
    return NULL;

3.5 ProcessInputMessage —— 处理rpc请求

在读取rpc请求之后,已经确认了使用的是何种协议,我们使用的是baidu-rpc协议,
baidu-rpc协议的消息格式可以参看3.6 baidu-rpc协议
这里主要说明处理rpc请求的流程:

  • 解析出RpcMeta,主要包括这些字段:service_name、method_name、compress_type、attachment_size等(baidu_rpc_meta.proto)
  • 根据service_name、method_name找到对应的service和method
  • 解析业务定义的pb,比如:extent_io.proto等
  • 做一些初始化的操作,比如构建Closure
  • CallMethod就是我们实现的rpc接口,比如:rpc Write(WriteRequest) returns (WriteResponse)

具体逻辑见:

// source code: src/brpc/policy/baidu_rpc_policy.cpp
void ProcessRpcRequest(InputMessageBase* msg_base) {
    DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
    SocketUniquePtr socket_guard(msg->ReleaseSocket());
    Socket* socket = socket_guard.get();
    const Server* server = static_cast<const Server*>(msg_base->arg());
    ScopedNonServiceError non_service_error(server);
    // 解析RpcMeta
    // 这个是baidu-rpc协议定义的格式:baidu_rpc_meta.proto
    RpcMeta meta;
    if (!ParsePbFromIOBuf(&meta, msg->meta)) {
        LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;
        socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",
                          socket->description().c_str());
        return;
    const RpcRequestMeta &request_meta = meta.request();
    // 做一些请求初始化工作
    std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
    std::unique_ptr<google::protobuf::Message> req;
    std::unique_ptr<google::protobuf::Message> res;
    ServerPrivateAccessor server_accessor(server);
    ControllerPrivateAccessor accessor(cntl.get());
    const bool security_mode = server->options().security_mode() &&
                               socket->user() == server_accessor.acceptor();
    if (request_meta.has_log_id()) {
        cntl->set_log_id(request_meta.log_id());
    cntl->set_request_compress_type((CompressType)meta.compress_type());
    accessor.set_server(server)
        .set_security_mode(security_mode)
        .set_peer_id(socket->id())
        .set_remote_side(socket->remote_side())
        .set_local_side(socket->local_side())
        .set_auth_context(socket->auth_context())
        .set_request_protocol(PROTOCOL_BAIDU_STD)
        .set_begin_time_us(msg->received_us())
        .move_in_server_receiving_sock(socket_guard);
    MethodStatus* method_status = NULL;
    do {
        // 检查server状态
        if (!server->IsRunning()) {
            cntl->SetFailed(ELOGOFF, "Server is stopping");
            break;
        // 检查网络拥塞、检查并发
        if (socket->is_overcrowded()) {
            cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
                            butil::endpoint2str(socket->remote_side()).c_str());
            break;
        if (!server_accessor.AddConcurrency(cntl.get())) {
            cntl->SetFailed(
                ELIMIT, "Reached server's max_concurrency=%d",
                server->options().max_concurrency);
            break;
        if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
            cntl->SetFailed(ELIMIT, "Too many user code to run when"
                            " -usercode_in_pthread is on");
            break;
        // 根据RpcMeta找到具体的service和method
        // 并初始化request和response
        butil::StringPiece svc_name(request_meta.service_name());
        if (svc_name.find('.') == butil::StringPiece::npos) {
            const Server::ServiceProperty* sp =
                server_accessor.FindServicePropertyByName(svc_name);
            if (NULL == sp) {
                cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
                                request_meta.service_name().c_str());
                break;
            svc_name = sp->service->GetDescriptor()->full_name();
        const Server::MethodProperty* mp =
            server_accessor.FindMethodPropertyByFullName(
                svc_name, request_meta.method_name());
        if (NULL == mp) {
            cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
                            request_meta.service_name().c_str(),
                            request_meta.method_name().c_str());
            break;
        } else if (mp->service->GetDescriptor()
                   == BadMethodService::descriptor()) {
            BadMethodRequest breq;
            BadMethodResponse bres;
            breq.set_service_name(request_meta.service_name());
            mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
            break;
        // Switch to service-specific error.
        non_service_error.release();
        method_status = mp->status;
        if (method_status) {
            int rejected_cc = 0;
            if (!method_status->OnRequested(&rejected_cc)) {
                cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                                mp->method->full_name().c_str(), rejected_cc);
                break;
        google::protobuf::Service* svc = mp->service;
        const google::protobuf::MethodDescriptor* method = mp->method;
        accessor.set_method(method);
        if (span) {
            span->ResetServerSpanName(method->full_name());
        const int reqsize = static_cast<int>(msg->payload.size());
        butil::IOBuf req_buf;
        butil::IOBuf* req_buf_ptr = &msg->payload;
        if (meta.has_attachment_size()) {
            if (reqsize < meta.attachment_size()) {
                cntl->SetFailed(EREQUEST,
                    "attachment_size=%d is larger than request_size=%d",
                     meta.attachment_size(), reqsize);
                break;
            int att_size = reqsize - meta.attachment_size();
            msg->payload.cutn(&req_buf, att_size);
            req_buf_ptr = &req_buf;
            cntl->request_attachment().swap(msg->payload);
        CompressType req_cmp_type = (CompressType)meta.compress_type();
        req.reset(svc->GetRequestPrototype(method).New());
        if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
            cntl->SetFailed(EREQUEST, "Fail to parse request message, "
                            "CompressType=%s, request_size=%d", 
                            CompressTypeToCStr(req_cmp_type), reqsize);
            break;
        res.reset(svc->GetResponsePrototype(method).New());
        // 构建closure,主要是在CallMethod之后,
        // 调用SendRpcResponse发送响应
        google::protobuf::Closure* done = ::brpc::NewCallback<
            int64_t, Controller*, const google::protobuf::Message*,
            const google::protobuf::Message*, const Server*,
            MethodStatus*, int64_t>(
                &SendRpcResponse, meta.correlation_id(), cntl.get(), 
                req.get(), res.get(), server,
                method_status, msg->received_us());
        // optional, just release resourse ASAP
        msg.reset();
        req_buf.clear();
        // 调用CallMethod
        if (!FLAGS_usercode_in_pthread) {
            // 正常就在原地调用CallMethod
            return svc->CallMethod(method, cntl.release(), 
                                   req.release(), res.release(), done);
        // 在开启usercode_in_pthread选项的CallMethod流程
        if (BeginRunningUserCode()) {
            // 在并发不大的情况下,原地调用CallMethod
            svc->CallMethod(method, cntl.release(), 
                            req.release(), res.release(), done);
            return EndRunningUserCodeInPlace();
        } else {
            // 在并发比较大的情况下,将CallMethod入队
            // 由usercode后台线程处理CallMethod
            return EndRunningCallMethodInPool(
                svc, method, cntl.release(),
                req.release(), res.release(), done);
    } while (false);
    // 在出错的情况下,发送异常响应
    SendRpcResponse(meta.correlation_id(), cntl.release(), 
                    req.release(), res.release(), server,
                    method_status, msg->received_us());

3.6 baidu-rpc协议

如下是baidu-rpc整体的协议格式:
baidu-rpc协议格式

RPC Header表明RPC协议类型,以及meta和body的长度,用于message的切分。该字段的解析是在接收请求的阶段,序列化是在SendResponse阶段。

RPC Meta包含compress_type、attachment_size等信息,如果该message是请求,那么里面包含了service-name、method-name等信息;如果message是响应,那么里面包含了错误码信息。

Request Body是用户定义的协议数据以及attachment数据(可选),比如:echo.proto等。

关于协议格式的定义以及实现见:

  • 协议格式:src/brpc/policy/baidu_rpc_meta.proto;
  • 协议实现:src/brpc/policy/baidu_rpc_protocol.cpp。

4. Socket通信

Socket连接是在监听fd触发了Pollin事件时创建,具体实现见:Socket::Create(src/brpc/socket.cpp)。
从Socket上读写数据都做了并发控制,也就是说同一时间仅有一个bthread从Socket上读或者写数据。

4.1 读数据

从Socket中读取数据做了并发控制,同一时间仅有一个bthread从Socket中读取数据。这个并发控制的逻辑是在Socket::StartInputEvent通过一个原子变量做的:

// source code: src/brpc/socket.cpp
int Socket::StartInputEvent(SocketId id, uint32_t events,
                            const bthread_attr_t& thread_attr) {
    SocketUniquePtr s;
    if (Address(id, &s) < 0) {
        return -1;
	// 通过原子变量s->_nevent做了读的并发控制
	// 后来的读事件fetch_add之后直接返回,最先到的读事件会创建bthread,处理这批读事件。
    if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
        // According to the stats, above fetch_add is very effective. In a
        // server processing 1 million requests per second, this counter
        // is just 1500~1700/s
        g_vars->neventthread << 1;
        bthread_t tid;
        // transfer ownership as well, don't use s anymore!
        Socket* const p = s.release();
        bthread_attr_t attr = thread_attr;
        attr.keytable_pool = p->_keytable_pool;
        if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
            LOG(FATAL) << "Fail to start ProcessEvent";
            ProcessEvent(p);
    return 0;

从Socket中读取数据是在Socket::DoRead,主要读取的逻辑其实是在IOBuf中:

// sorce code: src/brpc/socket.cpp
ssize_t Socket::DoRead(size_t size_hint) {
	// 忽略ssl的逻辑...
    return _read_buf.append_from_file_descriptor(fd(), size_hint);
// source code: src/butil/iobuf_inl.h
inline ssize_t IOPortal::append_from_file_descriptor(int fd, size_t max_count) {
    return pappend_from_file_descriptor(fd, -1, max_count);
// source code: src/butil/iobuf.cpp
ssize_t IOPortal::pappend_from_file_descriptor(
    int fd, off_t offset, size_t max_count) {
    iovec vec[MAX_APPEND_IOVEC];
    int nvec = 0;
    size_t space = 0;
    Block* prev_p = NULL;
    Block* p = _block;
    do {
    	// 根据要读的数据max_count,申请足够的block空间
        if (p == NULL) {
        	// 优先从tls的block链表中获取没用完的block,
        	// 如果从tls中没获取到,则创建一个新的block。
            p = iobuf::acquire_tls_block();
            if (BAIDU_UNLIKELY(!p)) {
                errno = ENOMEM;
                return -1;
            if (prev_p != NULL) {
                prev_p->portal_next = p;
            } else {
                _block = p;
        // 构造iovector
        vec[nvec].iov_base = p->data + p->size;
        vec[nvec].iov_len = std::min(p->left_space(), max_count - space);
        space += vec[nvec].iov_len;
        ++nvec;
        // MAX_APPEND_IOVEC为64
        if (space >= max_count || nvec >= MAX_APPEND_IOVEC) {
            break;
        prev_p = p;
        p = p->portal_next;
    } while (1);
	// 从socket的fd中读取数据
	// 并没有保证一定能读取到指定大小的数据
    ssize_t nr = readv(fd, vec, nvec);
    if (nr <= 0) {  // -1 or 0
        if (empty()) {
        	// 释放block,优先放到tls链表中,
        	// 如果tls链表长度=8,则释放。
            return_cached_blocks();
        return nr;
    size_t total_len = nr;
    do { // 将相关的block构建好ref,放到IOBuf中
        const size_t len = std::min(total_len, _block->left_space());
        total_len -= len;
        const IOBuf::BlockRef r = { _block->size, (uint32_t)len, _block };
        // 增加block的ref次数,增加之后ref次数>1
        _push_back_ref(r); 
        _block->size += len;
        if (_block->full()) { // 如果该block没有可用空间了,
            Block* const saved_next = _block->portal_next;
            _block->dec_ref();  // 则,降低ref次数,当ref变成0时,就施放资源。
            _block = saved_next;
    } while (total_len);
    return nr;

4.2 写数据

Server端的写数据主要是在CallMethod执行完成后,由Closure自动调用,主要实现就在SendRpcResponse中,SendRpcResponse会根据compress-type以及协议类型做序列化,之后调用Socket::Write接口将响应发给客户端。

由于CallMethod可能在不同的TaskGroup执行,也就是说存在多个线程同时往一个Socket fd发送消息的情况,而fd的写又不是原子的,所以如何高效率地排队不同线程写出的数据包是这里的关键。

brpc使用一种wait-free MPSC链表来实现这个功能。所有待写出的数据都放在一个单链表节点中,next指针初始化为一个特殊值(Socket::WriteRequest::UNCONNECTED)。当一个线程想写出数据前,它先尝试和对应的链表头(Socket::_write_head)做原子交换,返回值是交换前的链表头。如果返回值为空,说明它获得了写出的权利,它会在原地写一次数据。否则说明有另一个线程在写,它把next指针指向返回的头以让链表连通。正在写的线程之后会看到新的头并写出这块数据。

代码逻辑如下:

// source code: src/brpc/socket.cpp
// SendRpcResponse调用Socket::Write发送数据
// 该接口是线程安全的
int Socket::Write(SocketMessagePtr<>& msg, const WriteOptions* options_in) {
    WriteOptions opt = *options_in;
    // Set `req->next' to UNCONNECTED so that the KeepWrite thread will
    // wait until it points to a valid WriteRequest or NULL.
    req->next = WriteRequest::UNCONNECTED;
    req->id_wait = opt.id_wait;
    req->set_pipelined_count_and_user_message(opt.pipelined_count, msg.release(), opt.with_auth);
    return StartWrite(req, opt);
int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
	// _write_head是一个存放写请求的MPSC的链表的head,使用原子变量
    WriteRequest* const prev_head =
        _write_head.exchange(req, butil::memory_order_release);
    if (prev_head != NULL) {
    	// 已有其他线程在该Socket上做写操作了
    	// 那么就将当前的req通过“头插”的方式放到链表的前面
    	// 在其他线程完成之前的写操作后,会获取_write_head指针,处理后续的写
        req->next = prev_head;
        return 0;
	// 当prev_head==NULL,意味着没有其他线程在该Socket写
	// 那么,当前线程获得了该Socket的写权限
    int saved_errno = 0;
    bthread_t th;
    SocketUniquePtr ptr_for_keep_write;
    ssize_t nw = 0;
    req->next = NULL;
    // NOTE: Setup() MUST be called after Connect which may call app_connect,
    // which is assumed to run before any SocketMessage.AppendAndDestroySelf()
    // in some protocols(namely RTMP).
    req->Setup(this);
    // 执行一次写操作
    nw = req->data.cut_into_file_descriptor(fd());
    if (nw < 0) {
        // RTMP may return EOVERCROWDED
        if (errno != EAGAIN && errno != EOVERCROWDED) {
            saved_errno = errno;
            // EPIPE is common in pooled connections + backup requests.
            PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
            SetFailed(saved_errno, "Fail to write into %s: %s", 
                      description().c_str(), berror(saved_errno));
            goto FAIL_TO_WRITE;
    } else {
        AddOutputBytes(nw);
	// 如果req没有处理完,或者req->next有新的写请求
	// 那么后面会启动一个bthread处理
    if (IsWriteComplete(req, true, NULL)) {
        ReturnSuccessfulWriteRequest(req);
        return 0;
KEEPWRITE_IN_BACKGROUND:
    ReAddress(&ptr_for_keep_write);
    req->socket = ptr_for_keep_write.release();
    if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
                                 KeepWrite, req) != 0) {
        LOG(FATAL) << "Fail to start KeepWrite";
        KeepWrite(req);
    return 0;
// 异常处理。
FAIL_TO_WRITE:
    // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
    // `on_reset' callback inside the id object) so that we immediately
    // know this socket has failed inside the `on_reset' callback
    ReleaseAllFailedWriteRequests(req);
    errno = saved_errno;
    return -1;

5. bthread FAQ

1. bthread是协程(coroutine)吗?

严格意义上来说,bthread不是协程。我们常说的协程特指N:1线程库,即所有的协程运行于一个系统线程中,计算能力和各类eventloop库等价。

由于协程不跨线程,协程之间的切换不需要系统调用,可以非常快(100ns-200ns),受cache一致性的影响也小。

但代价是协程无法高效地利用多核,代码必须非阻塞,否则所有的协程都被卡住,对开发者要求苛刻。

协程的这个特点使其适合写运行时间确定的IO服务器,典型如http server,在一些精心调试的场景中,可以达到非常高的吞吐。

bthread是一个M:N线程库,一个bthread被卡住不会影响其他bthread。
有两个关键技术:

  • work stealing调度:让bthread更快地被调度到更多的核心上
  • butex:让bthread和pthread可以相互等待和唤醒。

2. 我应该在程序中多使用bthread吗?

不应该。除非你需要在一次RPC过程中让一些代码并发运行,你不应该直接调用bthread函数,把这些留给brpc做更好。

3. bthread和pthread worker如何对应?

pthread worker在任何时间只会运行一个bthread,当前bthread挂起时,pthread worker先尝试从本地runqueue弹出一个待运行的bthread,若没有,则随机偷另一个worker的待运行bthread,仍然没有才睡眠并会在有新的待运行bthread时被唤醒。

4. bthread中能调用阻塞的pthread或系统函数吗?

可以,只阻塞当前pthread worker。其他pthread worker不受影响。

5. 一个bthread阻塞会影响其他bthread吗?

不影响。若bthread因bthread API而阻塞,它会把当前pthread worker让给其他bthread。若bthread因pthread API或系统函数而阻塞,当前pthread worker上待运行的bthread会被其他空闲的pthread worker偷过去运行。

6. pthread中可以调用bthread API吗?

可以。bthread API在bthread中被调用时影响的是当前bthread,在pthread中被调用时影响的是当前pthread。使用bthread API的代码可以直接运行在pthread中。

7. 若有大量的bthread调用了阻塞的pthread或系统函数,会影响RPC运行么?

会。比如有8个pthread worker,当有8个bthread都调用了系统usleep()后,处理网络收发的RPC代码就暂时无法运行了。只要阻塞时间不太长, 这一般没什么影响,毕竟worker都用完了,除了排队也没有什么好方法。在brpc中用户可以选择调大worker数来缓解问题,在server端可设置ServerOptions.num_threads或-bthread_concurrency,在client端可设置-bthread_concurrency。

8. bthread会有Channel吗?

不会。channel代表的是两点间的关系,而很多现实问题是多点的,这个时候使用channel最自然的解决方案就是:有一个角色负责操作某件事情或某个资源,其他线程都通过channel向这个角色发号施令。如果我们在程序中设置N个角色,让它们各司其职,那么程序就能分类有序地运转下去。所以使用channel的潜台词就是把程序划分为不同的角色。channel固然直观,但是有代价:额外的上下文切换。做成任何事情都得等到被调用处被调度,处理,回复,调用处才能继续。这个再怎么优化,再怎么尊重cache locality,也是有明显开销的。另外一个现实是:用channel的代码也不好写。由于业务一致性的限制,一些资源往往被绑定在一起,所以一个角色很可能身兼数职,但它做一件事情时便无法做另一件事情,而事情又有优先级。各种打断、跳出、继续形成的最终代码异常复杂。

  • https://github.com/apache/incubator-brpc
  • https://zhuanlan.zhihu.com/p/113427004
从一个server的启动过程谈起,我们这里以echo server为例: int main(int argc, char* argv[]) { // gflags介绍:https://blog.csdn.net/lezardfu/article/details/23753741 // Parse gflags. We recommend you to use gflags as well. GFLAGS_NS::ParseCommandLineFlags(& 理解;同个账号下你能用多少个cmd窗口登录mysql的最大数量。 这里要注意navicat等客户端工具,虽然只需要登录一次,但不代表只是打开了一个cmd窗口,具体数量视你的操作而定。(之所以不用线程来解释,是因为本渣目前对线程这个概念理解不够清晰。) localhost:root这个用户的实际最大值=max_user_c... 当执行hdfs dfs -ls /user/*/*/*/*命令时上报OutOfMemoryError,您可以执行类似的命令来获取目录。查看集群内每台节点中的“/etc/hosts”文件中是否加入了客户端节点的IP和主机名。检查ZooKeeper故障节点的“/etc/hosts”文件中,IP和主机名是否正确,是否有一个IP对应多个主机名,或者一个主机名对应多个IP的情况。建议在执行Spark开发程序之前,应先根据实际数据量,估算shuffle过程的数据的大小,配置足够的磁盘空间再提交应用程序。 Bthread是brpc用到的一个线程库,也是brpc的核心之一,默认情况下,包括用户代码在内的绝大部分代码都是运行在bthread里的,bthread也是brpc实现高性能的基石。 bhtread官方文档定义如下: bthread是baidu-rpc使用的M:N线程库,目的是在提高程序的并发度的同时,降低编码难度,并在核数日益增多的CPU上提供更好的scalability, cache loca... 服务器环境nginx+php5.7+mysql5.5程序莫名其妙502查看nginx日志发现报错:connect() failed (111: Connection refused) while connecting to upstreamnginx运行恒昌,重启nginx错误依然没解决,网上查找说这个错误的原因,一般情况下upstream都是PHP 造成的,查看php-fpm.log发现警告:W... bthread是一个M:N线程库,是brpc的核心组件。bthread实现了用户态上下文切换,主要有2个目标,一是降低编码难度,业务层可使用同步编程模式;二是在多核系统上取得更好的扩展性和局部性,提供更低的延时,更高的cpu利用率。 bthread和协程 谈到bthread,就不得不提协程。首先要强调的一点是,bthread不是协程。一般上我们说的协程的概念是M:1线程库,多个协程跑在一个底层pthread上,一个协程阻塞就会导致协程所在的pthread阻塞,进而该pthread上的其他协程都无法被调度 转载:https://blog.csdn.net/okiwilldoit/article/details/82755526 bthread是brpc使用的M:N线程库,目的是在提高程序的并发度的同时,降低编码难度,并在核数日益增多的CPU上提供更好的scalabilitycache locality。 ”M:N“是指M个bthread会映射至N个pthread,一般M远大于N。 由于linux当下的pthread实现(NPTL)是1:1的,M个bthread也相当于映射至N个LWP。bthread的 正如标题所说,brpc的精华全部都在bthread上,而bthread就是我们brpc开发的一套“协程”。而进程,线程,和bthread的关系是什么样的呢?一个进程里面可以开辟多个线程,而线程和协程的关系呢。在微信开源的libco上,线程 :协程 = 1 :N。而在bthread线程 :协程 = M :N,而bthread实现的关键就是工作窃取算法。后续会展开描述。 bthread有三大件,TaskControl(进程内唯一),TaskGroup(线程内唯一),TaskMeta(bthread上下文)。 作为rpc服务器,在启动过后,最主要的一个过程就是收到请求后的处理,而这就牵涉到一个网络编程相关最基本的部分:如何有效地处理socket传过来地数据。最典型的实现就是区分I/O线程和工作线程,一个或多个I/O线程负责从socket读取数据放入一个队列,然后一堆worker线程来从队列里取数据并处理,或者I/O线程读完数据直接交给worker,此类严格区分I/O线程和worker线程的机制会有几种典...