Cpp 多线程学习笔记——4. 条件变量
介绍
在C++中通过<condition_variable>
头文件提供了条件变量类型std::condition_variable
和std::condition_variable_any
,
条件变量提供了一种线程间的同步机制:允许一个或多个线程等待某个条件变为真,同时另一个线程可以改变这个条件并通知等待的线程。
条件变量通常需要和同步锁结合使用。
std::condition_variable_any
与std::condition_variable
非常类似,
区别仅仅是std::condition_variable
的wait
函数只能接受std::unique_lock<std::mutex>
类型的参数,
而std::condition_variable_any
可以接受任何lockable
参数,相应地需要付出额外开销。
除此以外,两者的使用几乎完全一样。
下文中只讨论std::condition_variable
的使用,一般也不推荐使用std::condition_variable_any
。
使用方法
std::condition_variable
提供了几种方法来支持线程间的同步,主要包括等待和通知两类方法:
等待方法有如下几种:
wait(lock)
:使当前线程进入等待状态,直到被通知或被中断- 这里的
lock
是一个std::unique_lock<std::mutex>
类型变量,代表一个已经锁定的互斥锁,用于保护共享资源 - 在调用
wait
后,当前线程执行流暂停,拥有的互斥锁会被解锁,等待期间不会持有锁。只有当线程被唤醒时,才会自动重新锁定该互斥锁,然后继续执行
- 这里的
wait_for(lock, duration)
:在wait(lock)
的基础上加上超时机制- 线程会等待一段持续时间,在等待过程中放弃互斥锁
- 如果线程在持续时间内被唤醒,则尝试重写加锁,返回
std::cv_status::no_timeout
- 如果线程等待超时,仍然会主动唤醒自身,并尝试重新加锁,返回
std::cv_status::timeout
wait_until(lock, time_point)
:和wait_for(lock, duration)
基本一致,只是把等待持续时间改成了等待到固定时刻。
通知方法有如下两种:
notify_one()
:唤醒等待在该条件变量上的一个线程(如果有线程在等待)。被唤醒的线程将重新获得互斥锁的控制权,并继续执行。notify_all()
:唤醒所有等待在该条件变量上的线程。所有线程都会重新尝试获取互斥锁,这可能导致优先级反转等问题,需要谨慎使用。
关于条件变量的唤醒其实没这么简单,还会存在虚假唤醒等问题,这里不做讨论。
wait
方法的默认参数形式如下 1
void wait(std::unique_lock<std::mutex>& lock); // lock是一个已经获取并加锁的对象
除此之外,还可以支持加上一个可调用对象参数以支持更丰富的功能
1
2template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
它的含义是:当前线程陷入等待,被唤醒时会检查pred
,如果pred
返回true
则成功唤醒,如果返回false
则继续陷入等待。
对于wait_for
和wait_until
也有类似的重载版本。
示例代码
我们直接以示例代码来说明条件变量的使用。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
std::mutex mtx;
std::condition_variable cv;
bool ready = false; // 全局标志位
void printId(int index_id) {
std::unique_lock<std::mutex> lck(mtx); // 获取并加锁
// 第一阶段,标志位为false,进入while循环中,条件变量cv执行一次wait并陷入等待,释放锁
// 第二阶段,标志位在start()函数中被设置为true
// 第三阶段,通过条件变量cv唤醒线程,重新获取锁,跳出while循环
while (!ready) { cv.wait(lck); }
std::cout << "thread_id: " << std::this_thread::get_id()
<< " index_id: " << index_id << "\n";
}
void start() {
std::unique_lock<std::mutex> lck(mtx); // 获取并加锁
ready = true; // 改变全局标志位
cv.notify_all(); // 唤醒所有的等待线程
}
int main() {
std::vector<std::thread> threads;
threads.reserve(10);
for (size_t i = 0; i < 10; ++i) { threads.emplace_back(printId, i); }
std::cout << "create done.\n";
start();
for (auto &t : threads) { t.join(); }
std::cout << "process done.\n";
return 0;
}
运行结果如下(顺序随机) 1
2
3
4
5
6
7
8
9
10
11
12create done.
thread_id: 140041002432064 index_id: 9
thread_id: 140041069573696 index_id: 1
thread_id: 140041061180992 index_id: 2
thread_id: 140041052788288 index_id: 3
thread_id: 140041044395584 index_id: 4
thread_id: 140041036002880 index_id: 5
thread_id: 140041027610176 index_id: 6
thread_id: 140041019217472 index_id: 7
thread_id: 140041010824768 index_id: 8
thread_id: 140041077966400 index_id: 0
process done.
代码执行过程的详细解读如下:
- 初始化:互斥锁
mtx
、条件变量cv
和标志位ready
被初始化。 - 创建线程:在主函数中创建一个线程向量 threads
并预分配空间。启动10个线程,每个线程执行
printId
函数,并传入一个唯一的ID。 - 所有线程就绪:每一个线程依次开始执行
printId
函数,并陷入等待,具体情况为:- 某一个线程成功获取到互斥锁
mtx
并继续执行(其它线程阻塞式地等待加锁) - 由于
ready
初始值为false
,线程进入while
循环并执行一次cv.wait(lck)
进入等待状态,释放互斥锁mtx
。 - 其它线程依次获取到互斥锁
mtx
,同样地进入等待状态,释放互斥锁mtx
- 某一个线程成功获取到互斥锁
- 主线程在主函数中调用
start
函数:- 主线程获取到互斥锁
mtx
并加锁,这也意味着前面所有的子线程都已经等待就绪 - 修改全局标志位
ready
为true
,所有子线程的while
循环将失效 - 调用
cv.notify_all()
唤醒所有等待线程。
- 主线程获取到互斥锁
- 所有线程被唤醒并执行:
- 虽然所有子线程都被唤醒,但是只有一个线程可以立刻获取到互斥锁
mtx
(其它线程阻塞式地等待加锁) - 获取到互斥锁的线程继续执行,打印信息,执行完成并释放互斥锁
- 其它线程依次获取到互斥锁
mtx
,然后打印信息,执行完成
- 虽然所有子线程都被唤醒,但是只有一个线程可以立刻获取到互斥锁
- 结束部分:主线程等待所有子线程
join
,然后程序正常退出
需要注意的是,代码中使用了while
循环而非if
来检查全局标志位,两种做法的对应代码如下
1
2
3while (!ready) { cv.wait(lck); }
if (!ready) { cv.wait(lck); }
两者在使用效果上是几乎一样,但是因为条件变量可能存在虚假唤醒的问题,使用while
循环结合全局标志位可以有效地避免这种错误情况,因此普通的做法是放在while
循环之中。