Linux — 多线程的互斥与同步,信号量

news2024/9/22 19:27:14

 

 

1.线程互斥

  进程线程间的互斥相关背景概念

  • 临界资源:多线程执行流共享的资源就叫做临界资源。
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
  • 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。

 互斥量mutex

        大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
多个线程并发的操作共享变量,会带来一些问题。

 下面的代码是模拟多个线程抢票的过程。

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <memory>
#include "thread.hpp"

using namespace std;

//共享资源  ->火车票
int tickets = 100;
void* Get_tickets(void* args)
{
    string username = static_cast<const char*>(args);
    while(true)
    {
        if(tickets > 0)
        {   //微秒时间,1秒 = 1000毫秒 1毫秒 = 1000微秒...
            usleep(1245); //模拟抢票时间

            cout << username << "正在抢票中... 票号:"<< tickets-- << endl;   
        }
        else
            break;
    }
    return nullptr;
}
int main()
{
    //这里用的是封装好的线程
    unique_ptr<Thread> thread1(new Thread(Get_tickets,(void*)"user1 ",1));
    unique_ptr<Thread> thread2(new Thread(Get_tickets,(void*)"user2 ",2));
    unique_ptr<Thread> thread3(new Thread(Get_tickets,(void*)"user3 ",3));
    unique_ptr<Thread> thread4(new Thread(Get_tickets,(void*)"user4 ",4));

    thread1->join();
    thread2->join();
    thread3->join();
    thread4->join();

    return 0;
}

把Linux线程库的接口进行封装,封装线程的代码如下:lesson11/test4/thread.hpp · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)

 我们发现多个线程进行抢票居然出现了负数,这是违反常理的,不论是火车票还是电影票,100个位置卖了102张票,多出了两个人就会有问题。

 为什么会出现上述的结果呢?

1. if 语句判断条件为真以后,代码并发的切换到其他线程。注:线程什么时候切换呢?比如时间片到了,来了更高优先级的线程,线程在等待的时候。
2. usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段。
3. --ticket 操作本身就不是一个原子操作。

-- 操作并不是原子操作,而是对应三条汇编指令:

  •         load :将共享变量ticket从内存加载到寄存器中
  •         update : 更新寄存器里面的值,执行-1操作
  •         store :将新值,从寄存器写回共享变量ticket的内存地址

要解决上面的问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到上面这三点,其实就是需要一把锁,Linux上提供的这把锁叫互斥量。

 互斥量的接口

  初始化互斥量

初始化互斥量有两种方法:

1.静态分配 :pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

上了锁后,抢票就正常了,但是我们发现只有一个线程一直在抢,这个问题后面说。

2.动态分配

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);

参数:
mutex:要初始化的互斥量
attr:NULL

  销毁互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex);
 

销毁互斥量需要注意:

  • 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁

  互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值:成功返回0,失败返回错误号

int pthread_mutex_trylock(pthread_mutex_t *mutex);

尝试申请锁,申请成功直接持有锁并返回0,申请失败出错返回错误码

调用 pthread_mutex_lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁
#include <iostream>
#include <string>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <memory>
#include "thread.hpp"

using namespace std;

