多线程
- 生产消费模型
- 三种关系
- 两个角色
- 一个交易场所
- 交易场所的实现(阻塞队列)
- pthread_cond_wait 接口
- 判断阻塞队列的空或满时,需要使用while
- 测试一:单消费单生产案例
- 测试二:多生产多消费案例
生产消费模型
消费者与生产者这两个词是不是很熟悉?
生产者、消费者、分解者 … 这些词在中学生物课本中经常看到。但是,接下来所介绍的内容中没有分解者,只有生产者与消费者。
生产消费模型在日常生活中很常见,这里来举个比较贴切的案例:超市
在超市内部会存有大量的商品,这些商品提供给用户消费;超市并不是生产者,超市只是将多种商品汇聚到指定的柜台方便用户们选取。超市中的商品来自与各个供货商,供货商生产产品。
用户、超市、供货商中,用户是消费者;供货商是生产者;
那么,超市在这里担任一种什么角色呢?
先来想这样的一个问题,用户为什么去超市买商品,而不是直接去供货商那边去买呢?
首先,工厂在生产一种产品时,是批量生产的。并且在工厂中,不是单单只是生产一种产品。可能今天生产某个类型的产品,当数量达到甲方需求后就会停止生产,换成另一类的商品继续生产。如果,一个用户当天正好需要这个工厂的某个商品时,这个工厂正好生产着这个商品,这样是再好不过。如果,当天这个工厂恰好就没有生产这个商品呢?用户一直等待工厂来生产吗?这样的体验是很差的。
由此,超市的作用就是将各个供应商的商品汇总起来。什么商品我都收纳一些,不管当下商品卖不卖得出去,时间到了自然有用户需求。当用户需要某个商品时,不用说每次都要去供应商去询问等待,直接去超市购买即可。超市就起到了商品供应的缓冲作用。
用户、供应商、超市组成的的生产消费模型,允许生产与消费的工作步调不一致。供货商每时每刻都可以生产大批的商品汇总到超市,消费者在不同的时间点,都可以从超市中获取不同的商品需求,不需要时间的等待。
总的来说,超市的生产消费模式是高效的,允许忙闲不均。
铺垫了这么多,将上面提到的角色转换一下:
生产者与消费者 就是所谓的 多线程;
超市是交易场所,相当于一种特定的缓冲区(这个缓冲区可以是:队列、链表、哈希表… 等数据结构);
商品就是每个线程所需处理的数据
再将上面的角色关联一下,缓冲区、数据、多线程的之间的主要是为了完成一种功能:完成多线程之间的通信
所以,生产消费模型主要用于:不同线程之间完成通信工作
为了让多线程进行通信,就要让超市(交易场所)被所有的线程看到。注定了,交易场所是一个被多线程并发访问的公共场所。所以,交易场所就需要被保护起来;还要进行维护对应的互斥与同步的关系!(避免饥饿的发生)
三种关系
在生产消费模型中存在三种关系:
- 生产者与生产者之间处于:互斥关系
好比 可口可乐 与 百事可乐 两家厂商,都是生产者。超市中的货架摆放商品的容量是有限的。容量很大时还好,当容量很紧张时,可口可乐供货说来摆我的商品,百事可乐的供应商也说摆我的摆我的,不同的供应商之间就会出现竞争关系。所以,生产者与生产者之间是属于 互斥关系。
- 消费者与消费者之间处于:互斥关系
超市中的商品是有限的。当超市中的产品很紧缺,不同消费者之间又是同时需要这个产品时,就会出现竞争现象,也就是互斥。由此,消费者与消费者之间的关系是属于 互斥关系。
- 生产者与消费者之间处于:互斥且同步的关系
交易场所(超市)作为生产者与消费者之间公共场所,是需要发挥一定的作用的!下面来举些例子:
示例一:
假设某天超市中的一个产品没货了,恰好此时的消费者又需求这个商品时,如果超市没有及时补货,而且又没有对消费者进行提示的话,消费者会每时每刻都来超市寻求这个商品。消费者的每次的寻求,都会耗时、耗费精力。遇到好说的消费者还好,遇到不好说话的,可能就会引起不好的矛盾。由此,超市就应该做好对应的决策,当超市缺货时,如果有消费者来寻求商品时,应该对这个消费者进行沟通,让消费者等待,并且保留信息,待有货再来通知消费者来获取商品。
同样的,超市货架上的货物数量、货物价格等等,是生产者(供应商)需要关心的。超市需要对供应商做好对应的联系通知准备,没货了我就通知生产者上货,货满了就让供货商停止供货。也不要让生产者花费不必要的时间去往返询问等等。
示例二:
假设有生产者刚好正在为当前超市商家商品到货架上时,此时有消费者来了,寻求正在上货的商品。那么问题来了,是先上货呢?还是先让消费者拿货呢?货架属于公共资源,生产者与消费者共同访问货架,就会出现并发问题。
示例一中,生产者与消费者之间的关系,需要公共场所来维护。没货了就让消费者处于等待状态,让生产者来生产货物;货满了就要让生产者停止生产,让消费者来消费产品;由此,生产者与消费者要维护 同步的关系。
示例二中,生产者与消费者之间会存在并发访问的问题,由此,是 互斥的关系。
两个角色
- 生产消费模型中存在两个角色:生产者、消费者
一个交易场所
- 生产消费模型中的交易场所通常是:某种数据结构的缓冲区
总的来说,生产消费模型可以简单记忆概括为 “三二一原则”(这个原则主要用于方便记忆,不是官方名词)
存在三种关系、有两个角色、维护一个交易场所;手写CP问题,就可以转换为用代码去维护实现 “三二一原则”。
下面来简单实现一个交易场所:
交易场所的实现(阻塞队列)
创建一个 blockQueue.hpp 文件,其内部主要实现 BlockQueue 类(阻塞队列)。如何设计这个交易产所呢?
- 成员变量的设计
阻塞队列也是队列,成员变量中应该包含有一个队列 以及 队列容量的大小;
先前提到过,消费者与生产者之间的关系是需要交易场所来维护的(消费者与生产者所处的关系有:互斥关系 与 同步关系)。由此,BlockQueue 类中的成员需要包含:一把锁 和 条件变量。
但是,要考虑一点:当队列为满时,生产者要处于阻塞状态,停止生产;队列为空时,消费者也要处于阻塞状态,停止消费。条件变量只设置一个的话,是不能满足上面要求的。对此,我们要在类中设计两个条件变量,分别约束消费者与生产者。
先实现一部分代码,后续继续补充内容:
#pragma once
#include <iostream>
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <queue>
const int gcap = 5; // 阻塞队列的最大容量
template <class T>
class BlockQueue
{
public:
//构造
BlockQueue(const int cap = gcap)
: _cap(cap)
{
// 初始化互斥锁
pthread_mutex_init(&_mutex, nullptr);
// 初始化消费者、生产者的条件变量
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_productorCond, nullptr);
}
//析构
~BlockQueue()
{
// 释放锁
pthread_mutex_destroy(&_mutex);
// 释放条件变量
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
std::queue<T> _q;
size_t _cap; // 队列容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _consumerCond; // 消费者的条件变量
pthread_cond_t _productorCond; // 生产者的条件变量
};
如果对互斥锁,条件变量 接口不熟的老铁可以参考小编的这篇文章:多线程的互斥与同步,有较详细的介绍
- 成员函数的设计
来分析一下,关于生产者生产的商品 以及 消费者消费的商品 在交易场所中如何处理:
生产者生产的商品,会提供给超市,摆在货架上。换回来说就是存储到阻塞队列中。由此,我们需要在 BlockQueue 类中实现一个 push 成员函数,专门用于接收生产者生产的数据。同样的,消费者需要从超市货架上获取商品。所以,我们也需要实现一个 pop 成员函数,专门用于消费者拿取数据。实现如下:
template <class T>
class BlockQueue
{
public:
//构造
//析构
bool isFull() //判断阻塞队列是否满
{
return _q.size() == _cap;
}
bool isEmpty() //判断阻塞队列是否为空
{
return _q.empty();
}
//将生产者生产的数据放入阻塞队列中
void push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
while(isFull() == true)
{
//阻塞队列为满时,生产者停止生产,阻塞生产者
pthread_cond_wait(&_productorCond, &_mutex);
}
//推送数据到队列中
_q.push(in);
//当产品大于阻塞队列的容量一半时,不管消费者是否阻塞,都唤醒消费者消费
if (_q.size() >= _cap / 2)
pthread_cond_signal(&_consumerCond);
// 解锁
pthread_mutex_unlock(&_mutex);
}
// 消费产品
void pop(T *out)
{
// 加锁
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
//阻塞队列为空时,消费者停止消费,阻塞消费者
pthread_cond_wait(&_consumerCond, &_mutex);
}
//消费产品
*out = _q.front();
_q.pop();
//到这里,队列的最后一个产品可能被消费完了,不管生产者是否阻塞都唤醒生产者,进行生产工作
pthread_cond_signal(&_productorCond);
// 解锁
pthread_mutex_unlock(&_mutex);
}
private:
//成员变量...
}
上面实现的阻塞队列,有两个细节需要介绍:
pthread_cond_wait 接口
在调用 pthread_cond_wait
接口时,为什么要传入 互斥锁 变量?
先来回顾一下, 调用pthread_cond_wait
接口的缘由:为了实现线程的同步关系,防止某个线程重复申请互斥锁资源,导致其他线程处于饥饿状态。
就拿刚刚实现阻塞队列的 push 接口来说,生产者可不止只有一位,可能有很多个。为了防止多个线程同时访问同一块资源,导致并发问题。所以,要对当前 push 接口临界资源进行上锁保护。
生产者会源源不断的生产产品,当阻塞队列满时,需要对生产者要求停止生产。使用
pthread_cond_wait
接口,将生产者阻塞等待。判断队列满不满是在临界资源中进行的;由此,线程是申请了锁资源后,再调用pthread_cond_wait
接口。
为了避免其他线程不能使用锁资源,就得先解锁。这也是为什么在调用 pthread_cond_wait
接口时,要传入互斥锁的缘由。
pthread_cond_wait
接口内部实现:将当前线程申请的锁资源先归还(解锁),然后再将当前线程处于阻塞状态。
到这里就完了吗?其实不然:
在阻塞当前线程后,OS会将当前的线程所有数据,以及当前线程执行的代码位置先保存。然后,调度其他线程执行其他任务。假设某一时刻,队列不为满,生产者又可以继续生产产品时,OS 又会将当前线程调度回来(之前怎么转移当前线程运行的资源,现在就怎么原封不动的转移回来),这个线程又可以再临界区执行后续的代码了。
那么问题来了,当前线程自己是申请锁了的。但是锁,被 pthread_cond_wait
接口内部实现的代码释放了。
怎么办呢?实现 pthread_cond_wait
接口的设计者早就考虑到了。
pthread_cond_wait
内部并不是简简单单就将当前线程阻塞后就即刻返回。将当前线程处于阻塞状态后,线程被 OS 调度开。 之后当线程再被调度回来后,会继续执行pthread_cond_wait
内部的后续代码。
pthread_cond_wait
其内部会为当前线程申请互斥锁资源,没有申请成功会一直申请,直到申请成功,才会彻底的退出
这也说明了,为什么要给 pthread_cond_wait
接口传入 锁对象。
判断阻塞队列的空或满时,需要使用while
在判断队列是否为 空 / 满 时,为什么使用 while 循环?使用 if 可以吗?
因为消费者 和 生产者不仅只有一位,举个简单的例子:
当消费者只有一位,生产者有多个时,阻塞队列毋庸置疑很快会被生产者生产的数据占满。就拿上面的代码来说,当队列在处于满的状态下,所有的生产者都会被处于阻塞状态,只能等待消费者消费。由于消费者在获取数据时,会有对应的执行策略,唤醒生产者进行生产数据。上面在使用
pthread_cond_signal
接口还好,只是唤醒一个生产者。但是,在使用pthread_cond_broadcast
接口时就会出现问题。
前面讲到,pthread_cond_wait
内部实现有两部分执行功能:
- 先将当前线程锁资源归还,再将当前线程阻塞
- 当线程被唤醒,会为当前线程申请锁资源,申请成功继续向后执行对应的代码;否则会一直申请,直到成功
pthread_cond_broadcast
作用是唤醒所有被 pthread_cond_wait
阻塞的线程
回到刚刚的例子,消费者只有一个,在队列为满时,会去消费一个数据,阻塞队列腾就出一个空间。
如果此时消费者去调用pthread_cond_broadcast
就会出现这样的问题:所有的被阻塞的线程会被唤醒,都会去争着去申请锁资源
使用 if 条件判断进行判断的话只判断一次,并且生产者都是线程,就会出现并发的问题。第一个生产者被唤醒,在申请锁成功后,执行后续代码,生产数据。由于阻塞队列的容量只有一个,此时的生产就已经让阻塞队列满了。当第一个生产者释放锁后,后续又有其他生产者会申请锁资源,继续执行后续代码,生产对应的数据。这样直接导致队列容量的不足问题。
回看刚刚实现的代码:
//将生产者生产的数据放入阻塞队列中
void push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
while(isFull() == true)
{
//阻塞队列为满时,生产者停止生产,阻塞生产者
pthread_cond_wait(&_productorCond, &_mutex);
}
//推送数据到队列中
_q.push(in);
//当产品大于阻塞队列的容量一半时,不管消费者是否阻塞,都唤醒消费者消费
if (_q.size() >= _cap / 2)
pthread_cond_signal(&_consumerCond);
// 解锁
pthread_mutex_unlock(&_mutex);
}
使用 while 就可以很好的避免这样的问题发生。不管是唤醒单个线程,还是唤醒一批的线程。当阻塞队列的最后一个容量被占满后,其他被唤醒的线程还是会向后执行对应的代码,pthread_cond_wait
会为其他生产者申请锁资源。
使用 while 进行循环判断后,不管被唤醒的线程是否申请锁资源成功,都会再次进入循环对阻塞队列的容量进行判断。容量满,再次调用pthread_cond_wait
让这个线程阻塞,归还锁资源。循环往复,直到消费者再次消费数据。从而避免阻塞队列容量问题。
测试一:单消费单生产案例
交易场所实现完成后,简单测试一下代码:创建一个生产者、一个消费者,完成对应的任务
#include "blockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<int>* bp = static_cast<BlockQueue<int>*>(args);
while(true)
{
sleep(1); //消费者每次消费完,停顿一秒
int data = 0;
//将数据从blockqueue中获取 -- 获取到了数据
bp->pop(&data);
std::cout << "consumer get data:" << data << std::endl;
}
}
void* productor(void* args)
{
BlockQueue<int>* bp = static_cast<BlockQueue<int>*>(args);
while(true)
{
//sleep(1); //生产者每次生产完数据,停顿一秒
//生产随机数
int data = rand() % 10 + 1;
//将数据推送到blockqueue -- 完成生产过程
bp->push(data);
std::cout << "productor data:" << data << std::endl;
}
}
int main()
{
//321原则:3种关系、2个模型、1个场所
//创建阻塞队列
BlockQueue<int>* bq = new BlockQueue<int>();
//单消费者、单生产者模型
pthread_t p,c;
pthread_create(&c, nullptr, consumer,bq);
pthread_create(&p, nullptr, productor,bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
先让消费者慢于生产者生产的数据,每次消费完休眠一秒。运行效果如下:
由于在BlockQueue类中,设置了阻塞队列的最大容量为 5 。从运行结果看,生产者一上来就产生了5个数据,之后就阻塞了;当消费者消费数据后,才会继续生产数据。
下面修改代码,让生产者每次生产完数据后,暂停一秒。再来看看运行效果:
由于在生产者中,加了策略,当产品数量大于阻塞队列的一半时,唤醒消费者。所以,每次生产两个产品后,就会被消费者消费完。
测试二:多生产多消费案例
接下来修改测试代码,不再以整形为例子。在阻塞队列中传入 Task 对象,模拟实现一个简单的业务流程。实现 Tack 类,其成员函数主要完成基本的 加减乘除 运算任务:
#include "blockQueue.hpp" //包含交易场所的头文件
class Task
{
public:
Task() {}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op), _result(0), _codeRet(0)
{
}
// 完成运算任务
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '/':
{
if (_y == 0)
_codeRet = -1;
else
_result = _x / _y;
}
break;
case '*':
_result = _x * _y;
break;
case '%':
{
if (_y == 0)
_codeRet = -2;
else
_result = _x % _y;
}
break;
default:
std::cout << "没有这样的运算符!" << std::endl;
break;
}
}
//输出表达式字符串
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "=";
}
//输出结果以及运算成功与否
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_codeRet) + ")";
}
~Task() {}
private:
int _x;
int _y;
char _op; // 运算符
int _result; // 运算结果
int _codeRet; // 用于表示运算成功与否
};
Task 类实现后,在 main.cc 中创建多个生产者与消费者,分别生成数据与消费数据:
#include "task.hpp" //包含 Task 类头文件
//将线程编号转16进制
std::string switchPthreadId(pthread_t id)
{
char* str = new char[32];
snprintf(str, 32, "0x%x", id);
return str;
}
//消费
void* consumer(void* args)
{
BlockQueue<Task>* bp = static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t;
// 1. 将数据从blockqueue中获取 -- 获取到了数据
bp->pop(&t);
// 2. 结合某种业务逻辑,处理数据!
//执行运算任务
t();
std::cout << switchPthreadId(pthread_self()) << " | consumer data: " << t.formatArg() << t.formatRes() << std::endl;
}
}
//生产
void* productor(void* args)
{
BlockQueue<Task>* bp = static_cast<BlockQueue<Task>*>(args); //类型转换
std::string ops = "+-*/%";
while(true)
{
sleep(1);
// 1. 先通过某种渠道获取数据
char op = ops[rand() % ops.size()]; //取随机运算符
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
//获取数据
Task t(x, y, op);
// 2. 将数据推送到blockqueue -- 完成生产过程
bp->push(t);
std::cout << switchPthreadId(pthread_self()) << " | productor Task: " << t.formatArg() << "?" << std::endl;
}
}
int main()
{
//生产随机数
srand((uint64_t)time(nullptr) ^ getpid());
//321原则:3种关系、2个角色、1个场所
//创建阻塞队列
BlockQueue<Task>* bq = new BlockQueue<Task>();
//生产消费模型
pthread_t p[2],c[3];
//创建多个生产者与消费者
pthread_create(p, nullptr, productor,bq);
pthread_create(p + 1, nullptr, productor,bq);
pthread_create(c, nullptr, consumer,bq);
pthread_create(c + 1, nullptr, consumer,bq);
pthread_create(c + 2, nullptr, consumer,bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
return 0;
}
在这里我们先让生产者生成数据慢一点,每次停顿一秒后再生成数据;让消费者消费数据快一点,编译运行结果如下:
生产者消费者模型的 高效性 并不能只看生产者生成数据到交易场所中,再让消费者消费简单的线性流程。
要从整体的业务流程分析,生产消费模型高效性体现如下:
- 生产消费模型支持多个生产者同时向队列中提交数据,满足高并发的数据生成需求;
同时支持多个消费者同时向队列中获取数据,满足高并发,提高系统的 处理能力 和 吞吐量 - 生产者只需将消息放到队列中,就可以立即进行其他的操作,无需等待消费者处理完毕;
消费者也只需从队列中取出消息进行后续的逻辑处理,实现了生产者和消费者之间的异步操作
生成消费模型就介绍到这里,有感兴趣的老铁可以点点关注。