线程安全与互斥锁

在多线程编程中,由于多个线程存在共享的资源(例如全局变量等),因此可能导致相互之间产生干扰, 下面的例子可以展示这种问题(必须使用Debug模式编译,因为Release模式下可能直接优化了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <thread>
#include <vector>

const int num_threads = 5;
const int num_increments = 10000;
const int num_experiments = 20;

void increment_counter(int &counter) {
for (int i = 0; i < num_increments; ++i) {
counter++; // 多线程同时修改共享变量
}
}

void run_experiment(int experiment_number) {
int shared_counter = 0;
std::vector<std::thread> threads;
threads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, std::ref(shared_counter));
}

for (auto &t : threads) { t.join(); }

// 检查结果
if (shared_counter != num_threads * num_increments) {
std::cout << "Error in experiment " << experiment_number
<< ". Counter value: " << shared_counter << std::endl;
}
}

int main() {
for (int i = 0; i < num_experiments; ++i) { run_experiment(i + 1); }

return 0;
}

程序运行的结果可能与期望值不符,每一次执行得到的结果是随机的,通常比期望值更小,例如

1
2
3
4
Error in experiment 4. Counter value: 48882
Error in experiment 9. Counter value: 46751
Error in experiment 14. Counter value: 49241
Error in experiment 16. Counter value: 48416

这是因为多个线程在并发地对共享变量 shared_counter 进行递增操作,假如线程A和线程B在同一时刻读取到shared_counter的值为50,然后分别执行自增,线程A将51写入内存,线程B也将51写入内存,最终结果是51而非52。

这一类线程安全问题的本质在于,对于共享变量shared_counter的读写操作不是一步完成的(不是原子操作),而是被分解为几个步骤, 这导致多线程在同时进行读写时会出现混乱。一类最常见的解决办法就是使用锁进行保护,最常见的就是使用互斥锁,在<mutex>中提供了若干个互斥锁模型,我们先关注最重要的std::mutex互斥锁。

std::mutex

互斥锁是一个全局变量,它可以保证同一时刻只有一个线程可以持有,具体的逻辑如下:

  • 只有一个线程在调用lock()时可以成功加锁(变成持有锁的状态)并继续执行,其他线程在调用lock()时无法成功加锁,并且会陷入等待(相当于进入一个等待队列),直到持有锁的线程调用unlock()解锁;
  • 如果互斥锁被解锁,另一个陷入lock()等待的线程就可以成功加锁并开始执行,执行完毕后该线程需要调用unlock()解锁。(如果一直不解锁,则会导致程序陷入死锁状态)

加上互斥锁的示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

int shared_counter = 0;
std::mutex mtx; // 定义互斥锁

void increment_counter() {
for (int i = 0; i < 10000; ++i) {
mtx.lock(); // 加锁
shared_counter++;
mtx.unlock(); // 解锁
}
}

int main() {
std::vector<std::thread> threads;
threads.reserve(5);

for (int i = 0; i < 5; ++i) { threads.emplace_back(increment_counter); }

for (auto &t : threads) { t.join(); }

std::cout << "Final shared counter value: " << shared_counter << std::endl;
return 0;
}

程序运行的结果全部满足期望,不会出现由多线程导致的随机结果。

对互斥锁的直接加锁操作中,如果加锁失败会陷入阻塞状态,为了避免陷入阻塞,我们可以使用try_lock()来尝试加锁,无论成功或失败都不会陷入阻塞,具体逻辑为:

  • 如果当前线程加锁成功,则返回true
  • 如果当前线程加锁失败,则返回false,但是不会陷入阻塞状态。

使用例如

1
2
3
4
5
6
if (mtx.try_lock()) {
// 成功获得锁
}
else {
// 锁已被占用
}

使用try_lock()的完整例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void threadFunction(int threadId) {
// 尝试锁定互斥量
if (mtx.try_lock()) {
std::cout << "Thread " << threadId << " acquired the lock" << std::endl;
// 执行一些操作
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
std::cout << "Thread " << threadId << " released the lock" << std::endl;
}
else {
std::cout << "Thread " << threadId << " couldn't acquire the lock"
<< std::endl;
}
}

int main() {
std::thread t1(threadFunction, 1);

std::this_thread::sleep_for(std::chrono::seconds(1));

std::thread t2(threadFunction, 2);

t1.join();
t2.join();

return 0;
}

