目录
一.生产者消费者概念
二.模拟实现基于阻塞队列的生产消费模型
2.1概念
2.2构造阻塞队列
三.信号量
3.1原理
3.2信号量函数
3.3信号量模拟互斥功能
一.生产者消费者概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
二.模拟实现基于阻塞队列的生产消费模型
2.1概念
1.阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护来。
2.生产者线程要向阻塞队列当中Push数据,若阻塞队列已经满了,那么此时该生产者需要进行等待,直到阻塞队列中有空间时再被唤醒。消费者线程要从阻塞队列当中Pop数据,若阻塞队列为空,那么此时该消费者需要进行等待,直到阻塞队列中有新的数据时再被唤醒。
3.需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。若是队列满了,生产者要进行等待,若是队列为空,消费者要进行等待。
4.当生产者或者消费者执行后,都要去唤醒另一方。
2.2构造阻塞队列
BlockQueue.hpp代码:
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
const int capacity=5;
template<class T>
class BlockQueue
{
public:
BlockQueue(int size=capacity)//初始化
:size_(size)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&proCond_,nullptr);
pthread_cond_init(&conCond_,nullptr);
}
~BlockQueue()//变量的销毁
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&proCond_);
pthread_cond_destroy(&conCond_);
}
void _push(const T in)
{
lockQueue();//生产之前先加锁
while(is_full())
{
pthread_cond_wait(&proCond_,&mutex_);//若队列满了,则进行等待,同时把锁释放,再次醒来时会重新获取锁
}
_q.push(in);
unlockQueue();//生产完后解锁
pthread_cond_signal(&conCond_);//消费者可能在等待,要去唤醒
}
T _pop()
{
lockQueue();//消费之前先加锁
while(_q.empty())
{
pthread_cond_wait(&conCond_,&mutex_);//若队列为空,则进行等待,同时把锁释放,再次醒来时会重新获取锁
}
T out=_q.front();
_q.pop();
unlockQueue();//消费完后解锁
pthread_cond_signal(&proCond_);//唤醒生产者
return out;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool is_full()
{
return size_==_q.size();
}
queue<T> _q;
pthread_mutex_t mutex_;
pthread_cond_t proCond_;
pthread_cond_t conCond_;
int size_;
};
模拟任务task.cpp代码:
#include <iostream>
#include <string>
class Task
{
public:
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator()()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
BlockQueue.cpp代码:
#include"BlockQueue.hpp"
#include"task.hpp"
#include <ctime>
const std::string ops = "+-*/%";
void *productor(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
//制作任务
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
bqp->_push(t);//生产任务
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;
sleep(1);
}
}
void *consumer(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t = bqp->_pop(); // 消费任
int result = t(); //处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr));
BlockQueue<Task> bq; //阻塞队列
pthread_t tid1, tid2;
pthread_create(&tid1, nullptr, consumer, &bq);
pthread_create(&tid2, nullptr, productor, &bq);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
return 0;
}
结果:
三.信号量
3.1原理
当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
概念:本质上就是一个计数器,去划分临界资源的数量。
每个执行流在进入临界区之前都要先申请信号量,申请成功就有了访问临界资源的权限,当操作完毕后再释放信号量。就是对信号量做加减操作。
信号量的PV操作
p操作:申请信号量为p操作,当申请成功,就获得了访问该临界资源的权限,同时,该临界资源的数量也减少了一份,所以计数器要减一。
v操作:释放信号量为v操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一。
补充:PV操作必须是原子性的。多个执行流访问临界资源也是竞争式的,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。当信号量值为零时被申请,那么该执行流会在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
3.2信号量函数
初始化信号量函数:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数解释:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:信号量的初始值(计数器的初始值)。
销毁信号量函数:
int sem_destroy(sem_t *sem);
申请信号量函数(等待):
int sem_wait(sem_t *sem);
申请成功后值减一
释放信号量函数(发布):
int sem_post(sem_t *sem);
释放信号量后值加一
上面这些函数调用成功返回0,失败返回-1。
3.3信号量模拟互斥功能
当把信号量的值设置为1时,那么它说明临界资源的数量只有一分,信号量的作用基本等价于互斥锁。
class Sem{
public:
Sem(int num)
{
sem_init(&_sem, 0, num);
}
~Sem()
{
sem_destroy(&_sem);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
Sem sem(1); //二元信号量
int tickets=1000;
void* getTickets(void* args)
{
string s=(char*)args;
while(true)
{
sem.P();
if(tickets>0)
{
usleep(10000);
cout<<s<<" "<<"抢到票了,"<<"票数还剩下:"<<--tickets<<endl;
sem.V();
}
else
{
cout<<"票已经完了"<<" "<<s<<" "<<"退出了"<<endl;
sem.V();
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1,nullptr,getTickets,(void*)"pthread1");
pthread_create(&tid2,nullptr,getTickets,(void*)"pthread2");
pthread_create(&tid3,nullptr,getTickets,(void*)"pthread3");
pthread_create(&tid4,nullptr,getTickets,(void*)"pthread4");
pthread_detach(tid1);
pthread_detach(tid2);
pthread_detach(tid3);
pthread_detach(tid4);
while(true)
{
;
}
return 0;
}