//共享资源  ->火车票
int tickets = 100;
class ThreadData
{
public:
    ThreadData(const string& name,pthread_mutex_t* mutex_p)
    :_name(name),_mutex_p(mutex_p)
    {}
    ~ThreadData()
    {}
public:
    string _name;
    pthread_mutex_t* _mutex_p;
};
void* Get_tickets(void* args)
{
    ThreadData* td = static_cast<ThreadData*>(args);
    while(true)
    {
        pthread_mutex_lock(td->_mutex_p);   //加锁
        if(tickets > 0)
        {   //微秒时间,1秒 = 1000毫秒 1毫秒 = 1000微秒...
            usleep(1245); //模拟抢票时间

            cout << td->_name << "正在抢票中... 票号:"<< tickets-- << endl;   
            pthread_mutex_unlock(td->_mutex_p);  //解锁
        }
        else
        {   //不能直接在外面解锁,因为有break,会直接跳过,导致锁没有解开的场景
            pthread_mutex_unlock(td->_mutex_p);  
            break;
        }
    }
    return nullptr;
}
int main()
{
#define NUM 4
    pthread_mutex_t lock;
    pthread_mutex_init(&lock,nullptr);  //初始化锁

    vector<pthread_t> tids(NUM);

    for(int i = 0;i<NUM;++i)
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"thread %d",i+1);
        ThreadData* td = new ThreadData(buffer,&lock);
        pthread_create(&tids[i],nullptr,Get_tickets,td);
    }
    for(const auto& Tids:tids)
    {
        pthread_join(Tids,nullptr);
    }

    pthread_mutex_destroy(&lock);//销毁锁
    return 0;
}

这里也对互斥锁的接口进行了封装,感兴趣的可以看一下:lesson11/test4/Mutex.hpp · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com) 

 上了锁后,票数正常了,没有出现负数,但是出现了某一个线程一直抢票的现象。这是为什么?

 我们要知道,锁只规定了互斥访问,没有规定谁优先,谁先执行,也没有说必须轮着一个一个执行。所以我们让线程执行完抢票后,在等一会,模拟一下形成订单,有了这个等待的过程,其他的线程就可以抢到锁了。

如何看待锁(互斥量)?

        我们要对临界区的资源上锁,首先要让所有线程看到这把锁,那么这个锁的本身就属于共享资源。全局变量(比如票)是要被保护的,锁是用来保护全局资源的,那么锁本身也是全局资源,锁的完全谁来保护呢?

        这就需要保证加锁的过程必须是安全的,这个不需要我们操心,设计者在设计的时候就已经考虑到了,加锁的过程是原子的。也就是说,要么申请成功,要么申请不成功,不存在其他情况。

        如果锁申请成功,那么就继续执行临界区的代码,如果暂时申请不成功,那么执行流会阻塞,直到其他有执行流释放锁,那么操作系统会唤醒该执行流,再次申请锁。

如果一个线程申请锁成功,正在访问临界资源期间,该线程可不可以被切换呢(从CPU中切走)?        答案是可以!

而该线程被切走了以后,其他线程可以访问临界区吗?          答案是不可以!

      因为当持有锁的线程被切走时,是抱着锁被切走的,即便该线程被切走了,其他的线程依旧无法申请到锁,也无法访问临界区执行代码。直至该线程释放锁,其他线程才可以申请锁,访问临界资源。

对于其他线程而言,有意义的锁的状态只有两种:1.申请锁前,2.释放锁后。站在其他线程的角度,看待当前线程持有锁的过程,就是原子的!

互斥量实现原理(加锁解锁的原理)

经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
        为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

下图为加锁解锁的汇编指令:

1.CPU内的寄存器只有一套,被所有执行流共享,2.CPU内寄存器上的内容是每个执行流私有的,该数据属于运行时的上下文,执行流被切换时要带走自己的上下文。

加锁:

  • 第一步线程把0写入CPU的寄存器中(寄存器中的数据属于线程上下文,线程被切换时会带走上下文);
  • 第二步将寄存器和互斥量的值进行交换,交换的本质就是将共享的数据交换到我的上下文当中;
  • 第三步进行判断,当前占有CPU的线程,寄存器中的值是否大于0,大于则返回0,表示申请锁成功,否则申请不成功,线程挂起等待。

 解锁:

把1拷贝到mutex里,唤醒等待互斥量的线程,然后返回。

 因为线程在占用CPU执行时随时可能被切换,所以下图是线程A刚把第二步做完就被切换了。即使线程A还没有申请成功,但是已经拿到了锁的内容,我们就认为线程A已经申请到锁了,因为其他线程来申请也是申请不到的,只有等线程A释放了以后才可能申请到。

