互斥锁

线程安全与互斥锁

在多线程编程中,由于多个线程存在共享的资源(例如全局变量等),因此可能导致相互之间产生干扰, 下面的例子可以展示这种问题(必须使用Debug模式编译,因为Release模式下可能直接优化了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <thread>
#include <vector>

const int num_threads = 5;
const int num_increments = 10000;
const int num_experiments = 20;

void increment_counter(int &counter) {
for (int i = 0; i < num_increments; ++i) {
counter++; // 多线程同时修改共享变量
}
}

void run_experiment(int experiment_number) {
int shared_counter = 0;
std::vector<std::thread> threads;
threads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, std::ref(shared_counter));
}

for (auto &t : threads) { t.join(); }

// 检查结果
if (shared_counter != num_threads * num_increments) {
std::cout << "Error in experiment " << experiment_number
<< ". Counter value: " << shared_counter << '\n';
}
}

int main() {
for (int i = 0; i < num_experiments; ++i) { run_experiment(i + 1); }

return 0;
}

程序运行的结果可能与期望值不符,每一次执行得到的结果是随机的,通常比期望值更小,例如

1
2
3
4
Error in experiment 4. Counter value: 48882
Error in experiment 9. Counter value: 46751
Error in experiment 14. Counter value: 49241
Error in experiment 16. Counter value: 48416

这是因为多个线程在并发地对共享变量 shared_counter 进行递增操作,假如线程A和线程B在同一时刻读取到shared_counter的值为50,然后分别执行自增,线程A将51写入内存,线程B也将51写入内存,最终结果是51而非52。

这一类线程安全问题的本质在于,对于共享变量shared_counter的读写操作不是一步完成的(不是原子操作),而是被分解为几个步骤, 这导致多线程在同时进行读写时会出现混乱。一类最常见的解决办法就是使用锁进行保护,最常见的就是使用互斥锁,在<mutex>中提供了若干个互斥锁模型,我们先关注最重要的std::mutex互斥锁。

std::mutex

互斥锁是一个全局变量,它可以保证同一时刻只有一个线程可以持有,具体的逻辑如下:

  • 只有一个线程在调用lock()时可以成功加锁(变成持有锁的状态)并继续执行,其他线程在调用lock()时无法成功加锁,并且会陷入等待(相当于进入一个等待队列),直到持有锁的线程调用unlock()解锁;
  • 如果互斥锁被解锁,另一个陷入lock()等待的线程就可以成功加锁并开始执行,执行完毕后该线程需要调用unlock()解锁。(如果一直不解锁,则会导致程序陷入死锁状态)

加上互斥锁的示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

int shared_counter = 0;
std::mutex mtx; // 定义互斥锁

void increment_counter() {
for (int i = 0; i < 10000; ++i) {
mtx.lock(); // 加锁
shared_counter++;
mtx.unlock(); // 解锁
}
}

int main() {
std::vector<std::thread> threads;
threads.reserve(5);

for (int i = 0; i < 5; ++i) { threads.emplace_back(increment_counter); }

for (auto &t : threads) { t.join(); }

std::cout << "Final shared counter value: " << shared_counter << '\n';
return 0;
}

程序运行的结果全部满足期望,不会出现由多线程导致的随机结果。

对互斥锁的直接加锁操作中,如果加锁失败会陷入阻塞状态,为了避免陷入阻塞,我们可以使用try_lock()来尝试加锁,无论成功或失败都不会陷入阻塞,具体逻辑为:

  • 如果当前线程加锁成功,则返回true
  • 如果当前线程加锁失败,则返回false,但是不会陷入阻塞状态。

使用例如

1
2
3
4
5
6
if (mtx.try_lock()) {
// 成功获得锁
}
else {
// 锁已被占用
}

使用try_lock()的完整例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void threadFunction(int threadId) {
// 尝试锁定互斥量
if (mtx.try_lock()) {
std::cout << "Thread " << threadId << " acquired the lock\n";
// 执行一些操作
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
std::cout << "Thread " << threadId << " released the lock\n";
}
else {
std::cout << "Thread " << threadId << " couldn't acquire the lock\n";
}
}

int main() {
std::thread t1(threadFunction, 1);

std::this_thread::sleep_for(std::chrono::seconds(1));

std::thread t2(threadFunction, 2);

t1.join();
t2.join();

return 0;
}

