基础概念

首先学习几组概念:

  • 并发(Concurrency)/并行(Parallelism)
    • 并发指的是任务在同一时间段内被交替执行,不一定是同时执行(取决于CPU的核数和CPU的调度)
    • 并行指的是任务在同一时间内真正同时执行,并行是并发的一种特例(并发是在不同的核上面同时执行的,因此对CPU有要求)
  • 进程(Process)/线程(Thread)
    • 进程是程序执行的实例,是系统分配资源的基本单位,拥有自己的代码区、堆区、栈区、全局变量等。进程之间相互独立,一个进程的崩溃不会影响其他进程,进程之间的通信开销较大,需要特殊的机制
    • 线程是进程中的执行单元,是CPU调度中的基本单位。一个进程默认会有一个主线程,主线程对指令依次执行,也可以拥有多个线程,每一个线程独立执行指令(单独的执行流)。线程有自己独立的调用栈(因此函数内的局部变量是线程私有的),与其他线程共享所属的进程中的代码区、堆区和其他资源(全局变量是线程共有的),创建和切换线程,以及线程之间的通信比进程开销更小,但是存在数据的竞争等问题
  • 同步(Synchronous)/异步(Asynchronous)
    • 同步是指任务按照顺序依次执行,每个任务在前一个任务完成后开始执行。在同步模式中,任务之间需要等待其他任务完成才能继续执行。
    • 异步是指任务可以独立于其他任务进行执行,它们的执行不会阻塞其他任务。在异步模式中,任务可以在后台执行,执行结果可能需要等待一段时间才能获得,但这不会影响其他任务的执行。

按照自己的理解,再举几个例子吧:

  • 关于并发和并行的例子:
    • 学生在课后并发地完成每一个科目的作业:一会写语文作业,一会又切换回写数学作业,但是学生不可能同时写两科作业,也就是不能并行
    • 人体的消化系统的任务和循环系统的任务在并行地执行:在同一时间内,肠道蠕动和血液循环在同时进行,不可能说心脏跳动时就暂停了肠道蠕动
  • 关于进程和线程的例子:
    • 一个可执行文件被执行,就会由系统创建一个进程,分配相应的内存和其他资源,将这个进程理解为一个办公室,里面有一个职员(主线程)正在办公,办公室里面的东西就是进程私有的资源,两个办公室之间有很强的独立性,办公室之间的通讯要通过走廊,因此进程的创建销毁和通讯开销都比较大。
    • 什么是线程?职员一个人只能按照顺序一步步干活,这时候他可以申请一个助手(线程),助手和他一起在同一个办公室工作,助手干自己的活(一个单独的执行流),助手和职员之间的沟通非常方便,他们共享数据资源(在同一个办公室),缺点就是相互之间可能造成干扰。
  • 关于同步和异步的例子:
    • 手洗衣服的流程是同步的:先将衣物放入盆中,再加入洗衣粉,然后开始洗衣,等到洗衣完成,最后取出衣物晾干或烘干,每一步的开始都依赖于前一步的完成。
    • 使用洗衣机洗衣服是异步的:将衣物放入洗衣机并启动,洗衣机开始运作,此时人不需要等待在洗衣机旁,可以干任何事,等到洗衣机完成后会发出通知,然后取出衣物晾晒即可

从CPU的角度来看:

  • 对于单核CPU,不管是同时开启多个进程还是一个程序使用多线程都会使用唯一的核进行执行,因此都只是并发而非并行的(按照时间切片,不同的执行流被轮流执行);
  • 对于多核CPU,它拥有多个处理核心,可以同时推进多个执行流,此时不同的进程可以被分配给不同的核,单个进程的多个线程也可以被分配给不同的核,实现真正的并行。(目前消费级的个人电脑都是多核CPU)

在科学计算中,可以使用并行来提高效率,多进程和多线程是实现并行计算的不同途径,它们各有优劣,例如OpenMP采用的是多线程,MPI采用的是多进程(还支持每一个进程位于不同的计算结点上,MPI负责在不同计算节点之间的通讯)

多线程的实现

C/C++多线程

