多线程的实现

C/C++多线程

我们关注C/C++的多线程语法,按照平台和封装层次的不同,有几种常见的实现:

  1. 对于POSIX系统(Linux系统等),可以使用pthread库实现线程操作
  2. 对于Windows系统,同样提供了线程操作的API
  3. 对于Modern C++,可以使用std::thread进行跨平台统一的线程操作

值得注意的是,C++11引入了std::thread,但是这个线程类的设计有些缺陷(不是RAII的),后续填坑时为了不破坏兼容性,C++20又设计了一个新的名为std::jthread的线程类,仍然存放在<thread>头文件里面。 由于三家编译器的较新版本默认采用C++17标准,使用std::jthread时需要在编译选项中指明采用C++20标准,例如-std=c++20/std:c++20

下面使用C/C++最常见的三种实现,编写等价的多线程程序示例,可以发现在不同的实现中的操作都是类似的,前两者都是C语言风格的函数接口,区别仅仅是参数的格式和类型等细节,而std::thread则使用了类进行封装,使用更加简洁。

使用pthread的示例如下,只能在Linux系统上使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <pthread.h>

void *printMessage(void *ptr) {
char *message = (char *)ptr;
printf("%s\n", message);
pthread_exit(NULL);
}

int main() {
pthread_t thread1, thread2;
const char *msg1 = "Hello from Thread 1";
const char *msg2 = "Hello from Thread 2";

pthread_create(&thread1, NULL, printMessage, (void *)msg1);
pthread_create(&thread2, NULL, printMessage, (void *)msg2);

pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

return 0;
}

使用Windows API的示例如下,只能在Windows系统上使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <windows.h>

DWORD WINAPI printMessage(LPVOID ptr) {
char *message = (char *)ptr;
printf("%s\n", message);
return 0;
}

int main() {
HANDLE thread1, thread2;
const char *msg1 = "Hello from Thread 1";
const char *msg2 = "Hello from Thread 2";

thread1 = CreateThread(NULL, 0, printMessage, (LPVOID)msg1, 0, NULL);
thread2 = CreateThread(NULL, 0, printMessage, (LPVOID)msg2, 0, NULL);

WaitForSingleObject(thread1, INFINITE);
WaitForSingleObject(thread2, INFINITE);

CloseHandle(thread1);
CloseHandle(thread2);

return 0;
}

使用std::thread的示例如下,采用面向对象的方法管理线程,可以跨平台使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>

void printMessage(const char *message) {
std::cout << message;
}

int main() {
const char *msg1 = "Hello from Thread 1\n";
const char *msg2 = "Hello from Thread 2\n";

std::thread thread1(printMessage, msg1);
std::thread thread2(printMessage, msg2);

thread1.join();
thread2.join();

return 0;
}

在Linux上使用pthread库显然需要手动添加链接选项-pthread,虽然std::thread只是对pthread的封装,(通过CMake)直接使用std::thread时可能不需要额外的链接操作,这可能与具体的版本和平台有关。

Python多线程(补充)

