生产者消费者
概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
321原则
-
三种角色:生产者、消费者、仓库
-
两种关系:生产者与生产者之间是互斥关系,消费者与消费者之间是互斥关系,生产者与消费者之间是同步与互斥关系。
-
一个交易场所:仓库
优点
-
解耦–生产者。消费者之间不直接通信,降低了耦合度。
-
支持并发
-
支持忙闲不均
条件变量实现
使用条件变量控制生产者消费者的极限状态:即产品满的时候,生产者进行等待;即产品空的时候,消费者进行等待
使用互斥锁对临界资源的访问进行保护,即保护产品队列
使用C++11线程支持库中std::condition_variable(条件变量)、std::mutex(互斥锁)完成多线程的访问控制。
class My_Queue {
private:
std::condition_variable _cond;
std::mutex _mtx;
queue<int> _que;
int _max;
public:
My_Queue(int max = 5) :_max(max) {}
bool IsFull() {
return _que.size() == _max;
}
bool IsEmpty() {
return _que.size() == 0;
}
int put(int val) {
std::unique_lock<std::mutex> Lock(_mtx);
while (IsFull()) {
_cond.wait(Lock);
}
_que.push(val);
_cond.notify_all();
return val;
}
int out() {
std::unique_lock<std::mutex> Lock(_mtx);
while (IsEmpty()) {
_cond.wait(Lock);
}
int res = _que.front();
_que.pop();
_cond.notify_all();
return res;
}
};
const int n = 5;
My_Queue que(n);
void Producter()
{
srand((unsigned int)time(NULL));
while (1) {
//std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cout << "Producter:" << que.put(rand() % 100) << endl;
}
}
void Consumer()
{
srand((unsigned int)time(NULL));
while (1) {
//std::this_thread::sleep_for(std::chrono::milliseconds(1000));
cout << "Consumer:" << que.out() << endl;
}
}
int main()
{
thread tha(Producter, 1);
thread thb(Producter, 2);
thread thc(Producter, 3);
thread thd(Consumer, 1);
thread the(Consumer, 2);
tha.join();
thb.join();
thc.join();
thd.join();
the.join();
}
测试
由于多线程发生并行,导致同一时间中,多个生产者产生相同随机值
信号量实现
两个信号量实现等待状态,生产者信号量与消费者信号量
当生产者信号量wait阻塞,说明产品到达极限,无法生产
当消费者信号量wait阻塞,说明产品消耗完成,无法继续获取
使用互斥锁对临界资源访问进行控制
在linux下使用原生线程支持库中pthread_mutex、sem完成
由于服务器为单核,所以不做多线程测试,有意者可自行尝试
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <semaphore.h>
#define SIZE 6
int Warehouse[SIZE];
pthread_mutex_t mutex;
sem_t sem_pro;//生产者信号量
sem_t sem_con;//消费者信号量
int num = 0;
void* Producter(void* arg)
{
while (1)
{
sem_wait(&sem_pro);
pthread_mutex_lock(&mutex);
Warehouse[num] = rand() % 100 + 1;
printf("product Warehouse[%d] = %d\n", num, Warehouse[num]);
num++;
sem_post(&sem_con);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void* Consumer(void* arg)
{
while (1)
{
sem_wait(&sem_con);
pthread_mutex_lock(&mutex);
num--;
printf("consum Warehouse[%d] = %d\n", num, Warehouse[num]);
sem_post(&sem_pro);
pthread_mutex_unlock(&mutex);
sleep(2);
}
}
int main()
{
pthread_mutex_init(&mutex, NULL);
sem_init(&sem_pro, 0, 6);//生产者信号量值初始化为6 意为可容纳产品空间为6
sem_init(&sem_con, 0, 0);
pthread_t id[2];
pthread_create(&id[0], NULL, Producter, NULL);
pthread_create(&id[1], NULL, Consumer, NULL);
pthread_join(id[0], NULL);
pthread_join(id[1], NULL);
exit(0);
}
测试