目录
什么是信号量
如何理解信号量的使用
基于环形队列的生产消费者模型
如上问题我们如何用编码保证 ?(信号量)
编码:
POSIX信号量和SystemV信号量作用相同,都是用于同步操作。POSIX可以用于线程同步。
信号量本质上就是一个计数器。
什么是信号量
只要保证共享资源任何时刻都只有一个执行流在进行访问,就有了临界资源和临界区的概念。互斥(加锁)时候,共享资源是被当做整体使用的。如果不同线程访问的是共享资源中的不同的位置,这个共享资源就可以分开来使用,让不同的执行流访问不同的区域,就可以继续并发了。
那么我们如何知道这个一共有多少个资源,还剩多少个?自己初始化的
你怎么保证这个资源就是给你的?程序员编码
我怎么知道我一定可以具有一个共享资源呢?信号量
电影院的例子:
买票的本质:叫做资源(座位)的预定机制。
信号量本质:是一个计数器,访问临界资源的时候,必须先申请信号量资源(sem--, 预定资源, P),使用完毕信号量资源(sem++, 释放资源,V)。
如何理解信号量的使用
我们申请了一个信号量,代表我当前执行流一定具有一个资源,可以被它使用了,那么是哪一个资源呢?需要程序员根据场景自己编码完成!
基于环形队列的生产消费者模型
项目:就是 物理结构->逻辑结构,在加一些条件循环判断。看项目时,看核心的物理结构代码,联想到逻辑结构。
环形队列:
判空:start == end
判满:1.计数器 2.浪费一个格子
空的时候,消费者线程等待;满的时候,生产者线程等待。
如果生产和消费指向了环形的同一个位置(表示队列为空or未满):生产和消费要有互斥或者同步问题。但是大概率生产线程和消费线程指向的是不同的位置,所以只需要让生产和消费指向同一个位置,具有互斥同步关系就可以了,而让生产和消费不指向同一个位置时,并发指向!
我们的期望:生产者和消费者套圈(数据覆盖问题)、消费者不能超过生产者,指向同一个位置的情况:1.为空时,让生产者先运行;2.为满,让消费者先运行;其他情况可以并发访问。
如上问题我们如何用编码保证 ?(信号量)
先看看两者关注什么
生产者:最关注的是空间资源 -> spaceSem信号量 -> N
消费者:最关注的是数据资源 -> dataSem信号量 -> 0
进行生产:先申请信号量 P(spaceSem) -> spaceSem--,然后再特定位置生产。V(dataSem)
进行消费:先申请信号量P(dataSem) -> dataSem--, 然后消费特定的数据。V(spaceSem)
信号量申请失败,线程被挂起。
问题:如何保证在特定的位置访问自己的资源? 两者线程都有自己的下标即可。
编码:
#include "ringQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int myAdd(int x, int y)
{
return x + y;
}
void* consume(void* args)
{
RingQueue<Task>* rq = (RingQueue<Task>*)args;
while (true)
{
Task t;
rq->pop(&t);
std::cout << "线程 " << pthread_self() << " 消费成功:" << "=" << t() << std::endl;
}
}
void* product(void* args)
{
RingQueue<Task>* rq = (RingQueue<Task>*)args;
while (true)
{
int x = rand() % 100;
int y = rand() % 100;
Task t(x, y, myAdd);
rq->push(t);
std::cout << "线程 " << pthread_self() << " 生产任务成功:" << x << "+" << y << "=?" << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t c[3], p[5];
RingQueue<int> *rq = new RingQueue<int>();
for (int i = 0; i < 3; ++i) pthread_create(c + i, nullptr, consume, rq);
for (int i = 0; i < 5; ++i) pthread_create(p, nullptr, product, rq);
for (int i = 0; i < 3; ++i) pthread_join(c[i], nullptr);
for (int i = 0; i < 5; ++i) pthread_join(p[i], nullptr);
return 0;
}
#pragma once
#include "sem.hpp"
#include "Task.hpp"
#include <iostream>
#include <vector>
#include <pthread.h>
const int g_data_num = 5;
// 定义环形队列
template<class T>
class RingQueue
{
public:
RingQueue(int num = g_data_num)
:_num(num)
,_ring_queue(num)
,_c_step(0)
,_p_step(0)
,_space_sem(num)
,_data_sem(0)
{
pthread_mutex_init(&_c_mtx, nullptr);
pthread_mutex_init(&_p_mtx, nullptr);
}
void push(T& in)
{
// 先预空间申请信号量
_space_sem.p();
// 在进行实际处理
// std::cout << "空间申请成功" << std::endl;
pthread_mutex_lock(&_p_mtx);
_ring_queue[_p_step++] = in;
_p_step %= _num;
// std::cout << "线程 " << pthread_self() << " 生产任务成功:" << in << std::endl;
pthread_mutex_unlock(&_p_mtx);
_data_sem.v();
}
void pop(T* out)
{
// 先预数据申请信号量
_data_sem.p();
// 在进行实际处理
// std::cout << "数据申请成功" << std::endl;
pthread_mutex_lock(&_c_mtx);
*out = _ring_queue[_c_step++];
_c_step %= _num;
// std::cout << "线程 " << pthread_self() << " 任务消费成功:" << *out << std::endl;
pthread_mutex_unlock(&_c_mtx);
_space_sem.v();
}
~RingQueue()
{
pthread_mutex_destroy(&_c_mtx);
pthread_mutex_destroy(&_p_mtx);
}
private:
std::vector<T> _ring_queue; // 队列
int _num; // 数据最大个数
int _c_step; // 消费者下标
int _p_step; // 生产者下标
Sem _space_sem; // 空间信号量
Sem _data_sem; // 数据信号量
pthread_mutex_t _c_mtx; // 消费者锁
pthread_mutex_t _p_mtx; // 生产者锁
};
#ifndef __TASK_HPP
#define __TASK_HPP
#include <iostream>
#include <functional>
// typedef std::function<int (int, int)> func_t;
typedef int(*func_t)(int, int);
class Task
{
public:
Task()
{}
Task(int x, int y, func_t func)
:_x(x)
,_y(y)
,_func(func)
{
}
int operator()()
{
return _func(_x, _y);
}
private:
int _x;
int _y;
func_t _func;
};
#endif
#ifndef __TASK_HPP
#define __TASK_HPP
#include <iostream>
#include <functional>
// typedef std::function<int (int, int)> func_t;
typedef int(*func_t)(int, int);
class Task
{
public:
Task()
{}
Task(int x, int y, func_t func)
:_x(x)
,_y(y)
,_func(func)
{
}
int operator()()
{
return _func(_x, _y);
}
private:
int _x;
int _y;
func_t _func;
};
#endif
ring_queue:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ring_queue
多生产多消费的意义? 不要狭隘的认为,把认为或者数据放在交易场所,就是生产和消费,将数据或者任务生产前和那道之后的处理,才是最耗费时间的。
生产的本质:私有的任务->公共空间
消费的本质:公共空间中的认为->私有的
信号量的本质是一个计数器,计数器的意义是什么?可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断。
以前申请锁 -> 判断与访问 -> 释放锁 --- 这么做的本质是我们并不清楚临界资源的情况!!信号量要提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况!