输出如下,这里两个线程会先后获取到锁,因为两个线程的创建之间暂停了一段时间

1
2
3
4
Thread 1 acquired the lock
Thread 1 released the lock
Thread 2 acquired the lock
Thread 2 released the lock

如果注释该行来取消暂停时间,则两个线程中有一个就无法成功获取到锁,并进入else分支,输出如下

1
2
3
Thread Thread 2 acquired the lock
1 couldn't acquire the lock
Thread 2 released the lock

这里的输出比较混乱,但是仍然可以看出,线程2成功持有了锁,并进入if分支,而线程1进入了else分支。

更多互斥锁

<mutex>中实际上提供了四种互斥锁:

  • std::mutex: 基础的互斥锁
  • std::recursive_mutex: 支持递归的互斥锁
  • std::time_mutex: 支持定时的互斥锁
  • std::recursive_timed_mutex: 支持定时和递归的互斥锁

我们已经介绍了最基础的第一种互斥锁,后三种互斥锁其实只是对第一种互斥锁的补充:

  • std::mutex进行多次加锁会导致死锁,不适合在递归函数中使用,std::recursive_mutex允许持有的线程对其进行多次加锁,在解锁时也需要进行相同的多次解锁,适合在递归函数中使用;
  • std::mutex只有两种加锁方式:lock()try_lock(),在加锁失败时前者会陷入阻塞等待,后者则会立刻返回。在实际应用中,我们还需要一种中间策略:在加锁失败时尝试等待一段有限的时间,如果超时则失败返回。std::time_mutex提供的try_lock_for()方法接收一个时间参数,允许我们设置阻塞等待时间,如果互斥锁在这段时间内变得可用,则成功加锁并返回true,否则返回false

std::lock_guard

std::mutex的使用中,我们需要频繁地手动地加锁和开锁,忘记解锁更是会导致程序出错,C++提供了利用RAII的思想的互斥锁的守护对象std::lock_guard

std::lock_guard的核心功能实现很简单:构造时传入互斥锁对象来构造,在构造时自动调用lock(),在析构时自动调用unlock()

clang实现的源代码如下(删去了部分宏的细节)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <mutex>

template <class _Mutex>
class lock_guard {
public:
using mutex_type = _Mutex;

explicit lock_guard(mutex_type& m) : __m(m) {
__m.lock();
owns_lock = true;
}

lock_guard(mutex_type& m, std::adopt_lock_t) : __m(m), owns_lock(true) {}

~lock_guard() {
if (owns_lock) { __m.unlock(); }
}

lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;

private:
mutex_type& __m;
bool owns_lock = false;
};

std::lock_guard的使用示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

int shared_counter = 0;
std::mutex mtx; // 定义互斥锁

void increment_counter() {
for (int i = 0; i < 10000; ++i) {
std::lock_guard<std::mutex> lck(mtx); // 使用互斥锁保护共享资源
shared_counter++;
}
}

int main() {
std::vector<std::thread> threads;
threads.reserve(5);

for (int i = 0; i < 5; ++i) { threads.emplace_back(increment_counter); }

for (auto &t : threads) { t.join(); }

std::cout << "Final shared counter value: " << shared_counter << '\n';
return 0;
}

std::unique_lock

std::unique_lock是一个比std::lock_guard更复杂,支持更灵活管理互斥锁的对象:它支持在构造时或者构造后持有锁,支持在作用域范围内可以手动加锁和解锁,作用域结束时如果正在持有锁则自动解锁,相当于std::lock_guard的升级版。

std::unique_lock可以使用更灵活的管理策略,源码实现也更加复杂,需要结合布尔成员变量owns来理解,owns的具体含义为当前线程在当前状态是否处于持有互斥锁的状态。

