「前言」文章是关于Linux多线程方面的知识,上一篇是 Linux多线程详解(三),今天这篇是 Linux多线程详解(四),内容大致是生产消费者模型,讲解下面开始!
「归属专栏」Linux系统编程
「笔者」枫叶先生(fy)
「座右铭」前行路上修真我
「枫叶先生有点文青病」
「每篇一句」
记住该记住的,忘记该忘记的。
改变能改变的,接受不能改变的。
——塞林格《麦田守望者》
目录
八、生产者消费者模型
8.1 概念
8.2 生产者消费者模型的特点
8.3 生产者消费者模型优点
九、基于BlockingQueue的生产者消费者模型
9.1 概念
9.2 C++ queue模拟阻塞队列的生产消费模型
八、生产者消费者模型
8.1 概念
生产者消费者模型是指在一个系统中,存在生产者和消费者两种角色,生产者负责生产数据,消费者负责消费数据,二者之间通过共享缓冲区进行通信,以实现数据的传输和处理。生产者生产数据后将数据放入缓冲区中,而消费者从缓冲区中取出数据进行处理。
这就类似于学校里面的小卖部:
- 学生是消费者,
- 小卖部是共享的缓冲区,称为交易场所,
- 提供产品给小卖部的供货商是生产者,
- 这里的产品就是数据
生产者生产产品,生产者把产品放入小卖部,最后学生进行消费产品。
- 明显发现,生产者与消费者互不影响,生产者只负责生产数据,消费者只负责消费数据,用计算机的术语来说就是:把生产者的生产过程与消费者的消费过程进行解耦
- 小卖部是交易场所,是一个缓冲区,这个缓冲区的存在可以提供一定量的产品供学生进行一段时间的消费。
- 消费者疯狂消费产品,生产者此时不生产,等小卖部的产品下降到了一定的水平线后,小卖部再通知生产者进行生产一定量的产品(而不是生产一个产品就消费一个产品)。这个过程说明消费者和生产者的步调可以不一致,这样可以同时提高生产者和消费者的效率
上面有小卖部,可以对生产者和消费者进行解耦,那下面就谈谈生产者和消费者强耦合的情况(也就是没有小卖部,即没有缓冲区的情况)
因为没有小卖部,消费者想要消费数据就只能去跟生产者说,消费者说:我要一根火腿肠,供货商你生产一下。这个过程就势必会让消费者进行等待,这就是生产者和消费者强耦合的情况,这样会导致生产者和消费者的效率大大降低
比如我们平时main主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种强耦合的关系。
对生产者消费者模型的角色之间的关系分析
- 一个消费者或多个消费者对应:一个线程或多个线程
- 一个生产者或多个生产者对应:也是一个线程或多个线程
- 小卖部对应:共享资源
角色之间的关系分析:
- 生产者与生产者之间:互斥关系(竞争生产产品)
- 消费者与消费者之间:互斥关系(共同竞争产品,即共同竞争数据)
- 生产者与消费者之间:互斥 && 同步(互斥:生产者生产数据未完成,消费者这时来消费数据;同步:有数据消费者才进行消费,没有数据降到一定水平线生产者才开始生产数据)
生产者和消费者为什么存在同步和互斥的关系?
- 同步关系:
- 如果生产者一直生产数据,消费者不消费数据,当生产者生产的数据将缓冲区塞满后,生产者再生产数据就会生产失败
- 如果消费者一直消费数据,生产者不生产数据,当消费者把缓冲区的数据消费完之后,生消费者再进行消费就会消费失败
- 虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的
- 所以,生产者和消费者需要存在同步的关系,生产者生产数据达到一定水平线后就不生产,等待消费者进行消费数据,缓冲区的数据下降到一定的水平线生产者就开始生产数据,这就是同步
- 互斥关系:
- 生产者生产数据未完成,消费者这时来消费数据,这是就会导致数据不一致的问题
- 所以,生产者和消费者需要存在互斥的关系,消费者必须等到生产者生产完数据,消费者才可以进行消费(保证共享资源的安全性)
注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来
8.2 生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典模型,其特点如下:
321原则(便于记忆)
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者(通常由线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区(共享资源)
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的
8.3 生产者消费者模型优点
- 解耦(生产者与消费者进行解耦)
- 支持并发(高效)
- 支持忙闲不均(忙闲不均,比如生产者生产快,消费者消费慢;消费者消费快,生产者生产慢等)
九、基于BlockingQueue的生产者消费者模型
9.1 概念
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
- (以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
9.2 C++ queue模拟阻塞队列的生产消费模型
为了便于理解,以单生产者,单消费者,来进行讲解,其中阻塞队列就是缓冲区
BlockQueue.hpp
#pragma once
#include <iostream>
#include <thread>
#include <queue>
const int gmaxcap = 5;
template <class T>
class BlockQueue
{
// public:
// 如果一个类是模板类,那么它的静态成员变量应该在类的外部进行定义,而不能在类的内部进行定义和初始化。
// 这是因为模板类在编译时并不会被实例化,只有在使用时才会进行实例化,因此编译器无法确定静态成员变量的类型和值。
// static const int gmaxcap = 5;
public:
BlockQueue(const int &maxcap = gmaxcap) : _maxcap(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
void push(const T &in) // 输入型参数,一般设置成 const &
{
pthread_mutex_lock(&_mutex);
// 1.判断,不能使用if
while (is_full())
{
// pthread_cond_wait这个函数的第二个参数,必须是我们正在使用的互斥锁!
// a. pthread_cond_wait: 该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起
// b. pthread_cond_wait: 该函数在被唤醒返回的时候,会自动的重新获取你传入的锁
pthread_cond_wait(&_pcond, &_mutex); // 队列满,不满足生产条件,去生产者的条件变量下等待
}
// 2.走到这里的代表队列不为满,该线程一定可以生产数据
_q.push(in); // 生产数据
// 3.走到这里,一定能保证阻塞队列里面有数据,此时可以唤醒一个消费者线程去消费数据
// pthread_cond_signal(&_ccond);//唤醒函数可以放在临界区内部,也可以放在临界区外面
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_ccond);
}
void pop(T *out) // 输出型参数,一般设置成 * ;如果是输入输出型:&
{
pthread_mutex_lock(&_mutex);
// 1.判断,不能使用if
while (is_empty())
{
pthread_cond_wait(&_ccond, &_mutex); // 队列为空,不满足消费条件,去消费者的条件变量下等待
}
// 2.走到这里的代表队列不为空,该线程一定可以消费数据
*out = _q.front();
_q.pop();
// 3.走到这里,一定能保证阻塞队列里面有一个空的位置,此时可以唤醒一个生产者线程去生产数据
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
bool is_empty() // 判断队列是否为空
{
return _q.empty();
}
bool is_full() // 判断队列是否满
{
return _q.size() == _maxcap;
}
private:
std::queue<T> _q;
int _maxcap; // 队列元素的上限
pthread_mutex_t _mutex;
pthread_cond_t _pcond; // 生产者对应的条件变量
pthread_cond_t _ccond; // 消费者对应的条件变量
};
注:由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可
主函数:Main.cc
在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据
#include "BlockQueue.hpp"
#include <unistd.h>
#include <ctime>
void *consumer(void *_bq)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(_bq);
while (true)
{
// 消费活动
int data;
bq->pop(&data);
std::cout << "消费数据:" << data << std::endl;
sleep(1);
}
return nullptr;
}
void *productor(void *_bq)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(_bq);
while (true)
{
// 生产活动
int data = rand() % 10 + 1;
bq->push(data);
std::cout << "生产数据:" << data << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr) ^ 0x1122334);
// 阻塞队列:共享资源
BlockQueue<int> *bq = new BlockQueue<int>();
// c:consumer p:productor
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
// 线程等待
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
编译运行(生产者消费者步调一致,都是sleep了1秒)
生产者生产慢,消费者消费快
修改一下代码
虽然消费者消费的很快,但一开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的
运行结果
生产者生产快,消费者消费慢
修改代码
此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列打满了,此时生产者想要再进行生产就只能在条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了
运行结果
执行计算任务生产者消费者模型
际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,这里只是为了方便测试
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
class Task
{
// 两者用法相同,写法不同
// typedef std::function<int(int,int)> func_t;
using func_t = std::function<int(int, int, char)>;
public:
Task()
{}
Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
{}
std::string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
const std::string oper = "+-*/%";
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
// do nothing
break;
}
return result;
}
修改main函数
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
void *consumer(void *_bq)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(_bq);
while (true)
{
// 消费活动
Task t;
bq->pop(&t);
std::string result = t();
std::cout << "消费者,完成计算任务: " << result << std::endl;
sleep(1);
}
return nullptr;
}
void *productor(void *_bq)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(_bq);
while (true)
{
// 生产活动
int x = rand() % 20 + 1;
int y = rand() % 10;
int operCode = rand() % oper.size();
Task t(x, y, oper[operCode], mymath);
bq->push(t);
std::cout << "生产者,生产计算任务: " << t.toTaskString() << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr) ^ 0x1122334);
// 阻塞队列:共享资源
BlockQueue<Task> *bq = new BlockQueue<Task>();
// c:consumer p:productor
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
// 线程等待
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
编译运行
--------------------- END ----------------------
「 作者 」 枫叶先生
「 更新 」 2023.6.1
「 声明 」 余之才疏学浅,故所撰文疏漏难免,
或有谬误或不准确之处,敬请读者批评指正。