目录
1、生产消费者模型示意图
2、生产者消费者之间的关系
3、定义交易场所
4、实现生产消费者模型
5、伪唤醒
6、多生产多消费者的实际运用
7、POSIX信号量
7.1 初始化信号量
7.2 销毁信号量
7.3 等待信号量
7.4 发布信号量
8、生产消费的环形队列模型
8.1 实现多生产多消费环形模型
结语
前言:
使用生产消费者模型的原因如下:生产消费者模型常用于多线程并发式执行流,该模型具有同步与互斥机制。该模型的核心点在于用一个容器作为生产者和消费者相互通信的桥梁,使得生产者和消费者不直接进行交互,降低了生产者与消费者之间的强耦合度。好处在于生产者生产数据后即使消费者不来拿这个数据,生产者也可以继续生产,并且后续生产的数据不会覆盖之前的数据,只有当容器满了,生产者才会阻塞。只要容器内有数据,消费者就可以从容器中拿数据,无需等待生产者一个一个的生产数据。
1、生产消费者模型示意图
生产消费者模型具体示意图如下:
2、生产者消费者之间的关系
生产消费者模型具有互斥机制,因此生产者和消费者之间肯定存在互斥概念,这里例举3个关系(关系指的是生产者对交易场所的操作和消费者对交易场所的操作之间的关系,简写成生产者和消费者的关系):
1、生产者和消费者的关系必须是互斥和同步的,当生产者访问交易场所时消费者不能访问交易场所,只有当生产者访问结束后消费者才可访问交易场所,即访问具有原子性。
2、生产者和生产者之间也是互斥,他们的关系是类似竞争。
3、消费者和消费者之间也是互斥,比如一个资源只能让一个消费者获取,则另一个消费者就获取不了了。
综上所述可以发现生产者消费者在代码层面上是共享一把锁的。
3、定义交易场所
这里用队列来模拟交易场所,入队表示生产者访问场所,出队表示消费者访问场所。生产者和消费者就是两个线程,入队和出队是这两个线程要执行的操作。所以需要自己实现一个队列,并且对该队列的入队和出队进行同步互斥约束。
将队列封装成一个类,自定义队列代码如下:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
template <class T>
class BlockQueue
{
static const int defalutnum = 5;//队列的大小
public:
BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
}
//出队-消费者
T pop()
{
pthread_mutex_lock(&mutex_);
if(q_.size() == 0)
{
pthread_cond_wait(&c_cond_, &mutex_); //场所为空则消费者必须等待
}
T out = q_.front();
q_.pop();//消费了一个数据
pthread_cond_signal(&p_cond_); //唤醒生产者
pthread_mutex_unlock(&mutex_);
return out;
}
//入队-生产者
void push(const T &in)
{
pthread_mutex_lock(&mutex_);
if(q_.size() == maxcap_)
{
pthread_cond_wait(&p_cond_, &mutex_); //场所为满则生产者必须等待
}
// 1. 队列没满 2.被唤醒
q_.push(in);//生产了一个数据
pthread_cond_signal(&c_cond_);//唤醒消费者
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 复用容器队列
int maxcap_; // 队列大小
pthread_mutex_t mutex_;//生产者和消费者共用一把锁,因为他们的互斥的
pthread_cond_t c_cond_;//消费者的等待队列
pthread_cond_t p_cond_;//生产者的等待队列
};
上述代码的逻辑:刚开始消费者会先进入条件变量的等待队列中等待(等待时消费者会释放锁),生产者此时就申请到锁了,然后生产一个数据,只要场所里有了数据,就消费者就可以来拿数据,因此生产者进行入队操作后,就可以唤醒在等待队列里的消费者了。相反消费者只要出队了一个数据,表示场所里有剩余空间,消费者就可以唤醒生产者来生产数据了。
注意:采用的唤醒策略是交互性唤醒。
小细节:虽然唤醒了对方,但是不意味着唤醒了那一刻对面就拿到锁了,而是自己要先释放锁对方才能申请到锁,对方申请到锁了才可以从等待队列中出来。
4、实现生产消费者模型
有了上面的自定义栈结构,后续只需要在生产者线程中调用push函数,消费者线程中调用pop函数即可,实现生产消费者模型代码如下:
#include "queue.hpp"
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int t = bq->pop();//消费者消费数据
cout<<"消费者拿到一个数据"<<t<<endl;
sleep(1);
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int num = 0;
while (true)
{
bq->push(num);//生产者生产数据
cout<<"生产者生产一个数据"<<num<<endl;
num++;
}
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, bq);
pthread_create(&p, nullptr, Productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
运行结果:
从上述结果可以分析出两个关键点:
1、为什么是生产者先生产了5个数据,然后消费者才过来消费,而不是生产一个数据消费者就来消费。
答:因为上述代码中没有对生产者的循环调用进行sleep,并且生产者和消费者共用一把锁,在最开始的时候消费者会先触发条件变量(因为交易场所为空),会直接进入消费等待队列中。而生产者申请到锁后并不会触发条件变量(因为交易场所未满),所以生产者就可以拿着锁去访问临界资源,当他唤醒消费者等待队列并且释放锁后,此时还是他距离锁最近的(因为在代码中他释放锁之后的动作就是申请锁),因为生产者释放锁后没有对其进行任何的sleep,因此生产者在此时是对锁拥有最高优先级。
具体逻辑图如下:
2、为什么消费者消费一次数据后就可以直接唤醒生产者呢?
答:因为消费者调用完pop函数后sleep了1秒,导致消费者不再“离锁最近”,并且消费者消费数据后对生产者进行了唤醒,所以生产者的等待队列中没有任何线程跟其竞争锁,最终生产者被成功唤醒并申请到了锁,并执行push代码。
具体逻辑图如下:
5、伪唤醒
伪唤醒指的是当线程在等待队列中被唤醒了,实际上当前条件并不满足于该线程被唤醒,此时就会导致意料之外的错误,所以在线程被唤醒时,最好先检查一下当前的条件是否满足,因此上面代码中唤醒的条件应该从if改成while,以便防止伪唤醒发生,特别是在多生产多消费的情况下。
防止伪唤醒的代码如下:
T pop()
{
pthread_mutex_lock(&mutex_);
while(q_.size() == 0)//防止伪唤醒
{
pthread_cond_wait(&c_cond_, &mutex_); //场所为空则消费者必须等待
}
T out = q_.front();
q_.pop();//消费了一个数据
pthread_cond_signal(&p_cond_); //唤醒生产者
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &in)
{
pthread_mutex_lock(&mutex_);
while(q_.size() == maxcap_)//防止伪唤醒
{
pthread_cond_wait(&p_cond_, &mutex_); //场所为满则生产者必须等待
}
// 1. 队列没满 2.被唤醒
q_.push(in);//生产了一个数据
pthread_cond_signal(&c_cond_);//唤醒消费者
pthread_mutex_unlock(&mutex_);
}
举例说明伪唤醒的错误:在多生产的场景下,当我们只需要生产一个数据时,多个生产者竞争一个锁确实可以只生产一个数据,但是当这个竞争到锁的生产者释放锁时,该锁并不会被消费者拿到,而是被等待队列中另一个生产者申请到锁,就会导致数据溢出的风险。因为多个生产者是处于同一个条件变量的等待队列中,当其中某个生产者重新调用wait函数回到该队列中时,他会释放锁,但是这个锁不会被他唤醒的消费者先申请到,而是被他所在队列的其他生产者拿到,这就导致其他生产者继续向满的队列生产。(总而言之,伪唤醒是一种不稳定的唤醒方式,伪唤醒的出现和系统调度已经线程竞争条件都有关系,因此当多线程的情况变得复杂时要注意防护伪唤醒)
6、多生产多消费者的实际运用
在实际项目中,生产者通常是给消费者派发各种任务,有了生产消费者模型,就可以把这些任务写进交易场所中,然后消费者到场所中领取任务并执行,因此我们可以先定义一个任务类,用于任务的派发和执行。
任务类的代码如下:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";//随机运算符号
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op) : data1_(x),
data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;//除0错误
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;//模0错误
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()//实现函数重载
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()//将任务转成字符串的形式,方便打印
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;//计算结果
int exitcode_;//退出码
};
主函数执行生产消费者代码如下:
#include "queue.hpp"
#include "task.hpp"
#include <unistd.h>
#include <ctime>
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 消费
Task t = bq->pop();
// 计算
t();
std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " \
<< t.GetResult() << " thread id: " << pthread_self() << std::endl;
}
}
void *Productor(void *args)
{
int len = opers.size();
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
int x = 10;
int y = 20;
while (true)
{
// 模拟生产者生产数据
int data1 = rand() % 10 + 1;//随机数
usleep(10);
int data2 = rand() % 10;//随机数
char op = opers[rand() % len];//随机运算符号
Task t(data1, data2, op);
// 生产
bq->push(t);
std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " \
<< pthread_self() << std::endl;
sleep(1);
}
}
int main()
{
srand(time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, Consumer, bq);
}
for (int i = 0; i < 5; i++)
{
pthread_create(p + i, nullptr, Productor, bq);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
delete bq;
return 0;
}
运行结果:
7、POSIX信号量
POSIX信号量可以让多个线程访问同一个交易场所的不同区域,也就是说在这种情况下生产者和消费者不存在互斥,因为他们访问的是场所内不同的资源空间,当POSIX信号量为0时表示当前线程不能往下访问临界资源。
7.1 初始化信号量
接口介绍如下:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//sem表示要初始化的信号量
//pshared为0表示线程间共享,非零表示进程间共享
//value表示信号量初始值
初始化信号量时,必须先定义一个类型为sem_t的信号量变量。
7.2 销毁信号量
接口介绍如下:
int sem_destroy(sem_t *sem);//销毁一个信号量
7.3 等待信号量
接口介绍如下:
int sem_wait(sem_t *sem);//等待信号量,会将信号量的值减1
7.4 发布信号量
接口介绍如下:
int sem_post(sem_t *sem);//将信号量值加1
8、生产消费的环形队列模型
上面的生产消费模型用的是队列作为容器,现在用数组取模的方式模拟环形队列作为生产消费模型的交易场所,示意图如下:
上图表示交易场所为空时,生产者和消费者相遇,这时候生产者和消费者只有生产者可以往前走(生产者信号量不为0),因为场所内没有数据(消费者信号量为0),所以消费者无法消耗数据,也就无法往前走。
当交易场所满的情况如下:
可以发现当交易场所满的时候,生产者依然是和消费者指向同一块区域,并且此时也只有一方可以移动,即只有消费者可以移动(消费者信号量不为0,而生产者信号量为0),因为当前没有空间让生产者生产数据了,只能让消费者消费数据。
以上是空或满的情况,当然,只要消费者没有和生产者相遇,那么他们两个可以同时对交易场所进行操作,因为这种情况他们两个的信号量都不为0,而这是上述自定义队列办不到的,这也是POSIX信号量的优势,只有当交易场所为空或为满时(其中一方的信号量为0时),生产者和消费者才是互斥关系。
8.1 实现多生产多消费环形模型
首先先定义一个环形队列,逻辑和上面自定义队列的类是一样的,只不过环形队列用的是vector来当作容器,并且使用信号量约束生产者和消费者,代码如下:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
const static int defaultcap = 5;
template<class T>
class RingQueue{
private:
void P(sem_t &sem)//--信号量
{
sem_wait(&sem);
}
void V(sem_t &sem)//++信号量
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T &in) // 生产
{
P(pspace_sem_);
Lock(p_mutex_); // ?
ringqueue_[p_step_] = in;
// 位置后移,维持环形特性
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T *out) // 消费
{
P(cdata_sem_);
Lock(c_mutex_); // ?
*out = ringqueue_[c_step_];
// 位置后移,维持环形特性
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_; // 消费者下标
int p_step_; // 生产者下标
sem_t cdata_sem_; // 消费者关注的数据资源
sem_t pspace_sem_; // 生产者关注的空间资源
pthread_mutex_t c_mutex_;
pthread_mutex_t p_mutex_;
};
信号量的使用逻辑:生成一个数据,则生成者的信号量减减,消费者的信号量加加。相反,消费一个数据,则消费者的信号量减减,生成者的信号量加加。
主函数实现生成消费数据的代码如下(注意这里的任务列表依旧用上文的任务类,只不过加了一个默认构造):
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "sem.hpp"
#include "task.hpp"
using namespace std;
struct ThreadData
{
RingQueue<Task> *rq;
std::string threadname;
};
void *Productor(void *args)
{
// sleep(3);
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 1. 获取数据
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 2. 生产数据
rq->Push(t);
cout << "生成任务完毕 : " << t.GetTask() << " : " << name << endl;
sleep(1);
}
return nullptr;
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
// 1. 消费数据
Task t;
rq->Pop(&t);
// 2. 处理数据
t();
cout << "消费者拿到一个任务 : " << t.GetTask() << " : " <<
name << " 结果: " << t.GetResult() << endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(50);
pthread_t c[5], p[3];
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, td);
}
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, td);
}
for (int i = 0; i < 1; i++)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 1; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}
运行结果:
结语
以上就是关于生产消费者模型的讲解,生产消费者模型可以提高并发式执行程序的效率,他的核心点在于交易场所的设计,以及生产者和消费者访问场所的时机,若时机出现偏差则会导致线程对锁的竞争不公平。
最后如果本文有遗漏或者有误的地方欢迎大家在评论区补充,谢谢大家!!