目录
- 一、POSIX信号量
- 1.1 初始化信号量
- 1.2 销毁信号量
- 1.3 等待信号量
- 1.4 发布信号量
- 1.5 基于环形队列的生产消费模型(用信号量控制生产者和消费者之间的同步互斥关系)
- 1.5.1 makefile
- 1.5.2 RingQueue.hpp
- 1.5.3 Sem.hpp
- 1.5.4 Task.hpp
- 1.5.5 main.cc
- 二、信号量控制的环形队列原理图
一、POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于、线程间同步。
什么是信号量?信号量的本质就是一把计数器。
1.1 初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
1.2 销毁信号量
int sem_destroy(sem_t *sem);
1.3 等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()操作
1.4 发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()操作
上一个生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
(POSIX信号量)。
1.5 基于环形队列的生产消费模型(用信号量控制生产者和消费者之间的同步互斥关系)
环形队列采用数组模拟,用模运算来模拟环状特性。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
1.5.1 makefile
ring_queue:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ring_queue
1.5.2 RingQueue.hpp
#pragma once
#include <iostream>
using namespace std;
#include <pthread.h>
#include <vector>
#include <ctime>
#include <unistd.h>
#include "Sem.hpp"
static const int default_num = 5;
// 用数据模拟环形队列
template <class T>
class Ring_Queue
{
public:
Ring_Queue(int num = default_num)
: _rq(num)
, _num(num)
, _p_step(0)
, _c_step(0)
, _space_sem(num) // 刚开始空间信号量量等于队列的长度
,_data_sem(0) // 刚开始数据信号量等于0,因为没有数据
{
pthread_mutex_init(&_p_mtx, nullptr);
pthread_mutex_init(&_c_mtx, nullptr);
}
~Ring_Queue()
{
pthread_mutex_destroy(&_p_mtx);
pthread_mutex_destroy(&_p_mtx);
}
void Push(const T &in)
{
//这里为啥要先申请信号量,然后再加锁呢?为了提高并发度,提高效率
//因为我们知道加锁和解锁之间的代码,即临界区的代码是越少越好的,
//而如果先申请锁,那么每次只有一个线程申请信号量,然后执行临界区的
//代码,等到这个线程解锁后,其它线程才能申请锁,然后申请信号量,
//这就会导致线程在运行临界区代码的时候其它线程是阻塞在锁外面的,而
//在锁外面又申请不到信号量,那就是白白地等,但是如果是先申请信号量,
//然后再申请锁,那么当一个线程在临界区执行代码的时候,其它的线程也
//可以同步地申请信号量,从而提高了并发度,等到锁释放后,其它的线程
//只需要申请到锁就可以访问临界区的代码了,而不用在申请到锁之后才申
//请信号量
// 插入数据之前要申请空间信号量
_space_sem.p();
pthread_mutex_lock(&_p_mtx);
_rq[_p_step++] = in;
_p_step %= _num; // 体现环形结构
pthread_mutex_unlock(&_p_mtx);
// 插入了数据之后,这个数据并不是立刻就可以被删除的,即
// 插入数据不是目的,被读取才是目的,所以插入数据之后应该
// 释放_data_sem,即插入数据之后数据信号量应该++,而不是
// 空间信号量++
_data_sem.v();
}
void Pop(T &out)
{
//先申请信号量的原因同上
_data_sem.p();
pthread_mutex_lock(&_c_mtx);
out = _rq[_c_step++];
_c_step %= _num;//体现环形结构
pthread_mutex_unlock(&_c_mtx);
// 同理,这里要释放空间信号量
_space_sem.v();
}
private:
vector<T> _rq; // 环形队列
int _num; // 环形队列的大小
int _p_step; // 生产者生产的下标
int _c_step; // 消费者消费的下标
Sem _space_sem; // 空间信号量,这是生产者关心的
Sem _data_sem; // 数据信号量,这是消费者关心的
// 锁是防止多线程访问环形队列时出现数据不一致问题
//多生产多消费的场景下需要加这两把锁,那如何分析得出多生产多消费
//场景下是加这两把锁维护的呢???
//首先我们要想的不是如何加锁的问题,而是要想我要维护什么更关系的问题,
//比单生产单消费我多维护了什么关系的问题。
//原来的单生产单消费中只有生产者和消费者之间的关系,就是同步和互斥的关系
//现在多生产多消费很明显新增了生产者和生产者,消费者和消费者这两对关系,
//这两对关系都是互斥关系,所以我们要多维护了两对互斥关系,即生产和生产,消费和消费的关系。
//所以我们可以给生产者们加一把锁,给消费者们也加一把锁,需要加锁才能访问的资源是
//临界资源,那么生产者们的临界资源是什么?其实就是_p_step下标,所有的生产者都是
//想抢到_p_step下标,从而向该下标位置放入数据,同理消费者们的临界资源就是_c_step下标,
//所以如果是多生产多消费,我们应该加两把锁,保护_p_step和_c_step临界资源
// 这把锁的本质是锁住生产者的下标,保证访问下标时不会出错
pthread_mutex_t _p_mtx;
// 这把锁的本质是锁住消费者的下标,保证访问下标时不会出错
pthread_mutex_t _c_mtx;
};
1.5.3 Sem.hpp
#include <semaphore.h>
class Sem
{
public:
Sem(int num)
{
//初始化信号量
sem_init(&_sem,0,num);
}
~Sem()
{
//销毁信号量
sem_destroy(&_sem);
}
void p()
{
//p操作就是获取一个信号量,该操作保证是原子的
sem_wait(&_sem);
}
void v()
{
//v操作是释放一个信号量,该操作保证是原子的
sem_post(&_sem);
}
private:
sem_t _sem;//信号量
};
1.5.4 Task.hpp
#include <functional>
using namespace std;
#include <string>
typedef function<int(int,int)> func_t;
class Task
{
public:
Task()
{}
Task(int x, int y, char op,func_t func)
: _x(x), _y(y), _op(op), _func(func)
{}
int operator()()
{
return _func(_x,_y);
}
public:
int _x;
int _y;
char _op;
func_t _func;
};
1.5.5 main.cc
#include "RingQueue.hpp"
#include <cmath>
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
// void *producter(void *args)
// {
// Ring_Queue<Task> *rq = static_cast<Ring_Queue<Task> *>(args);
// while (true)
// {
// int x = 0;
// int y = 0;
// char op;
// cout << "Please input the first num : ";
// cin >> x;
// cout << "Please input the second num : ";
// cin >> y;
// Task t(x, y, op, Add);
// rq->Push(t);
// printf("生产者生产了一个数据:%d %c %d = ?\n", x, op, y);
// usleep(1000);
// }
// return nullptr;
// }
int main()
{
srand((unsigned int)time(nullptr));
pthread_t p,c;
Ring_Queue<int>* rq=new Ring_Queue<int>(10);
pthread_create(&p,nullptr,producter,rq);
pthread_create(&c,nullptr,consumer,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
// //单生产单消费
// int Add(int x, int y)
// {
// return x + y;
// }
// int Sub(int x, int y)
// {
// return x - y;
// }
// int Mul(int x, int y)
// {
// return x * y;
// }
// int Div(int x, int y)
// {
// return x / y;
// }
// int Mol(int x, int y)
// {
// return x % y;
// }
// #include "Task.hpp"
// void *producter(void *args)
// {
// Ring_Queue<Task> *rq = static_cast<Ring_Queue<Task> *>(args);
// while (true)
// {
// int x = 0;
// int y = 0;
// char op;
// pthread_mutex_lock(&mtx);
// usleep(1000);
// cout << "Please input the first num : ";
// cin >> x;
// cout << "Please input the second num : ";
// cin >> y;
// cout << "Please input the operator : ";
// cin >> op;
// printf("生产者 [%d] 生产了一个数据:%d %c %d = ?\n", pthread_self(), x, op, y);
// pthread_mutex_unlock(&mtx);
// switch (op)
// {
// case '+':
// {
// Task t(x, y, op, Add);
// rq->Push(t);
// break;
// }
// case '-':
// {
// Task t(x, y, op, Sub);
// rq->Push(t);
// break;
// }
// case '*':
// {
// Task t(x, y, op, Mul);
// rq->Push(t);
// break;
// }
// case '/':
// {
// Task t(x, y, op, Div);
// rq->Push(t);
// break;
// }
// case '%':
// {
// Task t(x, y, op, Mol);
// rq->Push(t);
// break;
// }
// default:
// {
// cout << "运算符错误!!!" << endl;
// exit(1);
// }
// }
// }
// return nullptr;
// }
// void *consumer(void *args)
// {
// Ring_Queue<Task> *rq = static_cast<Ring_Queue<Task> *>(args);
// while (true)
// {
// sleep(1);
// Task out;
// rq->Pop(out);
// printf("消费者 [%d] 消费了一个数据:", pthread_self());
// cout << out._x << " " << out._op << " " << out._y << " = " << out() << endl;
// }
// return nullptr;
// }
// int main()
// {
// srand((unsigned int)time(nullptr));
// pthread_t p, c;
// Ring_Queue<Task> *rq = new Ring_Queue<Task>(10);
// pthread_create(&p, nullptr, producter, rq);
// pthread_create(&c, nullptr, consumer, rq);
// pthread_join(p, nullptr);
// pthread_join(c, nullptr);
// return 0;
// }
// //多生产多消费
// int Add(int x, int y)
// {
// return x + y;
// }
// int Sub(int x, int y)
// {
// return x - y;
// }
// int Mul(int x, int y)
// {
// return x * y;
// }
// int Div(int x, int y)
// {
// return x / y;
// }
// int Mol(int x, int y)
// {
// return x % y;
// }
// void *producter(void *args)
// {
// Ring_Queue<Task> *rq = static_cast<Ring_Queue<Task> *>(args);
// while (true)
// {
// int x = 0;
// int y = 0;
// char op;
// pthread_mutex_lock(&mtx);
// usleep(1000);
// cout << "Please input the first num : ";
// cin >> x;
// cout << "Please input the second num : ";
// cin >> y;
// cout << "Please input the operator : ";
// cin >> op;
// printf("生产者 [%d] 生产了一个数据:%d %c %d = ?\n", pthread_self(), x, op, y);
// pthread_mutex_unlock(&mtx);
// switch (op)
// {
// case '+':
// {
// Task t(x, y, op, Add);
// rq->Push(t);
// break;
// }
// case '-':
// {
// Task t(x, y, op, Sub);
// rq->Push(t);
// break;
// }
// case '*':
// {
// Task t(x, y, op, Mul);
// rq->Push(t);
// break;
// }
// case '/':
// {
// Task t(x, y, op, Div);
// rq->Push(t);
// break;
// }
// case '%':
// {
// Task t(x, y, op, Mol);
// rq->Push(t);
// break;
// }
// default:
// {
// cout << "运算符错误!!!" << endl;
// exit(1);
// }
// }
// }
// return nullptr;
// }
// void *consumer(void *args)
// {
// Ring_Queue<Task> *rq = static_cast<Ring_Queue<Task> *>(args);
// while (true)
// {
// sleep(1);
// Task out;
// rq->Pop(out);
// printf("消费者 [%d] 消费了一个数据:", pthread_self());
// cout << out._x << " " << out._op << " " << out._y << " = " << out() << endl;
// }
// return nullptr;
// }
// int main()
// {
// srand((unsigned int)time(nullptr));
// pthread_t p[3], c[2];
// Ring_Queue<Task> *rq = new Ring_Queue<Task>(10);
// for (int i = 0; i < 3; i++)
// {
// pthread_create(&p[i], nullptr, producter, rq);
// }
// for (int i = 0; i < 2; i++)
// {
// pthread_create(&c[i], nullptr, consumer, rq);
// }
// for (int i = 0; i < 3; i++)
// {
// pthread_join(p[i], nullptr);
// }
// for (int i = 0; i < 2; i++)
// {
// pthread_join(c[i], nullptr);
// }
// return 0;
// }
void *producter(void *args)
{
Ring_Queue<int> *rq = static_cast<Ring_Queue<int> *>(args);
while (true)
{
int in = rand() % 100 + 1;
rq->Push(in);
pthread_mutex_lock(&mtx);
printf("[0x%x]:",pthread_self());
cout << "生产者生产了一个数据 in = " << in << endl;
pthread_mutex_unlock(&mtx);
sleep(1);
}
}
void *consumer(void *args)
{
Ring_Queue<int> *rq = static_cast<Ring_Queue<int> *>(args);
while (true)
{
sleep(1);
int out;
rq->Pop(out);
pthread_mutex_lock(&mtx);
printf("[0x%x]:",pthread_self());
cout << "消费者消费了一个数据 out = " << out << endl;
pthread_mutex_unlock(&mtx);
}
}
二、信号量控制的环形队列原理图
以上就是基于信号量的环形队列实现的生产者消费者模型啦,你学会了吗?如果感觉到有所帮助的话,那就点点小心心,点点关注呗,后期还会持续更新Linux系统编程的相关知识哦,我们下期见!!!