现在关注C/C++的多线程语法,按照平台和封装层次的不同,有几种常见的实现:

  1. 对于POSIX系统(Linux系统等),可以使用pthread库实现线程操作
  2. 对于Windows系统,同样提供了线程操作的API
  3. 对于Modern C++,可以使用std::thread进行跨平台统一的线程操作

值得注意的是,C++11引入了std::thread,但是这个线程类的设计有些缺陷(不是RAII的),后续填坑时为了不破坏兼容性,C++20又设计了一个新的名为std::jthread的线程类,仍然存放在<thread>头文件里面。 由于三家编译器的较新版本默认采用C++17标准,使用std::jthread时需要在编译选项中指明采用C++20标准,例如-std=c++20/std:c++20

下面使用C/C++最常见的三种实现,编写等价的多线程程序示例,可以发现在不同的实现中的操作都是类似的,前两者都是C语言风格的函数接口,区别仅仅是参数的格式和类型等细节,而std::thread则使用了类进行封装,使用更加简洁。

使用pthread的示例如下,只能在Linux系统上使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <pthread.h>

void *printMessage(void *ptr) {
char *message = (char *)ptr;
printf("%s\n", message);
pthread_exit(NULL);
}

int main() {
pthread_t thread1, thread2;
const char *msg1 = "Hello from Thread 1";
const char *msg2 = "Hello from Thread 2";

pthread_create(&thread1, NULL, printMessage, (void *)msg1);
pthread_create(&thread2, NULL, printMessage, (void *)msg2);

pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

return 0;
}

使用Windows API的示例如下,只能在Windows系统上使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <windows.h>

DWORD WINAPI printMessage(LPVOID ptr) {
char *message = (char *)ptr;
printf("%s\n", message);
return 0;
}

int main() {
HANDLE thread1, thread2;
const char *msg1 = "Hello from Thread 1";
const char *msg2 = "Hello from Thread 2";

thread1 = CreateThread(NULL, 0, printMessage, (LPVOID)msg1, 0, NULL);
thread2 = CreateThread(NULL, 0, printMessage, (LPVOID)msg2, 0, NULL);

WaitForSingleObject(thread1, INFINITE);
WaitForSingleObject(thread2, INFINITE);

CloseHandle(thread1);
CloseHandle(thread2);

return 0;
}

使用std::thread的示例如下,采用面向对象的方法管理线程,可以跨平台使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>

void printMessage(const char *message) {
std::cout << message;
}

int main() {
const char *msg1 = "Hello from Thread 1\n";
const char *msg2 = "Hello from Thread 2\n";

std::thread thread1(printMessage, msg1);
std::thread thread2(printMessage, msg2);

thread1.join();
thread2.join();

return 0;
}

Python多线程