与之类似的,Python也有多线程的模块,常见有两个多线程模块

  • _thread:一个低级模块,提供与底层线程相关的一些基本函数,类似于pthread(在Python2的名称为thread,在Python3改名为_thread
  • threading:一个高级模块,将线程的操作封装为一个线程类,建议使用,类似于std::thread

下面使用Python的两个线程模块,编写等价的多线程程序示例:

使用_thread的示例如下,注意_thread模块没有提供join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import _thread
import time


def print_message(message):
print(message, end="")


msg1 = "Hello from Thread 1\n"
msg2 = "Hello from Thread 2\n"

try:
t1 = _thread.start_new_thread(print_message, (msg1,))
t2 = _thread.start_new_thread(print_message, (msg2,))
except:
print("Error: Unable to start thread")

time.sleep(1)

使用threading的示例如下,采用面向对象的方法来管理线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading


def print_message(message):
print(message, end="")


msg1 = "Hello from Thread 1\n"
msg2 = "Hello from Thread 2\n"

thread1 = threading.Thread(target=print_message, args=(msg1,))
thread2 = threading.Thread(target=print_message, args=(msg2,))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

接下来只关注C++的多线程库std::thread的用法,其他的多线程使用都是类似的,因为都是对系统底层提供的线程操作接口的封装而已。

我们还需要明确的是,正因为std::thread只是对系统底层接口的封装,可以保证接口的一致性, 但是由于不同平台的底层实现是必然不同的,因此相同的代码可能产生不同的运行结果。

创建 std::thread

在创建thread对象时需要传入一个可调用对象(函数,仿函数,lambda表达式等),还可以附带一些参数,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>

void call_func(const char *message) { std::cout << message; }

class call_class {
public:
void operator()(const char *message) { std::cout << message; }
};

int main() {
std::thread thread1(call_func, "Hello from Thread (call func)\n");
std::thread thread2(call_class{}, "Hello from Thread (call class)\n");
std::thread thread3([](const char *message) { std::cout << message; },
"Hello from Thread (call lambda)\n");

thread1.join();
thread2.join();
thread3.join();

return 0;
}

thread对象被创建之后就会产生一个与之关联的线程(线程和thread对象不一样),新的线程进入可调用对象的入口开始执行。 不论是哪一种线程的实现,在创建线程时都是需要指定一个入口的,通常是函数入口(或者其他等价的可调用对象),从这个角度也可以把主线程理解为一个以main函数为入口的线程。

join和detach

接下来的问题是,新线程和当前线程之间如何产生联系?通常有两种选择,都是基于thread对象实现的:

  • join(汇合):用于将当前线程(通常是主线程)阻塞,等待并确保被调用的thread对象关联的线程执行完毕,然后当前线程才会继续执行
  • detach(分离):用于将被调用的thread对象关联的线程和当前线程分离,同时thread对象也和关联的线程分离,不再负责管理对应线程。分离的线程又被称为后台线程或守护线程。
    • 分离后的线程在后台运行,和当前线程不再与其有关联,即使当前线程中的thread对象被析构,那个线程也会继续运行。
    • 当前线程不会等待分离的线程执行完毕,被分离的线程结束时,相应的资源会自动释放。
    • 整个进程退出时因为所有资源都被系统回收,所以分离的线程也会被销毁,无论是否完成。

thread对象有一个布尔属性来记录关联线程的状态:

  • 状态为true,称为joinable;状态为false,称为not joinable
  • 状态可以通过thread.joinable()获取
  • 含参数构造的thread对象默认为joinable,不含参数构造的thread对象默认为not joinable
  • 如果调用了一次thread.join()thread.detach(),则属性会被标记为false,从joinable切换为not joinable
  • 只有处于joinable状态的thread对象,才可以调用thread.join()thread.detach(),否则报错,这意味着不可以再次调用这两个方法,否则在运行时会报错
  • thread对象执行析构时,它不应该还处于joinable的状态,否则会调用std::terminate()立刻退出这个程序

习惯上,可以在joindetach之前判断一下状态,例如

1
2
3
4
if (mythread.joinable()){
mythread.join();
// or mythread.detach();
}

推荐的做法是在主线程的最后join所有的线程,并且慎用detach,因为分离后的线程与主线程无关联,在主线程结束后,分离的线程可能会无法完成任务而被系统强制结束,这可能导致资源泄漏或数据不一致。

join实验验证

我们使用如下的几个小实验来验证join的行为,这里会先后创建一个睡眠5秒和睡眠2秒的新线程,观察总的耗时。

如果在创建join第一个线程之后,再创建第二个线程并join,程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <chrono>
#include <iostream>
#include <thread>

void func(const char *message, int n) {
std::this_thread::sleep_for(std::chrono::seconds(n));
std::cout << message << std::endl;
}

int main() {
auto start = std::chrono::system_clock::now();

//------------------------------------------------------------
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);
thread1.join();

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);
thread2.join();
//------------------------------------------------------------

auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - start)
.count();
std::cout << "done! elapsed " << elapsed << " seconds.";

return 0;
}

运行结果如下,总耗时7秒

1
2
3
4
5
starting first thread...
first thread finished.
starting second thread...
second thread finished.
done! elapsed 7 seconds.

等待第一个线程耗费5秒,等待第二个线程耗费2秒。

如果先创建两个线程,然后join第一个线程,最后join第二个线程,即

1
2
3
4
5
6
7
8
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);

thread1.join();
thread2.join();

运行结果如下,总耗时5秒

1
2
3
4
5
starting first thread...
starting second thread...
second thread finished.
first thread finished.
done! elapsed 5 seconds.

因为第一个线程完成之后,第二个线程已经完成了,因此等待时间就是第一个线程的睡眠时间。

