目录
- 🌈前言
- 🌸1、POSIX信号量
- 🍨1.1、概念
- 🍧1.2、PV操作
 
- 🌺2、POSIX信号量相关API
- 🍨2.1、初始化和销毁信号量
- 🍧2.2、等待信号量(P)
- 🍰2.3、发布信号量(V)
 
- 🍀3、基于环形队列的生产消费模型
- 🍨3.1、设计原理
- 🍨3.1、实现代码
 
🌈前言
这篇文章给大家带来线程同步与互斥的学习!!!
🌸1、POSIX信号量
🍨1.1、概念
-  
 
  概念⭐⭐⭐⭐⭐
  
 
-  
  - POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步
 
-  
  - 信号量是一个计数器,描述临界资源数量的计数器
 
-  
  - 只要信号量申请成功了,就一定能获得指定的资源
 
-  
  - 临界资源可以当作整体,也可以结合场景分成一小块一小块(比如:看电影,整个电影院就是临界资源,里面的座位就是一小块资源)
 
-  
  - 多线程获取一部分资源时,如果访问到相同的资源,是由程序员保证它们不访问到相同的资源的(设置判断)
 

-  
 
  资源预定
  
 
-  
  - 当我们申请mutex时,只要我们拿到了锁,当前被锁保护的临界资源就是我的了
 
-  
  - 访问临界资源被切换时,也不用担心,因为锁已经被我申请了,别人申请不了
 
-  
  - 信号量也是一样,只要申请成功,就能获取指定的资源,切换也不受影响
 
-  
  - 资源预定机制:申请属于自己的资源,但还没有使用,就叫做资源预定
 
🍧1.2、PV操作
-  
 
  概念
  
 
-  
  - 既然信号是一个计数器,那么计数器就有自增和自减的操作
 
-  
  - 信号量中的自减操作对应的是P操作,自增操作对应的是V操作
 
-  
  - 原生的自增自减操作在汇编种不是原子的,但是信号量的PV操作是原子的
 
-  
  - PV操作对应的是申请信号量资源和归还信号量资源
 二元信号量
-  
  - 二元信号量:申请信号量时,设置计数器为1,它只有0和1二种操作
 
-  
  - 二元信号量PV操作:申请信号量时,1自减变成0,申请成功,归还时又会自减变成0
 
// 二元信号量其实是一个互斥锁
信号量:1
P -- 1 -- 0 -- 加锁
V -- 0 -- 1 --释放锁
// 二元信号量 == 互斥锁
🌺2、POSIX信号量相关API
信号量的全部接口在编译时,都要加【-lpthread】选项,因为它是第三方库中的头文件
🍨2.1、初始化和销毁信号量
初始化信号量:
#include <semaphore.h>
typedef union
{
  char __size[__SIZEOF_SEM_T];
  long int __align;
} sem_t;
int sem_init(sem_t *sem, int pshared, unsigned int value);
-  
 
  函数解析
  
 
-  
  - 作用:sem_init()初始化sem指向的地址处的未命名的信号量
 
-  
  - sem:取地址信号量(信号量类型是sem_t)
 
-  
  - pshared:0表示线程间共享,非零表示进程间共享
 
-  
  - value:用于指定信号量的初始值(可以认为是计数器的初始值)
 
-  
  - 返回值:成功时,sem_init()返回0;出现错误时,返回-1,并设置errno以指示错误
 注意
-  
  - 初始化已初始化的信号量会导致未定义的行为
 
销毁信号量:
#include <semaphore.h>
typedef union
{
  char __size[__SIZEOF_SEM_T];
  long int __align;
} sem_t;
int sem_destroy(sem_t *sem);
-  
 
  函数解析
  
 
-  
  - sem:取地址信号量(信号量类型是sem_t)
 
-  
  - 返回值:成功时,sem_destroy()返回0;出现错误时,返回-1,并设置errno以指示错误
 注意
-  
  - 销毁其他进程或线程当前被阻止的信号量会产生未定义的行为
 
-  
  - 在重新初始化信号量之前,使用已被破坏的信号量会产生未定义的结果
 
🍧2.2、等待信号量(P)
#include <semaphore.h>
typedef union
{
  char __size[__SIZEOF_SEM_T];
  long int __align;
} sem_t;
int sem_wait(sem_t *sem); //P() -- P操作,信号量减一
-  
 
  函数解析
  
 
-  
  - 功能:P操作,等待信号量,会将信号量的值减1
 
-  
  - sem:取地址信号量(信号量类型是sem_t)
 
-  
  - 返回值:sem_wait()成功时返回0;出错时,信号量的值保持不变,返回-1,并且设置errno以指示错误
 
-  
  - 注意:如果信号量当前的值为零,那么该调用将阻塞/挂起等待,直到可以执行减量(即:信号量值高于零)
 