构造函数如下:

  • 默认只传入一个互斥锁的引用,构造时会自动加锁(调用lock()并设置owns=true
  • 传入互斥锁和一个策略参数:
    • 延迟加锁策略:传入std::defer_lock,构造初始化owns=false,此后可以调用lock()加锁
    • 尝试加锁策略:传入std::try_to_lock,构造时会调用owns=try_lock(),尝试加锁但是不会陷入等待
    • 持有锁托管策略:传入std::adopt_lock,构造初始化owns=true,假定当前的线程已经持有互斥锁,只是托管给当前对象

析构的逻辑为:如果owns=true,意味着析构时仍然处于持有锁的状态,因此调用unlock()来解锁。

忽略涉及时间的接口,主要有如下的方法:

  • owns_lock():获取owns的值,即当前是否持有锁
  • lock():加锁,将owns从false切换为true
  • unlock():解锁,将owns从true切换为false
  • try_lock():尝试加锁,结果可以通过owns_lock()获取
  • mutex()release():获取管理的互斥锁,release()还会放弃自身对互斥锁的管理

std::unique_lock的clang实现如下(删去了部分的宏的细节,省略了涉及时间控制的接口try_lock_fortry_lock_until

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <mutex>
#include <system_error>

template <class _Mutex>
class unique_lock {
public:
using mutex_type = _Mutex;

// 默认构造,此时与lock_guard等价
explicit unique_lock(mutex_type& m) : __m_(std::addressof(m)), __owns_(true) {
__m_->lock();
}

// 构造函数,使用延迟加锁策略
unique_lock(mutex_type& m, std::defer_lock_t) noexcept
: __m_(std::addressof(m)), __owns_(false) {}


// 构造函数,使用尝试加锁策略
unique_lock(mutex_type& m, std::try_to_lock_t)
: __m_(std::addressof(m)), __owns_(__m_->try_lock()) {}

// 构造函数,使用持有锁托管策略
unique_lock(mutex_type& m, std::adopt_lock_t)
: __m_(std::addressof(m)), __owns_(true) {}

// 析构函数
~unique_lock() {
if (__owns_){ __m_->unlock(); }
}

// 移动构造函数
unique_lock(unique_lock&& u) noexcept
: __m_(u.__m_), __owns_(u.__owns_) {
u.__m_ = nullptr;
u.__owns_ = false;
}

// 赋值
unique_lock& operator=(unique_lock&& u) noexcept {
if (__owns_){ __m_->unlock(); }

__m_ = u.__m_;
__owns_ = u.__owns_;
u.__m_ = nullptr;
u.__owns_ = false;
return *this;
}

// 手动加锁
void lock() {
if (__m_ == nullptr)
__throw_system_error(EPERM, "unique_lock::lock: references null mutex");
if (__owns_)
__throw_system_error(EDEADLK, "unique_lock::lock: already locked");
__m_->lock();
__owns_ = true;
}

// 尝试手动加锁
bool try_lock() {
if (__m_ == nullptr)
__throw_system_error(EPERM, "unique_lock::try_lock: references null mutex");
if (__owns_)
__throw_system_error(EDEADLK, "unique_lock::try_lock: already locked");
__owns_ = __m_->try_lock();
return __owns_;
}

// 手动解锁
void unlock() {
if (!__owns_)
__throw_system_error(EPERM, "unique_lock::unlock: not locked");
__m_->unlock();
__owns_ = false;
}

// 获取互斥锁指针
mutex_type* mutex() const noexcept {
return __m_;
}

// 获取互斥锁指针,并且放弃对互斥锁的管理(析构时)
mutex_type* release() noexcept {
mutex_type* m = __m_;
__m_ = nullptr;
__owns_ = false;
return m;
}

// 判断owns的状态
bool owns_lock() const noexcept {
return __owns_;
}

explicit operator bool() const noexcept {
return __owns_;
}

void swap(unique_lock& u) noexcept {
std::swap(__m_, u.__m_);
std::swap(__owns_, u.__owns_);
}

private:
mutex_type* __m_;
bool __owns_;
};

不指定策略时,std::unique_lock的用法基本类似于std::lock_guard,因此无需举例。使用延迟加锁策略的例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void func() {
std::unique_lock<std::mutex> lck(mtx, std::defer_lock);
// ... do some work before locking

// Locking the mutex when needed
lck.lock();
// ... critical section
std::cout << "Inside critical section\n";
lck.unlock();
// ... do some other work
}

int main() {
std::thread t(func);
t.join();
return 0;
}

除此之外,C++还提供了std::shared_lock,它和std::unique_lock的区别在于,它允许多个线程同时读取共享资源, 但是如果有线程尝试写入资源,则会导致其它线程阻塞,直到写入操作完成。

原子操作

原子类型和原子操作

原子操作是指在多线程编程中对共享数据的操作是不可分割、不会被中断的操作, 这意味着操作不会被其他线程干扰,不会被调度切换,要么一次性执行完成,要么完全不执行,不存在第三种状态。 原子操作可以用于避免数据竞争和保证线程安全,不过显然我们需要为安全性付出额外的性能开销。

原子类型是一种特殊的数据类型,在底层保证对原子类型变量的相关操作是原子操作, 例如对原子类型变量的读取、写入、交换、递增、递减等。

这里我们再次重复前一篇使用的例子,只是改动了共享变量的定义:使用原子类型的整数变量shared_counter而非通常的整数变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

const int num_threads = 5;
const int num_increments = 10000;
const int num_experiments = 20;

void increment_counter(std::atomic<int> &counter) {
for (int i = 0; i < num_increments; ++i) {
counter++; // 多线程同时修改共享变量
}
}

void run_experiment(int experiment_number) {
std::atomic<int> shared_counter = 0;
std::vector<std::thread> threads;
threads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, std::ref(shared_counter));
}

for (auto &t : threads) { t.join(); }

// 检查结果
if (shared_counter != num_threads * num_increments) {
std::cout << "Error in experiment " << experiment_number
<< ". Counter value: " << shared_counter << '\n';
}
}

