文章目录
- 生产者消费者模型
- 为何要使用生产者消费者模型
- 生产者消费者模型优点
- 基于BlockingQueue的生产者消费者模型
- 条件变量
- 条件变量代码
- POSIX信号量
- 基于环形队列的生产消费模型
生产者消费者模型
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
条件变量
#include <iostream>
#include <string>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;
const int num = 5;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *active(void *args)
{
string name = static_cast<const char *>(args);
while (true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex); // pthread_cond_wait,调用的时候,会自动释放锁, TODO
cout << name << " 活动" << endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t tids[num];
for (int i = 0; i < num; i++)
{
char *name = new char[32];
snprintf(name, 32, "thread-%d", i + 1);
pthread_create(tids + i, nullptr, active, name);
}
sleep(3);
while (true)
{
cout << "main thread wakeup thread..." << endl;
pthread_cond_signal(&cond);//唤醒单个线程
//pthread_cond_broadcast(&cond);//唤醒全部线程
sleep(1);
}
for (int i = 0; i < num; i++)
{
pthread_join(tids[i], nullptr);
}
}
条件变量代码
main.cc
#include<iostream>
#include<stdlib.h>
#include <unistd.h>
#include"blockQueue.hpp"
#include"task.hpp"
using namespace std;
void* consumer(void* args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t;
// 1. 将数据从blockqueue中获取 -- 获取到了数据
bq->pop(&t);
t();
// 2. 结合某种业务逻辑,处理数据! -- TODO
std::cout << pthread_self() << " | consumer data: " << t.formatArg() << t.formatRes() << std::endl;
}
}
void* productor(void* args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
std::string opers = "+-*/%";
while (true)
{
sleep(1);
// 1. 先通过某种渠道获取数据
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
char op = opers[rand() % opers.size()];
// 2. 将数据推送到blockqueue -- 完成生产过程
Task t(x, y, op);
bq->push(t);
std::cout << pthread_self() << " | productor Task: " << t.formatArg() << "?" << std::endl;
}
}
int main()
{
BlockQueue<Task> *bq=new BlockQueue<Task>();
pthread_t c[2], p[3];
pthread_create(&c[0], nullptr, consumer, bq);
pthread_create(&c[1], nullptr, consumer, bq);
pthread_create(&p[0], nullptr, productor, bq);
pthread_create(&p[1], nullptr, productor, bq);
pthread_create(&p[2], nullptr, productor, bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
delete bq;
return 0;
}
blockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gcap = 5;
// 不要认为,阻塞队列只能放整数字符串之类的
// 也可以放对象
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gcap):_cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_productorCond, nullptr);
}
bool isFull(){ return _q.size() == _cap; }
bool isEmpty() { return _q.empty(); }
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
// 细节1:一定要保证,在任何时候,都是符合条件,才进行生产
while(isFull()) // 1. 我们只能在临界区内部,判断临界资源是否就绪!注定了我们在当前一定是持有锁的!
{
// 2. 要让线程进行休眠等待,不能持有锁等待!
// 3. 注定了,pthread_cond_wait要有锁的释放的能力!
pthread_cond_wait(&_productorCond, &_mutex); // 我休眠(切换)了,我醒来的时候,在哪里往后执行呢?
// 4. 当线程醒来的时候,注定了继续从临界区内部继续运行!因为我是在临界区被切走的!
// 5. 注定了当线程被唤醒的时候,继续在pthread_cond_wait函数出向后运行,又要重新申请锁,申请成功才会彻底返回
}
// 没有满的,就要让他进行生产
_q.push(in);
// 加策略
// if(_q.size() >= _cap/2)
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_consumerCond);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
*out = _q.front();
_q.pop();
// 加策略
pthread_cond_signal(&_productorCond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
std::queue<T> _q;
int _cap;
// 为什么我们的这份代码,只用一把锁呢,根本原因在于,
// 我们生产和消费访问的是同一个queue&&queue被当做整体使用!
pthread_mutex_t _mutex;
pthread_cond_t _consumerCond; // 消费者对应的条件变量,空,wait
pthread_cond_t _productorCond; // 生产者对应的条件变量,满,wait
};
task.hpp
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "=";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
(POSIX信号量)
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
static const int N = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
public:
RingQueue(int num = N) : _ring(num), _cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
// 生产
void push(const T &in)
{
// 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
// 2. 什么时候用锁,什么时候用sem?你对应的临界资源,是否被整体使用!
//
P(_space_sem); // P()
Lock(_p_mutex); //? 1
// 一定有对应的空间资源给我!不用做判断,是哪一个呢?
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem);
}
// 消费
void pop(T *out)
{
P(_data_sem);
Lock(_c_mutex); //?
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ring;
int _cap; // 环形队列容器大小
sem_t _data_sem; // 只有消费者关心
sem_t _space_sem; // 只有生产者关心
int _c_step; // 消费位置
int _p_step; // 生产位置
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
Main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
using namespace std;
const char *ops = "+-*/%";
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
// 单生产单消费
// pthread_t c, p;
// pthread_create(&c, nullptr, consumerRoutine, rq);
// pthread_create(&p, nullptr, productorRoutine, rq);
// pthread_join(c, nullptr);
// pthread_join(p, nullptr);
// 多生产,多消费,该如何更改代码呢?done
// 意义在哪里呢?意义绝对不在从缓冲区冲放入和拿去,意义在于,放前并发构建Task,获取后多线程可以并发处理task,因为这些操作没有加锁!
pthread_t c[3], p[2];
for (int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, consumerRoutine, rq);
for (int i = 0; i < 2; i++)
pthread_create(p + i, nullptr, productorRoutine, rq);
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
delete rq;
return 0;
}
什么时候用锁,什么时候用sem?你对应的临界资源,是否被整体使用!
在上面写过阻塞队列的代码可以发现他是一个整体,而环型队列是将他自己打散了,通过下标去访问。
在阻塞队列当中可以发现消费者和生产者是不能并发的,而环形队列就是可以的,因为他将每一个下标当作一个整体也。所以消费者和生产者就是可以并发的互不影响。