调换join的顺序呢,即

1
2
3
4
5
6
7
8
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);

thread2.join();
thread1.join();

运行结果如下,总耗时5秒,和上面的一样

1
2
3
4
5
starting first thread...
starting second thread...
second thread finished.
first thread finished.
done! elapsed 5 seconds.

detach实验验证

我们还可以实验detach的行为,程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <chrono>
#include <iostream>
#include <thread>

void func(const char *message, int n) {
std::this_thread::sleep_for(std::chrono::seconds(n));
std::cout << message << std::endl;
}

int main() {
auto start = std::chrono::system_clock::now();

//------------------------------------------------------------
std::cout << "starting first thread...\n";
std::thread thread1(func, "first thread finished.",5);
thread1.detach();

std::cout << "starting second thread...\n";
std::thread thread2(func, "second thread finished.",2);
thread2.detach();
//------------------------------------------------------------

auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - start)
.count();
std::cout << "done! elapsed " << elapsed << " seconds.";

return 0;
}

运行结果如下

1
2
3
starting first thread...
starting second thread...
done! elapsed 0 seconds.

现在主线程并没有进行任何的等待,耗时几乎为0。

可以发现这里两个线程并没有机会输出信息,这并不是因为主线程的结束而终止,而是因为主线程结束导致了控制台的关闭,因此仍然在执行的线程无法把信息输出到控制台。这个例子说明detach后的线程可能因为主线程的结束而无法正常完成任务。

补充

线程id

在多线程中还有一个很重要的需求是知道当前线程是谁?C++使用std::thread::id表示线程标识符的类型(正整数), 对每个线程都分配一个唯一的标识符,用来区分不同的线程。

  • 通过使用thread对象的方法thread.get_id()获取thread对象关联的线程的id
  • 通过std::this_thread::get_id()则可以获取当前线程自身的id

示例代码如下,这里主线程的睡眠是保证输出不会混乱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <chrono>
#include <iostream>
#include <thread>

void threadFunction() {
std::cout << "Thread ID: " << std::this_thread::get_id() << std::endl;
}

int main() {
std::thread t1(threadFunction);
std::this_thread::sleep_for(std::chrono::seconds(1));

std::thread t2(threadFunction);
std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;
std::cout << "t1 Thread ID: " << t1.get_id() << std::endl;
std::cout << "t2 Thread ID: " << t2.get_id() << std::endl;

t1.join();
t2.join();

return 0;
}

输出如下

1
2
3
4
5
Thread ID: 37428
Thread ID: 7352
Main thread ID: 21260
t1 Thread ID: 37428
t2 Thread ID: 7352

可以发现线程的id是一个正整数,不同的线程具有不同的id:在t1线程内部可以获取自身的id,在主线程中也可以通过t1对象获取对应的线程id。

线程睡眠

有两个接口可以实现当前线程的睡眠,使得执行流暂停一段时间再继续执行:

  • std::this_thread::sleep_for:暂停一段时间(前面已经使用过)
  • std::this_thread::sleep_until:暂停到指定的时刻

例如,线程睡眠直到当前时刻加500毫秒,可以通过第一种实现

1
std::this_thread::sleep_for(std::chrono::milliseconds(500));

也可以通过第二种实现

1
2
auto wakeup_time = std::chrono::system_clock::now() + std::chrono::milliseconds(500);
std::this_thread::sleep_until(wakeup_time);

这里还搜集并整理一下C/C++中常见的实现程序暂停功能的函数:

  • Linux系统可以使用sleep函数,传入秒数
  • Windows API提供了Sleep函数,传入毫秒数
  • Cpp提供的睡眠函数如前文,传入任意的chrono时间对象即可

这几种函数接口的名称和时间单位都不一样,而且需要注意的是,无论是Linux系统还是Windows系统都无法给用户提供准确的计时行为,这是系统本身所决定的。

Linux实现的暂停如下

1
2
3
4
5
6
#include <unistd.h>

int main(){
sleep(3); // sleep 3 seconds
return 0;
}

Windows API实现的暂停如下

1
2
3
4
5
6
#include <Windows.h>

int main(){
Sleep(3000); // sleep 3000 milliseconds
return 0;
}

std::thread实现的暂停如下

1
2
3
4
5
6
7
8
#include <iostream>
#include <thread>
#include <chrono>

int main() {
std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep 3 seconds
return 0;
}