🍰2.3、发布信号量(V)
#include <semaphore.h>
typedef union
{
  char __size[__SIZEOF_SEM_T];
  long int __align;
} sem_t;
int sem_post(sem_t *sem);//V() -- V操作,信号量加1
-  
 
  函数解析
  
 
-  
  - 功能:V操作,发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
 
-  
  - sem:取地址信号量(信号量类型是sem_t)
 
-  
  - 返回值:成功时,sem_post()返回0;出现错误时,信号量的值保持不变,返回-1,并设置errno以指示错误
 
-  
  - 如果信号量的值因此变为大于零,则在sem_wait()调用中被阻塞的另一个进程或线程将被唤醒,并且继续往下运行
 
🍀3、基于环形队列的生产消费模型
上一篇文章已经讲了生产消费模型的原理【跳转多线程同步与互斥】
🍨3.1、设计原理
-  
 
  环形队列的设计方法
  
 
-  
  - 环形队列可以使用链表和数组来进行设计
 
-  
  - 队列是连续的,最好使用数组来设计,因为数组是连续存储的,在CPU高速缓存中命中率高,而链表是随机存储的,CPU高速缓存命中率低
 
-  
  - 环形队列实现方法可以使用计数器、取模运算、预留一个空位置方法等等来进行实现…
 
-  
  - 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
 

使用信号量设置基于生产消费的环形队列 – 信号量本质是一个计数器
图解:

-  
 
  总结
  
 
-  
  - 生产和消费线程可能会访问同一个位置
 
-  
  - 只有环形队列为空或为满的时候,才会访问同一个位置,我们让它们同步与互斥(信号量+互斥锁)的走就不会出现这个问题了
 
-  
  - 其他时候,都是访问不同的位置(可以并发的去执行)
 
-  
  - 队列为空时:消费者线程不能超过生产者线程,因为队列没有数据 – 生产者线程先走
 
-  
  - 队列为满时,生产者线程不能超过消费者线程,可能导致数据覆盖 – 消费者线程先走
 
🍨3.1、实现代码
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 10;
template <class T>
class RingQueue
{
public:
    RingQueue(int cap = gCap): ringqueue_(cap), pIndex_(0), cIndex_(0)
    {
        // 生产
        sem_init(&roomSem_, 0, ringqueue_.size());
        // 消费
        sem_init(&dataSem_, 0, 0);
        pthread_mutex_init(&pmutex_ ,nullptr);
        pthread_mutex_init(&cmutex_ ,nullptr);
    }
    
    // 生产
    void push(const T &in)
    {
    	// P操作
        sem_wait(&roomSem_);
        pthread_mutex_lock(&pmutex_);
        ringqueue_[pIndex_] = in; //生产的过程
        pIndex_++;   // 写入位置后移
        pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
        pthread_mutex_unlock(&pmutex_);
        // V操作
        sem_post(&dataSem_);
    }
    
    // 消费
    T pop()
    {
    	// P操作
        sem_wait(&dataSem_);
        pthread_mutex_lock(&cmutex_);
        T temp = ringqueue_[cIndex_];
        cIndex_++;
        cIndex_ %= ringqueue_.size();// 更新下标,保证环形特征
        pthread_mutex_unlock(&cmutex_);
        // V操作
        sem_post(&roomSem_);
        return temp;
    }
    ~RingQueue()
    {
        sem_destroy(&roomSem_);
        sem_destroy(&dataSem_);
        pthread_mutex_destroy(&pmutex_);
        pthread_mutex_destroy(&cmutex_);
    }
private:
    vector<T> ringqueue_; // 环形队列
    sem_t roomSem_;       // 衡量空间计数器,productor
    sem_t dataSem_;       // 衡量数据计数器,consumer
    uint32_t pIndex_;     // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源
    uint32_t cIndex_;     // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
	// 为什么要加锁呢? 因为多线程的情况下,需要保护下标自增操作,它们不是原子的
    pthread_mutex_t pmutex_; // 生产者互斥锁
    pthread_mutex_t cmutex_; // 消费者互斥锁
};
test.cpp – 测试代码
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
// 不要只关心把数据或者任务,从ringqueue 放拿的过程,获取数据或者任务,处理数据或者任务,也是需要花时间的!
void *productor(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while(true)
    {
        int data = rand()%10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
        sleep(1);
    }
}
void *consumer(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while(true)
    {
        //sleep(10);
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
    }
}
int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    RingQueue<int> rq;
    pthread_t c1,c2,c3, p1,p2,p3;
    pthread_create(&p1, nullptr, productor, &rq);
    pthread_create(&p2, nullptr, productor, &rq);
    pthread_create(&p3, nullptr, productor, &rq);
    pthread_create(&c1, nullptr, consumer, &rq);
    pthread_create(&c2, nullptr, consumer, &rq);
    pthread_create(&c3, nullptr, consumer, &rq);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);
    return 0;
}











![[电商实时数仓] 用户行为数据和业务数据采集以及ODS层](https://img-blog.csdnimg.cn/8d5a0268932544fc995056ee36f0d513.png)