输出如下,这里两个线程会先后获取到锁,因为两个线程的创建之间暂停了一段时间

1
2
3
4
Thread 1 acquired the lock
Thread 1 released the lock
Thread 2 acquired the lock
Thread 2 released the lock

如果注释该行来取消暂停时间,则两个线程中有一个就无法成功获取到锁,并进入else分支,输出如下

1
2
3
Thread Thread 2 acquired the lock
1 couldn't acquire the lock
Thread 2 released the lock

这里的输出比较混乱,但是仍然可以看出,线程2成功持有了锁,并进入if分支,而线程1进入了else分支。

其它互斥锁

<mutex>中实际上提供了四种互斥锁:

  • std::mutex: 基础的互斥锁
  • std::recursive_mutex: 支持递归的互斥锁
  • std::time_mutex: 支持定时的互斥锁
  • std::recursive_timed_mutex: 支持定时和递归的互斥锁

我们已经介绍了最基础的第一种互斥锁,后三种互斥锁其实只是对第一种互斥锁的补充:

  • std::mutex进行多次加锁会导致死锁,不适合在递归函数中使用,std::recursive_mutex允许持有的线程对其进行多次加锁,在解锁时也需要进行相同的多次解锁,适合在递归函数中使用;
  • std::mutex只有两种加锁方式:lock()try_lock(),在加锁失败时前者会陷入阻塞等待,后者则会立刻返回。在实际应用中,我们还需要一种中间策略:在加锁失败时尝试等待一段有限的时间,如果超时则失败返回。std::time_mutex提供的try_lock_for()方法接收一个时间参数,允许我们设置阻塞等待时间,如果互斥锁在这段时间内变得可用,则成功加锁并返回true,否则返回false

std::lock_guard

std::mutex的使用中,我们需要频繁地手动地加锁和开锁,忘记解锁更是会导致程序出错,C++提供了利用RAII的思想的互斥锁的守护对象std::lock_guard

介绍与实现

std::lock_guard的核心功能实现很简单:构造时传入互斥锁对象来构造,在构造时自动调用lock(),在析构时自动调用unlock()

clang实现的源代码如下(删去了部分宏的细节)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <mutex>

template <class _Mutex>
class lock_guard {
public:
using mutex_type = _Mutex;

explicit lock_guard(mutex_type& m) : __m(m) {
__m.lock();
owns_lock = true;
}

lock_guard(mutex_type& m, std::adopt_lock_t) : __m(m), owns_lock(true) {}

~lock_guard() {
if (owns_lock) { __m.unlock(); }
}

lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;

private:
mutex_type& __m;
bool owns_lock = false;
};

用法

std::lock_guard的使用示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

int shared_counter = 0;
std::mutex mtx; // 定义互斥锁

void increment_counter() {
for (int i = 0; i < 10000; ++i) {
std::lock_guard<std::mutex> lck(mtx); // 使用互斥锁保护共享资源
shared_counter++;
}
}

int main() {
std::vector<std::thread> threads;
threads.reserve(5);

for (int i = 0; i < 5; ++i) { threads.emplace_back(increment_counter); }

for (auto &t : threads) { t.join(); }

std::cout << "Final shared counter value: " << shared_counter << std::endl;
return 0;
}

std::unique_lock

std::unique_lock是一个比std::lock_guard更复杂,支持更灵活管理互斥锁的对象:它支持在构造时或者构造后持有锁,支持在作用域范围内可以手动加锁和解锁,作用域结束时如果正在持有锁则自动解锁,相当于std::lock_guard的升级版。

介绍与实现

std::unique_lock可以使用更灵活的管理策略,源码实现也更加复杂,需要结合布尔成员变量owns来理解,owns的具体含义为当前线程在当前状态是否处于持有互斥锁的状态。