可重入和不可重入

重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

  常见可重入的情况

  • 不使用全局变量或静态变量
  • 不使用用malloc或者new开辟出的空间
  • 不调用不可重入函数
  • 不返回静态或全局数据,所有数据都有函数的调用者提供
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

  常见不可重入的情况

  • 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
  • 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
  • 可重入函数体内使用了静态的数据结构

线程安全和线程不安全

线程安全:多个线程并发执行同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。

  常见的线程不安全的情况

  • 不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数

  常见的线程安全的情况

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的类或者接口对于线程来说都是原子操作
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性

  可重入与线程安全的联系

  • 函数是可重入的,那就是线程安全的
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
  • 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

  可重入与线程安全区别

  • 可重入函数是线程安全函数的一种
  • 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
  • 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。

死锁

死锁是指在一组进程中的各个线程均占有不会释放的资源,但因互相申请被其他线程所占用不会释放的资源而处于的一种永久等待状态。

 一个执行流不释放锁,而且还重复申请锁也会导致死锁。

  死锁四个必要条件

  • 互斥条件:一个资源每次只能被一个执行流使用
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放(一个线程去申请另一把锁,但是不释放自己的锁)
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系(线程A拿着自己的锁去申请线程B的锁,线程B也拿着自己的锁去申请线程A的锁,他们两造成一个环路条件)

  避免死锁

  • 破坏死锁的四个必要条件
  • 加锁顺序一致
  • 避免锁未释放的场景
  • 资源一次性分配

避免死锁算法

死锁检测算法(了解)

