原子类型和原子操作

原子操作是指在多线程编程中对共享数据的操作是不可分割、不会被中断的操作, 这意味着操作不会被其他线程干扰,不会被调度切换,要么一次性执行完成,要么完全不执行,不存在第三种状态。 原子操作可以用于避免数据竞争和保证线程安全,不过显然我们需要为安全性付出额外的性能开销。

原子类型是一种特殊的数据类型,在底层保证对原子类型变量的相关操作是原子操作, 例如对原子类型变量的读取、写入、交换、递增、递减等。

这里我们再次重复前一篇使用的例子,只是改动了共享变量的定义:使用原子类型的整数变量shared_counter而非通常的整数变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

const int num_threads = 5;
const int num_increments = 10000;
const int num_experiments = 20;

void increment_counter(std::atomic<int> &counter) {
for (int i = 0; i < num_increments; ++i) {
counter++; // 多线程同时修改共享变量
}
}

void run_experiment(int experiment_number) {
std::atomic<int> shared_counter = 0;
std::vector<std::thread> threads;
threads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, std::ref(shared_counter));
}

for (auto &t : threads) { t.join(); }

// 检查结果
if (shared_counter != num_threads * num_increments) {
std::cout << "Error in experiment " << experiment_number
<< ". Counter value: " << shared_counter << std::endl;
}
}

int main() {
for (int i = 0; i < num_experiments; ++i) { run_experiment(i + 1); }

return 0;
}

程序运行的结果全部满足期望,不会出现由多线程导致的随机结果。

这个例子中,我们使用了原子类型的整数变量作为共享变量解决了多线程问题。 由此可见,原子类型和原子操作是除了互斥锁之外的另一种避免多线程混乱的解决办法,相比于手动控制互斥锁的加锁解锁,原子操作是一种更轻量级的做法。

std::atomic

<atomic>包括std::atomic<T>模板类和相关的原子操作函数,为多线程编程提供了支持。

std::atomic<T>可以接受常见的基本数据类型,并且对常见数据类型定义了别名

1
2
3
4
5
6
7
8
9
std::atomic<bool>
std::atomic<char>
std::atomic<int>
std::atomic<long>

typedef std::atomic<bool> atomic_bool;
typedef std::atomic<char> atomic_char;
typedef std::atomic<int> atomic_int;
typedef std::atomic<double> atomic_long;

注意,默认并没有提供floatdouble对应的原子类型,并且通常的CPU架构不支持对浮点数的原子操作。

对于std::atomic<T>接受的自定义类型T,要求是Trivially Copyable的类型,简单来说需要满足三个条件:

  • 连续的内存布局
  • 拷贝是逐比特拷贝
  • 没有虚函数

从代码实现的角度,T需要满足下面的5个条件

1
2
3
4
5
std::is_trivially_copyable<T>::value = true
std::is_copy_constructible<T>::value = true
std::is_move_constructible<T>::value = true
std::is_copy_assignable<T>::value = true
std::is_move_assignable<T>::value = true

原子类型支持很多原子操作,最基本的包括storeloadexchange

1
2
3
4
5
6
7
8
9
10
std::atomic<int> atomicInt(0);

// 原子地写入值
atomicInt.store(10);

// 原子地读取值,返回当前值
int value = atomicInt.load();

// 原子地交换值,返回旧值
int oldValue_exchanged = atomicInt.exchange(20);

还有一些基本算术运算:加、减、按位与、按位或、按位异或

1
2
3
4
5
int oldValue1 = atomicInt.fetch_add(5);
int oldValue2 = atomicInt.fetch_sub(5);
int oldValue3 = atomicInt.fetch_and(5);
int oldValue4 = atomicInt.fetch_or(5);
int oldValue4 = atomicInt.fetch_xor(5);

对于原子类型也支持基于运算符的简单运算,编译器通常会将其转换为上述接口的调用

1
2
3
a = 10;
a++;
a += 2;

除此之外,还有两个附带条件判断的原子操作:compare_exchange_weakcompare_exchange_strong,它们的参数和语义是基本一致的: 提供一个期待值和一个新值,如果当前值是期待值,就写入新值并返回true;如果当前值不是期待值,则不作修改并返回false

1
2
3
int expected = 10;
bool exchanged1 = atomicInt.compare_exchange_weak(expected, 20);
bool exchanged2 = atomicInt.compare_exchange_strong(expected, 30);

两者的区别源于硬件实现方式的不同,在使用效果上的区别是:

  • *_weak的执行效率更高,适合放在循环中;*_strong的执行效率更低,适合一次性的操作;
  • *_weak的可靠性较低,可能出现虚假的失败;*_strong则会保证可靠性。

std::atomic<T>只是保证对应的操作是原子操作,但是并不保证原子操作在底层是否是通过加锁来实现的(这取决于不同平台的处理器实现),加锁通常意味着效率偏低, std::atomic<T>提供is_lock_free()成员函数可以用来判断对于此类型的底层操作是否是无锁的。平台通常可以保证对字节数不超过sizeof(void*)的平凡数据类型,对应的原子操作是无锁的。

std::atomic_flag

std::atomic_flag 可以理解为原子布尔类型,但是它并不等于std::atomic<bool>,而是比它更简单的标记类型, 它只提供很少的几个操作,但是保证操作都是无锁的(通常意味着操作更加高效)。