与之类似的,Python也有多线程的模块,常见有两个多线程模块

  • _thread:一个低级模块,提供与底层线程相关的一些基本函数,类似于pthread(在Python2的名称为thread,在Python3改名为_thread
  • threading:一个高级模块,将线程的操作封装为一个线程类,建议使用,类似于std::thread

下面使用Python的两个线程模块,编写等价的多线程程序示例:

使用_thread的示例如下,注意_thread模块没有提供join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import _thread
import time


def print_message(message):
print(message, end="")


msg1 = "Hello from Thread 1\n"
msg2 = "Hello from Thread 2\n"

try:
t1 = _thread.start_new_thread(print_message, (msg1,))
t2 = _thread.start_new_thread(print_message, (msg2,))
except:
print("Error: Unable to start thread")

time.sleep(1)

使用threading的示例如下,采用面向对象的方法来管理线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading


def print_message(message):
print(message, end="")


msg1 = "Hello from Thread 1\n"
msg2 = "Hello from Thread 2\n"

thread1 = threading.Thread(target=print_message, args=(msg1,))
thread2 = threading.Thread(target=print_message, args=(msg2,))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

接下来只关注C++的多线程库std::thread的用法,其他的多线程使用都是类似的,因为都是对系统底层提供的线程操作接口的封装而已。

我们还需要明确的是,正因为std::thread只是对系统底层接口的封装,可以保证接口的一致性, 但是由于不同平台的底层实现是必然不同的,因此相同的代码可能产生不同的运行结果。

创建 std::thread

在创建thread对象时需要传入一个可调用对象(函数,仿函数,lambda表达式等),还可以附带一些参数,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>

void call_func(const char *message) { std::cout << message; }

class call_class {
public:
void operator()(const char *message) { std::cout << message; }
};

int main() {
std::thread thread1(call_func, "Hello from Thread (call func)\n");
std::thread thread2(call_class{}, "Hello from Thread (call class)\n");
std::thread thread3([](const char *message) { std::cout << message; },
"Hello from Thread (call lambda)\n");

thread1.join();
thread2.join();
thread3.join();

return 0;
}

thread对象被创建之后就会产生一个与之关联的线程(线程和thread对象不一样),新的线程进入可调用对象的入口开始执行。 不论是哪一种线程的实现,在创建线程时都是需要指定一个入口的,通常是函数入口(或者其他等价的可调用对象),从这个角度也可以把主线程理解为一个以main函数为入口的线程。

join和detach

接下来的问题是,新线程和当前线程之间如何产生联系?通常有两种选择,都是基于thread对象实现的:

  • join(汇合):用于将当前线程(通常是主线程)阻塞,等待并确保被调用的thread对象关联的线程执行完毕,然后当前线程才会继续执行
  • detach(分离):用于将被调用的thread对象关联的线程和当前线程分离,同时thread对象也和关联的线程分离,不再负责管理对应线程。分离的线程又被称为后台线程或守护线程。
    • 分离后的线程在后台运行,和当前线程不再与其有关联,即使当前线程中的thread对象被析构,那个线程也会继续运行。
    • 当前线程不会等待分离的线程执行完毕,被分离的线程结束时,相应的资源会自动释放。
    • 整个进程退出时因为所有资源都被系统回收,所以分离的线程也会被销毁,无论是否完成。

thread对象有一个布尔属性来记录关联线程的状态:

  • 状态为true,称为joinable;状态为false,称为not joinable
  • 状态可以通过thread.joinable()获取
  • 含参数构造的thread对象默认为joinable,不含参数构造的thread对象默认为not joinable
  • 如果调用了一次thread.join()thread.detach(),则属性会被标记为false,从joinable切换为not joinable
  • 只有处于joinable状态的thread对象,才可以调用thread.join()thread.detach(),否则报错,这意味着不可以再次调用这两个方法,否则在运行时会报错
  • thread对象执行析构时,它不应该还处于joinable的状态,否则会调用std::terminate()立刻退出这个程序

习惯上,可以在joindetach之前判断一下状态,例如

1
2
3
4
if (mythread.joinable()){
mythread.join();
// or mythread.detach();
}

推荐的做法是在主线程的最后join所有的线程,并且慎用detach,因为分离后的线程与主线程无关联,在主线程结束后,分离的线程可能会无法完成任务而被系统强制结束,这可能导致资源泄漏或数据不一致。

join实验验证

我们使用如下的几个小实验来验证join的行为,这里会先后创建一个睡眠5秒和睡眠2秒的新线程,观察总的耗时。

如果在创建join第一个线程之后,再创建第二个线程并join,程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <chrono>
#include <iostream>
#include <thread>

void func(const char *message, int n) {
std::this_thread::sleep_for(std::chrono::seconds(n));
std::cout << message << std::endl;
}

int main() {
auto start = std::chrono::system_clock::now();

//------------------------------------------------------------
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);
thread1.join();

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);
thread2.join();
//------------------------------------------------------------

auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - start)
.count();
std::cout << "done! elapsed " << elapsed << " seconds.";

return 0;
}

运行结果如下,总耗时7秒

1
2
3
4
5
starting first thread...
first thread finished.
starting second thread...
second thread finished.
done! elapsed 7 seconds.

等待第一个线程耗费5秒,等待第二个线程耗费2秒。

如果先创建两个线程,然后join第一个线程,最后join第二个线程,即

1
2
3
4
5
6
7
8
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);

thread1.join();
thread2.join();

运行结果如下,总耗时5秒

1
2
3
4
5
starting first thread...
starting second thread...
second thread finished.
first thread finished.
done! elapsed 5 seconds.

因为第一个线程完成之后,第二个线程已经完成了,因此等待时间就是第一个线程的睡眠时间。

