条件变量
condition_variable
的使用及陷阱
最近看代码发现,在多线程中实现有关throttle和阻塞等有关的功能时,条件变量的使用是最常见的。
首先先对条件变量有个基本的认识
条件变量的基础知识
条件变量
std::condition_variable
定义在头文件
<condition_variable>
中。
条件变量用于阻塞一个或多个线程,直到某个线程修改线程间的共享变量,并通过
condition_variable
通知其余阻塞线程。从而使得已阻塞的线程可以继续处理后续的操作。
从条件变量的作用可以知道,在使用条件变量时,分为两个方面:
用于通知已阻塞线程,共享变量已改变
用于阻塞某一线程,直至该线程被唤醒
可以分为两步:
获取互斥量
std::mutex
, 这个操作通常使用
std::lock_guard
来完成
在持有锁的期间,在条件变量
std::condition_variable
上执行
notify_one
或者
notify_all
去唤醒阻塞线程。
这里列出相应的函数原型:
void notify_one() noexcept;
void notify_all() noexcept;
可以分为三步:
使用std::unique_lock<std::mutex>
来实现加锁操作,使得可以在相同的互斥量mutex
上(不同的线程)保护共享变量。
执行wait
,wait_for
或 wait_until
。该操作能够原子性的释放互斥量mutex
上的锁,并阻塞这个线程。
当条件变量condition_variable
被通知,超时,或虚假唤醒时,该线程结束阻塞状态,并自动的获取到互斥量mutex
上的锁。当然,这里应该检查是否为虚假唤醒。
这里列出相应的函数原型:
void wait (unique_lock<mutex>& lck);
template<class Pred>
void wait(unique_lock<mutex>& lock, Pred pred);
template<class Clock, class Duration>
cv_status wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time);
template<class Clock, class Duration, class Pred>
bool wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time, Pred pred);
template<class Rep, class Preiod>
cv_status wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time);
template<class Rep, class Preiod, class Pred>
bool wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time, Pred pred);
这里先列出基本使用模板
#include <condition_variable>
#include <mutex>
#include <thread>
std::mutex lock;
std::condition_variable condVar;
bool dataReady{false};
void waitingForWork() {
std::cout << "Waiting ..." << std::endl;
std::unique_lock<std::mutex> l(lock);
condVar.wait(l, []{return dataReady;}); // (4)
std::cout << "Running ..." << std::endl;
void setDataReady() {
std::lock_guard<std::mutex> l{lock};
dataReady = true;
std::cout << "Data prepared, notify one" << std::endl;
condVar.notify_one(); // (3)
int main() {
std::cout << "==========Begin==========" << std::endl;
std::thread t1(waitingForWork); // (1)
std::thread t2(setDataReady); // (2)
t1.join();
t2.join();
std::cout << "===========End===========" << std::endl;
这里同步工作是如何进行的呢?程序创建了两个线程t1
(1)和t2
(2),分别对应着waitingForWork
和setDataReady
。setDataReady
进行通知,通过条件变量condVar
来通知(3)它已经完成了前期的准备工作。而waitingForWork
则在持有锁的期间,等待通知(4)。
这里需要注意:收发方都需要同一把锁,对于发送着来说,使用std::lock_guard
已经足够了,因为它只调用一次lock
和unlock
,而对于接收着,必须使用std::unique_lock
,因为频繁多次的lock
和unlock
。
输出结果如下:
注: 编译时注意添加-pthread
选项,避免出现相关thread的错误。
==========Begin==========
Waiting ...
Data prepared, notify one
Running ...
===========End===========
那么这里就有疑问了,wait函数明明可以不加前置条件pred
也可以使用。为什么非要将工作流程写的这个复杂呢?
这里有一个基本的规则:无条件的等待可能错过唤醒,简单的唤醒却发现没有事可干。这意味这什么?条件变量可能是两个非常严重问题的受害者:唤醒丢失和虚假唤醒。
唤醒丢失和虚假唤醒
唤醒丢失: 唤醒丢失的现象是发送方在接收方进入等待状态之前发送通知。结果就是导致通知消失。C++标准以同时同步机制描述条件变量,“条件变量类是原始的,可同步的用于阻塞单个或多个线程,...”, 因此,当通知丢失后,接受方将一直处于等待状态。
虚假唤醒: 尽管没有发生通知,但接受者也有可能会被唤醒。
下面详细介绍下等待的工作流程:
等待工作流程
在等待的初始处理中,该线程锁定互斥锁,然后检查谓词[]{return dataReady;}
(谓词:在计算机语言的环境下,谓词是指条件表达式的求值返回真或假的过程。)
如果谓词被评估为:
true:
线程继续工作
false:
condVar.wait()
解锁互斥并将线程置于等待(阻塞)状态
如果条件变量condVar
处于等待状态并收到通知或被虚假唤醒,则会发生下面步骤:
线程被解除阻塞,并重新获得互斥锁
线程检查谓词
如果谓词被评估为:
true:
线程继续其工作
false:
condVar.wait()
解锁互斥并将线程置于等待(阻塞)状态
void waitingForWork() {
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Waiting ..." << std::endl;
std::unique_lock<std::mutex> l(lock);
condVar.wait(l); //(1)
std::cout << "Running ..." << std::endl;
void setDataReady() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Data prepared, notify one" << std::endl;
condVar.notify_one(); //(2)
int main() {
std::cout << "==========Begin==========" << std::endl;
std::thread t1(waitingForWork);
std::thread t2(setDataReady);
t1.join();
t2.join();
std::cout << "===========End===========" << std::endl;
现在,wait
的调用没有使用谓词,这样的同步看起来相当的简单。但是遗憾的是,这中情况会导致唤醒丢失。下面的结果展示了唤醒丢失导致了死锁。当然,这里为了100%必现唤醒丢失现象,我在两者间加了不同的延迟。对于不信任第一个模板的,也可以添加延时进行测试。
运行结果是什么呢?
==========Begin==========
Data prepared, notify one
Waiting ...
好吧,教训是艰难的,谓词是肯定的。难道没有别的简单的方式?
atomic
谓词
可能你已经注意到了,变量dataReady
仅仅只是一个布尔类型,那么使用atomic boolean
,去掉发送者的锁呢?
//conditionVariablesAtomic.cpp
#include <condition_variable>
#include <mutex>
#include <thread>
#include <atomic>
std::mutex lock;
std::condition_variable condVar;
std::atomic<bool> dataReady{false};
void waitingForWork() {
std::cout << "Waiting ..." << std::endl;
std::unique_lock<std::mutex> l(lock);
condVar.wait(l, []{return dataReady.load();});
std::cout << "Running ..." << std::endl;
void setDataReady() {
dataReady = true;
std::cout << "Data prepared, notify one" << std::endl;
condVar.notify_one();
int main() {
std::cout << "==========Begin==========" << std::endl;
std::thread t1(waitingForWork);
std::thread t2(setDataReady);
t1.join();
t2.join();
std::cout << "===========End===========" << std::endl;
因为dataReady不用互斥量保护,相比第一个版本,相对来说比较简单了。但是这存在一种竞争情况,可能造成死锁。
wait表达式等价于下面四行:
std::unique_lock<std::mutex> l{lock}
while(![]{return dataReady.load();}) {
//time window(1)
condVar.wait(l);
即使将dataReady设为原子性,也应该在持有互斥锁的情况下对它加锁;如果不是,则可能会发生已通知对等待线程的更改,但是不能正确同步,这种竞争状况可能会导致死锁。
假设条件变量condVar
在等待表达式中但不在等待状态时发送通知。这意味着线程的执行位于注释时间窗口(1)所在的源代码片段,结果就是通知丢失,然后,线程返回等待状态,大概率情况下可能会永久休眠。(这种情况会出现的一种可能,虚假唤醒发生,进入判断条件,条件不满足,在进入等待状态前,通知发生,然后就导致通知丢失了)。
如果dataReady
受互斥量保护,则不会发生这种情况。由于与互斥锁同步,因此条件变量仅在接收方处于等待状态时才发送通知。换句话说,在dataReady
更改时,接受方只能处于等待状态,更改完成后,发送通知,接收方就可以继续执行了
C++ Core Guidelines: Be Aware of the Traps of Condition Variables
standard library header <condition_variable>
std::condition_variable