比如多线程中有一个线程不做其他事情,专门检查有没有死锁的情况,如果有就去把那个锁释放掉。
银行家算法(了解


2.线程同步

  同步概念与竞态条件

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。

 生产者消费者模型

 为何要使用生产者消费者模型

        生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

        我们常见的生产者消费者模型比如 超市,供货商给超市供应商品,而学生或消费者到超市买商品或者说消费商品。那么供货商供应商品时,消费人群可以干任何事,可以在工作,在玩,在购买商品,当然消费者在购买商品时,供货商也可以干任何事,这个生产的过程和消费的过程是解耦的。我们把超市这个临时保存产品的场所叫做缓冲区。

        如果没有超市这个容器,那么你购买商品需要到供货商的厂区去购买,供货商不是说你到了他们厂区他就直接把商品给你了(他根本不知道你要什么),而是你到了之后提出需求后才开始生产。那么这个生产的过程需要你去等,等到生产完成,你拿回去自己使用这个过程,这些都需要时间支撑。此时生产者和消费者之间是强耦合的,也就是说消费者需要,生产者才去生产,消费者等待生产过程,消费者再去消费商品,这个时间消耗太大,不论是生产者还是消费者成本都太高了。

  • 生产者和生产者的关系:互斥 -> 容器的容量就那么大,同一块区域你放了商品别人就放不了商品了,当然也不能存在两个人同时往一块区域放商品的情况,这会出现数据不一致问题。
  • 消费者和消费者的关系:互斥 -> 商品是有限的,同一个商品你拿走了别人就不能拿了,当然也不能存在两个线程拿同一份资源的情况,也会造成数据不一致问题。
  • 生产者和消费者的关系:互斥与同步 ->我们知道计算机世界里数据是可以被覆盖的,互斥是因为消费者在这个区域拿商品,生产者正好也过来放商品,可能会出现消费者还没有把数据完全拿走,而生产者就把这个数据覆盖成其他的数据了。同步是超市里的商品空了,生产者放商品,消费者才能拿商品,或者超市的商品满了,消费者拿走商品,生产者才能放商品。

总结:“321原则”

  • 三种关系:生产者和生产者的关系(互斥),消费者和消费者的关系(互斥),生产者和消费者的关系(互斥(保证共享资源的安全性),同步) 他们之间的关系是对于商品(数据)而言的。
  • 两种角色:生产者线程,消费者线程
  • 一个交易场所:一段特定结构的缓冲区。

只要我们想写生产消费模型,其实本质工作就是维护“321原则”。

生产者消费者模型优点

1.生产者线程和消费者线程进行解耦。

2.支持生产和消费的一段时间的忙闲不均的问题

3.生产者和消费者并发的执行,能提高效率。

 条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

        当我们把一份临界资源用锁保护起来时,第一次到来的线程优先获得访问权,因为在锁之后一般有一个条件判断,该线程通过条件判断访问临界资源完毕后解锁,正常情况就应该换先一个线程访问临界资源了。但是很有可能该线程是一直循环访问,第一个循环结束后,他再次上来申请锁,判断条件是否满足,不满足条件后解锁...由于他得天独厚的条件,导致它申请锁会很快,那么该线程会一直重复申请锁,判断条件,不满足条件,解锁这个过程。导致其他线程申请不到锁,造成线程饥饿问题。此时就需要条件变量来解决问题。

        条件变量是pthread库提供的一个数据类型,每当有线程访问临界资源时,申请互斥锁不成功,如果没有条件变量就会被阻塞挂起,有条件变量的话,就会通过一个接口将线程的PCB链接到条件变量里的队列中进行等待,上一个线程访问完临界资源后,它再想继续访问临界资源,对不起,要去条件队列里等待,此时会调用接口唤醒条件队列中的第一个线程,让他进行访问。这样就不会出现线程饥饿问题了。

举个列子更好的理解条件变量:

        我们都知道在面试的时候,会一个一个的叫人去办公室面试,这个面试官就相当于共享资源,那么我们肯定不能一窝蜂的都挤到面试官面前等待提问,所以办公室就相当于互斥锁。如果仅仅只有互斥锁,那么一群人都在办公室门口等待被叫去面试,我们假设面试官只知道面试,不管面试的人是谁。那么第一个人面试完毕,刚走出办公室,门一关,那个人手都还在门把手上,他又直接把门打开进去面试了,就这样他一直重复的去做这样的事情,导致大家都面试不了,造成饥饿问题。

        这时就需要一个管理者,他对所有面试的人说,现在这有一个等待区,大家赶紧去排队领号,我一会叫号,叫到号的人才能去面试,没有号的人就别想面试了。此时大家就赶紧去排队,等待叫号,然后面试,不一会就井井有序了。我们把这个等待区和管理者叫做条件变量。

条件变量函数

  初始化

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrictattr);
参数:
        cond:要初始化的条件变量
        attr:NULL

  销毁

int pthread_cond_destroy(pthread_cond_t *cond);

        cond:要销毁的条件变量

  等待条件

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

作用:调用该函数的线程进行等待

参数:
        cond:要在这个条件变量上等待
        mutex:互斥量,后面详细解释

int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);

作用:设置一个等待时间,时间到了自动返回

  唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);

作用:唤醒一批线程,唤醒在该条件队列下的所有线程
int pthread_cond_signal(pthread_cond_t *cond);
作用:唤醒一个线程

 简单的使用:唤醒条件队列中的一个线程

唤醒条件队列中的所有线程

完整代码:lesson11/test5/1_test_cond/testCond.cc · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

 C++ queue模拟阻塞队列的生产消费模型

这里的代码先以单生产,单消费来演示。

#include <sys/types.h>
#include <unistd.h>
#include <ctime>

#include "BlockQueue.hpp"
#include "Task.hpp"