int main() {
for (int i = 0; i < num_experiments; ++i) { run_experiment(i + 1); }

return 0;
}

程序运行的结果全部满足期望,不会出现由多线程导致的随机结果。

这个例子中,我们使用了原子类型的整数变量作为共享变量解决了多线程问题。 由此可见,原子类型和原子操作是除了互斥锁之外的另一种避免多线程混乱的解决办法,相比于手动控制互斥锁的加锁解锁,原子操作是一种更轻量级的做法。

std::atomic

<atomic>包括std::atomic<T>模板类和相关的原子操作函数,为多线程编程提供了支持。

std::atomic<T>可以接受常见的基本数据类型,并且对常见数据类型定义了别名

1
2
3
4
5
6
7
8
9
std::atomic<bool>
std::atomic<char>
std::atomic<int>
std::atomic<long>

typedef std::atomic<bool> atomic_bool;
typedef std::atomic<char> atomic_char;
typedef std::atomic<int> atomic_int;
typedef std::atomic<double> atomic_long;

注意,默认并没有提供floatdouble对应的原子类型,并且通常的CPU架构不支持对浮点数的原子操作。

对于std::atomic<T>接受的自定义类型T,要求是Trivially Copyable的类型,简单来说需要满足三个条件:

  • 连续的内存布局
  • 拷贝是逐比特拷贝
  • 没有虚函数

从代码实现的角度,T需要满足下面的5个条件

1
2
3
4
5
std::is_trivially_copyable<T>::value = true
std::is_copy_constructible<T>::value = true
std::is_move_constructible<T>::value = true
std::is_copy_assignable<T>::value = true
std::is_move_assignable<T>::value = true

原子类型支持很多原子操作,最基本的包括storeloadexchange

1
2
3
4
5
6
7
8
9
10
std::atomic<int> atomicInt(0);

// 原子地写入值
atomicInt.store(10);

// 原子地读取值,返回当前值
int value = atomicInt.load();

// 原子地交换值,返回旧值
int oldValue_exchanged = atomicInt.exchange(20);

还有一些基本算术运算:加、减、按位与、按位或、按位异或

1
2
3
4
5
int oldValue1 = atomicInt.fetch_add(5);
int oldValue2 = atomicInt.fetch_sub(5);
int oldValue3 = atomicInt.fetch_and(5);
int oldValue4 = atomicInt.fetch_or(5);
int oldValue4 = atomicInt.fetch_xor(5);

对于原子类型也支持基于运算符的简单运算,编译器通常会将其转换为上述接口的调用

1
2
3
a = 10;
a++;
a += 2;

除此之外,还有两个附带条件判断的原子操作:compare_exchange_weakcompare_exchange_strong,它们的参数和语义是基本一致的: 提供一个期待值和一个新值,如果当前值是期待值,就写入新值并返回true;如果当前值不是期待值,则不作修改并返回false

