文章目录
- 绪论
- SIGCHLD
- volatile
- 概念
- 执行流
- 并行与并发
- 临界资源与临界区
- 互斥和同步
- 线程
- 线程定义
- 线程独立性
- 线程优点
- 线程缺点
- 线程ID
- 线程操作
- 线程创建
- 线程等待
- 线程终止
- 线程分离
- 线程替换
- 线程互斥与同步
- 互斥
- 同步
- 原子性
- 互斥锁/互斥量
- 原理
- 使用
- 抢票情景模拟
- 未加锁前
- 加锁后
- 代码
- 锁使用规范
- 重入与线程安全
- 线程安全
- 线程不安全的情况
- 线程安全的情况
- 重入
- 不可重入函数
- 常见不可重入函数
- 可重入函数
- 常见可重入韩函数
- 可重入函数与线程安全
- 死锁
- 死锁的4个必要不充分条件
- 避免死锁
- 银行家算法
- 死锁检测算法
- 同步
- 条件变量
- 初始化
- 销毁
- 触发条件,挂起等待
- 唤醒
- 操作
- 生成者消费者模型
- 概念
- 优点
- 321原则
- 基于BlockingQueue的生产者消费者模型
- BlockQueue.hpp
- CpTest.cc
- Task.hpp
- 信号量
- 信号量的P,V操作伪代码
- 申请信号量-P操作伪代码
- 释放信号量-V操作伪代码
- 信号量相关函数
- 初始化
- P操作
- V操作
- 销毁
- 基于循环队列生成者消费者模型
- ring_queue.hpp
- Task.hpp
- ring_cp.cpp
- 阻塞队列和循环队列生产者消费者模型对比
- 线程池
- thread_pool.hpp
- Task.hpp
- main.cpp
- 总结
绪论
-
在硬件材料没法断时间得到巨大突破,单核CPU已经不能满足需求,因此
多核CPU出现了,这意味着在同一时刻,可以运行多个程序。
-
模拟一种场景:一个程序的正常运行,需要网络资源,需要硬件资源,需要CPU资源等很多资源,我们可以通过让父进程给子进程分配这些任务去获得这些资源,但是因为进程具有独立性,子进程要想将获得的资源递达给父进程,必须给父子进程提供一个共享资源也就是临界资源。就像进程间通信一样,这种临界资源可以是管道,共享内存,信号量,互斥锁等。但是这就导致OS必须为这个临界资源进行管理,而管理一个对象必先描述(结构体)+对数据结构的增删查改。这种管理的维护代价非常高,因此线程(执行流)出来了,在OS中,线程又叫执行流。
-
一个进程可以有多个线程(1:N);所有线程都共享同一份内存,OS要将这块共享内存中的一部分内存作为每个线程的私有成分,但是因为所有线程共享了同一块内存,这就导致了线程间可以直接看到共享内存,数据的访问与存储就方便了很多很多。
-
所有线程共享了内存,解决访问数据与存储的方便,但是我们不的不考虑,因为CPU的调度导致执行流紊乱造成的数据紊乱问题(即内存中在不同时刻被不同的线程写入,造成数据不稳定,错乱),这种现象称为线程安全问题。
-
为了解决线程安全问题,提出互斥锁,但是又引入了死锁的问题了,为了避免死锁,死锁检测算法和银行家算法提出。同时锁被所有线程都可见的,因此也是一种临界资源,也有线程安全问题,因此锁是怎么保证自己的线程安全这也是一个问题–由此引入了原子性
-
同时临界资源是有限的,线程是怎么知道什么时候可以访问临界资源,由此提出条件变量,同时为了计数临界资源,提出信号量
-
最后基于以上线程的,总结出了:生产者消费者模型
-
对于多线程的,目前理解到了这里。
-
吐槽:这就和人生一样,我解决了当下的问题,又不得不面对新的问题,除了坦然的面对它,平凡的我似乎也只能这样。
SIGCHLD
-
进程在终止时会给父进程发SIGCHLD信号,该信号的默认处理动作是忽略即什么也不做。而实际中我们需要通过wait/waitpid的方式,让父进程等待子进程,以回收子进程的资源—这就导致了子进程必须等待父进程回收资源而处于Z模式也就是僵尸模式
-
实际中,我们可以对父进程捕捉SIGCHLD的handler进行修改,让handler函数中进行资源的回收,这样父进程就不需要显示的等待子进程,而是当信号到来时,中断进程以响应SIGCHLD对应的handler。
-
当父进程不想关心子进程的退出,可以通过修改SIGCHLD的handler,实际中Linux下,可以通过SIG_IGN函数来完成上述需求
void GetChild(int signo)
{
pid_t id = 0;
//可能有多个进程同时退出,
//而pending位图只能记录一个,
//同时无法预料同一时刻的有多少个子进程退出。
//因此要非阻塞的循环等待
while ((id = waitpid(-1, NULL, WNOHANG)) > 0)
{
printf("wait child %d success\n", id);
printf("child quit success \n");
sleep(1);
}
}
int main()
{
// signal(SIGCHLD,GetChild);
signal(SIGCHLD, GetChild);
//当父进程收到这个信号时,会中断响应这个信号
if (fork() == 0)
{
int cnt = 0;
while (cnt < 5)
{
printf("I am a child\n");
sleep(1);
++cnt;
}
if (fork() == 0)
{
sleep(1);
exit(1);
}
sleep(2);
exit(1);
}
while (1)
{
printf("I am a parent\n");
sleep(1);
};
//yonGetchild 替代waitpid
// waitpid(-1,NULL,0);
return 0;
}
volatile
- 保存内存可见性
- 有时编译器会基于代码,会将数据直接优化到寄存器里面,这就导致了不需要访问内存,只看寄存器中的数据,这有时会引起错误。为了避免这种错误,可以在需要的数据前面加volatile,要求编译器必须穿过内存访问数据。
概念
执行流
在CPU中被执行的实体,大部分是线程。因此线程也可以叫执行流,只不过在CPU中多称为执行流。
并行与并发
因为CPU的速度非常快,在看一个执行流的运行状态时,我们是分微观和宏观的。
- 在同一时刻,在多个CPU中,被同时调度的执行流,即无论微观还是宏观多个执行流就是同时被执行的。这就是并行
- 在一个单核CPU中的一段时间内,将所有执行流都执行一段时间,因为CPU太快,宏观上我们感觉执行流是同时在运行着,但在微观是串行的。
临界资源与临界区
被线程共享访问的资源称为临界资源(显示器就是一种临界资源),代码中真正访问临界资源的区域称为临界区(如使用向显示器打印数据相关的代码)
互斥和同步
- 对于任何临界资源,保证任何时刻只能有一个线程访问,这种称为互斥
- 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题(即一个线程因为优先级等优势一直具有对临界资源的访问权限,其它线程都不能得到对临界资源的访问,造成其它线程一直不能被推进),叫做同步 。
线程
线程定义
用代码粗鄙的理解:程序中,将不同函数分配给多个线程,这些函数可以完成对网络资源,对内存资源,硬盘资源的申请,这些申请的资源都放在了共享内存的不同区域。这样通过多线程就可以直接完成对于程序启动的所需资源的申请。
操作系统书上定义线程:是在进程内部运行的一个执行分支(执行流),属于进程的一部分,粒度要比进程更加细和轻量化。
-
线程作为一个可以访问内核数据的对象,OS肯定会进行管理,而要想管
理,就要先描述(结构体:TCB)+组织(对数据结构的增删查改);这也是绝大部分操作系统书所采用的,也是windows系统所使用。
-
无论是线程还是进程都要被CPU调度,这就意味着,PCB和TCB中有很多相似的属性:id,上下文,程序计数器等。同时又要提供基于TCB的调度算法,维护TCB和PCB间的关系。
-
线程和进程是如此相像,因此在linux中,线程用进程PCB进行表示,这就避免了提供新的基于TCB‘的相关的算法,也不用维护进程与线程间的关系,CPU以统一的视角调用执行流,不用区分执行流是线程还是进程。
-
无论是教材中的TCB,还是linux中线程复用进程PCB,都满足:是在进程内部运行的一个执行分支(执行流),属于进程的一部分,粒度要比进程更加细和轻量化。
-
创建进程是时间+空间的消耗,成本非常高,一但创建成功,各种线程就存在了,因此进程是承担分配系统资源的基本实体,而线程是CPU调度的基本单位
-
linux因为是用进程模拟的线程,不直接提供操作线程的api,而是给我们提供了同一个地址空间创建PCB的方法和分配资源给指定PCB的接口。但是这非常不利于用户使用,因此一般使用第三方库便于用户操作线程
myctrl:myctrl.cpp g++ -o $@ $^ -std=c++11 -lpthread;
-
地球就可以看作是一个临界资源,每个家庭是一个线程,所以的家庭都有自己的私有资源,也有公用资源。
线程独立性
-
线程中大部分资源是共享的,因此这个独立性是相对的独立性
-
线程中的独立数据(PCB,栈,上下文,这3个主要)
- 线程ID
- 一组寄存器(上下文)
- 栈
- errno
- 信号屏蔽字(block表)
- 调度优先级
-
线程中共享数据
- 文件描述符表
- 每种信号的处理方式(handler表)
- 当前工作目录
- 用户id和组id
线程优点
- 进程是系统资源的承担者,一但进程创建完毕,线程就有了,因此创建线程的代价要比进程小很多
- 当CPU调度线程的时候,需要切换的东西很多(上下文,页表,地址空间等等),而线程因为共享地址空间和页表,这就导致了CPU调度线程的时候,更多的是切换上下文,一组寄存器等;这说明线程间的切换要比进程间的切换消耗的时间与工作量少很多
- 线程占用的资源比进程少很多
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现:加密,大数据运算等主要使用CPU资源
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作 :网络下载,网盘,ssh,在线看电影等内存和外设的IO资源
- 有限的使用多线程可以提高效率,过多可能因为调度消耗,总体上降低效率
线程缺点
-
性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变 -
健壮性降低
- 编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的
- 一个进程中只要一个线程崩溃,进行就崩溃了
-
缺乏访问控制
线程间共享了临界资源,线程可以随意的访问其它线程的数据,不受限制
-
编程难度提高
编写与调试一个多线程程序比单线程程序困难得多
线程ID
-
每个线程都有自己的临时数据,也就要求其必然有自己的独立栈结构,而地址空间的栈是无法满足要求的,这个栈是给主线程/进程用的同时线程的独立结构体也需要保存。这一般由第三方库 pthread库提供并维护。
-
pthread库中可能采用数组的形式组织线程的所有私有资源,这样就可以通过地址空间区分所有线程。因此pthread_self()函数获得是用户层/pthread库层的虚拟内存地址。
-
内核与用户层的描述线程的ID是1:1的对应关系(不过也有N:1的,只是市场上很少有实体),提供在库中的struct thread中设置一个存放内核线程ID的属性,内核线程的数据界结构中也设置一个指向用户层的属性,这样就能确定对应关系,当CPU调度线程需要使用栈时,只需要跳转即可
线程操作
线程创建
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t * pthread_attr_t,void *(*start_routine) (void *), void *arg);
//thread 存放线程id的变量
//attr 线程属性
//start_routine 线程的任务
//arg start_routine的参数
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <pthread.h>
void *(thread_run)(void *args)
{
const char *s = (const char *)args;
while (1)
{
sleep(1);
printf("我是%s,pid:%d lwp:%lu\n ", args, getpid(), pthread_self());
}
}
void handler(int signo)
{
sleep(1);
printf("pid:%d,get a signo %d\n", getpid(), signo);
exit(1);
}
int main()
{
signal(2, handler);
// int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine) (void *), void *arg);
pthread_t thread;
pthread_create(&thread, NULL, thread_run, (void *)"线程1");
printf("thread: %lu\n",thread);
while (1)
{
sleep(1);
printf("我是 main 线程 pid:%d lwp:%d\n", getpid(), pthread_self());
}
return 0;
}
线程等待
- 进程需要等待,线程也需要等待,等待的目的是获取线程的结果,但是只要有一个线程崩溃了,整个进程就崩溃了,因此等待的结果中的异常退出我们没法处理,只能处理正常退出,结果不对的2种情况。
- 线程的等待只能是阻塞式等待,没有进程等待的 WNOHANG,不可同时等待
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
void *(thread_run)(void *args)
{
printf("我的线程ID:%lu\n ",pthread_self());
return (void*)111;
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void *)1);
void *status=NULL;
pthread_join(tid,&status);
//status可以灵活强转,比如:结构体
printf("ret:%lu\n",(int*)status);
return 0;
}
线程终止
-
一般函数中return是没法退出进程的,但是线程的函数中return可以实现线程的退出,main函数中return代表主线程/进程退出
-
pthread_exit(void *retval),终止线程;exit()是终止进程的,不可轻易使用
-
int pthread_cancel(pthread_t thread);
-
取消指定线程ID的线程,如果取消的是主线程,并不能取消进程,不推荐使用。
-
取消线程的返回值为-1
-
线程分离
如果不想等待线程,也就达到 signal(SIGCHLD,SIG_IGN)的效果,可以考虑线程分离,一旦分离,就不需要join,会自动释放其资源
#include <pthread.h>
int pthread_detach(pthread_t thread);
线程替换
- 一般是进程间进行替换,如果线程替换了,其它线程也会受影响
- 一个线程分离后,join就没意义了
#include <pthread.h>
int pthread_detach(pthread_t thread);
线程互斥与同步
- 对内存的数据的访问一般要经历3个过程:从内存读到CPU寄存器;CPU进行相关运算;寄存器重新写到内存。
- 如果在这三个过程中的任意一个,突然被CPU调度,虽然可以通过PCB的上下文保存数据,当再次调度时,恢复现场。但是可能将老的数据覆盖新的数据造成数据紊乱与无效化。这是一种影响线程安全的因素。
- 另外如果一个线程一直占有着临界资源,导致其它线程一直挂起等待,进度不被推进造成的一种“饥饿”现象,这也是一种影响线程安全的因素。
互斥
为了避免线程访问数据时,造成数据紊乱与无效化,提出了互斥的概念,让所有线程去竞争临界资源的使用权,保证同一时刻对临界资源的访问与修改只能有一个线程,只有当这个线程访问结束,其它线程才能去访问临界资源
同步
为了避免“饥饿”问题,提出同步的概念,在互斥的基础上,让所有线程有序的访问临界资源,保证所有线程进度都能被推进。
原子性
官方定义:指事务的不可分割性,一个事务的所有操作要么不间断地全部被执行,要么一个也没有执行。
我的理解:临界区代码要么不执行,要么执行后不会产生线程安全问题—这个很难解释,过于抽象了。建议从整体上体会原子性,因为我们不得不考虑CPU切换。
互斥锁/互斥量
每种临界资源都应有一把锁,来预防多线程下的线程安全问题。
原理
-
我们认为一条汇编语言是具有原子性,指的是:执行这条汇编语言时,不会被CPU调度切换。
-
大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性。
-
而CPU的寄存器中的数据对于线程是私有的,因此如果我们把锁的数据保存到寄存器中,其它就不能获得锁的数据而被挂起等待。在原子性的基础上完成对锁资源的竞争
-
竞争锁本质就是竞争锁资源中的那个“1”,一旦竞争成功,其它线程就被挂起等待。
-
一旦竞争成功,即使线程临界区被CPU调度,也是抱着锁被切换的,也就意义着其它线程是不可能进入临界区的。
-
也就是线程进入临界区,从整体上看是“原子的”,当线程进入临界区,其它线程不可能进入临界区。
-
有了互斥锁,线程进入临界区就成串行
-
如果一个锁一直抱着锁,不被切换来,那就出现了死锁了。
使用
系统锁相关代码
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);//锁的销毁
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);//锁的初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//当锁是全局的时候,可以考虑使用宏PTHREAD_MUTEX_INITIALIZER进行初始化
int pthread_mutex_lock(pthread_mutex_t *mutex);//竞争锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);//释放锁,释放锁的时候,会唤醒在锁等待队列中的所有线程
int pthread_mutex_trylock(pthread_mutex_t *mutex);
很多语言上都提供属于他们语言风格的锁,本质是对系统锁接口的封装
因为封装具有RAII特性,我们不需要关心锁的初始化与销毁,而是关心它的lock和unlock
mutex mtx;//C++11锁
mtx.lock();//竞争锁
mtx.unlock();//申请锁
抢票情景模拟
未加锁前
加锁后
代码
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>
#include <time.h>
#include <mutex>
#define NUM 5
class Buy_Ticket
{
private:
int _tickets;
// pthread_mutex_t mtx; //互斥锁,系统线程库
std::mutex mtx;//C++语言内置的,底层是对系统锁的封装
public:
Buy_Ticket(int tickets = 100)
: _tickets(tickets)
{
//pthread_mutex_init(&mtx, NULL);
}
~Buy_Ticket()
{
//pthread_mutex_destroy(&mtx);
}
bool Get_Ticket()
{
bool ret = true;
//pthread_mutex_lock(&mtx);
mtx.lock();
if (_tickets > 0)
{
usleep(1000);//1ms
printf("线程 %d 得到了%d号票\n", pthread_self(), _tickets--);
// usleep(rand() % 100 + 1);
}
else
{
usleep(1000);//1ms
std::cout << "票卖完了" << std::endl;
// usleep(rand() % 100 + 1);
ret = false;
}
//pthread_mutex_unlock(&mtx);
mtx.unlock();
//即使在这里被CPU切换也不会造成线程安全
return ret;
}
};
void *thread_run(void *args)
{
Buy_Ticket *bt = (Buy_Ticket *)args;
while (true)
{
if (bt->Get_Ticket())
{
continue;
}
else
{
break;
}
}
}
int main()
{
Buy_Ticket bt(100);
srand(time(NULL));
pthread_t tid[NUM];
for (int i = 0; i < NUM; ++i)
{
pthread_create(tid + i, NULL, thread_run, (void *)&bt);
}
for (int i = 0; i < NUM; ++i)
{
pthread_join(tid[i], NULL);
}
return 0;
}
锁使用规范
进入每个临界区前都要加锁,这是一种规范,否则因为多线程造成的线程安全问题使用者自己负责。
重入与线程安全
线程安全
因为没对临界区进行加锁造成临界资源数据紊乱与无效化的现象称为线程安全。
线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作(加锁)
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
重入
当线程执行一个函数时,因为CPU调度问题,造成其它线程又再次进入该函数的现象称为重入。很显然,如果函数中共享某种数据,会因为线程切换造成数据紊乱和线程安全问题,基于会出现问题和不出现问题分为:不可重入函数,可重入函数
不可重入函数
一个函数在重入的情况下,运行结果会出现问题,造成线程安全的,则该函数被称为不可重入函数
常见不可重入函数
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
可重入函数
一个函数在重入的情况下,运行结果不会出现任何问题/线程安全,则该函数被称为可重入函数
常见可重入韩函数
不使用全局变量或静态变量
不使用用malloc或者new开辟出的空间
不调用不可重入函数
不返回静态或全局数据,所有数据都有函数的调用者提供
使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入函数与线程安全
- 函数是可重入的,一定是线程安全的即可重入函数是线程安全的一种
- 线程安全的函数不一定是可重入的,因为不可重入函数可以通过加锁而变的线程安全。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但是如果这个函数锁还未释放,自己又重新进入,就有可能造成死锁,该函数还是不可重入的。
死锁
-
死锁就像哈希冲突一样不可避免,只能通过某些规范、措施尽量减少死锁发生的可能性
-
当一个线程占用资源A而不释放,又继续申请资源X,同时另外一个线程占有资源X而不释放,申请资源A,这样就会出现一个环状申请链路,会出现死锁。
就像锁,一个线程占有着锁,抱着锁挂起后,重新申请锁,就会出现死锁
死锁的4个必要不充分条件
产生死锁一定触发4个条件,但是触发4个条件不一定产生死锁
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
规范化使用锁,能少使用锁就少使用锁,锁多了也不好
- 破坏死锁的四个必要条件
- 加锁顺序一致:申请资源ABC,都是按ABC顺序申请的
- 避免锁未释放的场景 :编程规范问题
- 资源一次性分配
银行家算法
核心思想:是一种假设法,假设给线程分配了资源,如果以现有的资源,尽可能多的满足一些线程的需要,并回收这些线程占有的资源。最终所有线程都能完成任务说明该分配方案没问题,否则就说明该分配有死锁风险。
这里就不多缀述了,造的轮子放下面
#pragma once
#include <iostream>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <string>
#include <time.h>
namespace My_Banker
{
class Process; //前置声明
class Resoure
{
public:
friend class Process; //将 Process 声明为友元类,可以让友元类直接访问_resoure;
friend void Print(const std::vector<Process> &v, const Resoure &resoure);
friend void Safe_Test(std::vector<Process> vp, Resoure resoure, std::vector<std::string> &vs);
//作为共享资源考虑线程安全--加锁
pthread_mutex_t mtx;
Resoure()
{
pthread_mutex_init(&mtx, NULL);
}
~Resoure()
{
pthread_mutex_destroy(&mtx);
}
void Init(std::vector<int> resoure)
{
_resoure = resoure; //赋值是对2个已存在的对象的操作,拷贝是对刚创建的对象时的操作
}
void Show() const
{
for (int i = 0; i < _resoure.size(); ++i)
{
printf(" %d", _resoure[i]);
}
}
bool Is_Empty()
{
for (int i = 0; i < _resoure.size(); ++i)
{
if (_resoure[i] == 0)
{
if (i == _resoure.size() - 1)
{
return true;
}
}
else
{
break;
}
}
return false;
}
private:
std::vector<int> _resoure;
};
//成员是自定义类,不需要关心构造与析构函数
class Resoure;
class Process
{
public:
friend void Print(const std::vector<Process> &v, const Resoure &resoure);
friend void Safe_Test(std::vector<Process> vp, Resoure resoure, std::vector<std::string> &vs);
friend int Finish(std::vector<Process> &vp);
Process() = default;
~Process() = default;
void Init(std::string name, std::vector<int> Max_Need, std::vector<int> Allocation, My_Banker::Resoure &resoure)
{
while (pthread_mutex_lock(&resoure.mtx))
; //申请锁成功返回0
_name = name;
_Max_Need = Max_Need;
_Allocation = Allocation;
for (int i = 0; i < Max_Need.size(); ++i)
{
_Need.push_back(_Max_Need[i] - _Allocation[i]);
resoure._resoure[i] -= _Allocation[i];
}
while (pthread_mutex_unlock(&resoure.mtx))
;
}
//打印数据
void Show() const
{
printf("%s ", _name.c_str());
//打印最大需求
for (int i = 0; i < _Max_Need.size(); ++i)
{
printf(" %d", _Max_Need[i]);
}
printf("|");
//打印已占有的资源
for (int i = 0; i < _Allocation.size(); ++i)
{
printf(" %d", _Allocation[i]);
}
printf("|");
for (int i = 0; i < _Need.size(); ++i)
{
printf(" %d", _Need[i]);
}
printf("|");
if (_Finish)
{
printf("True ");
}
else
{
printf("False ");
}
printf("|");
}
bool Is_Req_Safe(std::vector<int> &request, const My_Banker::Resoure &resoure)
{
//判断request是否小于 _Need
bool ret = true;
for (int i = 0; i < request.size(); ++i)
{
if (request[i] > _Need[i])
{
ret = false;
}
}
if (ret)
{
//判断request 是否小于resoure
for (int i = 0; i < request.size(); ++i)
{
if (request[i] > resoure._resoure[i])
{
ret = false;
}
}
}
return ret;
}
// acciable去除需要
void Change(const std::vector<int> &request, Resoure &resoure)
{
for (int i = 0; i < request.size(); ++i)
{
_Need[i] -= request[i];
_Allocation[i] += request[i];
resoure._resoure[i] -= request[i];
}
}
bool Is_Finsh()
{
return _Finish;
}
private:
bool _Finish = false;
std::string _name;
std::vector<int> _Max_Need; //对各个资源的最大需求
std::vector<int> _Allocation; //进程占用的资源
std::vector<int> _Need; //还需多少资源达到执行线程的要求 // need[i]= Max_Need[i]- Allocatio[i];
};
void Print(const std::vector<Process> &v, const Resoure &resoure)
{
printf("%s |%s |%s |%s |%s |%s \n", "进程名字", "Max_Need ", "Alocation ", "Need", "Status", "Accessible");
for (int i = 0; i < v.size(); ++i)
{
v[i].Show();
if (i == 0)
{
resoure.Show();
}
std::cout << std::endl;
}
}
//安全检测是一种假设法,vp这里用拷贝构造,而不是引用
//
void Safe_Test(std::vector<Process> vp, Resoure resoure, std::vector<std::string> &vs)
{
while (pthread_mutex_lock(&resoure.mtx))
; //加锁
bool change = false;
do
{
change = false;
for (int i = 0; i < vp.size(); ++i)
{
if (vp[i]._Finish == false)
{
int j = 0;
for (; j < resoure._resoure.size(); ++j)
{
if (vp[i]._Need[j] <= resoure._resoure[j])
{
continue;
}
else
{
break;
}
}
if (j == resoure._resoure.size()) // vp[i]可以分配资源
{
vp[i]._Finish = true; //让该进程获得足够资源去完成任务
vs.push_back(vp[i]._name);
change = true;
//回收资源
for (int t = 0; t < resoure._resoure.size(); ++t)
{
resoure._resoure[t] += vp[i]._Allocation[t];
}
}
}
}
} while (change);
while (pthread_mutex_unlock(&resoure.mtx))
;
}
int Finish(std::vector<Process> &vp)
{
int ret = vp.size();
for (int i = 0; i < vp.size(); ++i)
{
for (int j = 0; j < vp[i]._Need.size(); ++j)
{
if (vp[i]._Need[j] == 0)
{
if (j == (vp[i]._Need.size() - 1))
{
vp[i]._Finish = true;
--ret;
}
}
else
{
break;
}
}
}
return ret;
}
void Text()
{
int N = 0; //资源数
int M = 0; //线程数
printf("请输入资源数,线程数:");
std::cin >> N >> M;
std::vector<int> resoure(N, 0);
//处理共享资源
printf("请输入个资源数:");
for (int i = 0; i < N; ++i)
{
std::cin >> resoure[i];
}
Resoure res;
res.Init(resoure);
//处理各个线程的输入数据
printf("请输入T0时刻下,各线程的资源分配情况\n\n\n");
std::vector<Process> vp(M, Process());
for (int i = 0; i < M; ++i)
{
std::string name;
std::vector<int> Max_Need(N, 0); //对各个资源的最大需求
std::vector<int> Allocation(N, 0); //进程占用的资源
printf("请输入线程的名字:");
std::cin >> name;
printf("该线程的对%d个资源的最大需求数组:", N);
for (int j = 0; j < N; ++j)
{
std::cin >> Max_Need[j];
}
printf("该线程的对%d个资源占有的量:", N);
for (int j = 0; j < N; ++j)
{
std::cin >> Allocation[j];
}
vp[i].Init(name, Max_Need, Allocation, res);
}
int ret = Finish(vp);
Print(vp, res);
std::vector<std::string> vs;
Safe_Test(vp, res, vs);
if (vs.size() == ret && ret > 0)
{
printf("T0时刻,找到一个安全序列:");
for (auto e : vs)
{
std::cout << e << "->";
}
std::cout << std::endl;
}
else
{
printf("T0时刻,不存在安全序列,有线程安全问题,进入死锁,程序终止\n");
printf("################################## -BY New_Young\n");
exit(1);
}
vs.clear(); //清空数组
int cnt = 0;
srand(time(NULL)); //随机数种子
while (Finish(vp) != 0)
{
if (res.Is_Empty())
{
printf("无可使用资源,进程结束\n");
printf("################################## -BY New_Young\n");
exit(1);
}
int key = rand() % M;
if (vp[key].Is_Finsh())
{
continue;
}
std::vector<int> request(N, 0);
printf("请输入T%d时刻 ,P%d的申请资源情况\n", cnt + 1, key);
//检测此时是否线程安全
ret = Finish(vp);
Print(vp, res);
Safe_Test(vp, res, vs);
if (vs.size() == ret && ret > 0)
{
printf("该时刻下,能找到一个安全序列,线程安全:");
for (auto e : vs)
{
std::cout << e << "->";
}
std::cout << std::endl;
}
else
{
printf("该时刻下,不存在安全序列,有线程安全问题,进入死锁,程序终止\n");
printf("################################## -BY New_Young\n");
exit(1);
}
vs.clear(); //清空
for (int i = 0; i < N; ++i)
{
std::cin >> request[i];
}
if (vp[key].Is_Req_Safe(request, res))
{
ret = Finish(vp);
Process tmp = vp[key]; //假设法,因此需要保留原始数据
Resoure restmp = res; //假设法,因此需要保留原始数据
vp[key].Change(request, res);
Safe_Test(vp, res, vs);
if (vs.size() == ret && ret > 0)
{
printf("T%d时刻,通过假设给P%d分配资源,能找到一个安全序列:", cnt + 1, key);
for (auto e : vs)
{
std::cout << e << "->";
}
std::cout << std::endl;
printf("已重新分配资源\n");
Finish(vp);
Print(vp, res);
++cnt;
}
else
{
printf("T%d时刻,通过给P%d分配资源,不能找到一个安全序列,存在线程安全问题", cnt + 1, key);
vp[key] = tmp; //恢复现场
res = restmp;
std::cout << std::endl;
}
vs.clear(); //清空
//
}
else
{
printf("T%d时刻 ,P%d的申请资源是不正确的,挂起等待其它线程申请资源\n", cnt + 1, key);
}
}
printf("################################## -BY New_Young\n");
}
}
死锁检测算法
死锁和银行家算法是很象的,最大的区别是
死锁检测算法对于假设分配资源时只看了request和need的关系,并没有考虑此时系统中的Accessible,这就有可能导致 Accessible出现负数;银行家算法2者都考虑了。
总体来说死锁检测算法有点糙,没有银行家算法严谨。
同步
-
在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
-
同步机制需要一个维序者规范线程访问临界资源的顺序,避免饥饿问题。
-
维序者通过条件变量控制线程顺序,这也是生成者所采用的措施。
-
一般条件变量的使用是搭配锁
-
一个任务中的条件变量个数是依据需求的,生产者消费者需要2个条件变量是因为:只有生产者知道消费者什么时候可以消费,只有消费者知道生产者什么时候可以生产
struct pthread_cond
{
//....
queue q;//等待队列,存放的在该条件下挂起的线程
//...
}
条件变量
条件变量就是一个结构体,在该条件变量下等待就是把线程放到条件变量队列中即push,唤醒就是pop
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
触发条件,挂起等待
等待就是把线程放到条件变量的队列成员中即push
- 该函数会先将锁锁放再挂起线程
- 当函数返回时,该函数会让线程自动竞争锁,保证返回时也持有锁
- 该函数功能很丰富也意味着一但函数出问题,会出现很多错误。
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
//线程挂起等待时,会把锁先释放掉,让其它锁竞争锁资源,之后唤醒时会重新获得锁。
唤醒
就是将队列头元素pop掉
int pthread_cond_signal(pthread_cond_t *cond);
//唤醒在该条件变量下等待队列头元素线程
int pthread_cond_broadcast(pthread_cond_t *cond);
///唤醒在该条件变量下等待队列头的所有线程
操作
- 一个线程控制另外一个线程启动与挂起(生产者消费者模型的雏形)
- 结果表明,线程的挂起是存在等待队列了,单次唤醒只能唤醒队列头元素
- 通过条件变量实现了线程的有序性
#include <unistd.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <pthread.h>
using namespace std;
pthread_mutex_t mtx;
pthread_cond_t cond;
void *ctrl(void *args)
{
string name = (char *)args;
sleep(5);
while (1)
{
printf("%s is say:beging work...\n", name.c_str());
//唤醒在该条件变量下等待队列的头元素
pthread_cond_signal(&cond);
//唤醒在该条件变量下等待队列的所有线程
//pthread_cond_broadcast(&cond);
sleep(1);
}
}
void *work(void *args)
{
int p = *(int *)args;
delete (int *)args;
while (1)
{
pthread_mutex_lock(&mtx);
pthread_cond_wait(&cond,&mtx);
printf("worker %d is working...\n", p);
pthread_mutex_unlock(&mtx);
}
}
int main()
{
#define NUM 5
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t boss;
pthread_t worker[NUM];
pthread_create(&boss, NULL, ctrl, (void *)"boss");
for (int i = 0; i < NUM; ++i)
{
int *p = new int(i);
pthread_create(worker + i, NULL, work, (void *)p);
// sleep(1);
}
for (int i = 0; i < NUM; ++i)
{
pthread_join(worker[i], NULL);
}
pthread_join(boss, NULL);
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
生成者消费者模型
生活场景:超市的商家和顾客就是生产者消费者模型。生产者:商家;消费者:顾客;”交易场所“:超市
内核场景:用户创建进程,进程在等待队列等待,OS调度等待队列中的进程到CPU中执行。生产者:用户;消费者:OS;”交易场所“:进程等待对列
概念
-
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
-
只有生产者才知道什么时候消费可以消费:装完货后
-
只有消费者知道什么时候生产者能生产:消费后
-
生产者消费者模型重心的是在数据的生成和数据的处理过程,这2个是有时间消耗的,数据的存和取是原子的。
-
数据可以是某种任务,这样当生产将任务放到等待队列中,消费者获得之后处理该任务,这就可以实现多个消费者同时并发消费,多个生产者同时并发生产的,消费者生产者之间并行
优点
- 解耦:将数据的生成与消费分离,提供效率
- 支持并发:当生产者生产数据还未放到临界资源区的时候,消费者可以一直消费
321原则
3种关系,2个角色,一个临界资源
- 消费者与消费者之间是互斥关系
- 生产者与生产者之间是互斥关系
- 生产者与消费者之间是互斥,同步关系。
基于BlockingQueue的生产者消费者模型
- 阻塞队列是典型的并发生产者消费者模型
- 阻塞队列的特殊性,某一时刻只能进或出一个数据,该模型下,消费者与生产只能并发而不能并行。同时所有生产者竞争临界资源生产,所有消费者竞争临界资源消费,,消费者和生产者之间也竞争临界资源,因此只需要一把锁即可;
- 生产者生产完后唤醒消费者消费,消费者消费后唤醒生产者生产
BlockQueue.hpp
采用模板,这样阻塞队列可以存放的数据类型就不局限于内置类型,可以是如何类型的数据:进程,线程等
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
namespace ns_blockqueue
{
const int default_capacity = 10;
template <class T>
class BlockQueue
{
private:
std::queue<T> _q; //等待对列
size_t _capacity; //容量上限
pthread_mutex_t _mtx; //保护临界资源的锁
pthread_cond_t cond_full; //当队列满时,生成者在该条件变量下等待
pthread_cond_t cond_empty; //当队列空时,消费者在该条件变量下等待
public:
BlockQueue(int cap = default_capacity)
: _capacity(cap)
{
pthread_mutex_init(&_mtx, NULL);
pthread_cond_init(&cond_full, NULL);
pthread_cond_init(&cond_empty, NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&cond_full);
pthread_cond_destroy(&cond_empty);
}
private:
bool Is_Full()
{
return _q.size() == _capacity;
}
bool Is_Empty()
{
return _q.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&_mtx);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mtx);
}
void ProductWait()
{
// 1. 该函数会首先释放锁,之后挂起自己,其它线程竞争锁
// 2. 该函数返回的时候,会让线程自动的竞争锁,成功或取锁后返回
pthread_cond_wait(&cond_full, &_mtx);
}
void WakeProduct()
{
//唤醒在cond_full条件下挂起等待的消费者
pthread_cond_signal(&cond_full);
}
void ConsumerWait()
{
// 1. 该函数会首先释放锁,之后挂起自己,其它线程竞争锁
// 2. 该函数返回的时候,会让线程自动的竞争锁,成功或取锁后返回
pthread_cond_wait(&cond_empty, &_mtx);
}
void WakeConsumer()
{
//唤醒在is_emoty条件下挂起等待的消费者
pthread_cond_signal(&cond_empty);
}
public:
//生成者生产
//只有生产者知道消费者什么时候可以消费,因此生产者生产完后就唤醒消费者消费
void Push(const T &data)
{
LockQueue();
//临界区
while(Is_Full())
{
ProductWait();
}
_q.push(data);
///_capacity++;
// cout<<"生产者 生产了 "<<data<<endl;
//唤醒在前:可能锁还在身上,但是当释放锁的时候回唤醒锁下的等待队列
//唤醒在后:可能因为切换,条件变量下的线程没被唤醒,
// 但是最终一定不是所有生产者被挂起,至少有一个回来执行唤醒代码
//唤醒在前在后都一样
UnLockQueue();
WakeConsumer(); //唤醒消费者消费
}
//消费者消费
//只有消费者知道生产者什么时候可以生产,因此消费者消费完后就唤醒生产生产
void Pop(T *out) // out输出型参数
{
LockQueue();
//临界区
while(Is_Empty())
{
ConsumerWait();
}
*out = _q.front();
_q.pop();
//_capacity--;
//cout<<"消费者 消费了 "<<*out<<endl;
UnLockQueue();
WakeProduct(); //唤醒生产者生产
}
};
}
CpTest.cc
#include "./BlockQueue.hpp"
#include "./Task.hpp"
#include <time.h>
using namespace ns_blockqueue;
using namespace ns_task;
using namespace std;
void *product(void *args)
{
// BlockQueue<int> *bp = (BlockQueue<int> *)args;
BlockQueue<Task> *bp = (BlockQueue<Task> *)args;
const std::string ops = "+-*/%";
while (true)
{
//int data=rand()%20+1;
int x=rand()%20+1;
int y=rand()%20+1;
char ch=ops[rand()%ops.size()];
Task t(x,y,ch);
cout<<"生产者派发了一个任务:"<<x<<" "<<ch<<" "<<y<<"="<<"?"<<endl;
bp->Push(t);
sleep(1);
}
}
void *consume(void *args)
{
BlockQueue<Task> *bp = (BlockQueue<Task> *)args;
while (true)
{
//int ret=0;
Task t;
bp->Pop(&t);
t();
sleep(1);
}
}
int main()
{
srand(time(NULL));
BlockQueue<Task> bq; //等待队列
pthread_t producter; //生产者
pthread_t consumer; //消费者
pthread_create(&producter, NULL, product, (void *)&bq);
pthread_create(&consumer, NULL, consume, (void *)&bq);
pthread_join(producter,NULL);
pthread_join(consumer,NULL);
}
Task.hpp
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op; //"+-*/"
public:
Task() = default;
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
void operator()()
{
int ret = 0;
//printf("hello ->%d\n",_op);
switch (_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
{
//std::cout<<"bug??" << _x <<" "<< _op <<" "<< _y << " ="<< "?" << std::endl;
printf("bug?? %d %d %c=?\n",_x,_y,_op);
exit(1);
}
break;
}
std::cout << "消费者->当前任务正在被LWP为" << pthread_self() << "的线程获得,处理:" << _x << " " << _op << " " << _y << "=" << ret << "的任务" << std::endl;
}
~Task() = default;
};
}
信号量
- 场景:电影院买票。票仅仅是代表你有了进入电影院看电影的权限,只有当你进入电影院并坐下的那一刻,才代表你可以与别人并行的看电影。信号量就是那个票,电影院的座位就是一个一个小的临界资源。
- 因此信号量是用来描述临界资源的计数器,它用来告诉用户层还有多少临界资源可以分配。
- 信号量仅仅只是计数不具有分配资源的功能,得到一个票,并不意味着你真正分到了属于自己的座位,只有当线程进入临界区时,才拥有一块属于自己的临界资源,也就是说买票是一种预定资源的行为,真正分配资源给线程可以是内核,可以是用户(循环队列中,确定一个位置给消费者或者生产者)
信号量的P,V操作伪代码
- 信号是一种计数器,预定资源就是–count,释放资源就是++count
- 预定资源对应P操作,释放资源对应V操作
- 信号作为一个可以被所有线程看到的计数器,也是一种临界资源,也有线程安全问题,要加锁,保正原子性
申请信号量-P操作伪代码
–count伪代码
start;
lock();//这个锁是维护信号量的
if(count<=0)
{
unlock();//释放锁,防止抱着锁挂起,进入死锁状态
//...挂起等待,
goto start;
//考虑可能挂起失败,因此跳到start是为了保正进入申请资源的地方一定count>0的
}
count--;
unlock();
释放信号量-V操作伪代码
++count伪代码
lock();
++count;
unlock();
信号量相关函数
初始化
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//sem 全局信号量
//pshared:确实信号量是在线程间共享还是在进程间共享,一般0线程,非0值代表进程
// value:计数对象的数量
P操作
#include <semaphore.h>
int sem_wait(sem_t *sem);
//向信号量的计数对象进行预订资源
V操作
#include <semaphore.h>
int sem_post(sem_t *sem);
销毁
#include <semaphore.h>
int sem_destroy(sem_t *sem);
基于循环队列生成者消费者模型
数据结构中实现环形队列主流是使用数组形式,也可以用双向带头循环链表,以下以数组的形式讨论。环形队列主要麻烦点是为满和为空的判断情况,为了处理这种情况,提出2种思路:
- 计数器方式:记录有效数据的个数
- 牺牲一个空间的方式
- 无论是那种方式都要用取模运算控制下标,只是计数器的相对容易些。
- 环形队列的特殊性,可以保证取数据和存数据并行,这就意味着生产者和消费者之间不在像阻塞队列那样串行,而是并行,在操作上也有很大不同
- 环形队列有固定大小,而生产者关心没有数据位置的个数,消费者关心有数据的个数,因此这里使用2个信号量分别计数。这2个信号量也可以表明循环队列的满和空
- 使用信号量保证了消费者和生产者的互斥性,另外生产者和生产者之间竞争生产的启始位置,消费者之间竞争消费的启始位置,这2种位置都是临界资源都要加锁,加2把锁
- 同样对于生产者消费者模型,存取数据不是关键,数据来源与处理是关键
ring_queue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <sys/bitypes.h>
#include <vector>
#include <semaphore.h>
namespace ns_ring_queue
{
//生产者和消费者都要竞争信号量,预订临界资源
//预订资源后,生产者与生产者间竞争启始位置,一但竞争成功就代表可以访问临界资源了
//预订资源后,消费者与消费者间竞争启始位置,一但竞争成功就代表可以访问临界资源了
//
template <class T>
class RingQueue
{
private:
static int g_dafault_capacity ;
std::vector<T> _rqv;
size_t _capacity; //循环队列的容量
size_t pro_pos; //生产者生产的启始位置
size_t con_pos; //消费者消费的启始位置
//锁
//生产者与生产者间竞争启始位置,一但竞争成功就代表可以访问临界资源了
//消费者与消费者间竞争启始位置,一但竞争成功就代表可以访问临界资源了
pthread_mutex_t p_mtx;
pthread_mutex_t c_mtx;
//信号量
//需要2个信号量,一个用于计数生产关心的空位置个数,一个用于计数消费者关心的有效数据的个数
sem_t empty_count; //空位置
sem_t data_count; //数据位置
public:
RingQueue(size_t cap = g_dafault_capacity)
: _rqv(cap, T()), _capacity(cap), pro_pos(0), con_pos(0)
{
//初始化信号量
sem_init(&empty_count, 0, cap);
sem_init(&data_count, 0, 0);
//初始化锁
pthread_mutex_init(&p_mtx, NULL);
pthread_mutex_init(&c_mtx, NULL);
}
~RingQueue()
{
sem_destroy(&empty_count);
sem_destroy(&data_count);
pthread_mutex_destroy(&p_mtx);
pthread_mutex_destroy(&c_mtx);
}
public:
void Push(const T &data)
{
sem_wait(&empty_count); //所有生产者竞争信号量,竞争成功代表预约临界资源成功,竞争失败就挂起,
//可以在之前就加锁,但是这几不能让所有生产者竞争信号量了,不符合循环队列并行的特性
pthread_mutex_lock(&p_mtx); //预约成功后 竞争启始位置,竞争成功代表可以访问临界资源--循环队列
_rqv[pro_pos] = data;
++pro_pos;
pro_pos %= _capacity;
sem_post(&data_count); //有效数据+1
pthread_mutex_unlock(&p_mtx);
}
void Pop(T *out)
{
sem_wait(&data_count);
pthread_mutex_lock(&c_mtx);
*out = _rqv[con_pos];
++con_pos;
con_pos %= _capacity;
sem_post(&empty_count); //空位置+1
pthread_mutex_unlock(&c_mtx);
}
};
template<class T>
int RingQueue<T>::g_dafault_capacity=10;
}
Task.hpp
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op; //"+-*/"
public:
Task() = default;
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
void operator()()
{
int ret = 0;
//printf("hello ->%d\n",_op);
switch (_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
{
//std::cout<<"bug??" << _x <<" "<< _op <<" "<< _y << " ="<< "?" << std::endl;
printf("bug?? %d %d %c=?\n",_x,_y,_op);
exit(1);
}
break;
}
std::cout << "消费者->当前任务正在被LWP为" << pthread_self() << "的线程获得,处理:" << _x << " " << _op << " " << _y << "=" << ret << "的任务" << std::endl;
}
~Task() = default;
};
}
ring_cp.cpp
#include "./ring_queue.hpp"
#include "./Task.hpp"
using namespace ns_ring_queue;
using namespace ns_task;
void *product(void *args)
{
// RingQueue<int> *rq = (RingQueue<int> *)args;
RingQueue<Task> *rq = (RingQueue<Task> *)args;
const std::string ops = "+-*/%";
while (true)
{
// int data = rand() % 20 + 1;
// 1. 制造数据
int x = rand() % 20 + 1;
int y = rand() % 20 + 1;
char ch = ops[rand() % ops.size()];
Task t(x, y, ch);
rq->Push(t);
std::cout << "生产者->派发了一个任务:" << x << " " << ch << " " << y << "="
<< "?" << std::endl;
// printf("生产者生产了%d\n", data);
sleep(1);
}
}
void *consume(void *args)
{
// RingQueue<int> *rq = (RingQueue<int> *)args;
RingQueue<Task> *rq = (RingQueue<Task> *)args;
while (true)
{
Task t;
rq->Pop(&t);
t();
sleep(1);
// sleep(5);
// int ret = 0;
// rq->Pop(&ret);
// printf("消费消费了%d\n", ret);
}
}
int main()
{
RingQueue<Task> rq;
pthread_t producter;
pthread_t consumer;
pthread_create(&producter, NULL, product, (void *)&rq);
usleep(1000);
pthread_create(&consumer, NULL, consume, (void *)&rq);
pthread_join(producter, NULL);
pthread_join(consumer, NULL);
return 0;
}
阻塞队列和循环队列生产者消费者模型对比
- 阻塞队列的特性(一个时刻只能有一个线程访问)决定了存和取是串行的,但是循环队列是不同的,它的特性(下标的随机访问)保证存和取是并行的
- 阻塞队列使用条件变量,循环队列使用信号量
- 阻塞队列只有一个临界资源,循环队列有多个(生产/消费的启始位置,信号量,循环队列)
线程池
-
计算机中的IO操作是很费时间的,OS为提供效率就提前申请一大堆空间,并在这堆空间提前创建一批线程,这样当用户层传达任务时,这些已经存在的线程就直接竞争这个任务就行,提高效率
-
这个存有大批线程的空间称为线程池,
-
线程池其实就是一个生产者消费者模型
thread_pool.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>
namespace ns_thread_pool
{
template <class T>
class Thread_Pool
{
private:
static int g_num; //线程池中提前开辟的线程个数
std::queue<T> _task_queue;
pthread_mutex_t _mtx; //临界资源:队列,需要加锁
pthread_cond_t _cond; //条件变量
size_t _num;
public:
Thread_Pool(size_t num = g_num)
: _num(num)
{
pthread_mutex_init(&_mtx, NULL);
pthread_cond_init(&_cond, NULL);
}
~Thread_Pool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
private:
bool Is_Empty()
{
return _task_queue.empty();
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void Wake()
{
pthread_cond_signal(&_cond);
}
//线程是没办法在类的内部执行非静态函数,因为非静态函数有隐形的this作为参数传递和线程执行函数是有矛盾的
//
static void *Run(void *args)
{
pthread_detach(pthread_self()); //线程分离,就不用等待线程
Thread_Pool<T> *tp = (Thread_Pool<T> *)args;
while (true)
{
T t;
tp->Pop(&t);
t();
sleep(1);
}
}
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
public:
void InitThreadPool()
{
pthread_t tid[_num];
for (int i = 0; i < _num; ++i)
{
pthread_create(tid + i, NULL, Run, (void *)this);
}
}
void PushTask(const T &data)
{
Lock();
_task_queue.push(data);
Unlock();
Wake();
}
void Pop(T *out)
{
Lock();
while (Is_Empty()) // while,考虑挂起失败和伪唤醒
{
Wait();
}
*out = _task_queue.front();
_task_queue.pop();
Unlock();
}
};
template <class T>
int Thread_Pool<T>::g_num = 10;
}
Task.hpp
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op; //"+-*/"
public:
Task() = default;
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
void operator()()
{
int ret = 0;
//printf("hello ->%d\n",_op);
switch (_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
{
//std::cout<<"bug??" << _x <<" "<< _op <<" "<< _y << " ="<< "?" << std::endl;
printf("bug?? %d %d %c=?\n",_x,_y,_op);
exit(1);
}
break;
}
std::cout << "当前任务正在被线程池中的线程->LWP为" << pthread_self() << "的线程获得,处理:" << _x << " " << _op << " " << _y << "=" << ret << "的任务" << std::endl;
}
~Task() = default;
};
}
main.cpp
#include "./thread_pool.hpp"
#include "./Task.hpp"
#include <string>
#include <time.h>
using namespace ns_task;
using namespace ns_thread_pool;
using namespace std;
int main()
{
srand(time(NULL));
Thread_Pool<Task> pp;
pp.InitThreadPool();
std::string ch("+-*/%");
while(1)
{
int x=rand()%20+1;
int y= rand()%20+1;
char op=ch[rand()%5];
Task t(x,y,op);
printf("主线程->派发了一个任务:%d %c %d =?\n",x,op,y);
pp.PushTask(t);
sleep(1);
}
}
总结
对于多线程,多造轮子,造多了就熟悉了