Linux:生产消费模型 & 读者写者模型
- 生产消费模型
- 阻塞队列
- 基本结构
- 构造与析构
- 投放任务
- 获取任务
- 总代码
- POSIX 信号量
- 基本概念
- 接口
- 环形队列
- 基本结构
- 构造与析构
- 投放任务
- 获取任务
- 总代码
- 读者写者模型
- 读写锁
生产消费模型
生产消费模型是一种用于处理多线程之间任务资源分配的设计模式,其可以提高多线程处理任务的效率。
假设我们现在有多个线程,一部分线程负责获取任务,称为productor
,另一部线程负责执行任务,称为consumer
。
有一个设计模式如图所示:
左侧是派发任务的线程productor
,右侧是执行任务的线程consumer
,他们之间是一对一的关系。上图中这个模式有以下问题:
现在生产者productor-3
有任务要派发,但是consumer-3
正在执行上一个任务,于是productor-3
就只能等待。但是此时明明有两个线程consumer-1
和consumer-2
是空闲的,它们却不能执行这个任务,这就出现了很明显的资源分配不合理问题。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题:
生产者和消费者彼此之间不直接通讯,而通过任务队列
来进行通讯:
- 生产者生产完数据之后不用等待消费者处理,直接扔给
任务队列
- 消费者不找生产者要数据,而是直接从
任务队列
里取
任务队列
就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个任务队列
就是用来给生产者和消费者解耦的。
本博客讲解两种任务队列
的实现方式:阻塞队列
和环形队列
。
阻塞队列
阻塞队列BlockQueue
常用于实现生产消费模型,示意如下:
相比于普通队列,阻塞队列有以下特性:
- 当阻塞队列满了,此时再放数据则会被阻塞等待,直到队列中有空间为止
- 当阻塞队列为空,此时读取数据则会被阻塞等待,直到队列中有数据为止
接下来我们就来实现一下这个阻塞队列:
基本结构
既然需要一个阻塞队列的类,那么第一步就是需要一个队列
,我们的队列要求有上限,超过某个值就要阻塞,所以还要有一个成员来标识队列的容量上限
。
template <typename T>
class blockQueue
{
private:
std::queue<T> _blockQueue; // 阻塞队列
int _cap; // 队列的上限
};
此处阻塞队列为STL自带的queue
,队列的容量上限为_cap
。此处的模板参数T
,最后是一个可调用对象
,也就是一个可以执行的任务。
接下来要考虑的就是线程之间的互斥与同步,那么就要回答以下几个问题:
消费者
与消费者
可以同时访问队列吗?不可以,所以消费者之间要互斥生产者
与生产者
可以同时访问队列吗?不可以,所以生产者之间要互斥生产者
与消费者
可以同时访问队列吗?不可以,所以生产者与消费者之间要互斥
综上,其实就是所有线程访问阻塞队列
时都要互斥,那么只需要一把锁就可以了。
随后就是同步问题:
- 什么时候
消费者
可以拿任务?当任务队列有任务时,否则消费者
去等待队列 - 什么时候
生产者
可以生产任务?当任务队列没满时,否则生产者
去等待队列
生产者
和消费者
可以共用一个等待队列
吗?或者说,它们可以共用一个条件变量吗?
是不行的,如果它们共用一个条件变量,就会导致唤醒等待队列
时,不知道唤醒的是生产者还是消费者。
综上:我们需要一把锁维护所有线程访问阻塞队列时的互斥。两个条件变量分别维护生产者与消费者的同步。
template <typename T>
class blockQueue
{
private:
std::queue<T> _blockQueue; // 阻塞队列
int _cap; // 队列的上限
pthread_mutex_t _mutex; // 访问阻塞队列时互斥
pthread_cond_t _consum_cond; // 消费者的条件变量
pthread_cond_t _product_cond; // 生产者的条件变量
};
以上就是我们所需要的所有成员了,接下来写一下构造函数和析构函数:
构造与析构
在构造函数中,需要外部传入一个值,来指定这个阻塞队列
的总上限,以及初始化互斥锁和条件变量。
代码:
template <typename T>
class blockQueue
{
public:
blockQueue(int cap = 5)
: _cap(cap)
, _blockQueue(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
pthread_cond_init(&_product_cond, nullptr);
}
~blockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consum_cond);
pthread_cond_destroy(&_product_cond);
}
};
投放任务
接下来就设计接口enQueue
,通过该接口可以往阻塞队列
中放任务。要考虑以下线程安全问题:
- 访问
阻塞队列
前先加锁,访问完毕后解锁 - 如果
阻塞队列
满了,就到条件变量_product_cond
下面去等待 - 如果
阻塞队列
没慢,或者被从条件变量下唤醒,此时就可以投入任务了 - 投入任务后,唤醒
_consum_cond
下面等待的消费之来执行任务
代码:
template <typename T>
class blockQueue
{
public:
void enQueue(const T& in)
{
pthread_mutex_lock(&_mutex);
while (_blockQueue.size() == _cap)
{
pthread_cond_wait(&_product_cond, &_mutex);
}
_blockQueue.push(in);
pthread_cond_signal(&_consum_cond);
pthread_mutex_unlock(&_mutex);
}
};
以上代码中, pthread_mutex_lock(&_mutex)
先加锁,随后while
语句判断当前阻塞队列
是否满,如果满了就执行pthread_cond_wait(&_product_cond, &_mutex)
去阻塞等待。
当不满足条件,或者线程被唤醒,此时就可以_blockQueue.push(in)
来投放任务了,当投放完任务,此时通过pthread_cond_signal(&_consum_cond)
唤醒一个消费者来执行任务。最后释放掉自己的锁。
获取任务
接下来就是接口popQueue
,消费者
要去队列中拿任务,相似的要考虑以下线程安全问题:
- 访问
阻塞队列
前先加锁,访问完毕后解锁 - 如果
阻塞队列
为空,就到条件变量_consum_cond
下面去等待 - 如果
阻塞队列
不为空,或者被从条件变量下唤醒,此时就可以获取任务了 - 获取任务后,唤醒条件变量
_product_cond
下等待的生产者
,表示当前有任务拿走了,位置空出来了,可以来生产任务了
代码:
template <typename T>
class blockQueue
{
public:
void popQueue(T& out)
{
pthread_mutex_lock(&_mutex);
while (_blockQueue.empty())
{
pthread_cond_wait(&_consum_cond, &_mutex);
}
out = _blockQueue.front();
_blockQueue.pop();
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
}
};
传入一个T&
类型的引用,用于输出任务。
一开始通过pthread_mutex_lock
加锁,保证访问阻塞队列的互斥。随后判断while
判断阻塞队列是否为空,如果为空就通过pthread_cond_wait(&_consum_cond, &_mutex)
进入等待队列阻塞等待。
当任务队列不为空,或者被从_consum_cond
的等待队列中唤醒,此时就可以拿走队列中的任务了。拿走任务后,通过pthread_cond_signal(&_product_cond)
唤醒生产者来投放任务,最后释放锁。
总代码
现在我把这个阻塞队列放到文件blockQueue.hpp
中。
blockQueue.hpp
:
#pragma once
#include <iostream>
#include <queue>
#include <string>
#include <unistd.h>
#include <pthread.h>
template <typename T>
class blockQueue
{
public:
blockQueue(int cap = 5)
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
pthread_cond_init(&_product_cond, nullptr);
}
~blockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consum_cond);
pthread_cond_destroy(&_product_cond);
}
void enQueue(const T& in)
{
pthread_mutex_lock(&_mutex);
while (_blockQueue.size() == _cap)
{
pthread_cond_wait(&_product_cond, &_mutex);
}
_blockQueue.push(in);
pthread_cond_signal(&_consum_cond);
pthread_mutex_unlock(&_mutex);
}
void popQueue(T& out)
{
pthread_mutex_lock(&_mutex);
while (_blockQueue.empty())
{
pthread_cond_wait(&_consum_cond, &_mutex);
}
out = _blockQueue.front();
_blockQueue.pop();
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
}
private:
std::queue<T> _blockQueue; // 阻塞队列
int _cap; // 队列的上限
pthread_mutex_t _mutex; // 访问阻塞队列时互斥
pthread_cond_t _consum_cond; // 消费者的条件变量
pthread_cond_t _product_cond; // 生产者的条件变量
};
接下来实现基于环形队列
的阻塞队列
,在那之前,我们要学一下POSIX 信号量
。
POSIX 信号量
基本概念
信号量(Semaphore)是一种用于控制多个线程或进程对共享资源访问的同步机制。它本质上是一个计数器,用来表示某种资源的可用数量。
信号量的核心概念:
- 计数器: 信号量维护一个计数器,它表示当前可用的资源数量。
- 等待: 当一个线程想要访问资源时,它会检查信号量的计数器。如果计数器大于 0,则表示有可用资源,线程可以访问资源并使计数器减 1。如果计数器为 0,则表示没有可用资源,线程会进入等待状态,直到有其他线程释放资源。
- 释放: 当一个线程完成对资源的访问后,它会将计数器加 1,并唤醒一个等待的线程(如果有)。
信号量的类型:
- 二元信号量(Binary Semaphore): 计数器只能取 0 或 1 的值,表示资源是可用(1)还是不可用(0)。此时就是互斥锁,保证同一时间只有一个线程访问共享资源。
- 计数信号量(Counting Semaphore): 计数器可以取任意非负整数,表示可用资源的数量。常用于控制对资源池的访问。
你可以简单地认为,信号量就是一个计数器,只不过是原子性的
接口
信号量的类型为
sem_t
,所有信号量的操作,都基于一个sem_t
类型的变量。
sem_init:
sem_init
函数用于初始化一个信号量,需要头文件<semaphore.h>
,函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem
:指向sem_t
信号量的指针pshared
:若该值为0
,则信号量在线程之间共享;若该非0
,信号量在进程之间共享。value
:信号量的初始值
sem_destory:
sem_destory
函数用于销毁一个信号量,需要头文件<semaphore.h>
,函数原型如下:
int sem_destroy(sem_t *sem);
sem_wait:
sem_wait
函数用于申请
一个信号量,需要头文件<semaphore.h>
,函数原型如下:
int sem_wait(sem_t *sem);
sem_post:
sem_post
函数用于释放
一个信号量,需要头文件<semaphore.h>
,函数原型如下:
int sem_post(sem_t *sem);
环形队列
生产消费模型使用的环形队列,与数据结构中的环形队列不同,此处的环形队列是基于信号量
完成的。
如图所示:
上图是一个环形队列,生产者
与消费者
都绕着环的顺时针走,黑色代表这个位置有数据,白色代表这个位置没有数据。生产者
在前面生产,消费者
跟在后面消费,这就是基于环形队列的生产消费模型。
我们可以把环形队列中的资源用信号量表示。一个信号量_room_sem
用于表示:该环形队列中有几个数据。另外一个信号量_data_sem
表示:该环形队列有几个空位置。
就上图来说:_room_sem = 4
,_data_sem = 4
。
- 只有
_room_sem
的值大于0
,表示还有空间可以放数据,此时生产者
才可以生产 - 只有
_data_sem
的值大于0
,表示当前有数据可以读取,此时消费者
才可以消费
接下来我们分析一下生产者
与消费者
之间的同步与互斥关系:
生产者
与生产者
之间:互斥,两个生产者
不能在同一个位置放入数据消费者
与消费者
之间:互斥,两个消费者
不能同时读取一个位置的数据生产者
与消费者
之间:有互斥关系,但无需额外维护
此处最重要的就是,第三条:基于环形队列的模型,生产者
与消费者
之间有互斥关系,但无需额外维护!
为什么呢?我们之前讲解阻塞队列
时,要保证生产者
与消费者
不会同时访问阻塞队列
,因为有可能会造成生产者
和消费者
访问同一块数据的问题。
但是在环形队列
中,什么情况下生产者
和消费者
才会访问同一块数据呢?环形队列全空,或者全满的时候。
也就是下图:
此时生产者
和消费者
访问同一块数据,但是:
- 对于全满:信号量
_room_sem = 0
,此时生产者
不会访问队列; - 对于全空:信号量
_data_sem = 0
,此时消费者
不会访问队列;
除去这两种情况,其余时候生产者
和消费者
可以同时访问同一个队列的不同部分!所以相比于阻塞队列
,环形队列
可以让生产和消费同时进行。
接下来就来实现一下这个基于信号量的环形队列
:
基本结构
我们的环形队列,不能用简单的队列,因为queue
只提供push
尾插,pop
头删这样的接口。而我们需要对这个队列进行随机访问,此时最好用vector
来模拟。
其次我们要有以下信息:
_cap
:该队列的最大值_consumer_step
:当前消费者
访问哪一个数据块,也就是数组下标_productor_step
:当前生产者
访问哪一个数据块,也就是数组下标
其次就是两个信号量:
_room_sem = 0
:当前有几个空位置_data_sem = 0
:当前有几个有数据的位置
还有两把锁:
_consumer_mutex
:消费者
与消费者
之间的互斥_productor_mutex
:生产者
与生产者
之间的互斥
代码:
template <typename T>
class ringQueue
{
private:
std::vector<T> _ringQueue; // 环形队列
int _cap; // 队列的总容量
int _consumer_step; // 消费者当前位置
int _productor_step; // 生产者当前位置
sem_t _room_sem; // 空位置数目
sem_t _data_sem; // 有数据的位置数目
pthread_mutex_t _consumer_mutex; // 消费者之间互斥
pthread_mutex_t _productor_mutex; // 生产者之间互斥
};
构造与析构
构造函数中,要用户传入cap
,指定该环形队列
的最大值,随后初始化各个信号量
,锁
。而析构函数中,则是销毁信号量
和锁
。
代码:
template <typename T>
class ringQueue
{
public:
ringQueue(int cap = 5)
: _cap(cap)
, _ringQueue(cap)
, _consumer_step(0)
, _productor_step(0)
{
sem_init(&_room_sem, 0, cap);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_consumer_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
}
~ringQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_consumer_mutex);
pthread_mutex_destroy(&_productor_mutex);
}
};
此处构造函数中,sem_init(&_room_sem, 0, cap);
,第二个参数0
表示该信号量在线程键共享,cap
为环形队列最大容量,因为一开始所有位置都为空,所以_room_sem = cap
,而_data_sem = 0
。
投放任务
接下来设计enQueue
接口,往环形队列
中放任务,过程如下:
- 申请信号量
- 竞争锁
- 放数据
先展示代码,后解释:
template <typename T>
class ringQueue
{
public:
void enQueue(const T& in)
{
sem_wait(&_room_sem);
pthread_mutex_lock(&_productor_mutex);
_ringQueue[_productor_step] = in;
_productor_step = (_productor_step + 1) % _cap;
pthread_mutex_unlock(&_productor_mutex);
sem_post(&_data_sem);
}
};
第一步sem_wait
为申请一个信号量_room_sem
,所谓申请一个信号量,就相当于_room_sem--
,即表示可用资源减少一个。但是如果_room_sem = 0
,那么此时申请这个信号量的线程就会阻塞,直到信号量不为0
。
随后pthread_mutex_lock
加锁,与其他生产者
保持互斥,再_ringQueue[_productor_step] = in
,即投放任务到环形队列的头部。
_productor_step = (_productor_step + 1) _cap
则是让队列头走到下一个位置,为了形成一个逻辑上的环,最后还要对_cap
取模。
最后释放锁,再sem_post(&_data_sem)
,这相当于信号量_data_sem++
,也就是当前队列中的数据多加了一个。
获取任务
popQueue
接口用于消费者
获取任务,有了上一个接口铺垫,这个接口就很简单了:
template <typename T>
class ringQueue
{
public:
void popQueue(T &out)
{
sem_wait(&_data_sem);
pthread_mutex_lock(&_consumer_mutex);
out = _ringQueue[_consumer_step];
_productor_step = (_consumer_step + 1) % _cap;
pthread_mutex_unlock(&_consumer_mutex);
sem_post(&_room_sem);
}
};
首先申请一个信号量sem_wait(&_data_sem)
,相当于_data_sem--
,表示这个当前队列中的数据少了一个,随后加锁保持消费者
之间的互斥。
当进入临界区后,out = _ringQueue[_consumer_step]
将数据读取到输出型参数out
中,再_productor_step = (_consumer_step + 1) % _cap
,让环形队列
的尾巴走到下一个位置。
最后释放锁,sem_post(&_room_sem)
相当于_room_sem++
,表示这个位置的数据已经拿走,空位置多了一个。
总代码
现在把环形队列
封装在文件ringQueue.hpp
中:
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
template <typename T>
class ringQueue
{
public:
ringQueue(int cap = 5)
: _cap(cap)
, _ringQueue(cap)
, _consumer_step(0)
, _productor_step(0)
{
sem_init(&_room_sem, 0, cap);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_consumer_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
}
~ringQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_consumer_mutex);
pthread_mutex_destroy(&_productor_mutex);
}
void enQueue(const T& in)
{
sem_wait(&_room_sem);
pthread_mutex_lock(&_productor_mutex);
_ringQueue[_productor_step] = in;
_productor_step = (_productor_step + 1) % _cap;
pthread_mutex_unlock(&_productor_mutex);
sem_post(&_data_sem);
}
void popQueue(T &out)
{
sem_wait(&_data_sem);
pthread_mutex_lock(&_consumer_mutex);
out = _ringQueue[_consumer_step];
_productor_step = (_consumer_step + 1) % _cap;
pthread_mutex_unlock(&_consumer_mutex);
sem_post(&_room_sem);
}
private:
std::vector<T> _ringQueue; // 环形队列
int _cap; // 队列的总容量
int _consumer_step; // 消费者当前位置
int _productor_step; // 生产者当前位置
sem_t _room_sem; // 空位置数目
sem_t _data_sem; // 有数据的位置数目
pthread_mutex_t _consumer_mutex; // 消费者之间互斥
pthread_mutex_t _productor_mutex; // 生产者之间互斥
};
读者写者模型
读者写者模型
也是是一种用于处理多线程之间资源分配的设计模式,其可以提高多线程处理任务的效率。
但是其与生产者消费者略有不同。生产消费模型中就像一个商店,生产者
向商店投放商品,消费者
消费商品后,会把商品带走。
而读者写者模型中就像一个黑板报,写者
在黑板报上写内容,读者
读完后,不会把黑板上的内容擦掉,其他人还可以看到同样的内容。
简单分析一下同步与互斥关系:
写者
与写者
:互斥关系,不能同时往一个区域写入读者
与写者
:互斥与同步,写者
在写的时候,读者
不能看,不然会拿到不完整的数据读者
与读者
:既不互斥,也不同步
黑板报是可以多个人一起看的,因为看黑板报的人不会修改这个黑板报。同理,因为读者
不会修改共享资源,此时可以被多个读者
一起访问。
读者写者模型有两种模式:读者优先模式
和写者优先模式
。
读者优先模式
:只要读者
在读,写者
就必须等待,直到没有任何读者
读取,写者
才可以写入写者优先模式
:如果写者
想要写入,后续来的读者
都不能读取,只能等待。当所有之前的读者
都走光了,写者
就可以写入了
Linux
已经提供了读者写者模型的相关接口 - 读写锁
,我们就不额外实现了。
读写锁
rwlock 读写锁
包含在pthread库
中,使用方式与mutex 互斥锁
几乎一模一样,我简单介绍一下:
读写锁的类型是pthread_rwlock_t
。全局的读写锁直接使用PTHREAD_RWLOCK_INITIALIZER
初始化,无需销毁。局部的读写锁则用init
初始化,destroy
销毁。
接口如下:
创建:
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
销毁:
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁:
略微不同的就是加锁,由于读者写者模型中,线程有两种身份,此时加锁也有两种方式:
- 读者加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
- 写者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
解锁:
unlock
可以同时释放rdlock
和wrlock
:
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);