1
2
3
int expected = 10;
bool exchanged1 = atomicInt.compare_exchange_weak(expected, 20);
bool exchanged2 = atomicInt.compare_exchange_strong(expected, 30);

两者的区别源于硬件实现方式的不同,在使用效果上的区别是:

  • *_weak的执行效率更高,适合放在循环中;*_strong的执行效率更低,适合一次性的操作;
  • *_weak的可靠性较低,可能出现虚假的失败;*_strong则会保证可靠性。

std::atomic<T>只是保证对应的操作是原子操作,但是并不保证原子操作在底层是否是通过加锁来实现的(这取决于不同平台的处理器实现),加锁通常意味着效率偏低, std::atomic<T>提供is_lock_free()成员函数可以用来判断对于此类型的底层操作是否是无锁的。平台通常可以保证对字节数不超过sizeof(void*)的平凡数据类型,对应的原子操作是无锁的。

std::atomic_flag

std::atomic_flag 可以理解为原子布尔类型,但是它并不等于std::atomic<bool>,而是比它更简单的标记类型, 它只提供很少的几个操作,但是保证操作都是无锁的(通常意味着操作更加高效)。

std::atomic_flag只支持test_and_set()以及clear()两个成员函数:

  • test_and_set(): 检查并修改std::atomic_flag的内部标志
    • 如果std::atomic_flag没有被标记,则进行标记,并返回false
    • 如果std::atomic_flag已被标记,则不做修改,并返回true
  • clear():清除std::atomic_flag的内部标记,保证下一次调用test_and_set()时会返回false

std::atomic_flag类型的变量通常使用宏ATOMIC_FLAG_INIT初始化,并且初始化之后的状态是未标记的。

std::atomic_flag 最主要的应用就是实现一个自旋锁(spin lock),见下文。

内存顺序模型

原子操作内存顺序模型是原子操作中一个核心概念,它定义了原子操作之间以及原子操作与非原子操作之间内存访问的顺序规则,从而确保在多线程环境下数据的一致性和可见性。

通常包括如下几种模型:(以枚举类std::memory_order提供)

  • std::memory_order_relaxed:最弱的内存顺序,不对其他内存操作施加顺序约束,只保证当前原子操作本身的原子性。
  • std::memory_order_consume:类似于std::memory_order_acquire,但更弱,主要用于依赖关系传递。
  • std::memory_order_acquire:修饰一个载入操作,表示在本线程中,所有后续的关于此变量的内存操作都必须在本条原子操作完成后执行,不会将后面的操作重排到前面。
  • std::memory_order_release:修饰一个存储操作,表示在本线程中,所有之前的针对该变量的内存操作完成后才能执行本条原子操作,不会将前面的操作重排到后面。
  • std::memory_order_acq_rel:结合了std::memory_order_acquirestd::memory_order_release的效果,适合修饰“读-改-写”操作。
  • std::memory_order_seq_cst:最强的内存顺序,所有线程的所有读写操作按全局顺序一致进行。(这是所有原子操作的默认方式)

对于前文中的函数调用方式的原子操作,我们都可以向其中传入内存顺序参数,分成三类:

  • 存储(store)操作,可选用的内存顺序有std::memory_order_relaxedstd::memory_order_releasestd::memory_order_seq_cst

  • 载入(load)操作,可选用的内存顺序有std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_seq_cst

  • “读-改-写”(read-modify-write)操作,可选用的参数为全部六种。

所有原子操作默认使用的内存顺序参数都是最强的std::memory_order_seq_cst

这部分内容不是很懂,因为已经涉及到CPU层面的很多细节了,通常编程中也不需要关注。

自旋锁

前文中我们使用互斥锁在确保线程同步,还有一种自旋锁也可以达到类似的目的,我们通过对比的方式介绍自旋锁:

  • 自旋锁 (Spinlock):
    • 实现相对简单,通常使用原子操作(如std::atomic_flag)来实现。
    • 当一个线程尝试获取锁但失败时,它会不断地检查锁的状态(自旋等待),直到锁可用。
    • 适用于锁持有时间非常短的情况,因为自旋等待会占用 CPU 资源。
  • 互斥锁 (Mutex):
    • 实现相对复杂,通常依赖于操作系统提供的同步原语。
    • 当一个线程尝试获取锁但失败时,它会被挂起,进入睡眠状态,直到锁可用。
    • 适用于锁持有时间较长的情况,因为线程挂起不会占用 CPU 资源。