调换join的顺序呢,即

1
2
3
4
5
6
7
8
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);

thread2.join();
thread1.join();

运行结果如下,总耗时5秒,和上面的一样

1
2
3
4
5
starting first thread...
starting second thread...
second thread finished.
first thread finished.
done! elapsed 5 seconds.

detach实验验证

我们还可以实验detach的行为,程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <chrono>
#include <iostream>
#include <thread>

void func(const char *message, int n) {
std::this_thread::sleep_for(std::chrono::seconds(n));
std::cout << message << std::endl;
}

int main() {
auto start = std::chrono::system_clock::now();

//------------------------------------------------------------
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);
thread1.detach();

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);
thread2.detach();
//------------------------------------------------------------

auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - start)
.count();
std::cout << "done! elapsed " << elapsed << " seconds.";

return 0;
}

运行结果如下

1
2
3
starting first thread...
starting second thread...
done! elapsed 0 seconds.

现在主线程并没有进行任何的等待,耗时几乎为0。

可以发现这里两个线程并没有机会输出信息,这并不是因为主线程的结束而终止,而是因为主线程结束导致了控制台的关闭,因此仍然在执行的线程无法把信息输出到控制台。这个例子说明detach后的线程可能因为主线程的结束而无法正常完成任务。

补充

线程id

在多线程中还有一个很重要的需求是知道当前线程是谁?C++使用std::thread::id表示线程标识符的类型(正整数), 对每个线程都分配一个唯一的标识符,用来区分不同的线程。

  • 通过使用thread对象的方法thread.get_id()获取thread对象关联的线程的id
  • 通过std::this_thread::get_id()则可以获取当前线程自身的id

示例代码如下,这里主线程的睡眠是保证输出不会混乱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <chrono>
#include <iostream>
#include <thread>

void threadFunction() {
std::cout << "Thread ID: " << std::this_thread::get_id() << std::endl;
}

int main() {
std::thread t1(threadFunction);
std::this_thread::sleep_for(std::chrono::seconds(1));

std::thread t2(threadFunction);
std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;
std::cout << "t1 Thread ID: " << t1.get_id() << std::endl;
std::cout << "t2 Thread ID: " << t2.get_id() << std::endl;

t1.join();
t2.join();

return 0;
}

输出如下

1
2
3
4
5
Thread ID: 37428
Thread ID: 7352
Main thread ID: 21260
t1 Thread ID: 37428
t2 Thread ID: 7352

可以发现线程的id是一个正整数,不同的线程具有不同的id:在t1线程内部可以获取自身的id,在主线程中也可以通过t1对象获取对应的线程id。

线程睡眠

有两个接口可以实现当前线程的睡眠,使得执行流暂停一段时间再继续执行:

  • std::this_thread::sleep_for:暂停一段时间(前面已经使用过)
  • std::this_thread::sleep_until:暂停到指定的时刻

例如,线程睡眠直到当前时刻加500毫秒,可以通过第一种实现

1
std::this_thread::sleep_for(std::chrono::milliseconds(500));

也可以通过第二种实现

1
2
auto wakeup_time = std::chrono::system_clock::now() + std::chrono::milliseconds(500);
std::this_thread::sleep_until(wakeup_time);

这里还搜集并整理一下C/C++中常见的实现程序暂停功能的函数:

  • Linux系统可以使用sleep函数,传入秒数
  • Windows API提供了Sleep函数,传入毫秒数
  • Cpp提供的睡眠函数如前文,传入任意的chrono时间对象即可

这几种函数接口的名称和时间单位都不一样,而且需要注意的是,无论是Linux系统还是Windows系统都无法给用户提供准确的计时行为,这是系统本身所决定的。

Linux实现的暂停如下

1
2
3
4
5
6
#include <unistd.h>

int main(){
sleep(3); // sleep 3 seconds
return 0;
}

Windows API实现的暂停如下

1
2
3
4
5
6
#include <Windows.h>

int main(){
Sleep(3000); // sleep 3000 milliseconds
return 0;
}

std::thread实现的暂停如下

1
2
3
4
5
6
7
8
#include <iostream>
#include <thread>
#include <chrono>

int main() {
std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep 3 seconds
return 0;
}