介绍

在C++中通过<condition_variable>头文件提供了条件变量类型std::condition_variablestd::condition_variable_any, 条件变量提供了一种线程间的同步机制:允许一个或多个线程等待某个条件变为真,同时另一个线程可以改变这个条件并通知等待的线程。 条件变量通常需要和同步锁结合使用。

std::condition_variable_anystd::condition_variable非常类似, 区别仅仅是std::condition_variablewait函数只能接受std::unique_lock<std::mutex>类型的参数, 而std::condition_variable_any可以接受任何lockable参数,相应地需要付出额外开销。 除此以外,两者的使用几乎完全一样。

下文中只讨论std::condition_variable的使用,一般也不推荐使用std::condition_variable_any

使用方法

std::condition_variable提供了几种方法来支持线程间的同步,主要包括等待和通知两类方法:

等待方法有如下几种:

  • wait(lock):使当前线程进入等待状态,直到被通知或被中断
    • 这里的lock是一个std::unique_lock<std::mutex>类型变量,代表一个已经锁定的互斥锁,用于保护共享资源
    • 在调用wait后,当前线程执行流暂停,拥有的互斥锁会被解锁,等待期间不会持有锁。只有当线程被唤醒时,才会自动重新锁定该互斥锁,然后继续执行
  • wait_for(lock, duration):在wait(lock)的基础上加上超时机制
    • 线程会等待一段持续时间,在等待过程中放弃互斥锁
    • 如果线程在持续时间内被唤醒,则尝试重写加锁,返回std::cv_status::no_timeout
    • 如果线程等待超时,仍然会主动唤醒自身,并尝试重新加锁,返回std::cv_status::timeout
  • wait_until(lock, time_point):和wait_for(lock, duration)基本一致,只是把等待持续时间改成了等待到固定时刻。

通知方法有如下两种:

  • notify_one():唤醒等待在该条件变量上的一个线程(如果有线程在等待)。被唤醒的线程将重新获得互斥锁的控制权,并继续执行。
  • notify_all():唤醒所有等待在该条件变量上的线程。所有线程都会重新尝试获取互斥锁,这可能导致优先级反转等问题,需要谨慎使用。

关于条件变量的唤醒其实没这么简单,还会存在虚假唤醒等问题,这里不做讨论。

wait方法的默认参数形式如下

1
void wait(std::unique_lock<std::mutex>& lock); // lock是一个已经获取并加锁的对象

除此之外,还可以支持加上一个可调用对象参数以支持更丰富的功能

1
2
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);

它的含义是:当前线程陷入等待,被唤醒时会检查pred,如果pred返回true则成功唤醒,如果返回false则继续陷入等待。 对于wait_forwait_until也有类似的重载版本。

示例代码

我们直接以示例代码来说明条件变量的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
bool ready = false; // 全局标志位

void printId(int index_id) {
std::unique_lock<std::mutex> lck(mtx); // 获取并加锁

// 第一阶段,标志位为false,进入while循环中,条件变量cv执行一次wait并陷入等待,释放锁
// 第二阶段,标志位在start()函数中被设置为true
// 第三阶段,通过条件变量cv唤醒线程,重新获取锁,跳出while循环
while (!ready) { cv.wait(lck); }

std::cout << "thread_id: " << std::this_thread::get_id()
<< " index_id: " << index_id << "\n";
}

void start() {
std::unique_lock<std::mutex> lck(mtx); // 获取并加锁
ready = true; // 改变全局标志位
cv.notify_all(); // 唤醒所有的等待线程
}

int main() {
std::vector<std::thread> threads;

threads.reserve(10);
for (size_t i = 0; i < 10; ++i) { threads.emplace_back(printId, i); }
std::cout << "create done.\n";

start();

for (auto &t : threads) { t.join(); }

std::cout << "process done.\n";
return 0;
}

运行结果如下(顺序随机)

1
2
3
4
5
6
7
8
9
10
11
12
create done.
thread_id: 140041002432064 index_id: 9
thread_id: 140041069573696 index_id: 1
thread_id: 140041061180992 index_id: 2
thread_id: 140041052788288 index_id: 3
thread_id: 140041044395584 index_id: 4
thread_id: 140041036002880 index_id: 5
thread_id: 140041027610176 index_id: 6
thread_id: 140041019217472 index_id: 7
thread_id: 140041010824768 index_id: 8
thread_id: 140041077966400 index_id: 0
process done.

代码执行过程的详细解读如下:

  • 初始化:互斥锁 mtx、条件变量 cv 和标志位 ready 被初始化。
  • 创建线程:在主函数中创建一个线程向量 threads 并预分配空间。启动10个线程,每个线程执行 printId 函数,并传入一个唯一的ID。
  • 所有线程就绪:每一个线程依次开始执行 printId 函数,并陷入等待,具体情况为:
    • 某一个线程成功获取到互斥锁mtx并继续执行(其它线程阻塞式地等待加锁)
    • 由于ready初始值为false,线程进入while循环并执行一次cv.wait(lck)进入等待状态,释放互斥锁mtx
    • 其它线程依次获取到互斥锁mtx,同样地进入等待状态,释放互斥锁mtx
  • 主线程在主函数中调用start函数:
    • 主线程获取到互斥锁mtx并加锁,这也意味着前面所有的子线程都已经等待就绪
    • 修改全局标志位readytrue,所有子线程的while循环将失效
    • 调用cv.notify_all()唤醒所有等待线程。
  • 所有线程被唤醒并执行:
    • 虽然所有子线程都被唤醒,但是只有一个线程可以立刻获取到互斥锁mtx(其它线程阻塞式地等待加锁)
    • 获取到互斥锁的线程继续执行,打印信息,执行完成并释放互斥锁
    • 其它线程依次获取到互斥锁mtx,然后打印信息,执行完成
  • 结束部分:主线程等待所有子线程join,然后程序正常退出

需要注意的是,代码中使用了while循环而非if来检查全局标志位,两种做法的对应代码如下

1
2
3
while (!ready) { cv.wait(lck); }

if (!ready) { cv.wait(lck); }

两者在使用效果上是几乎一样,但是因为条件变量可能存在虚假唤醒的问题,使用while循环结合全局标志位可以有效地避免这种错误情况,因此普通的做法是放在while循环之中。