简单来说,自旋锁更适合锁的持有时间较短的轻量级情景,它选择持续占用CPU而不是让系统将线程挂起再进行恢复。 即使如此,自旋锁仍然可以提示操作系统:当前线程愿意主动放弃其当前正在执行的 CPU 时间片,从而允许其他线程获取 CPU 执行时间, 在C++中调用std::this_thread::yield()可以达到这个目的,这个函数通常在自旋锁实现的while循环中使用。

基于std::atomic_flag实现自旋锁的代码比较简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <atomic>

class Spinlock {
public:
Spinlock() = default;

void lock() {
while (m_flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); // 自旋等待时让出 CPU
}
}

void unlock() { m_flag.clear(std::memory_order_release); }

private:
std::atomic_flag m_flag = ATOMIC_FLAG_INIT;
};

使用自旋锁代替互斥锁,重复之前的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

class Spinlock {
public:
Spinlock() = default;

void lock() {
while (m_flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); // 自旋等待时让出 CPU
}
}

void unlock() { m_flag.clear(std::memory_order_release); }

private:
std::atomic_flag m_flag = ATOMIC_FLAG_INIT;
};

Spinlock spinlock;

const int num_threads = 5;
const int num_increments = 10000;
const int num_experiments = 20;

void increment_counter(int &counter) {
for (int i = 0; i < num_increments; ++i) {
spinlock.lock();
counter++; // 多线程同时修改共享变量
spinlock.unlock();
}
}

void run_experiment(int experiment_number) {
int shared_counter = 0;
std::vector<std::thread> threads;
threads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, std::ref(shared_counter));
}

for (auto &t : threads) { t.join(); }

// 检查结果
if (shared_counter != num_threads * num_increments) {
std::cout << "Error in experiment " << experiment_number
<< ". Counter value: " << shared_counter << '\n';
}
}

int main() {
for (int i = 0; i < num_experiments; ++i) { run_experiment(i + 1); }

return 0;
}

程序运行的结果全部满足期望,不会出现由多线程导致的随机结果。

条件变量

基本概念

在C++中通过<condition_variable>头文件提供了条件变量类型std::condition_variablestd::condition_variable_any, 条件变量提供了一种线程间的同步机制:允许一个或多个线程等待某个条件变为真,同时另一个线程可以改变这个条件并通知等待的线程。 条件变量通常需要和同步锁结合使用。

std::condition_variable_anystd::condition_variable非常类似, 区别仅仅是std::condition_variablewait函数只能接受std::unique_lock<std::mutex>类型的参数, 而std::condition_variable_any可以接受任何lockable参数,相应地需要付出额外开销。 除此以外,两者的使用几乎完全一样。

下文中只讨论std::condition_variable的使用,一般也不推荐使用std::condition_variable_any

使用方法

std::condition_variable提供了几种方法来支持线程间的同步,主要包括等待和通知两类方法:

等待方法有如下几种:

  • wait(lock):使当前线程进入等待状态,直到被通知或被中断
    • 这里的lock是一个std::unique_lock<std::mutex>类型变量,代表一个已经锁定的互斥锁,用于保护共享资源
    • 在调用wait后,当前线程执行流暂停,拥有的互斥锁会被解锁,等待期间不会持有锁。只有当线程被唤醒时,才会自动重新锁定该互斥锁,然后继续执行
  • wait_for(lock, duration):在wait(lock)的基础上加上超时机制
    • 线程会等待一段持续时间,在等待过程中放弃互斥锁
    • 如果线程在持续时间内被唤醒,则尝试重写加锁,返回std::cv_status::no_timeout
    • 如果线程等待超时,仍然会主动唤醒自身,并尝试重新加锁,返回std::cv_status::timeout
  • wait_until(lock, time_point):和wait_for(lock, duration)基本一致,只是把等待持续时间改成了等待到固定时刻。

通知方法有如下两种:

  • notify_one():唤醒等待在该条件变量上的一个线程(如果有线程在等待)。被唤醒的线程将重新获得互斥锁的控制权,并继续执行。
  • notify_all():唤醒所有等待在该条件变量上的线程。所有线程都会重新尝试获取互斥锁,这可能导致优先级反转等问题,需要谨慎使用。