const string oper = "+-*/%";    //运算符号
int mymath(int x,int y,char op)
{
    int result = 0;
    switch(op)
    {
        case '+':
            result = x + y;
            break;
        case '-':
            result = x - y;
            break;
        case '*':
            result = x * y;
            break;
        case '/':
        {
            if(y == 0)
            {
                cerr<<"div zero err" << endl;
                result = -1;
            }
            else 
                result = x / y;
            break;
        }
        case '%':
        {
            if(y == 0)
            {
                cerr<<"mod zero err" << endl;
                result = -1;
            }
            else 
                result = x % y;
            break;
        }
    }
    return result;
}
void* consumer(void* args)  //消费者线程
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    while(true)
    {
        //消费活动
        
        Task t;
        bq->pop(&t);
        cout<<"消费任务:" << t() << endl;
        // sleep(1);
    }
    return nullptr;
}
void* productor(void* args) //生产者线程
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    while(true)
    {
        //生产活动
        
        int x = rand()%10+1;
        int y = rand()%5;
        int operidx = rand()% oper.size();  //运算符下标

        Task t(x,y,oper[operidx],mymath);
        bq->push(t);

        cout<< "生产任务:" << t.to_Task_string() << endl;
        sleep(1);

    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(0) ^ getpid());
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    delete bq;
    return 0;
}

 上面的代码只是主函数的接口,具体的实现有兴趣的自己看看把:lesson11/test5/BlockQueue · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)

在此基础上多添加了一个存储文件的线程,这么就不过多赘述了,想了解的自己看:

lesson11/test5/BlockQueue2 · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)

而多生产和多消费可以基于单生产单消费的代码直接进行添加线程,由于我们只有一个队列,一把锁,所以不管多个线程生产还是多个线程消费,都只有一个线程能申请到锁,

在前面说过生产者消费者模型很高效,那么它高效在哪里呢?

        它并不是高效在阻塞队列上,而是可以在生产之前和消费之后,让线程并发的执行。因为我们今天只是模拟任务,但如果是真的任务,那么生产任务和消费任务一定特别的耗时间,所以让他们并发的执行,就会大大提高效率。你生产你的,我消费我的,互不干涉,只有在队列为空或为满时才会有一方的线程暂时等待。

说一说上面的代码一些“不足”的地方:

我们知道一个线程操作临界资源时是先加锁,在检测是否满足条件,根据检测结果进行下一步动作。可是呢,我们无法在加锁之前就得知此次访问是否满足条件,这样的话就会造成不管满不满足条件,都要先加锁,然后检测,满足就挂起,这样线程啥也没干,转了一圈,还浪费时间。如果我们事先就知道判断条件满不满足,那线程就该执行执行,该挂起挂起,就不用每次都加锁了。此时就要引入信号量的概念了。

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

回顾一下信号量的概念:信号量是什么?

        信号量本质是一把计数器,这个计数器是用来衡量临界资源中资源数量多少的。只要拥有了信号量那么在未来就一定能拥有临界资源的一部分。申请信号量的本质就是对临界资源中特定小块的预定机制。为什么要有信号量呢?

信号量可以让线程在访问临界资源前就知道临界资源的使用情况,可以通过信号量评估资源的使用率。

信号量一般都需要什么操作?

我们知道信号量是计数器,线程要申请信号量,信号量就一定要被所有线程都能看到,那么信号量本身就是公共资源。由于信号量是公共资源,那么申请(p)信号量资源(sem--)和归还(v)信号量资源(sem++)的操作就必须是原子的。信号量的核心操作:PV原语。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性。环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态,但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。

生产者和消费者在什么情况下会访问同一个位置呢?1.队列为空的时候,2.队列为满的时候,其他情况下,生产者和消费者访问的是不同的位置。也就是说在环形队列中,大部分情况下,单生产和单消费是可以并发的执行的,只有在满或者空时,才会有互斥与同步的问题!

//RingQueue.hpp
#pragma once

#include <iostream>
#include <vector>
#include <cassert>

#include <semaphore.h>
#include <pthread.h>

using namespace std;

