在我们的进程虚拟地址的代码区,对于代码中的每个函数都有对应的地址,每个函数中的每行代码都有对应的代码,并且每个函数中的每行代码的地址都是连续的。既然代码是连续的,也就意味着我们可以将我们代码分块,分成不同的代码块。划分出不同的代码块之后,我们能否按照其在页表中的连续映射将其分配给不同的进程并行运行呢?
如果可以的话,我们就完成了将一份代码并行运行,提高了运行的效率。
这就是多线程,多线程就可以让我们的代码并行运行起来,本篇从线程的基本概念开始介绍,然后比较了多线程与多进程之间的区别。接着介绍了在 Linux 系统下的线程控制,介绍了在 Linux 系统下使用多线程的函数接口。然后讲解了线程的互斥,需要使用锁互斥,同时揭示了互斥的缺点,然后引入了线程同步,需要使用条件变量来解决。在介绍完条件变量之后,我们又介绍了在多线程中常用的生产消费模型,实现了两种经典的生产消费模型:阻塞队列(使用条件变量和互斥锁完成)和环形队列(使用信号量和互斥锁完成)。同时还介绍了线程安全和可重入以及死锁的概念。
最后结合以上的知识,写了一个单例(懒汉)模式的线程池。
目录
线程的概念
1. Linux 和 windows 下的线程
2. 线程的创建与使用
3. 线程之间共享的数据和独有的数据
多线程和多进程的区别
1. 线程的优点
2. 线程的缺点
线程的控制
1. 线程的等待
2. 线程终止
3. 线程分离
4. 线程的 tid
5. __thread 修饰变量
线程的互斥 -- 线程加锁
1. 线程不安全的原因
2. 线程加锁
3. LockGuard
4. 加锁的原理
线程的同步
1. 条件变量接口
2. 生产消费模型
3. BlockQueue -- 阻塞队列(生产消费模型)
4. POSIX 信号量
5. RingQueue -- 环形队列
可重入与线程安全
死锁
1. 避免死锁
线程池 -- 懒汉模式
线程的概念
线程:在进程的内部运行,是 CPU 调度的基本单位。在 Linux 系统下的线程,是在进程的地址空间中运行。如下图:
在如上的进程中,我们有着多个进程的 PCB(task_struct),对于每个 PCB 都在代码区中分配了不同的代码的起始地址,每个 PCB 都分别运行属于分配自己的代码块,相当于每个线程拿了一部分页表,每个线程使用页表的一部分,最终达到实现并行运行。在没谈线程之前,每个进程的 PCB 都只有一个,现在谈到线程时,对于进程的定义我们可以更深一步:
进程的定义:
内核数据结构 + 进程的代码和数据;
承担分配系统资源的基本实体。(对于资源的分配包括CPU资源的分配,物理内存的分配等等)
以上两种说法都是进程的定义。 在一个进程中会存在多个进程 PCB,对于每个进程 PCB,系统都会给其在代码区分配不同的资源,同时也会表现在一个进程的不同 PCB 用着不同的 CPU 资源。
1. Linux 和 windows 下的线程
操作系统对于线程的设计,需要包括到在线程中的新建、暂停、销毁、调度等等的功能,也就意味着操作系统需要将我们的线程给管理起来,就需要设计出一个独立的数据结构,其中包括线程的 id、优先级、状态、连接属性等等(进程为 PCB,线程会 TCB),然而在我们的 windows 系统下就设计出来这样的结构(当 windows 下运行多线程的时候 CPU 在连接进程的同时,还需要连接连接在进程后面的线程,设计这样的结构的同时还需要单独的设计出对应的管理调度算法)。
然而在 Linux 系统下,由于在线程中的属性在进程中也存在,所以在 Linux 系统下就没有单独的设计出 TCB 的数据结构,而是沿用了 PCB 的数据结构,这样的设计还节省了调度算法的设计,TCB 与 PCB 使用同一套调度算法,减轻了线程的调度复杂度。所以在 Linux 系统中对于线程的说法,也可以叫做轻量级进程。
所以联系到线程的概念,线程是 CPU 调度的基本单位,在 Linux 中 CPU 调度线程的时候,根本就不会在意是线程还是进程,只会在意是当前调度的一个基本单位。
2. 线程的创建与使用
接下来介绍关于在 linux 系统下创建线程的函数,如下:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); pthread_t *thread 表示需要传入一个pthread_t 的参数,一个输出型参数 const pthread_attr_t *attr 线程的属性,通常传入 nullptr void *(*start_routine) (void *) 创建出的线程需要执行的方法 void *arg 传入方法的参数
当使用如上系统调用之后,主进程会继续向下运行,新线程会继续向下运行。另外在我们的 linux 下使用该系统调用需要连接动态库 pthread,使用如下代码测试创建出的多线程,如下:
#include <iostream> #include <pthread.h> #include <unistd.h> void* StartNewThread(void* args) { const char* name = static_cast<const char*>(args); while (true) { sleep(1); std::cout << name << " running..." << ", pid:" << getpid() << std::endl; } } int main() { pthread_t tid; pthread_create(&tid, nullptr, StartNewThread, (void*)"new thread"); while (true) { std::cout << "main thread running..." << ", pid:" << getpid() << std::endl; sleep(1); } return 0; }
测试结果如下:
当我们运行的时候,主线程和新线程的 pid 一模一样,然后我们使用指令 ps -aL 查询当前的执行流,我们可以看到一个关键字 LWP(light weight process 轻量级进程),其中 pid 和 lwp 相等的为主线程。所以在我们的操作系统中,操作系统调度的时候,看的执行流的 LWP,看的不是执行流的 pid,因为多线程的 pid 一样。
对于传入的方法 void *(*start_routine) (void *) 其中传入的参数为 void *arg ,这个参数的类型为一个 void* 的一个指针,说明我们可以对这个传入任意类型的指针,传入之后在方法中又强转回来,如下:
pthread_t tid; void func() { // .... } pthread_create(&tid, nullptr, StartNewThread, (void*)&func); pthread_create(&tid, nullptr, StartNewThread, (void*)"new thread"); int a = 0; pthread_create(&tid, nullptr, StartNewThread, (void*)&a); std::vector<std::string> vs; pthread_create(&tid, nullptr, StartNewThread, (void*)&vs);
所以我们可以根据这个 void* 类型的指针传入任意的方法或者类型给我们的线程。
3. 线程之间共享的数据和独有的数据
多线程拥有同一个地址空间和页表,也就意味着多线程的大部分数据都是共享的,当让也存在部分不是共享的数据。
共享的数据如下:
1. 文本段和数据段都是共享的,因为共享同一个地址空间,如定义了一个函数,在各个线程中都可以进行调用。
2. 文件描述符表。在每个线程中使用打印函数都可以打印到屏幕上,就是因为共享了文件描述符表。
3. 各种信号的处理方式(SIG_IGN、SIG_DFL、自定义方法)
4. 当前工作目录
5. 用户 id 和 组 id
不共享的数据:
1. 线程 ID 也即是 LWP
2. 一组寄存器。每个线程的一组寄存器对应着该线程执行的上下文数据,切换前执行到哪里,有着哪些数据。
3. 栈。线程在执行时产生的临时变量都会保存到自己的栈结构中。
4. erron 错误码
5. 信号屏蔽字
6. 调度的优先级。
多线程和多进程的区别
多线程和多进程都可以实现并行执行任务,那么多线程和多进程之间有哪些区别呢?
1. 进程创建成本非常高,一个进程的创建需要构建地址空间、页表,加载代码和数据以及建立映射关系,还有建立信号系统等等;然而创建线程只需要在原来的进程上创建出一个 pcb,以及将对应的代码和数据关联起来就可以,所以线程的创建相对于进程的创建的成本非常小。 —— 启动
2. 线程的调度成本低,对于线程的调度只需要切换相关的 pcb,对于进程而言则不是这样的,进程的切换需要先保存寄存器中的数据,然后切换其他线程地址空间和页表。(其实地址空间和页表的也换也仅仅只是几个寄存器进行切换,花的时间不是很多。真正耗时的为:在 CPU 内有着缓存 Cache,会将访问数据的附近数据也给加载进来,当我们切换线程的时候,很可能 Cache 中加载的数据还可以用得到,但是假设我们切换了进程, CPU 内 Cache 数据就作废了,就需要重新加载另一个进程的数据,这里才是真正耗时的)—— 运行
3. 删除一个线程的成本低,删除一个线程只需要删除对应的 pcb,而删除一个进程需要将进程的代码数据,地址空间,页表等等全都释放。 —— 删除
4. 多线程的安全性低于多进程,当多线程的代码发送错误的时候,操作系统会向进程 pid 发送信号,但是所有线程的 pid 都是一样的,也就意味着,一个线程出错,所有的线程都会被进程发送来的信号给杀掉,所有其他的任务都崩掉,正常的任务也没了。然而在多进程执行任务的时候,崩掉也只是崩掉一个,崩掉一个任务。
1. 线程的优点
以上对比的是线程和线程之间的区别,现在总结一下线程的优点:
1. 创建线程的代价小、线程占用的资源相对较少;
2. 能够充分利用多处理器的可并行数量(多进程也可以);
3. 等待慢速 I/O 操作结束的同时,程序可执行其他的计算任务;
4. 计算密集型应用,为了能在多处理器上运行,将计算分解到多个线程中实现(对于多线程的创建,最好根据 CPU 的核数来创建,线程并不是创建得越多越好,若 CPU 的核的数量少于线程数,有些线程还是需要串行运行,并且还增加切换线程的时间)
5. I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。(对于 I/O 密集型则可以允许创建多线程,比如我们需要下载10G的一个应用,我们则可以创建出10个线程,每个线程下载一个G,线程等待下载的时间可以重叠,则可以加快我们的下载速度)
2. 线程的缺点
线程既有优点,也会存在缺点:
1. 线程的健壮性低。多线程中的线程只要一个出现问题,那么所有的线程都可能会受到影响。
2. 缺少访问控制。对于一个进程中的大部分数据,线程都可以看见,也就意味着线程可以更改其他线程需要的数据,这样的访问机制不是很安全。
3. 性能缺失。当创建出的线程数大于 CPU 核数,就会导致有些线程处于串行运行状态,反而增加了运行的成本。
线程的控制
在 Linux 系统中没有真正意义上的线程(没有 tcb),只有轻量级进程,所以对于线程提供的线程控制,和线程的定制的标准的线程库有区别,Linux 向上提供的是轻量级进程的接口。所以为了将轻量级进程的接口和标准的线程库相符,Linux 向上提供了一个 pthread 动态库,对轻量级进程进行封装,所以这也是为什么我们在调用这些接口的时候,需要链接 pthread 库。
1. 线程的等待
当我们的线程任务执行结束之后,我们还是需要将对应的线程回收。所以这个时候就需要主线程使用系统调用 pthread_join 回收我们的线程,如下:
int pthread_join(pthread_t thread, void **retval); thread 表示线程id,我们需要等待的线程 retval 线程执行函数的返回值,不关心可以设置为nullptr 返回值为0表示我们的等待成功,若为其他值则表示等待失败
当主进程使用该系统调用等待我们的线程的时候,线程只要没有退出,主线程就会一直阻塞的等待我们的线程。(对于线程的等待,其实不管是否等待,等到主线程结束的时候,其余的线程同时会被释放,但是这样的作法是非常不推荐的。若不等待线程,可能造成线程还未执行完他的任务的时候就已经退出的情况,因为主线程可能提前退出;还有可能会照成类似僵尸进程的情况,线程执行任务结束之后,主线程未将其回收且主进程未退出,操作系统会将线程的 PCB 一直维护起来)
如下的线程等待代码:
void* StartNewThread(void* args) { const char* name = static_cast<const char*>(args); int cnt = 5; while (cnt--) { std::cout << name << " running..." << ", pid:" << getpid() << std::endl; sleep(1); } return (void*)name; } int main() { pthread_t tid; pthread_create(&tid, nullptr, StartNewThread, (void*)"new thread"); void* ret = nullptr; int n = pthread_join(tid, &ret); if (n == 0) std::cout << "wait thread success..." << std::endl; std::cout << static_cast<const char*>(ret) << std::endl; return 0; }
如上的代码,当我们想要获取对应线程执行函数的返回值,就可以使用一个 void* 类型的变量取地址去接收它(也可以使用其他变量指针强转成 void** 类型),若我们不关心则直接传入 nullptr。
2. 线程终止
线程的终止可以是线程执行函数直接 return,或者是主线程直接退出了。这些退出方式都是一些自然的退出方式,若想要让进程直接了当的退出,该如何退出呢?
我们可以使用函数 exit,当然这个函数需要谨慎使用,因为只要在线程执行函数中使用该函数就会使得整个进程退出,所以一般我们不适用该函数。
可以使用专门的接口 pthread_exit,如下:
void pthread_exit(void *retval); 函数中的参数就是线程返回函数的返回值,可以设置为nullptr
我们只需要在线程执行函数中调用该接口,就可以使得线程直接退出。
还存在一种线程退出方式,使用系统调用接口 pthread_cancel,在主线程中使用该函数之后可以直接将对应的线程给取消,如下:
int pthread_cancel(pthread_t thread); 传入线程的id 当使用该函数将线程取消之后,线程的返回值为:0 PTHREAD_CANCELED -> #define PTHREAD_CANCELED ((void *) -1)
在主线程中使用该函数可以直接终止线程。
3. 线程分离
我们将线程创建出来之后,可不可以不等待我们的线程,让线程结束执行完线程函数之后直接退出的呢?
我们只需要将我们的线程分离,就可以达到这样的目的,只需要使用我们的系统调用函数 pthread_detach,如下:
int pthread_detach(pthread_t thread); 参数传入线程id pthread_t pthread_self(void); 哪个线程调用,就返回哪个线程的id
当我们使用函数将线程分离之后,我们就不用在等待我们的线程了。若我们还是使用 pthread_join 等待我们的函数,就会等待失败,其返回值就不会为 0。但是我们在使用线程分离函数的时候需要注意:要保证主线程在其他线程结束之后在退出,否则会导致有的线程还未执行结束,就被迫退出。但是我们还是需要注意即使线程分离,我们的线程还是在进程内部,当某个线程出现错误,还是会导致进程接收信号而终止。
4. 线程的 tid
不管是使用系统调用创建我们的线程,还是 join 等待我们的线程,我们都需要将我们的线程 id 传入系统调用函数中,那么线程 id 到底是什么呢?如下图:
当我们获取到新线程的 tid 和新线程的 lwp 时,我们会发现它们其实并不一样。所以 OS 给用户提供的线程 id,不是在内核中的 LWP,而是 pthread 库维护的一个值(在库中维护不一定代表在库中开辟空间)。
但是从将线程 id 转换为 16 进制之后,我们不难发现线程 id 其实是一个地址。在我们运行程序的时候,先将程序的地址空间页表等等之类的构建好,同时将磁盘中的二进制文件映射到我们的物理内存中。当我们开始运行该进程的时候,进程运行到使用 pthead 库的时候,就会在磁盘中将 pthread 库映射到物理内存中,然后通过页表将其映射到虚拟地址的堆栈之中。接着就可以跳转到堆栈之中执行我们的 pthread 的库函数了。
在 OS 内核中,只会关心轻量级进程,也就是线程的 LWP,在我们程序员的角度只会关心我们创建出的线程的 tid,也就是说:在 OS 中根本没有线程 id 这一概念。如下图:
当我们在进程中创建一个线程之后,就会在 pthread 库中创建一个对应的 struct pthread其中会包含线程在用户最基本的属性,以及线程的独立的栈结构,创建出的 struct pthread 控制块存储在我们的的栈区(线程控制块中申请的一个大小合适的内存空间)和堆区之间(用户空间中),所以未来我们想要访问 pthread 控制块,只需要找到其地址,所以对于线程 id 来说,就是线程控制块的地址。
另外对于 pthread_join 接口而言,假若线程未分离,执行完任务结束之后,库还不会将线程对应的线程控制块释放,只有当使用 pthread_join 接口之后,库才会线程运行任务的返回值返回给 pthread_join 中的参数,然后在将对应的线程控制块释放。
在操作系统层面的线程 LWP,用于操作系统对线程进行调度的一个标识,而在库中维护的线程 id,以及线程的其他的属性方法,是对于用户层面的,方便用户进行使用。所以总得来说:Linux 线程 = pthread库中线程的属性集 + LWP。
5. __thread 修饰变量
我们创建出的线程也是可以共享着整个进程的代码和数据的,所以对于在代码中的全局变量,只要一个线程进行了修改,对于其他线程而言都被修改,但是假若我们想要让这种全局变量资源每一个线程一份,我们可以使用关键字 __thread 修饰这个变量(该关键字只在 Linux 中管用,并且只能用于修饰内置类型),这样就使得每个线程都可以拥有一个独有的变量,其原理就是在每个线程控制块的线程局部存储中单独形成一个变量。如下:
如上图所示,在代码中定义的 gval 的值和地址都不一样(在线程中修改了 gval,主线程中未修改)。
线程的互斥 -- 线程加锁
对于多个线程而言,可以轻易的看到同一份资源(因为本身就是在一个进程之内),也就是共享资源。但是既然存在共享资源,当不同的线程对同一份资源进行访问的时候,有的对资源进行写入,有的对资源进行拿出,这个时候我们就需要对资源进行保护,防止一个线程还没写完另一个线程就将数据读出。
其中对于共享资源保护的方法有互斥和同步,本段主要主要介绍的是互斥。如下:
代码:
Thread.hpp
#include <iostream> #include <functional> #include <string> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cerrno> using func_t = std::function<void(const std::string& name)>; // typedef void*(*func_t)(void*); const pthread_t ctid = -1; class Thread { private: void excute() { std::cout << _name << " begin to run" << std::endl; _isrunning = true; _func(_name); _isrunning = false; } static void* ThreadRoutine(void* args) { Thread* self = static_cast<Thread*>(args); self->excute(); return nullptr; } public: Thread(func_t func, const std::string& name) : _func(func), _isrunning(false), _tid(ctid), _name(name) {} ~Thread() {} void Start() { // 创建之后就开始运行了 int n = pthread_create(&_tid, nullptr, ThreadRoutine, (void*)this); if (n != 0) { std::cout << "thread create failed!!!" << std::endl; exit(1); } } void Stop() { // 将线程暂停,使用 if (_isrunning == false) return; std::cout << _name << " stop " << std::endl; int n = ::pthread_cancel(_tid); if (n != 0) { std::cout << "thread stop failed" << std::endl; } _isrunning = false; } void Join() { // 线程等待, if (_isrunning) return; int n = pthread_join(_tid, nullptr); if (n != 0) { std::cout << "thread wait failed!!!" << strerror(errno) << std::endl; } std::cout << _name << " join " << std::endl; } std::string Status() { if (_isrunning) return "running"; else return "sleep"; } private: pthread_t _tid; func_t _func; bool _isrunning; std::string _name; };
main.cc
#include "Thread.hpp" #include <vector> int gtickets = 10000; void BuyTickets(const std::string& name) { while (true) { if (gtickets > 0) { std::cout << "who: " << name << ", get a ticket:" << gtickets << std::endl; gtickets--; usleep(1000); // 1ms 表示抢票时间 } else { break; } } } int main() { Thread t1(BuyTickets, "thread-1"); Thread t2(BuyTickets, "thread-2"); Thread t3(BuyTickets, "thread-3"); Thread t4(BuyTickets, "thread-4"); t1.Start(), t2.Start(), t3.Start(), t4.Start(); t1.Join(), t2.Join(), t3.Join(), t4.Join(); return 0; }
当我们写了一个多线程抢票的代码,其中有的线程抢票出现抢到负数的情况。但是在代码中我们也给出了只有当剩余票数大于 0 的时候线程才可以继续进行抢票,这是为什么呢?这就是因为线程不安全。
1. 线程不安全的原因
以上面的抢票逻辑为例,只有当执行 if (gtickets > 0) 这条判断语句通过时,才会进行我们的抢票,而在我们的 CPU 当中,有着逻辑运算和算术运算两种计算方式。对于执行 if (gtickets > 0) 这条逻辑运算的时候,会先将 gtickets 加载到 CPU 内的寄存器中,然后将需要比较的数存放到另一个寄存器中,然后通过 CPU 内的 CU 计算得出结果,然后放到另一个寄存器中(算术运行过程和这个过程相仿),也就意味着不管是逻辑运算还是算术运算在 CPU 层面,在汇编层面,一条语句都可能被拆分成多条汇编语句。
当多线程运行抢票逻辑的时候,每个线程都会执行这个 if (gtickets > 0) 这个判断语句。但是在单核 CPU 内,寄存器只有一套,而寄存器里面的数据可以有多套(切换线程 tcb 的时候会将 CPU 数据存储到 tcb 中),既然存在这样的逻辑,那么就很有可能当某个进程刚在执行 if (gtickets > 0) 语句,将 gtickets 和 需要比较的值放入到寄存器之后,就被切换,当下一次被切换回来的时候,再一次将存储到 tcb 中寄存器的值存放会寄存器中,这个时候即使在磁盘中的 gtickets 变量已经被修改到小于 0,但还是会进入判断语句之中。当通过判断 if (gtickets > 0) 时,进入新的代码,遇到 gtickets-- 代码的时候,会重新从内存中读取数据,然后运算,然后写回数据,所以这就会导致票数出现负数的情况。所以这个时候就会出现票数小于 0 的情况。如下:
2. 线程加锁
对于如上的线程安全问题,其中最常用的解决方法就是给贡献资源加锁。对共享资源加锁,就是在任意时刻只允许一个线程访问共享资源。在 Linux 系统中,pthread_mutex_t 就是我们锁的类型。在使用锁之前需要对我们的锁进行初始化,如下:
pthread_mutex_t 互斥锁类型 int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); // 栈上开辟的,或者new出来的,就需要使用这个函数,第二个参数通常设置为nullptr; // 使用如上方法初始化的锁,还需要使用如下函数释放我们的锁: int pthread_mutex_destroy(pthread_mutex_t *mutex); // 如上两个系统调用调用成功返回值为 0。 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 用于全局锁或者静态锁的初始化,使用这个方法初始化的锁不需要进行释放
以上只是对于锁的初始化,若真正要达到对共享资源进行加锁,还需要使用如下函数:
int pthread_mutex_lock(pthread_mutex_t *mutex); // 对我们的共享资源进行加锁 // 每个线程进行加锁的时候,若抢到了锁就会向下运行,若没有抢到锁就会阻塞在这里 int pthread_mutex_unlock(pthread_mutex_t *mutex); // 共享资源访问结束之后,需要将我们的锁给释放掉,否则其他线程就不能访问到共享资源 int pthread_mutex_trylock(pthread_mutex_t *mutex); // 当抢锁失败之后就会出错返回,不会阻塞 // 以上系统调用都是调用成功的时候返回值为0
使用以上的加锁方式,我们就可以对我们的共享资源进行加锁了(共享资源也被称为临界资源,访问临界资源的代码被我们称为临界区。所以对共享资源的访问就是对临界资源的访问,也就是访问临界区。同时对临界资源进行保护就是对临界区代码进行保护。)。
所以对于共享资源(临界区)的加锁,可以理解为下图:
如下所示的加锁方式:
pthread_mutex_t gmtx = PTHREAD_MUTEX_INITIALIZER; int gtickets = 10000; void BuyTickets(const std::string& name) { while (true) { // 全局锁 pthread_mutex_lock(&gmtx); if (gtickets > 0) { std::cout << "who: " << name << ", get a ticket:" << gtickets << std::endl; gtickets--; usleep(1000); // 1ms 表示抢票时间 // 全局锁 pthread_mutex_unlock(&gmtx); } else { // 全局锁 pthread_mutex_unlock(&gmtx); break; } } }
以上为一种使用全局锁的方式,也可以使用局部申请的锁,不过需要注意要初始化和销毁。在我们使用锁的时候需要注意以下几点:
1. 对于加锁的范围,粒度一定要尽量的小。也就是串行部分的代码在保证能保护临界资源的前提下加锁范围尽可能小,执行的效率才会更高。
2. 任何线程在进行访问临界资源的时候,都得申请锁,原则上不应该有例外。
3. 对于线程申请的锁,前提是所有线程都得看到这把锁,所以对于这把锁而言,也是共享资源。由于加锁的过程是原子的,所以不会存在线程安全的问题。
4. 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成,没有正在完成这一个状态。(所以对于要访问临界区代码的线程而言,要么申请了锁,要么没有申请锁被阻塞了,所以当一个线程抢到锁之后,相对于其他线程而言就是原子的)
5. 对于线程锁的申请,申请失败就会被阻塞,直到另一个线程解锁然后抢到这把锁才解除阻塞;线程申请锁一旦成功,就会先后继续运行。
6. 线程锁申请成功的代码,在执行临界区代码的线程,也可以被切换,因为线程的切换可能出现在任意时刻。即使该线程被切换了,其他线程也不会访问临界区的资源,因为他们并没有抢到锁。
3. LockGuard
对于如上对代码的加锁,需要我们手动的进行加锁和解锁,而且在不同的作用域比较麻烦,所以这个时候我们就可以写一个类来封装我们的锁,只需要声明了该类就可以直接加锁,出来定义域折后就可以自动的解锁了,如下:
LockGuard.hpp
#pragma once #include <iostream> #include <pthread.h> class LockGuard { public: LockGuard(pthread_mutex_t* mtx) : _mtx(mtx) { pthread_mutex_lock(_mtx); } ~LockGuard() { pthread_mutex_unlock(_mtx); } private: pthread_mutex_t* _mtx; };
对于我们原来抢票的代码就可以修改为:
pthread_mutex_t gmtx = PTHREAD_MUTEX_INITIALIZER; int gtickets = 1000; void BuyTickets(const std::string& name, pthread_mutex_t* lock) { while (true) { // 全局锁 // pthread_mutex_lock(&gmtx); // pthread_mutex_lock(lock); // 使用 lockguard 封装 LockGuard lockguard(lock); if (gtickets > 0) { std::cout << "who: " << name << ", get a ticket:" << gtickets << std::endl; gtickets--; usleep(1000); // 1ms 表示抢票时间 // 全局锁 // pthread_mutex_unlock(&gmtx); // 局部锁 // pthread_mutex_unlock(lock); } else { // 全局锁 // pthread_mutex_unlock(&gmtx); // 局部锁 // pthread_mutex_unlock(lock); break; } } }
这种锁的风格为:RAII 风格的锁
4. 加锁的原理
在上文中我们已经提过,当使用 pthread_mutex_lock 函数申请锁的时候,申请成功则返回执行临界区的代码,申请失败则会阻塞在函数中,只有当锁被另一个线程释放的时候,该线程才有可能在 pthread_mutex_lock 函数内部被唤醒。同时锁也作为临界资源,也会被所有线程共享,说明在 pthread_mutex_lock 函数中抢锁的过程就是原子的。
为了实现互斥锁操作,大多数体系结构都提供了 swap 或者 exchange 指令(这些指令是原子的,是直接在硬件层面实现的指令),该指令就是将寄存器和内存单元的数据交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也是有先后,一个处理器上的交换指令执行时一个处理器的交换指令只能等待总线周期。
那么锁是如何实现的呢?如下:
如上图所示,当我们进入 lock 函数之后,会先将寄存器 al 赋值为 0 ,然后使用指令交换 mutex 内的值,所以 mutex 被交换之后一定为 0,而只要 al 得到为 1,就会继续向下运行,若没有得到则被在 lock 函数中被阻塞,即使拿到 al 值为 1 的线程被切换了,也会将 al 的值一起带走,而其他线程的 al 与 mutex 交换之后 al 的值还是会为 0。
对于 unlock 函数而言,会将 mutex 中的值恢复为 1,然后唤醒所有被挂起的线程,让再一次去抢 mutex(支撑上面 mutex 的值始终只有一个线程拿到的前提的是:把数据从内存中移动到 CPU 寄存器内,本质是把数据从共享变成线程私有,所以对于 mutex 而言,总会被某一个线程给私有)。
线程的同步
上文中已经提到,对于线程同步和线程互斥都是对临界资源保护的手段。但是对于线程互斥而言很可能会出现某个线程一直占用锁,而其他线程却一直被阻塞的情况。这样的情况虽然是在规则之内,但是很不合理,效率很低。所以这个时候就出现了线程同步,也就是在线程安全的前提下,各个线程访问临界资源具有一定的顺序性。对于顺序性来说,可以是严格的顺序(各个线程的顺序已经规定),也可以是相对的顺序(各个线程的顺序没有被严格规定,但是还是会让每个线程都能很好的访问临界资源)。
我们实现同步依靠条件变量实现的,线程同步既然要实现线程的顺序性,也就是让线程排队,所以条件变量就会维护着一个线程队列。维护队列的同时,当一个线程访问临界资源结束之后,还需要通知在排队的线程我已经访问结束。若想要实现严格的顺序性,那么通知就是只通知排在最前面的线程,若只实现相对的顺序性,那么就唤醒所有的线程。
1. 条件变量接口
使用条件变量的接口如下:
pthread_cond_t // 条件变量的类型 int pthread_cond_destroy(pthread_cond_t *cond); int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); // 局部条件变量的初始化与销毁 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 全局条件变量的初始化 int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); // 条件变量的等待系统调用,需要传入的参数为条件变量和锁 // 唤醒等待的线程 int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有在等待的线程 int pthread_cond_signal(pthread_cond_t *cond); // 唤醒最近在等待的线程 // 以上系统调用调用成功的返回值为 0
如上的条件变量接口,特别是初始化和销毁的使用方法和锁的初始化销毁一模一样。而线程同步中需要其他的线程进行等待,所以就需要使用 pthread_cond_wait 接口,当某个线程结束之后,让其他线程去访问资源前,需要将其唤醒,有唤醒排队最近的一个系统调用,也有唤醒所有等待线程的系统调用。
根据以上接口,写了一个测试代码,让主线程不断唤醒其他线程打印消息,而且是按照一定的顺序打印消息,如下:
#include <iostream> #include <string> #include <vector> #include <unistd.h> #include <pthread.h> const int threadnum = 5; pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; void* WaitRun(void* args) { std::string name = static_cast<const char*>(args); // 使用条件变量,使得线程按照顺序打印 while (true) { pthread_mutex_lock(&gmutex); pthread_cond_wait(&gcond, &gmutex); std::cout << "I am a thread, name: " << name << std::endl; usleep(10000); // pthread_cond_signal(&gcond); pthread_mutex_unlock(&gmutex); } return nullptr; } int main() { std::vector<pthread_t> threads; for (int i = 1; i <= threadnum; i++) { std::string* name = new std::string; *name += "thread-" + std::to_string(i); pthread_t tid; pthread_create(&tid, nullptr, WaitRun, (void*)name->c_str()); threads.push_back(tid); } while (true) { // 在主线程中不断的唤醒其他线程 pthread_cond_signal(&gcond); sleep(1); } for (auto& t : threads) pthread_join(t, nullptr); return 0; }
测试如下:
2. 生产消费模型
提出生产消费模型就是为了提高在线程运行时的效率,那么这个效率是如何实现的呢?
在我们的通常的线程使用中,存在生产任务的生产者线程,也存在解决任务的消费者线程,则在这两者之间是以内存空间为枢纽的;生产者生产出任务然后将其放在内存空间中,消费者从内存空间中拿出数据然后进行执行;同时生产者的生产任务和消费者的解决任务其实并不矛盾,他们可以并发执行,且互不干扰(生产者和消费者之间进行了解耦),他们一个写入内存,一个从内存中拿出数据,协调忙闲不均,最终就可以达到提高效率的功能。
对于以上生产者消费者模型就可以总结为(321原则):
1. 一个交易场所(特定数据结构形式存在的一段内存空间,可以是栈,队列数组,共享内存等等)
2. 两种角色(生产者、消费者)生产线程和消费线程
3. 三种关系(生产者和生产者、消费者和消费者、生产者和消费者),对于前两种关系为互斥关系,他们会出现竞争的情况。而对于生产者和消费者而言是互斥、同步的关系,生产者在向交易场所生产的时候,没有生产完是不允许消费者拿出数据的(这里表现为互斥),但是当消费者消费完数据之后,还要立刻通知生产者生产数据,生产者生产数据完成,要立刻通知消费者来消费(这里表现为同步)。
我们假若要实现生产消费模型,本质就是通过代码来实现321原则,实现其中的三种关系就依靠锁和条件变量
3. BlockQueue -- 阻塞队列(生产消费模型)
在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构。其与普通队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入元素;当队列满时,往队列中拿出元素的操作也会被阻塞,直到铀元素被从队列中取出(前文写的管道通信其实就是一种阻塞队列的实现,不管匿名管道还是命名管道:Linux中进程间通信--匿名管道和命名管道-CSDN博客)。如下图:
一个关于 BlockQueue 的测试代码,其中在 BlockQueue.hpp 中实现的是阻塞队列的主逻辑,在 Task.hpp 中实现的是一个我们需要执行的一个任务的类,在 main.cc 中则是主要调用阻塞队列,代码如下:
BlockQueue.hpp:
#pragma once #include <iostream> #include <queue> #include <functional> #include <pthread.h> const int defaultcap = 5; template <typename T> class BlockQueue { private: bool isFull() { return _blockqueue.size() == _max_cap; } bool isEmpty() { return _blockqueue.empty(); } public: BlockQueue(int cap = defaultcap) : _max_cap(cap) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_p_cond, nullptr); pthread_cond_init(&_c_cond, nullptr); } void Pop(T* out) { pthread_mutex_lock(&_mutex); while (isEmpty()) { // 这个时刻容量为空,不能在消费了,必须等待 // 调用wait函数的时候,处理让自己在这里排队等待,还会释放锁,让其他线程去抢锁 // 当被唤醒的时候,还是会在wait函数中竞争锁,只有重新抢到锁wait函数才会返回 // 添加尚未满足,但是线程被异常唤醒的情况,我们称其为伪唤醒 pthread_cond_wait(&_c_cond, &_mutex); } // 然后从队列中拿出数据 *out = _blockqueue.front(); _blockqueue.pop(); // 不管先解锁还是先唤醒都是一样,都可以达到目的 pthread_mutex_unlock(&_mutex); pthread_cond_signal(&_p_cond); } void Push(const T& in) { // 向内装入数据的时候需要加锁 pthread_mutex_lock(&_mutex); // 这里使用循环是防止唤醒的时候使用的broadcast函数,或者signal函数调用失败 // 使用该函数之后被唤醒且没有抢到锁的线程会阻塞在抢锁这个位置 // 当一个刚被唤醒且抢到锁的线程执行结束准备释放锁的时候 // 会立即被被阻塞在锁这个位置的生产者抢到,这样很可能导致生产的数量 // 超过容量 while (isFull()) { // 队列满,需要将生产者阻塞,然后唤醒消费者 pthread_cond_wait(&_p_cond, &_mutex); } // 现在将我们的任务放入到队列中 _blockqueue.push(in); // 先解锁,然后在唤醒,这样就可以在唤醒之后立刻就拿到锁 pthread_mutex_unlock(&_mutex); pthread_cond_signal(&_c_cond); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_p_cond); pthread_cond_destroy(&_c_cond); } private: pthread_mutex_t _mutex; // 锁 pthread_cond_t _p_cond; // 生产者的条件变量 pthread_cond_t _c_cond; // 消费者的条件变量 // 外面不断向内写入数据,内部将数据 std::queue<T> _blockqueue; // 临界资源 int _max_cap; // 阻塞队列的容量 };
Task.hpp:
#pragma once class Task { public: Task() {} Task(int x, int y) : _x(x), _y(y) {} int Excute() { return _x + _y; } private: int _x; int _y; };
main.cc:
#include "BlockQueue.hpp" #include "Task.hpp" #include <functional> #include <ctime> #include <unistd.h> using func_t = Task; int add(int x, int y) { return x + y; } void* Consumer(void* args) { BlockQueue<func_t>* bq = static_cast<BlockQueue<func_t>*>(args); while (true) { // 1. 获取数据 func_t func; bq->Pop(&func); // 2. 获取数据之后处理数据,将传入的数据 std::cout << "consumer -> " << func.Excute() << std::endl; } } void* Productor(void* args) { srand(time(nullptr) ^ getpid()); BlockQueue<func_t>* bq = static_cast<BlockQueue<func_t>*>(args); while (true) { // // 1. 构建数据 int x = rand() % 10 + 1, y = rand() % 10 + 1; func_t func(x, y); std::cout << "productor -> x: " << x << " y: " << y << std::endl; // // 2. 生产数据 // bq->Push(task); bq->Push(func); sleep(1); } } int main() { pthread_t c1, c2, p1, p2, p3; BlockQueue<func_t>* bq = new BlockQueue<func_t>(); pthread_create(&c1, nullptr, Consumer, (void*)bq); pthread_create(&c2, nullptr, Consumer, (void*)bq); pthread_create(&p1, nullptr, Productor, (void*)bq); pthread_create(&p2, nullptr, Productor, (void*)bq); pthread_create(&p3, nullptr, Productor, (void*)bq); pthread_join(c1, nullptr); pthread_join(c2, nullptr); pthread_join(p1, nullptr); pthread_join(p2, nullptr); pthread_join(p3, nullptr); return 0; }
测试结果如下:
对于以上编码,还存在几个细节。
为什么我们调用 pthread_cond_wait 函数不仅仅需要传入环境变量还需要传入锁?
这是因为当我们将某个线程在访问共享资源的同时,若这个时候需要让我们的线程进入等待状态,就必须将我们的锁给释放,要不然在等待的线程一直持有锁,其他的线程就不可以访问临界资源,所以该函数让线程等待的同时,将锁释放了,当唤醒线程之后,会在该函数中重新进行竞争锁,当竞争成功之后就会继续向下执行,竞争失败就会继续在等待阶段。
为什么我们在判断阻塞队列中是否需要加入元素或者拿出元素的时候,需要使用 while 循环来判断是否当前队列为满还是为空?
若我们唤醒线程使用的 pthread_cond_broadcast 函数,那么就会唤醒在等待的所有线程,然后被唤醒的线程就会在 pthread_cond_wait 中进行竞争锁,不会在继续呆在等待阶段,而是阻塞在抢锁阶段,这时假若有一个线程拿到了锁向下运行,然后消耗了一个资源(刚好消耗了这个资源后,资源为空),接着释放了锁,然后在 pthread_cond_wait 中的一个线程又立马抢到了锁,准备访问资源的时候就会发现资源是空的,出现错误访问的情况。
为什么在生产数据和消费数据的时候是串行的,那么为什么会说生产消费模型会提高线程运行的效率呢?
这是因为在正在的实际中生产数据和消费者拿出数据的时间是很短的,生产者生产数据和消费者消费数据才是真正消耗时间的,而且当某个线程在执行往队列中的放入(拿出)数据的时候,其他线程也可以在生产和消费数据。
为什么线程等待,一定要在临界区中等待呢?(也就是为什么 pthread_cond_wait 一定要在临界区中调用)
这是因为,无论是生产者还是消费者,在进行访问资源的时候一定要访问资源的状态,也是判断当前资源是为空还是为满,但是我们查资源的同时,本身就是对临界资源的访问,所以就意味着在查之前必定要加锁,同时判断资源为满还是为空,所以我们线程等待一定要在临界区中进行等待,得出的临界区访问结果也还是处于在临界区中的。
需要注意的一个点是,我们在实现阻塞队列的时候,特别需要注意我们是否完成了三种关系(生产者与生产者的互斥、消费者与消费者的互斥、生产者与消费者的互斥与同步)。
4. POSIX 信号量
在之前文章(Linux中的System V通信标准)中已经介绍过关于信号量实现的原理,信号量的本质是一个计数器,用于记录当前的共享资源可访问的数量,同时其作用的本质就是实现共享资源的预定机制,让不同的线程预定不同的共享资源,其中的 P 操作为让信号量减减,V 操作就是让信号量加加。
使用信号量就不用在使用我们的条件变量,因为信号量已经帮我们记录了共享资源的数量,当资源满或者资源空的时候,信号量会自动帮助我们阻塞。
接下来我们来认识关于信号量的接口,如下:
int sem_init(sem_t *sem, int pshared, unsigned int value); // sem 表示我们的定义的信号量变量 // pshared 表示是否需要不同进程间共享,通常设置为0 // value 表示信号量初始值设置为多少 int sem_destroy(sem_t *sem); // 对定义的信号量进行销毁 int sem_wait(sem_t *sem); // 信号量的 P 操作,对信号量进行减减,若信号量不足则阻塞在函数中 int sem_post(sem_t *sem); // 信号量的 V 操作,对信号量进行加加 以上函数调用成功的返回值都是0
5. RingQueue -- 环形队列
现在借助环形队列和信号量来实现生产消费模型,在实现该代码的同时的,同样需要满足生产消费模型的三个条件(三二一原则),以及其中的三种关系,如下:
RingQueue.hpp:
#pragma once #include <iostream> #include <string> #include <vector> #include <pthread.h> #include <semaphore.h> const unsigned int max_cap = 10; template <typename T> class RingQueue { private: void P(sem_t& sem) { sem_wait(&sem); } void V(sem_t& sem) { // 对 sem 进行加加,也就是释放对应的一个sem sem_post(&sem); } public: RingQueue(unsigned int cap = max_cap) : _max_cap(cap), _ringqueue(cap), _c_step(0), _p_step(0) { // 初始情况下的空间状态为满,但数据状态为0 sem_init(&_space_sem, 0, cap); sem_init(&_data_sem, 0, 0); pthread_mutex_init(&_c_mutex, nullptr); pthread_mutex_init(&_p_mutex, nullptr); } void Pop(T* out) { // 先上锁 // Pop出一个数据,空间量加一,数据量减一 // 信号量的申请需要在抢锁之前,因为信号量的申请可以同时多线程到原子的信号量 // 而抢锁只有一个线程能抢到,其他线程只能等待 P(_data_sem); pthread_mutex_lock(&_c_mutex); *out = _ringqueue[_c_step]; _c_step++; _c_step %= _max_cap; pthread_mutex_unlock(&_c_mutex); V(_space_sem); } void Push(const T& in) { // push 进入一个数据,数据量加一,空间量减一 P(_space_sem); pthread_mutex_lock(&_p_mutex); _ringqueue[_p_step] = in; _p_step++; _p_step %= _max_cap; pthread_mutex_unlock(&_p_mutex); V(_data_sem); } ~RingQueue() { sem_destroy(&_space_sem); sem_destroy(&_data_sem); pthread_mutex_destroy(&_c_mutex); pthread_mutex_destroy(&_p_mutex); } private: std::vector<T> _ringqueue; unsigned int _max_cap; unsigned int _c_step; unsigned int _p_step; sem_t _space_sem; sem_t _data_sem; pthread_mutex_t _c_mutex; pthread_mutex_t _p_mutex; };
main.cc:
#include "RingQueue.hpp" #include "Task.hpp" #include <functional> #include <unistd.h> using func_t = Task; int add(int x, int y) { return x + y; } void* Consumer(void* args) { RingQueue<func_t>* rq = static_cast<RingQueue<func_t>*>(args); while (true) { func_t func; rq->Pop(&func); std::cout << "consumer -> " << func.Excute() << std::endl; } } void* Productor(void* args) { srand(time(nullptr) ^ getpid()); RingQueue<func_t>* rq = static_cast<RingQueue<func_t>*>(args); while (true) { sleep(1); int x = rand() % 10 + 1, y = rand() % 10 + 1; func_t func(x, y); std::cout << "productor -> x: " << x << " y: " << y << std::endl; rq->Push(func); } } int main() { pthread_t c1, c2, p1, p2, p3; RingQueue<func_t>* rq = new RingQueue<func_t>(); pthread_create(&c1, nullptr, Consumer, (void*)rq); // pthread_create(&c2, nullptr, Consumer, (void*)bq); // pthread_create(&p1, nullptr, Productor, (void*)bq); // pthread_create(&p2, nullptr, Productor, (void*)bq); pthread_create(&p3, nullptr, Productor, (void*)rq); pthread_join(c1, nullptr); // pthread_join(c2, nullptr); // pthread_join(p1, nullptr); // pthread_join(p2, nullptr); pthread_join(p3, nullptr); return 0; }
测试如下,直接测试的为多消费者和多生产者:
我们在实现的环形队列的时候,并没有像我们在实现阻塞队列的时候,需要判断当前队列是否为满,而是直接申请信号量和申请锁,然后就可以开始运行了,这是因为对于信号量来说本身就是判断条件,是临界资源的计数器,实现出来共享资源的预定机制,让我们在外部可以不判断临界资源是否满足条件,就可以直到内部资源的情况(因为申请不到信号量就会自动在信号量中阻塞起来,直到有了信号量就被放出来了)。
假若我们实现的是单生产者和单消费者模型,则不需要在我们进行拿数据和放数据的时候进行加锁,因为在申请信号量的时候若没有资源会自动阻塞。
另外,对于信号量的容量只有一个的信号量,被我们称为二元信号量,这个时候就相当于锁的存在,因为申请到这一个资源则向下运行,申请不到资源则阻塞,和锁非常相似。
可重入与线程安全
线程安全:多个线程共同并发一段代码,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现线程安全的问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没执行结束,就有其他执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,这样的函数被我们称之为可重入函数。
重入是函数级别的概念,而线程安全则是线程级别的概念。
可重入与线程安全的联系:
当一个函数是可重入函数时,进入该函数一定是线程安全的。当一个函数是不可重入的,那么该函数就不能由多个线程使用,有可能会引发线程安全的问题。当一个函数中有全局变量,那么该函数既不是线程安全也不是可重入函数。
可重入与线程安全的区别:
可重入函数是线程安全函数的一种。线程安全不一定就是可重入的,而可重入函数一定是线程安全的。如果我们对临界资源加上锁,那么该函数是线程安全的,但是假若重入函数的锁还未释放,且再一次被调用,那么对于该重入函数会再次获取已经被它自己持有的锁,这就会导致函数陷入等待状态,然而自己持有却未释放,这就会导致该函数变为不可重入函数,导致死锁。
死锁
死锁:在一组线程中的各个线程均占有不会释放的资源,但因互相申请被其他线程锁占用不会释放的资源而处于一种永久等待的状态。(一个下次也会存在死锁的情况:当一个线程已经占有锁的时候,再一次申请同意把锁,这个时候就会导致死锁)
死锁的四个必要条件:
1. 互斥条件:一个资源(一把锁)只能被一个执行流使用;
2. 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。(当多个线程申请对方的资源时,对方保持不放,自己也不放)
3. 不剥夺条件:一个执行流已经获得的资源,在未使用完之前,不会强行被剥夺。
4. 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。(可以理解为申请锁的顺序不同,一个执行流先申请锁1,然后申请锁2,另一个则先申请锁2,在锁1,当各自拥有锁1,锁2的时候就会导致循环等待)
1. 避免死锁
避免死锁的方法就是只需要破坏死锁的四个条件。
1. 对于死锁的第一个条件,这个条件基本不会被破坏,因为只要涉及线程安全都会想办法上锁,不上锁就达不到要求。
2. 当申请一个资源申请不到的时候,同时也释放掉自己的资源。
3. 可以设计出优先级,然后当申请资源的时候,优先级高的可以剥夺优先级低的资源。
4. 可以让每个执行流申请锁的顺序一致,都是按照申请锁1、锁2……这样的顺序申请锁。
线程池 -- 懒汉模式
我们现在将根据以上的知识写一个线程池,其中实现的主逻辑为,我们在线程池中创建出多个线程,只要有任务向线程池中加入任务,线程池中的线程就会依次将池中的任务取出然后去执行。
所以我们在实现我们的线程池类中,需要有着维护线程的数据结构,有着维护着任务的队列,以及需要有着锁,防止出现线程安全的问题,所以同时我们需要一把锁,另外没有任务执行的同时,我们应该让线程池中的线程等待,直到有了任务才将其唤醒,所以还需要一个条件变量。实现线程池使用的是单例模式中的懒汉模式,所以将构造、拷贝构造和赋值重载都给私有了。
代码如下,其中只有 ThreadPool.hpp 代码为新代码,其余的代码均在上文中出现过,只是进行了些许调整,其中的 Log.hpp 已经在文章:日志Log程序(C++)中进行介绍:
ThreadPool.hpp:
#pragma once #include <iostream> #include <queue> #include <vector> #include <string> #include <pthread.h> #include "Thread.hpp" const int default_thread_num = 5; using namespace log_ns; template <typename T> class ThreadPool { private: void LockQueue() { pthread_mutex_lock(&_mutex); } void UnLockQueue() { pthread_mutex_unlock(&_mutex); } void WakeUpThread() { pthread_cond_signal(&_cond); } void SleepThread() { pthread_cond_wait(&_cond, &_mutex); } bool IsEmptyQueue() { return _task_queue.empty(); } void HandlerTask(std::string name) { while (true) { LockQueue(); while (IsEmptyQueue() && _isrunning) { // 只有当队列为空以及在运行的状态才会继续向下运行 LOG(DEBUG, "%s sleep\n", name.c_str()); _sleep_thread_num++; SleepThread(); _sleep_thread_num--; LOG(DEBUG, "%s wakeup\n", name.c_str()); } // 当队列为空且不运行时自动退出 if (IsEmptyQueue() && !_isrunning) { // std::cout << name << " quit..." << std::endl; LOG(DEBUG, "%s quit...\n", name.c_str()); UnLockQueue(); break; } // 运行到这个位置任务队列中一定有元素,且愿意运行下去 T t = _task_queue.front(); _task_queue.pop(); // t 执行任务 // std::cout << name << " -> " << t.result() << std::endl; LOG(DEBUG, "%s -> %s\n", name.c_str(), t.result().c_str()); UnLockQueue(); } } ThreadPool(int threadnum = default_thread_num) : _thread_num(default_thread_num), _sleep_thread_num(0), _isrunning(false) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); } ThreadPool(const ThreadPool<T>& tp) = delete; ThreadPool& operator=(const ThreadPool<T>& tp) = delete; void Init() { // 将线程池内中的handler任务绑定this,让其可以传入线程中运行 func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); for (int i = 0; i < _thread_num; i++) { std::string name = "thread-" + std::to_string(i + 1); _threads.emplace_back(func, name); LOG(DEBUG, "%s init\n", name.c_str()); } } void Start() { // 将线程池的状态设置为运行状态 _isrunning = true; for (auto& thread : _threads) thread.Start(); } public: static ThreadPool<T>* GetInstance() { if (_tp == nullptr) { // 创建线程池可能存在线程安全的问题 pthread_mutex_lock(&_sig_mutex); if (_tp == nullptr) { _tp = new ThreadPool(); _tp->Init(); _tp->Start(); LOG(INFO, "create thread pool\n"); } pthread_mutex_unlock(&_sig_mutex); } else { LOG(INFO, "get thread pool\n"); } return _tp; } void Stop() { LockQueue(); _isrunning = false; // 唤醒所有线程,让线程退出 pthread_cond_broadcast(&_cond); UnLockQueue(); LOG(DEBUG, "thread pool stop\n"); } void Push(const T& in) { LockQueue(); // 只有在运行状态我们才往任务队列中放入任务 if (_isrunning) { _task_queue.push(in); if (_sleep_thread_num > 0) WakeUpThread(); } UnLockQueue(); } ~ThreadPool() { for (auto& t : _threads) t.Join(); pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: int _thread_num; int _sleep_thread_num; std::vector<Thread> _threads; std::queue<T> _task_queue; bool _isrunning; pthread_mutex_t _mutex; pthread_cond_t _cond; static ThreadPool<T>* _tp; static pthread_mutex_t _sig_mutex; }; // 单例(懒汉)模式 template <typename T> ThreadPool<T>* ThreadPool<T>::_tp = nullptr; template <typename T> pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
Log.hpp:
#pragma once #include <iostream> #include <string> #include <cstdarg> #include <cstring> #include <fstream> #include <sys/types.h> #include <pthread.h> #include <unistd.h> namespace log_ns { enum { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; // 定义日子真正需要记录的信息 struct LogMessage { std::string _level; int _id; std::string _filename; int _filenumber; std::string _curtime; std::string _log_message; }; #define SCREEN_TYPE 1 #define FILE_TYPE 2 const std::string defaultlogfile = "./log.txt"; pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER; class Log { private: std::string LevelToString(int level) { switch(level) { case DEBUG: return "DEBUG"; case INFO: return "INFO"; case WARNING: return "WARNING"; case ERROR: return "ERROR"; case FATAL: return "FATAL"; default: return "UNKNOWN"; } } std::string CurTime() { // 获取当前的时间戳 time_t curtime = time(nullptr); // 将当前时间戳转换成结构体 struct tm* now = localtime(&curtime); char buff[128]; snprintf(buff, sizeof(buff), "%d-%02d-%02d %02d:%02d:%02d", now->tm_year + 1900, now->tm_mon + 1, now->tm_mday, now->tm_hour, now->tm_min, now->tm_sec ); return buff; } void Flush(const LogMessage& lg) { // 打印日志的时候可能存在线程安全,使用锁lock住 pthread_mutex_lock(&log_lock); switch(_type) { case SCREEN_TYPE: FlushToScreen(lg); break; case FILE_TYPE: FlushToFile(lg); break; } pthread_mutex_unlock(&log_lock); } void FlushToFile(const LogMessage& lg) { std::ofstream out; out.open(_logfile, std::ios::app); // 文件的操作使用追加 if (!out.is_open()) return; char buff[2024]; snprintf(buff ,sizeof(buff), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curtime.c_str(), lg._log_message.c_str() ); out.write(buff, strlen(buff)); out.close(); } void FlushToScreen(const LogMessage& lg) { printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curtime.c_str(), lg._log_message.c_str() ); } public: Log(std::string logfile = defaultlogfile) : _type(SCREEN_TYPE), _logfile(logfile) {} void Enable(int type) { _type = type; } void LoadMessage(std::string filename, int filenumber, int level, const char* format, ...) { LogMessage lg; lg._level = LevelToString(level); lg._filename = filename; lg._filenumber = filenumber; // 获取当前时间 lg._curtime = CurTime(); // std::cout << lg._curtime << std::endl; lg._id = getpid(); // 获取可变参数 va_list ap; va_start(ap, format); char buff[2048]; vsnprintf(buff, sizeof(buff), format, ap); va_end(ap); lg._log_message = buff; // std::cout << lg._log_message; Flush(lg); } void ClearOurFile() { std::ofstream out; out.open(_logfile); out.close(); } ~Log() {} private: int _type; std::string _logfile; }; Log lg; // LOG 宏 #define LOG(level, format, ...) \ do \ { \ lg.LoadMessage(__FILE__, __LINE__, level, format, ##__VA_ARGS__); \ } while (0) #define EnableToScreen() \ do \ { \ lg.Enable(SCREEN_TYPE); \ } while (0) #define EnableToFile() \ do \ { \ lg.Enable(FILE_TYPE); \ } while (0) // 清理文件 #define ClearFile() \ do \ { \ lg.ClearOurFile(); \ } while (0) }
Thread.hpp:
#pragma once #include <iostream> #include <functional> #include <string> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cerrno> #include "Log.hpp" // using func_t = std::function<void(const std::string& name, pthread_mutex_t* lock)>; using func_t = std::function<void(const std::string& name)>; using namespace log_ns; // typedef void*(*func_t)(void*); const pthread_t ctid = -1; class Thread { private: void excute() { // std::cout << _name << " begin to run" << std::endl; LOG(INFO, "%s begin to run\n", _name.c_str()); _isrunning = true; _func(_name); _isrunning = false; } static void* ThreadRoutine(void* args) { Thread* self = static_cast<Thread*>(args); self->excute(); return nullptr; } public: Thread(func_t func, const std::string& name) : _func(func), _isrunning(false), _tid(ctid), _name(name) {} ~Thread() {} void Start() { // 创建之后就开始运行了 int n = pthread_create(&_tid, nullptr, ThreadRoutine, (void*)this); if (n != 0) { std::cout << "thread create failed!!!" << std::endl; exit(1); } } void Stop() { // 将线程暂停,使用 if (_isrunning == false) return; // std::cout << _name << " stop " << std::endl; int n = ::pthread_cancel(_tid); if (n != 0) { std::cout << "thread stop failed" << std::endl; } _isrunning = false; } void Join() { // 线程等待, if (_isrunning) return; int n = pthread_join(_tid, nullptr); if (n != 0) { std::cout << "thread wait failed!!!" << strerror(errno) << std::endl; } // std::cout << _name << " join " << std::endl; } std::string Status() { if (_isrunning) return "running"; else return "sleep"; } private: pthread_t _tid; func_t _func; bool _isrunning; std::string _name; };
Task.hpp:
#pragma once #include <string> #include <iostream> class Task { public: Task() {} Task(int x, int y) : _x(x), _y(y) {} int Excute() { return _x + _y; } std::string debug() { std::string msg = std::to_string(_x); msg +=" + "; msg += std::to_string(_y); msg += " = ?"; return msg; } std::string result() { std::string msg = std::to_string(_x); msg +=" + "; msg += std::to_string(_y); msg += " = "; msg += std::to_string(Excute()); return msg; } private: int _x; int _y; };
main.cc:
#include "ThreadPool.hpp" #include "Task.hpp" #include "Log.hpp" #include <memory> #include <functional> #include <unistd.h> using namespace log_ns; int main() { ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance(); srand(time(nullptr) ^ getpid()); int cnt = 5; while (true) { int x = rand() % 10 + 1, y = rand() % 10 + 1; Task t(x, y); tp->Push(t); // 往线程池中push一个任务,打印日志信息 LOG(INFO, "Push a task: %s\n", t.debug().c_str()); sleep(1); if (cnt == 0) break; cnt--; } // std::cout << "begin to stop" << std::endl; // 让线程退出 LOG(INFO, "thread begin to stop\n"); tp->Stop(); sleep(5); return 0; }
测试如下:
如上所示,运行线程池之后就会开始初始化线程池中线程,当我们向线程池中推送任务之后,线程池就会开始运行,当我们将线程池关闭之后,从右边的监视窗口我们也可以发现只剩下主线程。