关于条件变量的唤醒其实没这么简单,还会存在虚假唤醒等问题,这里不做讨论。

wait方法的默认参数形式如下

1
void wait(std::unique_lock<std::mutex>& lock); // lock是一个已经获取并加锁的对象

除此之外,还可以支持加上一个可调用对象参数以支持更丰富的功能

1
2
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);

它的含义是:当前线程陷入等待,被唤醒时会检查pred,如果pred返回true则成功唤醒,如果返回false则继续陷入等待。 对于wait_forwait_until也有类似的重载版本。

示例代码

我们直接以示例代码来说明条件变量的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
bool ready = false; // 全局标志位

void printId(int index_id) {
std::unique_lock<std::mutex> lck(mtx); // 获取并加锁

// 第一阶段,标志位为false,进入while循环中,条件变量cv执行一次wait并陷入等待,释放锁
// 第二阶段,标志位在start()函数中被设置为true
// 第三阶段,通过条件变量cv唤醒线程,重新获取锁,跳出while循环
while (!ready) { cv.wait(lck); }

std::cout << "thread_id: " << std::this_thread::get_id()
<< " index_id: " << index_id << "\n";
}

void start() {
std::unique_lock<std::mutex> lck(mtx); // 获取并加锁
ready = true; // 改变全局标志位
cv.notify_all(); // 唤醒所有的等待线程
}

int main() {
std::vector<std::thread> threads;

threads.reserve(10);
for (size_t i = 0; i < 10; ++i) { threads.emplace_back(printId, i); }
std::cout << "create done.\n";

start();

for (auto &t : threads) { t.join(); }

std::cout << "process done.\n";
return 0;
}

运行结果如下(顺序随机)

1
2
3
4
5
6
7
8
9
10
11
12
create done.
thread_id: 140041002432064 index_id: 9
thread_id: 140041069573696 index_id: 1
thread_id: 140041061180992 index_id: 2
thread_id: 140041052788288 index_id: 3
thread_id: 140041044395584 index_id: 4
thread_id: 140041036002880 index_id: 5
thread_id: 140041027610176 index_id: 6
thread_id: 140041019217472 index_id: 7
thread_id: 140041010824768 index_id: 8
thread_id: 140041077966400 index_id: 0
process done.

代码执行过程的详细解读如下:

  • 初始化:互斥锁 mtx、条件变量 cv 和标志位 ready 被初始化。
  • 创建线程:在主函数中创建一个线程向量 threads 并预分配空间。启动10个线程,每个线程执行 printId 函数,并传入一个唯一的ID。
  • 所有线程就绪:每一个线程依次开始执行 printId 函数,并陷入等待,具体情况为:
    • 某一个线程成功获取到互斥锁mtx并继续执行(其它线程阻塞式地等待加锁)
    • 由于ready初始值为false,线程进入while循环并执行一次cv.wait(lck)进入等待状态,释放互斥锁mtx
    • 其它线程依次获取到互斥锁mtx,同样地进入等待状态,释放互斥锁mtx
  • 主线程在主函数中调用start函数:
    • 主线程获取到互斥锁mtx并加锁,这也意味着前面所有的子线程都已经等待就绪
    • 修改全局标志位readytrue,所有子线程的while循环将失效
    • 调用cv.notify_all()唤醒所有等待线程。
  • 所有线程被唤醒并执行:
    • 虽然所有子线程都被唤醒,但是只有一个线程可以立刻获取到互斥锁mtx(其它线程阻塞式地等待加锁)
    • 获取到互斥锁的线程继续执行,打印信息,执行完成并释放互斥锁
    • 其它线程依次获取到互斥锁mtx,然后打印信息,执行完成
  • 结束部分:主线程等待所有子线程join,然后程序正常退出

需要注意的是,代码中使用了while循环而非if来检查全局标志位,两种做法的对应代码如下

1
2
3
while (!ready) { cv.wait(lck); }

if (!ready) { cv.wait(lck); }

两者在使用效果上是几乎一样,但是因为条件变量可能存在虚假唤醒的问题,使用while循环结合全局标志位可以有效地避免这种错误情况,因此普通的做法是放在while循环之中。