互斥访问高效从何谈起(上节补充)
效率要考虑整体效率
放/取数据时串行,但造数据/处理数据可以并行
多线程时:数据在交易场所中传输确实是互斥,串行的(占比时间很短)
但生产者获取数据与消费者处理数据(任务)缺可以并行,提高了效率(占比时间最长)
所以总体来看,多线程确实提高了效率
POSIX信号量
头文件:semaphore(翻译:信号,信号量,信号灯)
信号量本质是一个保证了原子性的计数器,提供了信号量未申请成功的阻塞功能
信号量本质是预约机制
struct sem_t//模拟sem_t
{
int count;//计数器
mutex_t mutex;//PV原子性
cond//管理队列
}//信号量就是锁+条件变量
信号量本身也是临界资源
信号量PV操作是原子的
二元信号量就是互斥锁
semaphore接口
#include <semaphore.h>
// 创建和初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 销毁信号量
int sem_destroy(sem_t *sem);
// 获取信号量(阻塞直到成功)
int sem_wait(sem_t *sem);
// 非阻塞地获取信号量
int sem_trywait(sem_t *sem);
// 增加信号量的计数
int sem_post(sem_t *sem);
// 获取当前信号量的值
int sem_getvalue(sem_t *sem, int *sval);
返回值:都是成功0,失败-1
全局也要init初始化
PV操作是原子的
信号量管理环形队列生产消费者模型
单生产单消费
同一位置:互斥&同步
不在同一位置:并发
数据与空间和为capacity
生产者:P(空间)V(数据)空间变数据
消费者:P(数据)V(空间)数据变空间
两种情况分析
[code]RingBuffer.hpp
//单生产单消费
多生产多消费
多生产多消费,新增互斥关系
生产者与生产者之间互斥
消费者与消费者之间互斥
做法上,就是新加两把锁,分别管理生产者之间与消费者之间。这样多生产多消费就变成了单生产单消费
先PV还是先lock
Equeue中lock在PV之间,效率更高(先申请信号量,再加锁,可使申请信号量与申请锁并行)
void Equeue(const T &in)
{
// 生产者
_spacesem.P();
{
LockGuard lockguard(_p_lock);
_ring[_p_step] = in; // 生产完毕!
_p_step++;
_p_step %= _cap; // 维持唤醒特性
}
_datasem.V();
}
板书笔记
code
Main.cc
#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace RingBufferModule;
void *Consumer(void *args)
{
RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
while(true)
{
sleep(1);
// sleep(1);
// 1. 消费数据
int data;
ring_buffer->Pop(&data);
// 2. 处理:花时间
std::cout << "消费了一个数据: " << data << std::endl;
}
}
void *Productor(void *args)
{
RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
int data = 0;
while (true)
{
// 1. 获取数据:花时间
// sleep(1);
// 2. 生产数据
ring_buffer->Equeue(data);
std::cout << "生产了一个数据: " << data << std::endl;
data++;
}
}
int main()
{
RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源
// 单生产,单消费
pthread_t c1, p1, c2,c3,p2;
pthread_create(&c1, nullptr, Consumer, ring_buffer);
pthread_create(&c2, nullptr, Consumer, ring_buffer);
pthread_create(&c3, nullptr, Consumer, ring_buffer);
pthread_create(&p1, nullptr, Productor, ring_buffer);
pthread_create(&p2, nullptr, Productor, ring_buffer);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
delete ring_buffer;
return 0;
}
Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace LockModule
{
class Mutex
{
public:
Mutex(const Mutex&) = delete;
const Mutex& operator = (const Mutex&) = delete;
Mutex()
{
int n = ::pthread_mutex_init(&_lock, nullptr);
(void)n;
}
~Mutex()
{
int n = ::pthread_mutex_destroy(&_lock);
(void)n;
}
void Lock()
{
int n = ::pthread_mutex_lock(&_lock);
(void)n;
}
pthread_mutex_t *LockPtr()
{
return &_lock;
}
void Unlock()
{
int n = ::pthread_mutex_unlock(&_lock);
(void)n;
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
LockGuard(Mutex &mtx):_mtx(mtx)
{
_mtx.Lock();
}
~LockGuard()
{
_mtx.Unlock();
}
private:
Mutex &_mtx;
};
}
RingBuffer.hpp
体现了单生产单消费模型,多生产多消费模型在consumer与productor函数中加锁即可
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "Sem.hpp"
#include "Mutex.hpp"
namespace RingBufferModule
{
using namespace SemModule;
using namespace LockModule;
// RingQueue
template <typename T>
class RingBuffer
{
public:
RingBuffer(int cap)
: _ring(cap),
_cap(cap),
_p_step(0),
_c_step(0),
_datasem(0),
_spacesem(cap)
{
}
// 为什么没有判断??信号量,本身就是表示资源数目的,只要成功,就一定有,不需要判断!
void Equeue(const T &in)
{
// 生产者
_spacesem.P();
{
LockGuard lockguard(_p_lock);
_ring[_p_step] = in; // 生产完毕!
_p_step++;
_p_step %= _cap; // 维持唤醒特性
}
_datasem.V();
}
void Pop(T *out)
{
// 消费者
_datasem.P();
{
LockGuard lockguard(_c_lock);
*out = _ring[_c_step];
_c_step++;
_c_step %= _cap;
}
_spacesem.V();
}
~RingBuffer()
{
}
private:
std::vector<T> _ring; // 环, 临界资源
int _cap; // 总容量
int _p_step; // 生产者位置
int _c_step; // 消费位置
Sem _datasem; // 数据信号量
Sem _spacesem; // 空间信号量
Mutex _p_lock;
Mutex _c_lock;
};
} // namespace RingBufferModule
Sem.hpp
#pragma once
#include <semaphore.h>
namespace SemModule
{
int defalutsemval = 1;
class Sem
{
public:
Sem(int value = defalutsemval) : _init_value(value)
{
int n = ::sem_init(&_sem, 0, _init_value);
(void)n;
}
void P()
{
int n = ::sem_wait(&_sem);
(void)n;
}
void V()
{
int n = ::sem_post(&_sem);
(void)n;
}
~Sem()
{
int n = ::sem_destroy(&_sem);
(void)n;
}
private:
sem_t _sem;
int _init_value;
};
} // namespace SemModule