相关文章推荐

条件变量 condition_variable 的使用及陷阱

最近看代码发现,在多线程中实现有关throttle和阻塞等有关的功能时,条件变量的使用是最常见的。

首先先对条件变量有个基本的认识

条件变量的基础知识

条件变量 std::condition_variable 定义在头文件 <condition_variable> 中。

条件变量用于阻塞一个或多个线程,直到某个线程修改线程间的共享变量,并通过 condition_variable 通知其余阻塞线程。从而使得已阻塞的线程可以继续处理后续的操作。

从条件变量的作用可以知道,在使用条件变量时,分为两个方面:

  • 用于通知已阻塞线程,共享变量已改变
  • 用于阻塞某一线程,直至该线程被唤醒
  • 可以分为两步:

  • 获取互斥量 std::mutex , 这个操作通常使用 std::lock_guard 来完成
  • 在持有锁的期间,在条件变量 std::condition_variable 上执行 notify_one 或者 notify_all 去唤醒阻塞线程。
  • 这里列出相应的函数原型:

    void notify_one() noexcept;
    void notify_all() noexcept;
    

    可以分为三步:

  • 使用std::unique_lock<std::mutex>来实现加锁操作,使得可以在相同的互斥量mutex上(不同的线程)保护共享变量。
  • 执行wait,wait_forwait_until。该操作能够原子性的释放互斥量mutex上的锁,并阻塞这个线程。
  • 当条件变量condition_variable被通知,超时,或虚假唤醒时,该线程结束阻塞状态,并自动的获取到互斥量mutex上的锁。当然,这里应该检查是否为虚假唤醒。
  • 这里列出相应的函数原型:

    void wait (unique_lock<mutex>& lck);
    template<class Pred> 
        void wait(unique_lock<mutex>& lock, Pred pred);
    template<class Clock, class Duration> 
        cv_status wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time);
    template<class Clock, class Duration, class Pred> 
        bool wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time, Pred pred);
    template<class Rep, class Preiod>
        cv_status wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time);
    template<class Rep, class Preiod, class Pred>
        bool wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time, Pred pred);
    

    这里先列出基本使用模板

    #include <condition_variable>
    #include <mutex>
    #include <thread>
    std::mutex lock;
    std::condition_variable condVar;
    bool dataReady{false};
    void waitingForWork() {
        std::cout << "Waiting ..." << std::endl;
        std::unique_lock<std::mutex> l(lock);
        condVar.wait(l, []{return dataReady;});           // (4)
        std::cout << "Running ..." << std::endl;
    void setDataReady() {
            std::lock_guard<std::mutex> l{lock};
            dataReady = true;
        std::cout << "Data prepared, notify one" << std::endl;
        condVar.notify_one();                             // (3)
    int main() {
        std::cout << "==========Begin==========" << std::endl;
        std::thread t1(waitingForWork);                    // (1)
        std::thread t2(setDataReady);                      // (2)
        t1.join();
        t2.join();
        std::cout << "===========End===========" << std::endl;
    

    这里同步工作是如何进行的呢?程序创建了两个线程t1(1)和t2(2),分别对应着waitingForWorksetDataReadysetDataReady进行通知,通过条件变量condVar来通知(3)它已经完成了前期的准备工作。而waitingForWork则在持有锁的期间,等待通知(4)。

    这里需要注意:收发方都需要同一把锁,对于发送着来说,使用std::lock_guard已经足够了,因为它只调用一次lockunlock,而对于接收着,必须使用std::unique_lock,因为频繁多次的lockunlock

    输出结果如下:

    注: 编译时注意添加-pthread选项,避免出现相关thread的错误。

    ==========Begin==========
    Waiting ...
    Data prepared, notify one
    Running ...
    ===========End===========
    

    那么这里就有疑问了,wait函数明明可以不加前置条件pred也可以使用。为什么非要将工作流程写的这个复杂呢?

    这里有一个基本的规则:无条件的等待可能错过唤醒,简单的唤醒却发现没有事可干。这意味这什么?条件变量可能是两个非常严重问题的受害者:唤醒丢失和虚假唤醒。

    唤醒丢失和虚假唤醒

  • 唤醒丢失: 唤醒丢失的现象是发送方在接收方进入等待状态之前发送通知。结果就是导致通知消失。C++标准以同时同步机制描述条件变量,“条件变量类是原始的,可同步的用于阻塞单个或多个线程,...”, 因此,当通知丢失后,接受方将一直处于等待状态。
  • 虚假唤醒: 尽管没有发生通知,但接受者也有可能会被唤醒。
  • 下面详细介绍下等待的工作流程:

    等待工作流程

    在等待的初始处理中,该线程锁定互斥锁,然后检查谓词[]{return dataReady;}(谓词:在计算机语言的环境下,谓词是指条件表达式的求值返回真或假的过程。)

  • 如果谓词被评估为:
  • true: 线程继续工作
  • false: condVar.wait()解锁互斥并将线程置于等待(阻塞)状态
  • 如果条件变量condVar处于等待状态并收到通知或被虚假唤醒,则会发生下面步骤:
  • 线程被解除阻塞,并重新获得互斥锁
  • 线程检查谓词
  • 如果谓词被评估为:
  • true: 线程继续其工作
  • false: condVar.wait()解锁互斥并将线程置于等待(阻塞)状态
  • void waitingForWork() { std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "Waiting ..." << std::endl; std::unique_lock<std::mutex> l(lock); condVar.wait(l); //(1) std::cout << "Running ..." << std::endl; void setDataReady() { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Data prepared, notify one" << std::endl; condVar.notify_one(); //(2) int main() { std::cout << "==========Begin==========" << std::endl; std::thread t1(waitingForWork); std::thread t2(setDataReady); t1.join(); t2.join(); std::cout << "===========End===========" << std::endl;

    现在,wait的调用没有使用谓词,这样的同步看起来相当的简单。但是遗憾的是,这中情况会导致唤醒丢失。下面的结果展示了唤醒丢失导致了死锁。当然,这里为了100%必现唤醒丢失现象,我在两者间加了不同的延迟。对于不信任第一个模板的,也可以添加延时进行测试。

    运行结果是什么呢?

    ==========Begin==========
    Data prepared, notify one
    Waiting ...
    

    好吧,教训是艰难的,谓词是肯定的。难道没有别的简单的方式?

    atomic谓词

    可能你已经注意到了,变量dataReady仅仅只是一个布尔类型,那么使用atomic boolean,去掉发送者的锁呢?

    //conditionVariablesAtomic.cpp
    #include <condition_variable>
    #include <mutex>
    #include <thread>
    #include <atomic>
    std::mutex lock;
    std::condition_variable condVar;
    std::atomic<bool> dataReady{false};
    void waitingForWork() {
        std::cout << "Waiting ..." << std::endl;
        std::unique_lock<std::mutex> l(lock);
        condVar.wait(l, []{return dataReady.load();});
        std::cout << "Running ..." << std::endl;
    void setDataReady() {
        dataReady = true;
        std::cout << "Data prepared, notify one" << std::endl;
        condVar.notify_one();
    int main() {
        std::cout << "==========Begin==========" << std::endl;
        std::thread t1(waitingForWork);
        std::thread t2(setDataReady);
        t1.join();
        t2.join();
        std::cout << "===========End===========" << std::endl;
    

    因为dataReady不用互斥量保护,相比第一个版本,相对来说比较简单了。但是这存在一种竞争情况,可能造成死锁。

    wait表达式等价于下面四行:

    std::unique_lock<std::mutex> l{lock}
    while(![]{return dataReady.load();}) {
        //time window(1)
        condVar.wait(l);
    

    即使将dataReady设为原子性,也应该在持有互斥锁的情况下对它加锁;如果不是,则可能会发生已通知对等待线程的更改,但是不能正确同步,这种竞争状况可能会导致死锁。

    假设条件变量condVar在等待表达式中但不在等待状态时发送通知。这意味着线程的执行位于注释时间窗口(1)所在的源代码片段,结果就是通知丢失,然后,线程返回等待状态,大概率情况下可能会永久休眠。(这种情况会出现的一种可能,虚假唤醒发生,进入判断条件,条件不满足,在进入等待状态前,通知发生,然后就导致通知丢失了)。

    如果dataReady受互斥量保护,则不会发生这种情况。由于与互斥锁同步,因此条件变量仅在接收方处于等待状态时才发送通知。换句话说,在dataReady更改时,接受方只能处于等待状态,更改完成后,发送通知,接收方就可以继续执行了

    C++ Core Guidelines: Be Aware of the Traps of Condition Variables

    standard library header <condition_variable>

    std::condition_variable

     
    推荐文章