static const int gcap = 5;
template<class T>
class RingQueue
{
public:
    void P(sem_t& sem)  //获取信号量资源
    {
        int n = sem_wait(&sem);
        assert(n == 0);
        (void)n;
        
    }
    void V(sem_t& sem)  //归还信号量资源
    {
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;

    }
public:
    RingQueue(const int& cap = gcap)
        :_queue(cap),_cap(cap)
    {
        int n = sem_init(&_spaceSem,0,_cap);
        assert(n == 0);
        n = sem_init(&_dataSem,0,0);
        assert(n == 0);
        _ProductorStep = _ConsumerStep = 0;
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);

    }
    void push(const T& in)
    {
        P(_spaceSem);
        pthread_mutex_lock(&_pmutex);

        _queue[_ProductorStep++] = in;
        _ProductorStep %= _cap;

        pthread_mutex_unlock(&_pmutex);
        V(_dataSem);
    }
    void pop(T* out)
    {
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);

        *out = _queue[_ConsumerStep++];
        _ConsumerStep %= _cap;

        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);
    }
    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    vector<T> _queue;
    int _cap;   //队列容量
    sem_t _spaceSem;    //生产者信号量,->空间资源
    sem_t _dataSem;     //消费者信号量,->数据资源
    int _ProductorStep; //生产者在环形队列的下标
    int _ConsumerStep;  //消费者在环形队列的下标
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};
//mian.cc
#include <ctime>
#include <cstdlib>
#include <unistd.h>

#include "RingQueue.hpp"
#include "Task.hpp"

string Selfname()
{
    char name[128];
    snprintf(name,sizeof(name),"thread[0x%x]",pthread_self());
    return name;
}
void* ProductorRoutine(void* args)
{
    // RingQueue<int>* rq = static_cast<RingQueue<int>* >(args);
    RingQueue<Task>* rq = static_cast<RingQueue<Task>* >(args);

    while(true)
    {
        //生产数据
        // int data = rand()%10+1;
        // rq->push(data);
        // cout<< "生产数据: "<< data <<endl;
        //获取或构建任务,是需要花时间的
        int x = rand()%10+1;
        int y = rand()%5;
        char op = oper[rand()%oper.size()];        // int operIdx = rand()%oper.size();
        Task t(x,y,op,mymath);                    // Task t(x,y,oper[operIdx],mymath);

        rq->push(t);
        cout<< Selfname()<<",生产任务:"<< t.to_Task_string() <<endl;
        sleep(1);
    }
}
void* consumerRoutine(void* args)
{
    // RingQueue<int>* rq = static_cast<RingQueue<int>* >(args);
    RingQueue<Task>* rq = static_cast<RingQueue<Task>* >(args);

    while(true)
    {
        // int data;
        // rq->pop(&data);
        // cout<< "获取数据: "<< data <<endl;

        //消费或执行任务,是需要花时间的
        Task t;
        rq->pop(&t);
        cout<<Selfname() <<",计算任务:"<< t() << endl;
        // sleep(1);
    }
}
//多生产,多消费
int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x1241235);
    RingQueue<Task>* rq = new RingQueue<Task>();

    pthread_t p[5],c[5];
    for(int i = 0;i<5;++i)  pthread_create(p+i,nullptr,ProductorRoutine,rq);
        
    for(int i = 0;i<5;++i)  pthread_create(c+i,nullptr,consumerRoutine,rq);

    for(int i = 0;i<5;++i)  pthread_join(p[i],nullptr);
    for(int i = 0;i<5;++i)  pthread_join(c[i],nullptr);
 
    delete rq;
    return 0;
}
// //单生产,单消费
// int main()
// {
//     srand((unsigned int)time(nullptr) ^ getpid() ^ 0x1241235);
//     RingQueue<Task>* rq = new RingQueue<Task>();
  
//     pthread_t p[5],c[5];
//     pthread_create(&p,nullptr,ProductorRoutine,rq);
//     pthread_create(&c,nullptr,consumerRoutine,rq);

//     pthread_join(p,nullptr);
//     pthread_join(c,nullptr);

 
//     delete rq;

//     return 0;
// }

 完整代码:

