文章目录
- 前言
- Linux 线程概念
- 线程的优点
- 线程的缺点
- 线程异常
- 线程用途
- 使用
- pthread_create
- pthread_join
- 线程退出
- 线程id
- 分离线程
- 线程互斥
- 问题:临界资源访问问题
- 问题解决:互斥锁的使用
- RAII 风格的加锁方式
- 可重入 & 线程安全
- 死锁的概念
- 线程同步
- 条件变量
- 生产者消费者模型
- 基于 BlockingQueue 的生产者消费者模型
- POSIX 信号量
- 简介
- 基于环形队列的生产者消费者模型
- 线程池
前言
尽管通常我们认为一个进程只有单一的控制流,但是在现代系统中,一个进程实际上可以由多个称为 线程 的执行单元组成,每个线程都运行在进程的上下文中,并共享同样的代码和全局数据。由于网络服务器中对并行处理的需求,线程成为越来越重要的编程模型,因为多线程之间比多进程之间更容易共享数据,也因为线程一般来说都比进程更高效。当有多处理器可用的时候,多线程也是一种使得程序可以运行得更快的方法。
线程(thread) 就是运行在进程上下文中的逻辑流。线程由内核自动调度。每个线程都有它自己的 线程上下文(thread context),包括一个唯一的整数 线程 ID(Thread ID, TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码。所有的运行在一个进程里的线程共享该进程的整个虚拟地址空间。
Linux 线程概念
Linux 认为:进程和线程没有概念上的区分,只有一个概念——执行流。
在 Linux 中,线程是用进程模拟的,准确来说,是用进程的 PCB 模拟的。也就是说,Linux没有为线程设计新的 TCB,线程和进程都是用的同样的结构体 task_struct
描述的。
我们知道,fork 一个进程内核会为为你创建一个新的进程,包括新的虚拟地址空间,新的PCB、页表等。而创建一个新的线程,则只是创建一个新的 PCB,多个 PCB 共享同一个虚拟地址空间,一个进程的代码相当于被这多个 PCB 分割了。
之前我们说,进程 = 内核数据结构(PCB) + 进程对应的代码和数据,现在我们有了一个全新的理解:
从内核的视角:进程 = 承担分配系统资源的基本实体(进程的基座属性)。换句话说,不只是 PCB,还包括虚拟地址空间,页表,物理内存的数据和代码,这些全部合起来才叫进程。
线程是什么呢?线程 = 调度的基本单位,CPU 在调度的时候只能看到 task_struct,一个 task_struct 代表着进程中的一个执行流,也就是线程。
我们以前写的代码,都是一个进程只有一个执行流,这样的进程叫做 单执行流进程。对应的,内部有多个执行流的进程叫做 多执行流进程
总结:
- 在一个程序里的一个执行路线就叫做线程。更准确的定义是:线程是”一个进程内部的控制序列“。
- 一切进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在 Linux 系统中,CPU 眼中的看到的 PCB 都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流。
线程的优点
- 创建一个线程的代价比创建一个进程小得多
- 线程间的切换比进程间的切换操作系统所要做的工作少得多(不需要切换地址空间与页表)
- 线程占用的资源比进程少得多(线程本身用资源的就是进程里的一部分)
- 能充分利用多处理器的可并行数量
- 在等待慢速 I/O 操作结束的同时,程序可执行其他的计算任务(这一点进程也有)
- 计算密集型应用(代码大部分都是为了计算,如加密和解密),为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O 密集型应用,为了提高性能,将 I/O 操作重叠,线程可以同时等待不同的 I/O 操作。
线程的缺点
- 性能缺失
- 一个很少被外部事件阻塞的计算密集型线程往往无法与其他线程共享同一个处理器,如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失(线程间切换的成本变高)
- 健壮性降低
- 进程的独立性很高,健壮性也是非常高的,线程与线程之间的隔离性没那么好,在一个多线程里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说,线程之间是缺乏保护的。
- 缺乏访问控制
- 进程是访问控制的基本粒度,在一个线程中调用某些 OS 函数会对整个进程造成影响
- 编程难度提高
- 编写与调试多线程程序比单线程程序困难
线程异常
- 单个线程如果出现除以零,野指针问题导致线程崩溃,进程也会随之崩溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,该进程内的所有线程也就随即退出
线程用途
- 合理使用多线程,能提高计算密集型程序的执行效率
- 合理使用多线程,能提高 IO 密集型程序的用户体验
线程共享进程数据,但也拥有自己的一部分数据:
- 线程ID
- 一组寄存器
- 栈
- errno
- 型号屏蔽字
- 调度优先级
使用
Linux 下没有真正意义的线程,所以也就没有易于用户使用的线程相关的系统调用接口,但是有原生线程库,提供了封装好的线程接口供我们使用。Linux 默认都带有这个库。
pthread_create
创建线程:
NAME
pthread_create - create a new thread
SYNOPSIS
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
Compile and link with -pthread.
// 返回值:成功:返回0,失败:返回一个错误码
-
thread
: 输出型参数,表示线程 id -
attr
: 线程属性,我们不管,可以直接设置为 NULL -
start_routine
: 函数指针,线程调用的入口 -
arg
: 传入start_routine
函数的参数
pthread_join
线程等待,一个线程被创建了,最后一定要 join,否则就会造成如进程那样的内存泄漏问题。
NAME
pthread_join - join with a terminated thread
SYNOPSIS
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
Compile and link with -pthread.
// 返回值:成功:返回0,失败:返回错误码
thread
: 线程 id,表示你要等的线程
retval
: 退出结果
例子:
创建两个线程,然后让三个线程各自循环打印对应的信息。
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
void* callback1(void* args)
{
string name = (char*)args;
while (true)
{
cout << name << endl;
sleep(1);
}
}
void* callback2(void* args)
{
string name = (char*)args;
while (true)
{
cout << name << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, nullptr, callback1, (void*)"thread 1");
pthread_create(&tid2, nullptr, callback2, (void*)"thread 2");
while (true)
{
cout << "我是主线程..." << endl;
sleep(1);
}
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
return 0;
}
编译的时候一定要链接线程库
g++ -o mythread mythread.cpp -lpthread -std=c++11
运行结果:
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
我是主线程...
thread 2
thread 1
thread 2
我是主线程...
thread 1
thread 2
我是主线程...
thread 1
thread 2
我是主线程...
thread 1
thread 2
我是主线程...
在运行的同时,使用 ps -aL
查看线程
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ps -aL
PID LWP TTY TIME CMD
6876 6876 pts/3 00:00:00 mythread
6876 6877 pts/3 00:00:00 mythread
6876 6878 pts/3 00:00:00 mythread
7045 7045 pts/4 00:00:00 ps
LWP 意为轻量级进程(Lightweight process),其实就是 Linux 下的线程,它的编号如果与 PID 相等,那么这个线程就是主线程,另外两个线程的 PID 也与主线程相等,因为它们都是在同一个进程里运行的。
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, callback1, (void*)"thread 1");
cout << "new thread id: " << tid << endl;
while (true)
{
cout << "main thread 正在运行..." << endl;
sleep(1);
}
return 0;
}
通过如上代码来查看线程 id
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
new thread id: 140508908504832
main thread 正在运行...
thread 1: 32051
可以看到线程 id 是一个非常大的值。为什么会这么大呢?这个问题我们稍后再谈。
pthread_self
查看线程 id
NAME
pthread_self - obtain ID of the calling thread
SYNOPSIS
#include <pthread.h>
pthread_t pthread_self(void);
Compile and link with -pthread.
线程退出
下面我们来看 pthread_join 的第二个参数 retval
:
它是一个二级指针,是一个输出型参数,因为 start_routine
函数的返回值时 void*,作为一个输出型参数,要想拿到这个返回值,自然就是 void** 类型的。
例子:
void* startRoutine(void* args)
{
string name = (char*)args;
int cnt = 3;
while (cnt--)
{
printTid(name, pthread_self());
sleep(1);
}
cout << "线程退出了..." << endl;
// 1. 线程退出方式,return
return (void*)111;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, startRoutine, (void*)"thread 1");
void* ret = nullptr;
pthread_join(tid, &ret);
cout << "main thread join success, *ret: " << (long long)ret << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
thread 1 正在运行, thread id: 0xe5b0f700
thread 1 正在运行, thread id: 0xe5b0f700
thread 1 正在运行, thread id: 0xe5b0f700
线程退出了...
main thread join success, *ret: 111
线程退出方式2:使用 pthread_exit
NAME
pthread_exit - terminate calling thread
SYNOPSIS
#include <pthread.h>
void pthread_exit(void *retval);
Compile and link with -pthread.
retval
: 退出码
注意:pthread_exit 是专用来退出某个线程的。而 exit 是用来退出整个进程的,任何一个线程调用 exit 都会使整个进程退出。
线程退出方式3:使用 pthread_cancel
NAME
pthread_cancel - send a cancellation request to a thread
SYNOPSIS
#include <pthread.h>
int pthread_cancel(pthread_t thread);
Compile and link with -pthread.
thread
: 线程 id
例子:
新线程跑3秒后使用 pthread_cancel 终止
void* startRoutine(void* args)
{
string name = (char*)args;
while (true)
{
printTid(name, pthread_self());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, startRoutine, (void*)"thread 1");
sleep(3);
pthread_cancel(tid);
void* ret = nullptr;
pthread_join(tid, &ret);
cout << "main thread join success, *ret: " << (long long)ret << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
thread 1 正在运行, thread id: 0x46298700
thread 1 正在运行, thread id: 0x46298700
thread 1 正在运行, thread id: 0x46298700
main thread join success, *ret: -1
使用 pthread_cancel 退出的进程退出码为 -1
线程id
上面我们提到,线程 id 是一个非常大的值,其实这个值是一个地址,该地址处存储了线程的相关信息。这里的线程 id 是用户级别的 id,属于NPTL线程库的范畴,线程库的后续操作,就是根据该线程ID来操作线程的
首先要明确几点
- 线程是一个独立的执行流,这一点我们已经看到了
- 线程一定会在自己的运行过程中,产生临时数据(调用函数,定义局部变量等)
- 线程一定需要有自己的独立的栈结构
在全局变量前面加 __thread
,多个线程对它取地址,得到的地址是不同的
像这样的变量并不是所有线程共享的,它由各线程局部存储。
例子:
__thread int global_value = 100;
void* startRoutine(void* args)
{
while (true)
{
cout << "thread: " << pthread_self() << " global_value: " << global_value << " &global_value: " << &global_value << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 3");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
thread: 140564086306560 global_value: 100 &global_value: 0x7fd7a06eb6fc
thread: 140564077913856 global_value: 100 &global_value: 0x7fd79feea6fc
thread: 140564069521152 global_value: 100 &global_value: 0x7fd79f6e96fc
分离线程
- 默认情况下,新创建的线程是 joinable 的,线程退出后,需要对其进行 pthread_join 操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join 是一种负担(类似于进程等待,主线程会阻塞式地等),这时,我们可以告诉系统,当线程退出时,自动释放线程。
使用 pthread_detach 分离线程
NAME
pthread_detach - detach a thread
SYNOPSIS
#include <pthread.h>
int pthread_detach(pthread_t thread);
Compile and link with -pthread.
// 返回值:成功:返回0,失败:返回错误码
thread
: 要分离的线程 id
例子:
void* startRoutine(void* args)
{
pthread_detach(pthread_self());
cout << "线程已分离" << endl;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 3");
sleep(1);//注意这里的sleep,如果没有这个sleep,主线程可能在其他线程分离之前就把它们给join了
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
线程已分离
线程已分离
线程已分离
22:Invalid argument
22:Invalid argument
22:Invalid argument
可以看到,线程成功分离,之后调用 pthread_join 函数都失败并返回错误码了。
因为执行流很混乱,其实我们更倾向于让主线程分离其他线程:
void* startRoutine(void* args)
{
while (true)
{
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 3");
pthread_detach(tid1);
pthread_detach(tid2);
pthread_detach(tid3);
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
22:Invalid argument
22:Invalid argument
22:Invalid argument
注意:一般我们分离线程,对应的 main thread 不退出。主线程代表了整个进程,主线程退出相当于进程退出,其他线程都会跟着退出。
线程互斥
- 临界资源:多个执行流都能看到并能访问的资源
- 临界区:多个执行流中访问临界资源的代码被称为临界区
- 互斥:当我们访问某种资源的时候,任何时刻都只有一个执行流在进行访问,这就叫做互斥特性
问题:临界资源访问问题
这里补充一点:临界资源的访问可能会出现数据不一致的问题
例如有如下抢票系统:
创建三个线程,让它们都对全局变量减减,表示抢票操作
int tickets = 1000;
void *getTickets(void *args)
{
string name = (char*)args;
while (true)
{
// 临界区
if (tickets > 0)
{
cout << name << "抢到了票,编号为:" << tickets << endl;
--tickets;
}
else
{
cout << name << "抢不到票,票已抢完" << endl;
break;
}
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, getTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*)"thread 3");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
这个程序有一个漏洞,问题在于 --tickets
,在语言层面上,我们觉得它是一条语句,但是在系统层面,它其实要分三步来完成:
- 将 tickets 变量从内存取出到寄存器
- 在 CPU 中进行运算
- 将运算结果写回到内存
如 线程A 完成了第1、2两步,取tickets的值1000在CPU中减减到999,然后正要做第3步,还没把999写回到内存呢,线程就被切换了,线程B 过来就又取到了 1000,然后在CPU中减减到999,写回内存,此时 tickets 在内存中的值是999,而两个线程各取一次,预期值应该是998,二者不符,程序错误。
怎么解决这个问题呢?
很简单,我们只要保证 --tickets
这个操作具有原子性,即要么不做,要么就一次做到底。
即要让程序在执行 --tickets
期间不会被打扰,具体的解决方案就是加锁
问题复现
当面的代码出错的频率不是很高,下面我们多加一个线程,并使用usleep系统调用来增加线程切换的次数。
int tickets = 1000;
void *getTickets(void *args)
{
string name = (char*)args;
while (true)
{
// 临界区
if (tickets > 0)
{
usleep(1000);
cout << name << "抢到了票,编号为:" << tickets << endl;
--tickets;
}
else
{
cout << name << "抢不到票,票已抢完" << endl;
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*)"thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
//...
thread 3抢到了票,编号为:5
thread 4抢到了票,编号为:4
thread 2抢到了票,编号为:3
thread 1抢到了票,编号为:2
thread 3抢到了票,编号为:1
thread 3抢不到票,票已抢完
thread 4抢到了票,编号为:0
thread 4抢不到票,票已抢完
thread 2抢到了票,编号为:-1
thread 2抢不到票,票已抢完
thread 1抢到了票,编号为:-1
thread 1抢不到票,票已抢完
可以看到,运行到最后甚至出现了负数。
问题解决:互斥锁的使用
锁的类型:pthread_mutex_t
锁的初始化:
NAME
pthread_mutex_destroy, pthread_mutex_init - destroy and initialize a mutex
SYNOPSIS
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex); // 锁的销毁
int pthread_mutex_init(pthread_mutex_t *restrict mutex, // 局部锁
const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 全局锁
NAME
pthread_mutex_lock, pthread_mutex_trylock, pthread_mutex_unlock - lock and unlock a mutex
SYNOPSIS
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex); // 加锁,阻塞式
int pthread_mutex_trylock(pthread_mutex_t *mutex); // 加锁,非阻塞式
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 解锁
- 阻塞式,即当前进程需要等待其他线程把这把锁解锁后,再给自己加锁,非阻塞式则是直接查看这把锁是否已释放,若已释放则给自己加锁并返回0,若未释放则返回错误码然后直接继续执行后面的代码。
- 一把锁用完了必须解锁。
下面对上面的代码进行加锁:
int tickets = 1000;
// 创建锁
pthread_mutex_t mutex;
void *getTickets(void *args)
{
string name = (char*)args;
while (true)
{
// 临界区
// 加锁
pthread_mutex_lock(&mutex);
if (tickets > 0)
{
usleep(1000);
cout << name << "抢到了票,编号为:" << tickets << endl;
--tickets;
// 解锁,分支语句往往每个分支都要加,防止跳过导致漏解锁
pthread_mutex_unlock(&mutex);
}
else
{
cout << name << "抢不到票,票已抢完" << endl;
// 解锁
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
// 锁的初始化
pthread_mutex_init(&mutex, nullptr);
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*)"thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
// 销毁锁
pthread_mutex_destroy(&mutex);
return 0;
}
注意要创建锁,然后别忘了写上初始化和销毁。
关于加锁,要注意以下几点:
- 只对临界区加锁,加锁的粒度越小越好
- 加锁的本质是让线程执行临界区的代码串行化
- 加锁是一套规范,对所有临界区,要加就都要加
- 锁保护的是临界区,任何线程执行临界区代码访问临界资源,都必须先申请锁,前提是都必须先看到锁,所以锁本身就是临界资源,那么谁来保护它呢?
- 其实锁不用保护,因为竞争和申请锁的过程本身就是原子的。
RAII 风格的加锁方式
在 C++ 中,我们可以像封装智能指针那样封装锁:
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&lock_, nullptr);
}
void lock()
{
pthread_mutex_lock(&lock_);
}
void unlock()
{
pthread_mutex_unlock(&lock_);
}
~Mutex()
{
pthread_mutex_destroy(&lock_);
}
private:
pthread_mutex_t lock_;
};
class LockGuard
{
public:
LockGuard(Mutex* mutex)
: mutex_(mutex)
{
mutex_->lock();
}
~LockGuard()
{
mutex_->unlock();
}
private:
Mutex* mutex_;
};
例子:
#include <unistd.h>
#include "Lock.hpp"
int tickets = 1000;
// 创建锁
Mutex mutex;
bool getTickets()
{
bool ret = false;
// 加锁
LockGuard lockGuard(&mutex);
if (tickets > 0)
{
usleep(1001);
cout << "thread: " << pthread_self() << " get a ticket" << tickets << endl;
--tickets;
ret = true;
}
return ret;
}
void* startRoutine(void* args)
{
const char* name = static_cast<const char*>(args);
while (true)
{
if (!getTickets()) break;
cout << name << " get tickets success" << endl;
usleep(100);
}
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 3");
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 4");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
C++11 的线程库就是像这样封装了系统接口。
可重入 & 线程安全
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果则称这是线程安全的;常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现线程安全问题。
简单来说:可重入函数可以多线程调用,不可重入函数不可以多线程调用,否则会出现线程安全问题。
常见的线程不安全的情况:
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
- 类或者接口对于线程来说都是原子操作。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。
常见不可重入的情况:
- malloc/free 函数,因为 malloc 函数使用全局链表来管理堆的。
- 标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构。
- 可重入函数体内使用了静态的数据结构。
常见可重入的情况:
- 不使用全局变量或静态变量
- 不使用 malloc 或者 new 开辟的空间
- 调用不可重入函数
- 不返回静态或全局数据,所有数据都由函数的调用者提供
- 使用本地数据,或者通过制作迁居数据的本地拷贝来保护全局数据。
注意:
- C++ STL不是线程安全的,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全
- 智能指针 unique_ptr,由于由于只在当前代码块范围内生效,因此不涉及线程安全问题
- 对于 shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题,说那是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证了 shared_ptr 能够高效的原子的操作引用计数。
死锁的概念
死锁是指一组线程中的各个线程互相申请被其他线程所占用的不会释放的资源而处于一种永久等待的状态。
例子:
让两个线程分别申请A锁和B锁,等待1s后再让它们互相申请已被对方占用的锁。
pthread_mutex_t mutexA = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutexB = PTHREAD_MUTEX_INITIALIZER;
void* startRoutine1(void* args)
{
while (true)
{
pthread_mutex_lock(&mutexA);
sleep(1);
pthread_mutex_lock(&mutexB);
cout << "线程1, tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexA);
pthread_mutex_unlock(&mutexB);
}
}
void* startRoutine2(void* args)
{
while (true)
{
pthread_mutex_lock(&mutexB);
sleep(1);
pthread_mutex_lock(&mutexA);
cout << "线程2, tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexB);
pthread_mutex_unlock(&mutexA);
}
}
int main()
{
pthread_t t1, t2;
pthread_create(&t1, nullptr, startRoutine1, nullptr);
pthread_create(&t2, nullptr, startRoutine2, nullptr);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
运行后程序会一直处于等待状态:
死锁的必要条件:
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能强行剥夺
- 循环等待条件:拖杆执行流之间形成的一种头尾相接的循环等待资源的关系。
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
线程同步
- 同步:在保证临界资源安全的前提下,让线程具有一定顺序性地访问某种资源,从而有效避免饥饿问题,这种机制称为同步。(饥饿问题:指因为互斥可能导致一个执行流长时间得不到某种资源)。
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
条件变量
条件变量是 Linux 中最常用的同步策略。
条件:对应的共享资源的状态。
条件变量:条件满足或不满足的时候,进行 wait 或 signal 的一种方式。
- 当一个线程互斥地访问某个变量时,它可能发现在其他线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,直到其他线程将一个结点添加到队列中,这种情况就需要用到条件变量。
条件变量的类型:pthread_cond_t
条件变量函数:
初始化
// 局部初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
// cond: 要初始化的条件变量
// attr: NULL
// 全局初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
等待
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
// abstime: 可以设定等待时间
通过这个函数也可以知道,条件变量必须和互斥锁一起使用。
唤醒指定线程:
int pthread_cond_signal(pthread_cond_t *cond);
唤醒在 cond
条件变量下等的线程。
int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒在 cond
条件变量下等的所有线程。
例子
创建一个条件变量和一个互斥锁,在下面的例子中,我们没有使用互斥锁,只是作为 pthread_cond_wait
的参数。接下来在 main 函数中创建线程,然后我们每输入一个 n 就唤醒一个线程执行打印。
// 定义一个条件变量和一个互斥锁
pthread_cond_t cond;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* waitCommand(void* args)
{
while (true)
{
pthread_cond_wait(&cond, &mutex); // 让对应的线程等待被唤醒
cout << "thread id: " << pthread_self() << " run..." << endl;
}
}
int main()
{
pthread_cond_init(&cond, nullptr);
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, waitCommand, nullptr);
pthread_create(&t2, nullptr, waitCommand, nullptr);
pthread_create(&t3, nullptr, waitCommand, nullptr);
while (true)
{
char n = 'a';
cout << "请输入你的command(n/q): ";
cin >> n;
if (n == 'n') pthread_cond_signal(&cond); // 让在cond条件变量下等的线程被唤醒
else break;
sleep(1);
}
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_cond_destroy(&cond);
return 0;
}
运行结果:
可以看到,线程是被轮流唤醒的。
为什么 pthread_cond_wait
需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等待下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好地通知等待在条件变量上的线程。
- 条件不会无缘无故突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护,没有互斥锁就无法安全地获取和修改共享数据。
生产者消费者模型
这一模型中有三类对象,消费者,超市,工厂供应商(生产者),生产者提供货物上架超市,消费者从超市购买商品。超市在中间起到了一个缓冲区的效果。
- 消费者有多个,消费者之间需要竞争商品,是竞争关系,也就是互斥关系。
- 生产者有多个,生产者之间需要竞争货架,是竞争关系,也就是互斥关系。
- 消费者和生产者之间既是互斥关系又是同步关系。互斥,即供应商在给超市提供货物的和消费者消费不能同时进行。同步:即供应商供货与消费者消费存在一定的顺序性。
总结:321原则
- 3 种关系:生产者和生产者(互斥),消费者和消费者(互斥)生产者和消费者(互斥且同步)
- 2 种角色:生产者和消费者(都是由线程承担的)
- 1 个交易场所:超市(内存中特定的一种数据结构)
基于 BlockingQueue 的生产者消费者模型
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素。当队列为满时,我那个队列里放入元素的操作也会被阻塞,直到有元素被从队列中取出。这一特性类似于管道。
要判断条件是否成立,必然要访问共享资源,要访问共享资源,必然要先申请锁,当判断条件不成立时,就需要进行等待。但是我们不可能让持有着锁的线程等待,这样就造成死锁了。对于这一点,其实系统接口早就帮我们完善了。
- 通过
pthread_cond_wait
进入等待时,该线程会自动释放该锁,以让其他进程竞争,防止死锁。 - 当线程被重新唤醒时,该线程会自动重新加锁,继续保护共享资源的访问,线程会从等待语句开始继续向后执行。
例子:
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <cstdlib>
#include <pthread.h>
using namespace std;
const uint32_t gDefaultCap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap)
: cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
// 生产接口
void push(const T& in)
{
// 加锁
lockQueue();
// 这里必须使用while而不是if, 保证被唤醒时能够再次进行条件判断
while (isFull())
{
proBlockWait();
}
// 不满,可以生产
pushCore(in);
unlockQueue();
// 唤醒消费者
wakeupCon();
}
// 消费接口
T pop()
{
// 加锁
lockQueue();
// 判断是否为空
while (isEmpty())
{
conBlockWait();
}
// 为空,可以消费
T tmp = popCore();
unlockQueue();
// 唤醒生产者
wakeupPro();
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
void proBlockWait()
{
// 在阻塞线程的时候,会自动释放这把锁
pthread_cond_wait(&proCond_, &mutex_);
}
void conBlockWait()
{
pthread_cond_wait(&conCond_, &mutex_);
}
void wakeupPro()
{
pthread_cond_signal(&proCond_);
}
void wakeupCon()
{
pthread_cond_signal(&conCond_);
}
void pushCore(const T& in)
{
bq_.push(in);
}
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
queue<T> bq_; // 阻塞队列
uint32_t cap_; // 容量
pthread_mutex_t mutex_; // 保护阻塞队列的互斥锁
pthread_cond_t conCond_;// 让消费者等待的条件变量
pthread_cond_t proCond_; // 让生产者等待的条件变量
};
#include "BlockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(args);
while (true)
{
int data = pbq->pop();
cout << "consumer 消费数据完成: " << data << endl;
}
}
void* productor(void* args)
{
BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(args);
while (true)
{
int data = rand() % 10;
pbq->push(data);
cout << "productor 生产数据完成: " << data << endl;
sleep(2);
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
BlockQueue<int> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
生产者消费者模型的优点:
- 解耦
- 支持并发(体现在生产者与消费者对任务的制作和处理,而不是对临界区的存放与取出)
- 支持忙闲不均(指的是制作任务和处理任务)
POSIX 信号量
简介
信号量是一个描述临界资源数量的计数器。其主要有两种操作:
- 自减(P),原子操作,表示申请资源
- 自增(V),原子操作,表示归还资源
临界资源可以被看做一个整体,也可以被分块,多个线程并发访问临界资源的不同区域并不会出现线程安全问题。
如果我们把信号量初始值设为1,那么申请资源就进行P操作,变为0,归还资源对应V操作,变为1,像这样只有01的信号量称作二元信号量。
申请资源也对应了加锁,归还资源对应释放锁,所以二元信号量和互斥锁是等价的。
如果信号量初始值大于1,那么我们就应该保证临界资源被划分成了多个,不同的线程申请的是不同区域的资源。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared: 0表示线程间共享,非0表示进程间共享
// value: 信号量初始值
销毁信号量:
int sem_destroy(sem_t *sem);
等待信号量:
int sem_wait(sem_t *sem);
本质就是如果信号量不为0,那么将信号量的值减1。
发布信号量:
int sem_post(sem_t *sem);
表示资源使用完毕,可以归还资源了,信号量的值加1。
基于环形队列的生产者消费者模型
让环形队列作为临界资源,充当”超市“的角色。
当队列为空或满的时候,生产者和消费者指向的是同一个位置,此时需要互斥+同步。
其他时候,都指向的是不同的位置,可以并发访问。
- 生产者需要的资源是空间,其信号量初始值为n
- 消费者需要的资源是数据,其信号量初始值为0
例子:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 5;
template<class T>
class RingQueue
{
public:
RingQueue(int cap = gCap)
: ringqueue_(cap)
, pIndex_(0)
, cIndex_(0)
{
sem_init(&roomSem_, 0, ringqueue_.size());
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_, nullptr);
pthread_mutex_init(&cmutex_, nullptr);
}
// 生产
void push(const T& in)
{
sem_wait(&roomSem_);
pthread_mutex_lock(&pmutex_); // 加锁,保证生产者和生产者之间的互斥
ringqueue_[pIndex_] = in;
++pIndex_;
pIndex_ %= ringqueue_.size();
pthread_mutex_unlock(&pmutex_);
sem_post(&dataSem_);
}
// 消费
T pop()
{
sem_wait(&dataSem_);
pthread_mutex_lock(&cmutex_); // 加锁,保证消费者和消费者之间的互斥
T temp = ringqueue_[cIndex_];
++cIndex_;
cIndex_ %= ringqueue_.size();
pthread_mutex_unlock(&cmutex_);
sem_post(&roomSem_);
return temp;
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
vector<T> ringqueue_; // 唤醒队列
sem_t roomSem_; // 空间计数器
sem_t dataSem_; // 数据计数器
uint32_t pIndex_; // 当前生产者写入位置
uint32_t cIndex_; // 当前消费者读取位置
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
#include <pthread.h>
void* productor(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>*>(args);
while (true)
{
int data = rand() % 10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << "生产了一个数据: " << data << endl;
sleep(1);
}
}
void* consumer(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>*>(args);
while (true)
{
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "]" << "消费了一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr));
RingQueue<int> rq;
pthread_t c1, c2, c3, p1, p2, p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
线程池
线程池是一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建于销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。
线程池的一个简易实现:
#pragma once
#include <iostream>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
int gThreadNum = 5;
template<class T>
class ThreadPool
{
public:
ThreadPool(int threadNum = gThreadNum)
: isStart_(false)
, threadNum_(threadNum)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
// 类内成员,设置为static以去掉隐含的this指针,this指针只能手动传入。
static void* threadRoutine(void* args)
{
pthread_detach(pthread_self());
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
while (1)
{
tp->lockQueue();
while (!tp->haveTack())
{
tp->waitForTask();
}
T t = tp->pop();
tp->unlockQueue();
t.run(); // 规定:所有的任务都有一个run方法
}
}
// 运行线程池,创建线程
void start()
{
assert(!isStart_);
for (int i = 0; i < threadNum_; ++i)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
// 派发任务
void push(const T& in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool haveTack() { return !taskQueue_.empty(); }
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
void choiceThreadForHeadler() { pthread_cond_signal(&cond_); }
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
private:
bool isStart_;
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};