构造函数如下:

  • 默认只传入一个互斥锁的引用,构造时会自动加锁(调用lock()并设置owns=true
  • 传入互斥锁和一个策略参数:
    • 延迟加锁策略:传入std::defer_lock,构造初始化owns=false,此后可以调用lock()加锁
    • 尝试加锁策略:传入std::try_to_lock,构造时会调用owns=try_lock(),尝试加锁但是不会陷入等待
    • 持有锁托管策略:传入std::adopt_lock,构造初始化owns=true,假定当前的线程已经持有互斥锁,只是托管给当前对象

析构的逻辑为:如果owns=true,意味着析构时仍然处于持有锁的状态,因此调用unlock()来解锁。

忽略涉及时间的接口,主要有如下的方法:

  • owns_lock():获取owns的值,即当前是否持有锁
  • lock():加锁,将owns从false切换为true
  • unlock():解锁,将owns从true切换为false
  • try_lock():尝试加锁,结果可以通过owns_lock()获取
  • mutex()release():获取管理的互斥锁,release()还会放弃自身对互斥锁的管理

std::unique_lock的clang实现如下(删去了部分的宏的细节,省略了涉及时间控制的接口try_lock_fortry_lock_until

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <mutex>
#include <system_error>

template <class _Mutex>
class unique_lock {
public:
using mutex_type = _Mutex;

// 默认构造,此时与lock_guard等价
explicit unique_lock(mutex_type& m) : __m_(std::addressof(m)), __owns_(true) {
__m_->lock();
}

// 构造函数,使用延迟加锁策略
unique_lock(mutex_type& m, std::defer_lock_t) noexcept
: __m_(std::addressof(m)), __owns_(false) {}


// 构造函数,使用尝试加锁策略
unique_lock(mutex_type& m, std::try_to_lock_t)
: __m_(std::addressof(m)), __owns_(__m_->try_lock()) {}

// 构造函数,使用持有锁托管策略
unique_lock(mutex_type& m, std::adopt_lock_t)
: __m_(std::addressof(m)), __owns_(true) {}

// 析构函数
~unique_lock() {
if (__owns_){ __m_->unlock(); }
}

// 移动构造函数
unique_lock(unique_lock&& u) noexcept
: __m_(u.__m_), __owns_(u.__owns_) {
u.__m_ = nullptr;
u.__owns_ = false;
}

// 赋值
unique_lock& operator=(unique_lock&& u) noexcept {
if (__owns_){ __m_->unlock(); }

__m_ = u.__m_;
__owns_ = u.__owns_;
u.__m_ = nullptr;
u.__owns_ = false;
return *this;
}

// 手动加锁
void lock() {
if (__m_ == nullptr)
__throw_system_error(EPERM, "unique_lock::lock: references null mutex");
if (__owns_)
__throw_system_error(EDEADLK, "unique_lock::lock: already locked");
__m_->lock();
__owns_ = true;
}

// 尝试手动加锁
bool try_lock() {
if (__m_ == nullptr)
__throw_system_error(EPERM, "unique_lock::try_lock: references null mutex");
if (__owns_)
__throw_system_error(EDEADLK, "unique_lock::try_lock: already locked");
__owns_ = __m_->try_lock();
return __owns_;
}

// 手动解锁
void unlock() {
if (!__owns_)
__throw_system_error(EPERM, "unique_lock::unlock: not locked");
__m_->unlock();
__owns_ = false;
}

// 获取互斥锁指针
mutex_type* mutex() const noexcept {
return __m_;
}

// 获取互斥锁指针,并且放弃对互斥锁的管理(析构时)
mutex_type* release() noexcept {
mutex_type* m = __m_;
__m_ = nullptr;
__owns_ = false;
return m;
}

// 判断owns的状态
bool owns_lock() const noexcept {
return __owns_;
}

explicit operator bool() const noexcept {
return __owns_;
}

void swap(unique_lock& u) noexcept {
std::swap(__m_, u.__m_);
std::swap(__owns_, u.__owns_);
}

private:
mutex_type* __m_;
bool __owns_;
};

用法

不指定策略时,用法基本类似于std::lock_guard,因此无需举例。

使用延迟加锁策略的例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void func() {
std::unique_lock<std::mutex> lck(mtx, std::defer_lock);
// ... do some work before locking

// Locking the mutex when needed
lck.lock();
// ... critical section
std::cout << "Inside critical section" << std::endl;
lck.unlock();
// ... do some other work
}

int main() {
std::thread t(func);
t.join();
return 0;
}

除此之外,C++还提供了std::shared_lock,它和std::unique_lock的区别在于,它允许多个线程同时读取共享资源, 但是如果有线程尝试写入资源,则会导致其它线程阻塞,直到写入操作完成。