lesson11/6环形队列的生产消费模型 · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/480167.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用粒子群算法设计无线传感器网络中的最优安全路由模型(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 无线传感器网络&#xff08;WSN&#xff09;由数十个、数百个甚至数千个自主传感器组成。这些传感器以无线方式嵌入环境中&…

Day49 5.01 C++刷题

Go不是解释型语言&#xff0c;是编译型语言 Java是混合型语言

MOSFET正向导通,阻断,阈值电压研究

一 设计要求&#xff1a; N-为均匀掺杂、其他均为离子注入所形成的高斯掺杂&#xff1b;P的宽度为10、结深6um&#xff1b;氧化层oxide厚度为0.1um,宽度为10um&#xff1b;氧化层左侧空白需要定义为材料air&#xff1b;所有电极都定义为无厚度&#xff1b;所有的高斯掺杂峰值点…

( 数组和矩阵) 485. 最大连续 1 的个数 ——【Leetcode每日一题】

❓485. 最大连续 1 的个数 难度&#xff1a;简单 给定一个二进制数组 nums &#xff0c; 计算其中最大连续 1 的个数。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,0,1,1,1] 输出&#xff1a;3 解释&#xff1a;开头的两位和最后的三位都是连续 1 &#xff0c;所以最大…

Vision Transformer架构Pytorch逐行实现

前言 代码来自哔哩哔哩博主deep_thoughts&#xff0c;视频地址&#xff0c;该博主对深度学习框架方面讲的非常详细&#xff0c;推荐大家也去看看原视频&#xff0c;不管是否已经非常熟练&#xff0c;我相信都能有很大收获。论文An Image is Worth 16x16 Words: Transformers f…

iOS审核这些坑,腾讯游戏也踩过

WeTest 导读 在App上架苹果应用商店的过程中&#xff0c;相信大多数iOS开发者往往都有过这样的经历&#xff1a;辛苦开发出来的产品&#xff0c;测试验收也通过了&#xff0c;满怀期待的提交App给苹果审核&#xff0c;结果经常被苹果各种理由拒之门外&#xff0c;苦不堪言。 …

Prometheus监控系统存储容量优化攻略,让你的数据安心保存!

云原生监控领域不可撼动&#xff0c;Prometheus 是不是就没缺点&#xff1f;显然不是。 一个软件如果什么问题都想解决&#xff0c;就会导致什么问题都解决不好。所以Prometheus 也存在不足&#xff0c;广受诟病的问题就是 单机存储不好扩展。 1 真的需要扩展容量吗&#xff…

0x80070570文件或目录损坏且无法读取解决方法

第一种解决方法&#xff1a;命令提示符修复。 1、首先按下“Win标R”键&#xff0c;打开运行。 2、然后如果要修复的文件在E盘&#xff0c;那就输入&#xff1a;chkdsk e: /f&#xff0c;h盘就是&#xff1a;chkdsk h: /f&#xff0c;反正是哪个盘就把中间的字幕改成那个盘的…

ecs思考

VPC网络诊断&#xff0c;从router看起&#xff0c;连接公有子网路有一个默认&#xff0c;再新增一条指向igw路由&#xff1b;连接私有子网路由有一个默认&#xff0c;再新增一条指向NAT网关的路由&#xff0c;其中NAT网关一定要在公有子网中&#xff0c;否则&#xff0c;私有子…

Android 10.0 设置默认浏览器后安装另外浏览器后默认浏览器功能修复

1.前言 在10.0的系统rom定制化开发中,当在系统中有多个浏览器的时候,会在用代码启用浏览器的时候,让用户选择进入哪个浏览器,这样显得特别的不方便 所以产品开发中,要求用RoleManager的相关api来设置默认浏览器,但是在设置完默认浏览器以后,在安装一款浏览器的时候,默认…

〔金融帝国实验室〕(Capitalism Lab)v9.0.00官方重大版本更新!

〖金融帝国实验室〗&#xff08;Capitalism Lab&#xff09;v9.0.00正式发布&#xff01; ◎制作发行&#xff1a;Enlight Software ◎发布时间&#xff1a;2023年04月28日 ————————————— ※v9.0.00更新说明&#xff1a; 1.实现6项数据信息双窗口并列显示&#…

兴寿镇“春踏青,兴寿行”特色旅游线路点靓辛庄

记者&#xff1a;云飞 踏着欢乐的节拍&#xff0c;伴着春日的暖阳&#xff0c;2023年4月29日&#xff0c;北京市昌平区兴寿镇&#xff0c;2023党建引领文旅农产业融合发展系列旅游季——“春踏青&#xff0c;兴寿行”特色旅游线路第二站&#xff0c;在兴寿镇辛庄村圆满举办。 此…

【搭建私有云盘】无公网IP,在外远程访问本地微力同步

文章目录 1.前言2. 微力同步网站搭建2.1 微力同步下载和安装2.2 微力同步网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1.前言 私有云盘作为云存储概念的延伸&#xff0c;虽然谈不上多么新颖&#xff0c;但是其…

《QDebug 2023年4月》

一、Qt Widgets 问题交流 二、Qt Quick 问题交流 1.对 qml 基本类型 list 的编辑 在 Qt5 中&#xff0c;QML 的 list 类型只提供了 push 添加数据&#xff0c;或者重新赋值&#xff0c;没法 pop。到了 Qt6&#xff0c;实测可以对 list 调用 pop/shift 等操作。 Qt5 中可以先…

【Liunx】进程的程序替换——自定义编写极简版shell

目录 进程程序替换[1~5]1.程序替换的接口&#xff08;加载器&#xff09;2.什么是程序替换&#xff1f;3.进程替换的原理4.引入多进程5.系列程序替换接口的详细解析&#xff08;重点&#xff01;&#xff09; 自定义编写一个极简版shell[6~8]6.完成命令行提示符7.获取输入的命令…

Docker 架构

Docker 架构 简介Docker daemon &#xff08;守护进程&#xff09;Docker client &#xff08;客户端&#xff09;Docker registries &#xff08;仓库&#xff09;Images &#xff08;镜像&#xff09;Containers &#xff08;容器&#xff09;The underlying technology &…

前缀和 技巧小记

前缀和 子数组的元素之和&#xff1a;一维前缀和子矩阵的元素之和&#xff1a;二维前缀和前缀和 哈希表&#xff1a;寻找和为 target 的子数组 子数组的元素之和&#xff1a;一维前缀和 前缀和适用于快速、频繁地计算一个索引区间内的元素之和。 int res 0; // 存储区间[…

链表:常见面试题-拷贝特殊链表

题目&#xff1a; 一种特殊的单链表节点类描述如下: class Node { int value; Node next; Node rand; Node(int val) {value val} } rand指针是单链表节点结构中新增的指针&#xff0c;rand可能指向链表中的任意一个节点&#xff08;包括自己&#xff09;&#xff0c;也可…

计算机电脑中了勒索病毒怎么办,Windows系统中了faust勒索病毒解密数据恢复

电脑的操作系统被恶意软件攻击已不再是新鲜的话题了。而攻击的恶意软件中有一种叫做faust勒索病毒&#xff0c;常常袭击Windows电脑系统。如果我们的电脑在使用Windows操作系统时感染了faust勒索软件&#xff0c;请不要慌张&#xff0c;我们可以咨询专业的数据恢复厂商&#xf…

深度学习技巧应用11-模型训练中稀疏化参数与稀疏损失函数的应用

大家好,我是微学AI,今天给大家介绍一下深度学习技巧应用11-模型训练中稀疏化参数与稀疏损失函数的应用,在训练神经网络的过程中,将稀疏损失加入到常规损失函数的作用主要是降低模型复杂性和提高模型泛化能力。通过引入稀疏性约束,优化算法会在减小常规损失的同时,尽量让参…