目录
1.生产消费者模型
1.1.概念引入
1.2.基于阻塞队列的生产消费模型
1.3.POSIX信号量
1.3.1.再识信号量
1.3.2.信号量接口的学习
1.4.基于环行队列的生产消费模型
1.5.深刻理解生产消费模型
2.可重入函数与线程安全
1.生产消费者模型
1.1.概念引入
生产者-消费者模型(Producer-Consumer Model)是一个经典的多线程并发协作模型,在分布式系统中非常常见。这个模型主要由两类线程和一个缓冲区组成:
- 生产者线程:负责生产数据,并将数据放入缓冲区中。
- 消费者线程:从缓冲区中取出数据,并进行消费或处理。
缓冲区是存放生产者数据的地方,它用于在生产者和消费者之间传递数据
对于生产消费者模型我们更多可以看成:供货商--超市---消费者 模型!
结合生产消费者模型,供货商本质上是“生产线程”,消费者是“消费线程”,而超市就是实现生产、消费线程进行通信的一块存储数据的缓冲区(内存空间)。那么生产者、消费者之间就天然的需要具有三种关系:
- 生产者之间由于竞争,需要实现互斥关系,也就是需要加锁!
- 消费者之间由于竞争,需要实现互斥关系,也就是需要加锁!
- 生产者与消费者不仅需要实现互斥关系,还要实现同步!
最终我们总结出:
由生产者、消费者在一段开辟的内存空间内需要满足以上3种关系。即为“321”原则,三种关系,两个角色,一个通信场所……
1.2.基于阻塞队列的生产消费模型
在这个模型中,我们以单个生产者---单个消费者来实现,首先我们知道生产消费模型中,生产者和消费者需要满足互斥和同步!因此我们需要通过锁、条件变量来实现 ,另外生产者的线程访问时需要加锁,消费者线程访问时也需要加锁,因此我们可以实现一把锁,来保证,某一次对阻塞队列访问时只有一个线程。
在了解最基本的原理之后,具体的代码如下:
BlockQueue.hpp
#include <pthread.h>
#include <iostream>
#include <queue>
using namespace std;
template <class T>
class BlockQueue
{
public:
BlockQueue(int capacity = 5)
: _capacity(capacity)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
// 生产和消费的基本原则:
// 1.当共享空间满了后,生产者不允许生产,为空不允许消费
// 2.何时生产由消费者决定,何时消费由生产者决定
void Push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
while (IsFull() == true)
{
// 如果队列满了我们需要阻塞生产者
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
// 只有生产后才能唤醒消费者,也可以制定策略来通知!
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 传入输出型参数,退出队列
{
pthread_mutex_lock(&_mutex);
while (Empty() == true) // 这里需要轮询判断,防止伪唤醒
{
// 如果队列为空,不允许消费者消费
pthread_cond_wait(&_c_cond, &_mutex);
}
// 退出的数即为队头
*out = _q.front();
_q.pop();
// 只要消费成功可以唤醒生产者
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex)
}
bool Empty() { return _q.empty(); }
bool IsFull() { return _capacity == _q.size(); }
private:
queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
// 生产者的条件变量
pthread_cond_t _p_cond;
// 消费者的条件变量
pthread_cond_t _c_cond;
};
在我们实现的过程中,我们需要知道:
- 消费者什么时候可以消费,生产者什么时候需要生产,前者必须保证阻塞队列中有资源,后者需要在资源未满的前提下,这就延伸到了---当我们通过条件变量进行等待时,必须由另一方进行唤醒,具体看代码注释
- 另外我们在阻塞队列中为了实现线程安全,直接对阻塞队列对象进行加锁,而阻塞队列本身可以分成很多个小对象,这里我们在后续信号量的部分会讲解。
- 为什么我们判断阻塞队列为空、为满需要用while循环轮询判断,这是因为我们可能通过pthread_cond_broadcast一次性唤醒多个线程,造成伪唤醒。例如当生产者刚生产了1件商品,就一次性唤醒了多个线程,那么就会导致狼多肉少,进而产生错误!
- 在这个模块中我们生产、消费关系是:只要不为空,就允许消费,进行通知,只要不为满就允许生产。实际上我们可以根据我们的生产、消费策略,来设定生产多少件后再通知消费、消费多少件后再通知生产,本质是设定策略,生产和消费“水位线”。
main.cpp
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include "BlockQueue.hpp"
// 基于生产消费者模型的阻塞队列
void *Consumer(void *args)
{
BlockQueue<int> *bq = (BlockQueue<int> *)args;
while (1)
{
int data = 0;
bq->Pop(&data);
cout << "Consumer data: " << data << endl;
// 消费者的消费速度也会影响生产者
}
}
void *Producter(void *args)
{
BlockQueue<int> *bq = (BlockQueue<int> *)args;
while (1)
{
int data = rand() % 10 + 1;
bq->Push(data);
cout << "Producter data: " << data << endl;
// 生产者休眠速度会影响消费者
sleep(1);
}
}
// 内置类型进行生产者消费模型交换
// 单个生产者-单个消费者
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, bq);
pthread_create(&p, nullptr, Producter, bq);
// 主线程进行等待!
pthread_join(c, nullptr);
pthread_join(p, nullptr);
}
在main函数中,我们可以控制生产者、消费者函数的休眠来模拟生产速度、消费速度。
- 比如生产时sleep(1),消费时不sleep,就会出现消费者等待生产者生产的现象,就是一步一步的先出产品然后立马被消费,同步性较强
- 或者是在消费时sleep,生产时不sleep,那么就会出现先生产出很多,然后生产者等待消费者消费再进行生产的现象。
1.3.POSIX信号量
1.3.1.再识信号量
在操作系统:进程间通信 | System V IPC-CSDN博客 博客3.2.模块中我们对信号量概念进行了提及,并总结出如下三点:
信号量的本质是一把计数器,信号量的使用是为了预定资源,信号量的操作是原子的。
那么为什么需要信号量呢?
首先我们需要知道锁的本质就是信号量,也就是信号量的使用是为了将并发访问资源强制转化为串行访问,至于为什么我们已经很熟悉了,我们也知道锁又称为二元信号量,处理的是只有一块资源的问题。而多元信号量处理的是多块资源的问题……
当我们多线程访问时,对于队列资源内部的每一个小资源可以存放数据,访问时需要加锁操作,那么这样子就需要许多把锁,显然是不合理的,所以就衍生出了多元信号量来处理多线程访问多块资源的问题。
说到了这里我们只需要把多元信号量当成处理多资源问题的“锁”即可!!!那么接下来直接开始我们多元信号量接口的学习……
1.3.2.信号量接口的学习
信号量的基本基本接口:
// 定义信号量变量
sem_t sem;
// 对信号量进行初始化
sem_init(sem_t *sem,
int pshared, // 传入0表示线程间共享,非零表示进程间共享
unsigned int value); // 传入信号量初始值,表示资源个数
// 销毁信号量
sem_destroy(sem_t *sem);
对信号量操作的接口---P、V操作
// 等待信号量,会将信号量的值减1,表示预定资源
// 如果没有资源会进行等待V操作释放资源,对应着P操作
sem_wait(sem_t *sem)
// 发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
// 不用考虑资源没有被使用,对应着V操作
sem_post(sem_t *sem);
1.4.基于环行队列的生产消费模型
环形队列和阻塞队列的生产消费模型的区别:前者把空间看为一个整体,线程访问时就不需要考虑队列资源同步和互斥的问题。后者因为需要实现线程可以同时进行空间的占用,和数据的读取,即需要考虑队列资源同步和互斥的问题,即把队列空间分成若干个小空间
那么天然的,在环形队列的生产消费模型中,我们需要对队列资源进行同步和互斥的实现,共享的多个资源我们可以借助学过信号量来进行访问的控制。接下来我们来分析一下环形
接下来我们通过代码来体会一下:
RingQueue.hpp
#include <pthread.h>
#include <iostream>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include "LockGuard.hpp"
using namespace std;
template <class T>
class RingQueue
{
private:
// 对应资源占用一个
void P(sem_t *sem)
{
sem_wait(sem);
}
// 对应资源释放一个
void V(sem_t *sem)
{
sem_post(sem);
}
public:
RingQueue(int size = 5)
: _queue(5), _size(size), _p_flag(0), _c_flag(0)
{
// 空间信号量设置为该RingQueue的大小
sem_init(&_space_sem, 0, _size);
// 数据信号量为0
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
void Push(const T &in)
{
// 要先申请信号量,减少锁竞争的次数,让线程在锁外面排队
// 是要先把车票买了,再排队上车?
// 还是先排队买票,再上车?
P(&_space_sem);
// 设置临界区
{
pthread_mutex_lock(&_p_mutex);
_queue[_p_flag] = in; // 覆盖式写入
_p_flag++;
_p_flag %= _size;
pthread_mutex_unlock(&_p_mutex);
}
V(&_data_sem);
}
void Pop(T *out)
{
P(&_data_sem);
{
pthread_mutex_lock(&_c_mutex);
*out = _queue[_c_flag];
_c_flag++;
_c_flag %= _size;
pthread_mutex_unlock(&_c_mutex);
}
V(&_space_sem);
}
size_t GetProducterFlag() const { return _p_flag; }
size_t GetConsumerFlag() const { return _c_flag; }
private:
vector<T> _queue;
size_t _size; // 整个队列的大小,用来计算新的下标
size_t _p_flag; // 生产者的下标
size_t _c_flag;
sem_t _space_sem; // 空间信号量 -- 生产者
sem_t _data_sem; // 数据信号量
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
};
- 因为我们需要做到生产者、消费者在非互斥、互斥这两种情况下,进行生产和消费,那么我们就需要定义下标来判断,并且我们如何判断队列为满或空,我们也需要传入信号量来辅助我们判断。
- 当我们实现了基本的变量,那么就需要实现基本函数!空间的占用和数据读取,前者占用空间,表示空间资源减少一个需要对空间P操作,数据增加一个需要对数据V操作。后者,数据减少,空间增加也需要对应的PV、操作。
- 而我们对应的P、V操作本身就是对定义资源分别调用信号量函数来实现的,P表示对应信号量资源减一,V表示对应信号量加一
这时我们就完成了单生产者、单消费者的环形队列生产消费模型,那我们如何实现多生产、多消费者的模型呢?
- 我们在阻塞队列中实现多生产、多消费时,因为阻塞队列本身就自带着锁用来实现队列资源的同步和互斥,所以当我们多个线程进入时,都需要共同的竞争唯一的一份队列资源,申请唯一一把锁,那么天然的就不用在对生产者、消费者内部进行加锁。
- 而在环形队列中,当我们队列非互斥的情况时,即生产者生产位置和消费者消费位置不一致的情况下,我们不需要对生产、消费者之前的互斥关系考虑。这时我们需要考虑的是,防止多个生产者对同一个位置的生产和多个消费者对同一个位置的消费。
- 所以我们在变量中也能看到,我们实现了生产者的锁和消费者的锁,目的就是为了防止多个生产者对同一个位置的生产和多个消费者对同一个位置的消费。
- 最后还有一个问题是“先申请信号量还是先加锁”?显然先申请信号量,可以减少锁竞争的次数,线程都外面乖乖排队。
main.cc
#include "RingQueue.hpp"
void *Producter(void *args)
{
cout << "生产者故意休眠3s" << endl;
sleep(3);
RingQueue<int> *rq = (RingQueue<int> *)args;
int count = 100;
while (1)
{
rq->Push(count);
cout << "Producter data: " << count <<", flag is: "<< rq->GetProducterFlag()<< endl;
count--;
}
}
void *Consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
int data = 0;
while (1)
{
sleep(1);
rq->Pop(&data);
cout << "Consumer data: " << data << endl;
}
}
int main()
{
pthread_t p[2], c[2];
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&p[0], nullptr, Producter, rq);
pthread_create(&p[1], nullptr, Producter, rq);
pthread_create(&c[0], nullptr, Consumer, rq);
pthread_create(&c[1], nullptr, Consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
}
那么相比较阻塞队列和环形队列实现的生产消费模型?
- 首先环形队列的引入,使得生产者和消费者在大部分时候,可以同时进行生产和消费,支持高并发,不用考虑队列资源的互斥问题,但是缺点就体现在队列的大小固定,当生产者和消费者速度相差过大时,可能会导致队列满或空的情况频繁发生,从而影响程序的性能。
- 而阻塞队列,支持队列资源的扩容,因为他支持动态的等待、唤醒线程,当生产者和消费者速度相差过大时,也能够保持较高的效率,但是因为他的实现会导致生产者和消费者无法同时进行,从而并发能力较差!
1.5.深刻理解生产消费模型
可能大家在上面的这两个例子中没有体会到,生产者消费者模型的并发作用!这是因为我们的代码都只是demo,并不是面向实际开发的……怎么理解这一句话呢?
首先我们要知道生产者的生产的数据从哪里来,消费者要如何进行数据的处理?也就是生产者消费者模型并不是为了完成生产者、消费者的任务的,更多的是提供一个中间空间。实际开发中,生产者通过网络获取数据,并将数据放入共享资源区,消费者从共享资源区读取数据,进而进行数据的处理。
自从有了生产消费者模型,生产者和消费者进行数据交互的同时,通过多线程可以实现生产者在从网络端获取数据,并发的传入共享资源。消费者在从共享资源中获取数据,并发的进行数据处理。这里体现了生产者、消费者、生产者和消费者三个整体的并发,大大提高了效率。另外,我们也要知道从网络端获取数据、对数据进行处理都是需要时间的,这段时间就可以通过生产消费者模型进行异步,实现并发……
生产消费模型帮助我们在各模块下实现并发,实现了低耦合、高并发!提高了对时间的利用,解决各模块之间互相等待的问题。
2.可重入函数与线程安全
线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作, 并且没有锁保护的情况下,会出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们 称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
可重入函数是函数级别的概念,线程安全是线程级别的概念。当线程访问可重入函数时,表示该线程是安全的,访问不可重入函数时 是不安全的。在我们抢票函数中,就是一个不可重入函数,而访问它的线程是不安全的。当我们加锁之后,就实现了访问他的线程是安全的,但是这时该函数不支持并发的访问了……