目录
- 🌈前言
- 🌸1、POSIX信号量
- 🍨1.1、概念
- 🍧1.2、PV操作
- 🌺2、POSIX信号量相关API
- 🍨2.1、初始化和销毁信号量
- 🍧2.2、等待信号量(P)
- 🍰2.3、发布信号量(V)
- 🍀3、基于环形队列的生产消费模型
- 🍨3.1、设计原理
- 🍨3.1、实现代码
🌈前言
这篇文章给大家带来线程同步与互斥的学习!!!
🌸1、POSIX信号量
🍨1.1、概念
-
概念⭐⭐⭐⭐⭐
-
- POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步
-
- 信号量是一个计数器,描述临界资源数量的计数器
-
- 只要信号量申请成功了,就一定能获得指定的资源
-
- 临界资源可以当作整体,也可以结合场景分成一小块一小块(比如:看电影,整个电影院就是临界资源,里面的座位就是一小块资源)
-
- 多线程获取一部分资源时,如果访问到相同的资源,是由程序员保证它们不访问到相同的资源的(设置判断)
-
资源预定
-
- 当我们申请mutex时,只要我们拿到了锁,当前被锁保护的临界资源就是我的了
-
- 访问临界资源被切换时,也不用担心,因为锁已经被我申请了,别人申请不了
-
- 信号量也是一样,只要申请成功,就能获取指定的资源,切换也不受影响
-
- 资源预定机制:申请属于自己的资源,但还没有使用,就叫做资源预定
🍧1.2、PV操作
-
概念
-
- 既然信号是一个计数器,那么计数器就有自增和自减的操作
-
- 信号量中的自减操作对应的是P操作,自增操作对应的是V操作
-
- 原生的自增自减操作在汇编种不是原子的,但是信号量的PV操作是原子的
-
- PV操作对应的是申请信号量资源和归还信号量资源
二元信号量
-
- 二元信号量:申请信号量时,设置计数器为1,它只有0和1二种操作
-
- 二元信号量PV操作:申请信号量时,1自减变成0,申请成功,归还时又会自减变成0
// 二元信号量其实是一个互斥锁
信号量:1
P -- 1 -- 0 -- 加锁
V -- 0 -- 1 --释放锁
// 二元信号量 == 互斥锁
🌺2、POSIX信号量相关API
信号量的全部接口在编译时,都要加【-lpthread】选项,因为它是第三方库中的头文件
🍨2.1、初始化和销毁信号量
初始化信号量:
#include <semaphore.h>
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
函数解析
-
- 作用:sem_init()初始化sem指向的地址处的未命名的信号量
-
- sem:取地址信号量(信号量类型是sem_t)
-
- pshared:0表示线程间共享,非零表示进程间共享
-
- value:用于指定信号量的初始值(可以认为是计数器的初始值)
-
- 返回值:成功时,sem_init()返回0;出现错误时,返回-1,并设置errno以指示错误
注意
-
- 初始化已初始化的信号量会导致未定义的行为
销毁信号量:
#include <semaphore.h>
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
int sem_destroy(sem_t *sem);
-
函数解析
-
- sem:取地址信号量(信号量类型是sem_t)
-
- 返回值:成功时,sem_destroy()返回0;出现错误时,返回-1,并设置errno以指示错误
注意
-
- 销毁其他进程或线程当前被阻止的信号量会产生未定义的行为
-
- 在重新初始化信号量之前,使用已被破坏的信号量会产生未定义的结果
🍧2.2、等待信号量(P)
#include <semaphore.h>
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
int sem_wait(sem_t *sem); //P() -- P操作,信号量减一
-
函数解析
-
- 功能:P操作,等待信号量,会将信号量的值减1
-
- sem:取地址信号量(信号量类型是sem_t)
-
- 返回值:sem_wait()成功时返回0;出错时,信号量的值保持不变,返回-1,并且设置errno以指示错误
-
- 注意:如果信号量当前的值为零,那么该调用将阻塞/挂起等待,直到可以执行减量(即:信号量值高于零)
🍰2.3、发布信号量(V)
#include <semaphore.h>
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
int sem_post(sem_t *sem);//V() -- V操作,信号量加1
-
函数解析
-
- 功能:V操作,发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
-
- sem:取地址信号量(信号量类型是sem_t)
-
- 返回值:成功时,sem_post()返回0;出现错误时,信号量的值保持不变,返回-1,并设置errno以指示错误
-
- 如果信号量的值因此变为大于零,则在sem_wait()调用中被阻塞的另一个进程或线程将被唤醒,并且继续往下运行
🍀3、基于环形队列的生产消费模型
上一篇文章已经讲了生产消费模型的原理【跳转多线程同步与互斥】
🍨3.1、设计原理
-
环形队列的设计方法
-
- 环形队列可以使用链表和数组来进行设计
-
- 队列是连续的,最好使用数组来设计,因为数组是连续存储的,在CPU高速缓存中命中率高,而链表是随机存储的,CPU高速缓存命中率低
-
- 环形队列实现方法可以使用计数器、取模运算、预留一个空位置方法等等来进行实现…
-
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
使用信号量设置基于生产消费的环形队列 – 信号量本质是一个计数器
图解:
-
总结
-
- 生产和消费线程可能会访问同一个位置
-
- 只有环形队列为空或为满的时候,才会访问同一个位置,我们让它们同步与互斥(信号量+互斥锁)的走就不会出现这个问题了
-
- 其他时候,都是访问不同的位置(可以并发的去执行)
-
- 队列为空时:消费者线程不能超过生产者线程,因为队列没有数据 – 生产者线程先走
-
- 队列为满时,生产者线程不能超过消费者线程,可能导致数据覆盖 – 消费者线程先走
🍨3.1、实现代码
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 10;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gCap): ringqueue_(cap), pIndex_(0), cIndex_(0)
{
// 生产
sem_init(&roomSem_, 0, ringqueue_.size());
// 消费
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_ ,nullptr);
pthread_mutex_init(&cmutex_ ,nullptr);
}
// 生产
void push(const T &in)
{
// P操作
sem_wait(&roomSem_);
pthread_mutex_lock(&pmutex_);
ringqueue_[pIndex_] = in; //生产的过程
pIndex_++; // 写入位置后移
pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
pthread_mutex_unlock(&pmutex_);
// V操作
sem_post(&dataSem_);
}
// 消费
T pop()
{
// P操作
sem_wait(&dataSem_);
pthread_mutex_lock(&cmutex_);
T temp = ringqueue_[cIndex_];
cIndex_++;
cIndex_ %= ringqueue_.size();// 更新下标,保证环形特征
pthread_mutex_unlock(&cmutex_);
// V操作
sem_post(&roomSem_);
return temp;
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
vector<T> ringqueue_; // 环形队列
sem_t roomSem_; // 衡量空间计数器,productor
sem_t dataSem_; // 衡量数据计数器,consumer
uint32_t pIndex_; // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源
uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
// 为什么要加锁呢? 因为多线程的情况下,需要保护下标自增操作,它们不是原子的
pthread_mutex_t pmutex_; // 生产者互斥锁
pthread_mutex_t cmutex_; // 消费者互斥锁
};
test.cpp – 测试代码
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
// 不要只关心把数据或者任务,从ringqueue 放拿的过程,获取数据或者任务,处理数据或者任务,也是需要花时间的!
void *productor(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while(true)
{
int data = rand()%10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
sleep(1);
}
}
void *consumer(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while(true)
{
//sleep(10);
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
RingQueue<int> rq;
pthread_t c1,c2,c3, p1,p2,p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}