std::atomic_flag只支持test_and_set()以及clear()两个成员函数:

  • test_and_set(): 检查并修改std::atomic_flag的内部标志
    • 如果std::atomic_flag没有被标记,则进行标记,并返回false
    • 如果std::atomic_flag已被标记,则不做修改,并返回true
  • clear():清除std::atomic_flag的内部标记,保证下一次调用test_and_set()时会返回false

std::atomic_flag类型的变量通常使用宏ATOMIC_FLAG_INIT初始化,并且初始化之后的状态是未标记的。

std::atomic_flag 最主要的应用就是实现一个自旋锁(spin lock),见下文。

内存顺序模型

原子操作内存顺序模型是原子操作中一个核心概念,它定义了原子操作之间以及原子操作与非原子操作之间内存访问的顺序规则,从而确保在多线程环境下数据的一致性和可见性。

通常包括如下几种模型:(以枚举类std::memory_order提供)

  • std::memory_order_relaxed:最弱的内存顺序,不对其他内存操作施加顺序约束,只保证当前原子操作本身的原子性。
  • std::memory_order_consume:类似于std::memory_order_acquire,但更弱,主要用于依赖关系传递。
  • std::memory_order_acquire:修饰一个载入操作,表示在本线程中,所有后续的关于此变量的内存操作都必须在本条原子操作完成后执行,不会将后面的操作重排到前面。
  • std::memory_order_release:修饰一个存储操作,表示在本线程中,所有之前的针对该变量的内存操作完成后才能执行本条原子操作,不会将前面的操作重排到后面。
  • std::memory_order_acq_rel:结合了std::memory_order_acquirestd::memory_order_release的效果,适合修饰“读-改-写”操作。
  • std::memory_order_seq_cst:最强的内存顺序,所有线程的所有读写操作按全局顺序一致进行。(这是所有原子操作的默认方式)

对于前文中的函数调用方式的原子操作,我们都可以向其中传入内存顺序参数,分成三类:

  • 存储(store)操作,可选用的内存顺序有std::memory_order_relaxedstd::memory_order_releasestd::memory_order_seq_cst

  • 载入(load)操作,可选用的内存顺序有std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_seq_cst

  • “读-改-写”(read-modify-write)操作,可选用的参数为全部六种。

所有原子操作默认使用的内存顺序参数都是最强的std::memory_order_seq_cst

这部分内容不是很懂,因为已经涉及到CPU层面的很多细节了,通常编程中也不需要关注。

自旋锁

前文中我们使用互斥锁在确保线程同步,还有一种自旋锁也可以达到类似的目的,我们通过对比的方式介绍自旋锁:

  • 自旋锁 (Spinlock):
    • 实现相对简单,通常使用原子操作(如std::atomic_flag)来实现。
    • 当一个线程尝试获取锁但失败时,它会不断地检查锁的状态(自旋等待),直到锁可用。
    • 适用于锁持有时间非常短的情况,因为自旋等待会占用 CPU 资源。
  • 互斥锁 (Mutex):
    • 实现相对复杂,通常依赖于操作系统提供的同步原语。
    • 当一个线程尝试获取锁但失败时,它会被挂起,进入睡眠状态,直到锁可用。
    • 适用于锁持有时间较长的情况,因为线程挂起不会占用 CPU 资源。

简单来说,自旋锁更适合锁的持有时间较短的轻量级情景,它选择持续占用CPU而不是让系统将线程挂起再进行恢复。 即使如此,自旋锁仍然可以提示操作系统:当前线程愿意主动放弃其当前正在执行的 CPU 时间片,从而允许其他线程获取 CPU 执行时间, 在C++中调用std::this_thread::yield()可以达到这个目的,这个函数通常在自旋锁实现的while循环中使用。

基于std::atomic_flag实现自旋锁的代码比较简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <atomic>

class Spinlock {
public:
Spinlock() = default;

void lock() {
while (m_flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); // 自旋等待时让出 CPU
}
}

void unlock() { m_flag.clear(std::memory_order_release); }

private:
std::atomic_flag m_flag = ATOMIC_FLAG_INIT;
};

使用自旋锁代替互斥锁,重复之前的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

class Spinlock {
public:
Spinlock() = default;

void lock() {
while (m_flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); // 自旋等待时让出 CPU
}
}

void unlock() { m_flag.clear(std::memory_order_release); }

private:
std::atomic_flag m_flag = ATOMIC_FLAG_INIT;
};

Spinlock spinlock;

const int num_threads = 5;
const int num_increments = 10000;
const int num_experiments = 20;

void increment_counter(int &counter) {
for (int i = 0; i < num_increments; ++i) {
spinlock.lock();
counter++; // 多线程同时修改共享变量
spinlock.unlock();
}
}

void run_experiment(int experiment_number) {
int shared_counter = 0;
std::vector<std::thread> threads;
threads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, std::ref(shared_counter));
}

for (auto &t : threads) { t.join(); }

// 检查结果
if (shared_counter != num_threads * num_increments) {
std::cout << "Error in experiment " << experiment_number
<< ". Counter value: " << shared_counter << std::endl;
}
}

int main() {
for (int i = 0; i < num_experiments; ++i) { run_experiment(i + 1); }

return 0;
}

程序运行的结果全部满足期望,不会出现由多线程导致的随机结果。