Linux之条件变量,信号量,生产者消费者模型

news2024/12/14 16:27:43

Linux之条件变量,信号量,生产消费者模型,日志以及线程池

  • 一.条件变量
    • 1.1条件变量的概念
    • 1.2条件变量的接口
  • 二.信号量
    • 2.1信号量的重新认识
    • 2.2信号量的接口
  • 三.生产者消费者模型
    • 3.1生产者消费者模型的概念
    • 3.2基于阻塞队列的生产者消费者模型
    • 3.3基于环形队列的生产者消费者模型

一.条件变量

1.1条件变量的概念

在学习了互斥量后我们能够避免了多线程并发访问公共资源的情况下造成数据不一致的情况,但是在解决了这位问题后还会有另外一个问题:当有一个线程开锁后其他的线程会不断的继续申请锁从而造成对时间的浪费以及对开锁线程来说的饥饿问题。
想要解决这个问题我们就需要使用互斥和同步中的同步也就是利用条件变量,条件变量简单来说就是让线程在面对已经被申请走的了锁后进行有序的等待直到解锁后再被唤醒,也就如同条件变量这个名字在满足了条件后再继续申请锁在没有满足条件时就进行阻塞。

1.2条件变量的接口

在了解了条件变量后我们就来通过使用条件变量的接口来让大家更好的理解条件变量。

  1. 初始化和销毁条件变量
    在这里插入图片描述

注意:条件变量和互斥量相似,申请静态的条件变量时不需要销毁,而申请动态的条件变量就需要销毁

  1. 让线程在条件变量下等待
    在这里插入图片描述

为什么条件变量需要传入一个锁呢?因为让线程进行阻塞等待时必须让其把所拥有的锁资源进行释放以免造成死锁问题

  1. 唤醒线程
    在这里插入图片描述

在了解了条件变量的接口后我们可以快速利用之前的抢票程序写一个条件变量的使用场景

#include <iostream>
#include <pthread.h>
#include <unistd.h>

int ticket = 5;
pthread_mutex_t mutex;
pthread_cond_t cond;
void *PthreadRoutinue(void *args)
{
    std::string threadname = static_cast<char *>(args);
    while (true)
    {
        //在判断前进行申请锁
        pthread_mutex_lock(&mutex);
        //进行条件判断
        if (ticket > 0)
        {
            //判断成功就获得票
            std::cout << threadname << " get a ticker:" << ticket-- << std::endl;
            usleep(1000);
        }
        else
        {
            std::cout << "没有票了" << std::endl;
            //判断失败就让该线程进行等待直到被唤醒
            //将等待的操作放在临界区中是有其用意的
            //1.线程被阻塞时会释放锁资源从而让别的线程可以来申请锁
            //2.如果该线程被唤醒也是在wait的返回中,它会继续去申请锁资源
            //3.当线程被唤醒后继续申请锁资源也是要参与对于锁资源的竞争的
            pthread_cond_wait(&cond,&mutex);
        }
        pthread_mutex_unlock(&mutex);

    }
}

int main()
{
    pthread_t tid,tid1,tid2;
    pthread_mutex_init(&mutex,nullptr);
    pthread_cond_init(&cond,nullptr);

    pthread_create(&tid, NULL, PthreadRoutinue, (void*)"thread-1");
    pthread_create(&tid1, NULL, PthreadRoutinue, (void*)"thread-2");
    pthread_create(&tid2, NULL, PthreadRoutinue, (void*)"thread-3");

    sleep(5);

    while(true)
    {
        //对于临界资源的访问最好都要申请锁
        pthread_mutex_lock(&mutex);
        ticket += 5;
        pthread_mutex_unlock(&mutex);
        //一个一个的唤醒
        //pthread_cond_signal(&cond);
        //一次性全部唤醒
        pthread_cond_broadcast(&cond);

        sleep(6);

    }

    return 0;
}

在这里插入图片描述

二.信号量

2.1信号量的重新认识

在我们学习进程间通信的时候提到过systemV中的信号量当时我们说信号量的本质是一个计数器,信号量中存储的就是对于公共资源来说有多少块没有被使用的资源,也就是说信号量不像锁那样是将公共资源当作一整块来使用而是将其划分出了不同的区域来让线程进行访问。当然如果信号量中的值为1那么它其实就是一个互斥锁。
那么我们申请信号量的本质也就是预定资源,就像我们在看电影的时候只要买了票那么那个座位就一定是我的也就是我们预定了那个座位。同时我们在进程间通信的时候也说了信号量有pv操作而pv操作是原子的所以我们对于信号量的使用大致分为三步:申请信号量,访问指定的一个位置也就是访问临界资源,释放信号量。我们知道在申请锁了之后我们还需要进行判断通过判断我们才知道资源有没有满还能不能被访问但是申请信号量则不同,因为只要申请成功了信号量那么就说明临界资源中已经有一块资源是属于我们的了所以不需要再进行判断了。

2.2信号量的接口

在重新认识了信号量后我们来学习一下信号量的接口

  1. 初始化信号量
    在这里插入图片描述
  2. 销毁信号量
    在这里插入图片描述
  3. 信号量的p操作即申请信号量
    在这里插入图片描述
  4. 信号量的v操作也就是释放信号量
    在这里插入图片描述

在了解了信号量的接口后我们就暂时不提供对信号量的测试代码了而是在我们之后学习生产者消费者模型中有一种实现方法需要利用信号量那时候我们会知道如何使用信号量。

三.生产者消费者模型

3.1生产者消费者模型的概念

生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
我们可以利用超市的例子来为大家更好的理解生产者消费者模型在这里插入图片描述
而在生产者消费者模型中有三种关系,两个角色和一个场所。
三种关系:

  • 生产和生产者之间是竞争即互斥关系
    联系现实中也是如此在超市中一种物品可能会有很多的品牌而这些不同的品牌来自不同的供货商,这些供货商之间肯定是竞争关系,他们都不想有其他的供货商参与自己的供货。
  • 消费者和消费者之间是竞争即互斥关系
    这个其实也很好理解就例如前几年的疫情期间有大批的人去超市中买盐当供不应求时消费者和消费者自然而然就形成了竞争关系,都想让自己买到。而如今大家无法太感同身受主要是现在的超市太大了不会产生供不应求的情况
  • 生产和消费者之间是竞争和同步即互斥和同步的关系
    同步关系可能比较好理解,对于超市来说希望顾客在上货完成之后立马就来买东西,在买完东西后再进行上货。而互斥也就是超市希望顾客在上货的期间不要来买东西。

两个角色:

  • 生产者
  • 消费者

一个场所:
-仓库

而生产者消费者模型的好处:

  1. 是让生产者产生数据和消费者消费数据形成一种解耦,利用超市即仓库这一个场所来完成解耦即消费者不找生产者要数据而是从仓库中取数据,生产者也不直接提供给消费者数据而是向仓库中放数据所以这个仓库就类似于一种缓冲区的作用从而完成解耦。
  2. 支持并发,我们利用互斥和同步来完成三种关系从而造成生产者和互斥者都可以拥有多个即多个生产线程和多个消费线程。
  3. 支持忙闲不均,也是利用互斥和同步来造成如果生产者少消费者那就让消费者等生产者生产,如果消费者少生产者多那就让生产者等消费者。
  4. 加快效率

3.2基于阻塞队列的生产者消费者模型

生产者消费者模型有很多种的实现方法,我们先来介绍基于阻塞队列的实现方法
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

我们先使用单生产者单消费者的生产者消费者模型来让大家更好的理解。

#Makefile
testmain:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
	
.PHONY:clean
clean:
	rm -f main
//block_queue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>

int defaultcapacity = 5;

// 不知道传入的是什么类型的数据所以使用模板
template <class T>
class block_queue
{
public:
    block_queue(int capacity = defaultcapacity)
        : _capacity(capacity)
    {
        // 初始化锁和条件变量
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    ~block_queue()
    {
        // 销毁锁和条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    bool IsFull()
    {
        return _q.size() == _capacity;
    }

    bool IsEmpty()
    {
        return _q.size() == 0;
    }

    void Push(const T &in)
    {
        // 生产者生产数据的过程
        // 1.先申请锁
        // 2.进行判断如果队列满了就进行阻塞,没满就进行生产数据
        // 3.生产数据
        // 4.解锁并唤醒一个阻塞的消费者

        pthread_mutex_lock(&_mutex);

        // 注意:不能使用if来进行判断
        // 因为使用if判断后当生产线程被唤醒后就默认当前队列是没有满的
        // 但是在实际使用过程中会有很多生产线程在生产数据所以线程在被唤醒后
        // 当前队列满没满是不一定的!!!所以最好使用while循环来判断从而当
        // 线程被唤醒后还要再进行一次判断,判断成功后才能生产数据
        // 这种循环判断让代码有了更强的鲁棒性也就是健壮性
        //  if(IsFull())
        //  {
        //      pthread_cond_wait(&_pcond,&_mutex);
        //  }
        while (IsFull())
        {
            pthread_cond_wait(&_pcond, &_mutex);
        }

        _q.push(in);

        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_ccond);
    }

    void Pop(T *out)
    {
        // 消费者消费数据的过程
        // 1.申请锁
        // 2.进行判断如果队列空了就进行阻塞,没空就进行消费数据
        // 3.消费数据
        // 4.解锁并唤醒一个阻塞的生产者.

        pthread_mutex_lock(&_mutex);

        while (IsEmpty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }

        *out = _q.front();
        _q.pop();

        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_pcond);
    }

private:
    std::queue<T> _q; // 队列
    int _capacity;    // 限制队列的最大容量

    pthread_mutex_t _mutex; // 生产者生产数据和消费者消费数据都需要申请锁
    pthread_cond_t _pcond;  // 给生产者的条件变量
    pthread_cond_t _ccond;  // 给消费者的条件变量
};
//main.cc
#include "block_queue.hpp"

void *producter(void *args)
{
    block_queue<int> *q = static_cast<block_queue<int> *>(args);
    // 生产者
    // 生产数据
    int data = 1;
    while (true)
    {
        // 生产者减慢
        sleep(3);
        //  p->push()
        q->Push(data);
        std::cout << "producter data: " << data++ << std::endl;
    }
}

void *consumer(void *args)
{
    block_queue<int> *c = static_cast<block_queue<int> *>(args);
    // 消费者
    // 消费数据
    while (true)
    {
        // 消费者减慢
        //sleep(1);
        // c->pop()
        int t;
        c->Pop(&t);
        // 处理数据
        std::cout << "consumer data:" << t << std::endl;
    }
}

int main()
{
    // 以最简单的整型变量为例
    block_queue<int> *bq = new block_queue<int>();
    pthread_t pid, cid;
    pthread_create(&pid, nullptr, producter, bq);
    pthread_create(&cid, nullptr, consumer, bq);

    pthread_join(pid, nullptr);
    pthread_join(cid, nullptr);

    return 0;
}

在这里插入图片描述
单生产者单消费者的生产者消费者模型已经搭建了大概但是其中还是有可以改进的地方例如我们申请锁和解锁需要调用两个函数那么我们能否可以建立一个锁的类让其构造函数申请锁析构函数释放锁呢?但是如果直接构造一个锁的类是不是不太安全那么我们可以再构造一个锁的守护者的类让其包含锁类从而只要我们创建一个锁的守护者就可以申请锁然后等到其生命周期结束我们就可以让其自动释放锁。
还有我们在使用模板时说到我们不知道数据的类型是什么所以使用模板那么我们只能传内置类型吗?不止吧,我们甚至可以传一个类对象过去所以我们是可以让生产者去传一个任务给消费者让其进行执行任务的操作的。
而且大家再思考一下如果我们想要实现多生产者多消费者的模型需要改动什么代码吗?不需要!我们锁有了条件变量有了即使生产者消费者多了又如何我们已经维护好了三种关系不需要再改进其中的代码了。
所以下面我将改进后的代码也给大家

// block_queue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "LockGuard.hpp"

int defaultcapacity = 5;

// 不知道传入的是什么类型的数据所以使用模板
template <class T>
class block_queue
{
public:
    block_queue(int capacity = defaultcapacity)
        : _capacity(capacity)
    {
        // 初始化锁和条件变量
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    ~block_queue()
    {
        // 销毁锁和条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    bool IsFull()
    {
        return _q.size() == _capacity;
    }

    bool IsEmpty()
    {
        return _q.size() == 0;
    }

    void Push(const T &in)
    {
        // 生产者生产数据的过程
        // 1.先申请锁
        // 2.进行判断如果队列满了就进行阻塞,没满就进行生产数据
        // 3.生产数据
        // 4.解锁并唤醒一个阻塞的消费者

        // pthread_mutex_lock(&_mutex);
        // 利用LockGuard
        // 当push完成后lg的生命周期到了就会自动调用析构函数来释放锁
        LockGuard lg(&_mutex);

        // 注意:不能使用if来进行判断
        // 因为使用if判断后当生产线程被唤醒后就默认当前队列是没有满的
        // 但是在实际使用过程中会有很多生产线程在生产数据所以线程在被唤醒后
        // 当前队列满没满是不一定的!!!所以最好使用while循环来判断从而当
        // 线程被唤醒后还要再进行一次判断,判断成功后才能生产数据
        // 这种循环判断让代码有了更强的鲁棒性也就是健壮性
        //  if(IsFull())
        //  {
        //      pthread_cond_wait(&_pcond,&_mutex);
        //  }
        while (IsFull())
        {
            pthread_cond_wait(&_pcond, &_mutex);
        }

        _q.push(in);

        // pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_ccond);
    }

    void Pop(T *out)
    {
        // 消费者消费数据的过程
        // 1.申请锁
        // 2.进行判断如果队列空了就进行阻塞,没空就进行消费数据
        // 3.消费数据
        // 4.解锁并唤醒一个阻塞的生产者.

        // pthread_mutex_lock(&_mutex);
        LockGuard lg(&_mutex);

        while (IsEmpty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }

        *out = _q.front();
        _q.pop();

        // pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_pcond);
    }

private:
    std::queue<T> _q; // 队列
    int _capacity;    // 限制队列的最大容量

    pthread_mutex_t _mutex; // 生产者生产数据和消费者消费数据都需要申请锁
    pthread_cond_t _pcond;  // 给生产者的条件变量
    pthread_cond_t _ccond;  // 给消费者的条件变量
};
// Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>

const int defaultvalue = 0;

enum
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};

const std::string opers = "+-*/%)(&";

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op)
        : data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
    {
    }
    void Run()
    {
        switch (oper)
        {
        case '+':
            result = data_x + data_y;
            break;
        case '-':
            result = data_x - data_y;
            break;
        case '*':
            result = data_x * data_y;
            break;
        case '/':
        {
            if (data_y == 0)
                code = div_zero;
            else
                result = data_x / data_y;
        }
        break;
        case '%':
        {
            if (data_y == 0)
                code = mod_zero;
            else
                result = data_x % data_y;
        }

        break;
        default:
            code = unknow;
            break;
        }
    }
    void operator()()
    {
        Run();
        //sleep(2);
    }
    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=?";

        return s;
    }
    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=";
        s += std::to_string(result);
        s += " [";
        s += std::to_string(code);
        s += "]";

        return s;
    }
    ~Task()
    {
    }

private:
    int data_x;
    int data_y;
    char oper; // + - * / %

    int result;
    int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};

// LockGuard.hpp
#pragma once
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *lock)
        : _lock(lock)
    {
    }
    ~Mutex()
    {
    }
    void Lock()
    {
        pthread_mutex_lock(_lock);
    }
    void Unlock()
    {
        pthread_mutex_unlock(_lock);
    }

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock)
        :_mutex(lock)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.Unlock();
    }

private:
    Mutex _mutex;
};
// main.cc
#include "block_queue.hpp"
#include "Task.hpp"
#include <time.h>

class PthreadDate
{

public:
    block_queue<Task> *_bq;
    std::string _name;
};
void *producter(void *args)
{
    PthreadDate *p = static_cast<PthreadDate *>(args);
    // 生产者
    // 生产数据
    // int data = 1;

    while (true)
    {
        int data1 = rand() % 10;
        usleep(rand() % 123);
        int data2 = rand() % 10;
        usleep(rand() % 123);
        char oper = opers[rand() % (opers.size())];
        Task t(data1, data2, oper);
        std::cout << p->_name << " " << " producter data: " << t.PrintTask() << std::endl;
        // 生产者减慢
        // sleep(1);
        //  p->push()
        p->_bq->Push(t);
    }
}

void *consumer(void *args)
{
    PthreadDate *c = static_cast<PthreadDate *>(args);
    // 消费者
    // 消费数据
    while (true)
    {
        // 消费者减慢
        // sleep(1);
        // c->pop()
        Task t;
        c->_bq->Pop(&t);
        // 处理数据
        t();
        std::cout << c->_name << " " << "consumer data:" << t.PrintResult() << std::endl;
    }
}

int main()
{
    // 传输任务
    srand((uint16_t)time(nullptr) * getpid() * pthread_self()); // 形成随机的数据

    // 以最简单的整型变量为例
    // block_queue<int> *bq = new block_queue<int>();

    // 多生产者多消费者
    // 传输任务
    block_queue<Task> *bq = new block_queue<Task>();
    pthread_t pid[3], cid[2];
    PthreadDate *td1 = new PthreadDate();
    td1->_bq = bq;
    td1->_name = "thread-1";
    pthread_create(&pid[0], nullptr, producter, td1);

    PthreadDate *td2 = new PthreadDate();
    td2->_bq = bq;
    td2->_name = "thread-2";
    pthread_create(&pid[0], nullptr, producter, td2);

    PthreadDate *td3 = new PthreadDate();
    td3->_bq = bq;
    td3->_name = "thread-3";
    pthread_create(&pid[0], nullptr, producter, td3);

    PthreadDate *td4 = new PthreadDate();
    td4->_bq = bq;
    td4->_name = "thread-4";
    pthread_create(&cid[0], nullptr, consumer, td4);

    PthreadDate *td5 = new PthreadDate();
    td5->_bq = bq;
    td5->_name = "thread-5";
    pthread_create(&cid[1], nullptr, consumer, td5);

    pthread_join(pid[0], nullptr);
    pthread_join(pid[1], nullptr);
    pthread_join(pid[2], nullptr);
    pthread_join(cid[0], nullptr);
    pthread_join(cid[1], nullptr);

    return 0;
}

在完成了模拟实现生产者消费者模型后大家可能有一个疑问:我们在介绍生产者消费者模型的时候说到它有个优点是加快效率,但是现在我们模拟实现后发现生产者生产数据和消费者消费数据之间是互斥的并且还会对线程进行阻塞,也就是是生产者生产数据和消费者消费数据是串行运行的。那么这又有何高效之说呢?
这个问题的答案其实很简单,生产者生产的数据是从哪来的消费者消费的数据又是去哪了呢?我们模拟实现的生产者消费者模型中的数据都是我们提供的所以可能不太明显,在以后我们使用生产者消费者模型可能数据都是通过网络获得的并且这些数据是要被处理的。所以生产数据和消费数据只是模型中的一部分,生产者消费者模型的高效是体现在获取数据和处理数据上生产数据和消费数据即使是互斥的是串行的也只是多花费了那么一点点时间而已但是在获取数据和处理数据上节省的时间远大于这花费的数据所以生产者消费者模型是高效的。

3.3基于环形队列的生产者消费者模型

在了解了基于阻塞队列实现的生产者消费者模型后我们还有没有其他的方法来实现生产者消费者模型呢?有,那就是利用信号量和环形队列来实现。我们在学习信号量的时候知道信号量不同于锁是将临界资源看作一整块来进行访问而是将其划分成不同的区域从而让线程来访问临界资源中的一段资源。那么我们就可以利用环形队列这个数据结构来配合信号量从而实现生产者消费者模型,环形队列其实说是队列更像是一个数组,不过是把数组的头尾结合在一起我们利用对信号量的初始化来规定将其划分成多少个区域也就是这个数组中最大的容量是多少,对于超过这个容量的位置我们只需要将其模上容量即可。
在这里插入图片描述
那么对于这个生产者消费者模型也是符合三个关系,两个角色,一个场所的。只是实现方法不同则具体的实现逻辑也就不一样。
对于基于环形队列实现的生产者消费者模型中我们可以思考一下生产者和消费者之间的逻辑关系,如果生产者将队列生产满了也就是生产者对消费者套了一个圈的处理方法是什么,如果消费者的消费速度过快超过了生产者又会怎么样呢?这两种情况的处理方法其实也很简单一旦生产者套圈了消费者那么只能让消费者先跑,如果消费者超过了生产者那么只能让生产者先跑。
所以生产者和消费者只有两种情况会指向同一个位置:

  1. 队列为空
    只能让生产者先跑,这个代表了互斥,先跑代表了对于环形队列来说局部是需要进行同步的
  2. 队列为满
    只能让消费者先跑,同样是有互斥和同步蕴含在其中。

而其他情况下生产者和消费者根本不会指向同一个位置而这句话也就说明我们是多线程并发进入的临界区。
同时对于环形队列中的资源我们也需要重新认识一下,对于生产者来说它是生产数据的所以它需要环形队列中的空间资源,而对于消费者来说它是消费数据所以它需要的是环形队列中的数据资源。
所以我们在定义信号量变量的时候我们需要定义两个一个是空间资源的信号量也就是给生产者使用的还有一个是数据资源的信号量也就是给消费者使用的。而初始化这两个信号量时初始化的值也不同,空间资源需要初始化为满数据资源需要初始化为0。
那么我们也可以通过一段伪代码来大概看一下生产者和消费者生产和消费数据时需要干一些什么事情。
在这里插入图片描述

在了解了生产者和消费者大概需要干的事情后我们就来直接上代码吧

// Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>

const int defaultvalue = 0;

enum
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};

const std::string opers = "+-*/%)(&";

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op)
        : data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
    {
    }
    void Run()
    {
        switch (oper)
        {
        case '+':
            result = data_x + data_y;
            break;
        case '-':
            result = data_x - data_y;
            break;
        case '*':
            result = data_x * data_y;
            break;
        case '/':
        {
            if (data_y == 0)
                code = div_zero;
            else
                result = data_x / data_y;
        }
        break;
        case '%':
        {
            if (data_y == 0)
                code = mod_zero;
            else
                result = data_x % data_y;
        }

        break;
        default:
            code = unknow;
            break;
        }
    }
    void operator()()
    {
        Run();
        //sleep(2);
    }
    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=?";

        return s;
    }
    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=";
        s += std::to_string(result);
        s += " [";
        s += std::to_string(code);
        s += "]";

        return s;
    }
    ~Task()
    {
    }

private:
    int data_x;
    int data_y;
    char oper; // + - * / %

    int result;
    int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};

// LockGuard.hpp
#pragma once
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *lock)
        : _lock(lock)
    {
    }
    ~Mutex()
    {
    }
    void Lock()
    {
        pthread_mutex_lock(_lock);
    }
    void Unlock()
    {
        pthread_mutex_unlock(_lock);
    }

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock)
        :_mutex(lock)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.Unlock();
    }

private:
    Mutex _mutex;
};
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <unistd.h>
#include "LockGuard.hpp"
#include "Task.hpp"

const int defaultcapacity = 5;

template <class T>
class ring_queue
{
public:
    ring_queue(int capacity = defaultcapacity)
        : _capacity(capacity), _v(capacity), _p_step(0), _c_step(0)
    {
        sem_init(&_space_sem, 0, capacity); // 生产者的信号量初始化时为满
        sem_init(&_data_sem, 0, 0);         // 消费者的信号量初始化时为空

        pthread_mutex_init(&_pmutex, nullptr);
        pthread_mutex_init(&_cmutex, nullptr);
    }
    ~ring_queue()
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }

    void Push(const T &in)
    {
        // 生产者生产数据分为三步
        // 1.申请空间信号量和锁
        // 2.生产数据
        // 3.释放数据信号量和锁

        // 是先申请信号量还是先申请锁呢?
        // 这两种方法其实都可以
        // 1.先申请信号量就是让线程提前预定资源后再去竞争锁去生产数据
        // 那些没有申请到锁资源的就先进行阻塞在解锁后再去竞争
        // 2.而先申请锁的就是让先申请到锁资源的线程去申请信号量
        // 而其他的线程只能等到解锁后再竞争再去申请信号量
        // 虽然说两种方法都可以但是推荐使用第一种方法因为先申请信号量
        // 避免了线程一个一个的申请信号量因为信号量是对资源的提前预定
        // 所以最好还是先让所有线程去申请信号量。
        P(_space_sem);
        // pthread_mutex_lock(&_pmutex);
        {
            // 利用局部代码块来限定lg的生命周期从而达到在V操作前解锁的作用
            LockGuard lg(&_pmutex);
            _v[_p_step] = in;
            _p_step++;
            _p_step %= _capacity;

            pthread_mutex_unlock(&_pmutex);
        }

        V(_data_sem);
    }

    void Pop(T *out)
    {
        // 消费者消费锁分为三步
        // 1.申请数据信号量和锁
        // 2.消费数据
        // 3.释放空间信号量和锁

        P(_data_sem);

        {
            LockGuard lg(&_cmutex);
            *out = _v[_c_step];
            _c_step++;
            _c_step %= _capacity;
        }

        V(_space_sem);
    }

private:
    std::vector<T> _v;
    int _capacity; // 容量

    sem_t _space_sem; // 生产者
    sem_t _data_sem;  // 消费者

    int _p_step; // 生产者的生产位置
    int _c_step; // 消费者的消费位置

    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;

    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
};

#include "ring_queue.hpp"

class PthreadDate
{
public:
    ring_queue<Task> *_bq;
    std::string _name;
};

void *producter(void *args)
{
    // 生产者
    PthreadDate *p = static_cast<PthreadDate *>(args);
    while (true)
    {

        // sleep(1);
        int data1 = rand() % 10;
        usleep(rand() % 123);
        int data2 = rand() % 10;
        usleep(rand() % 123);
        char oper = opers[rand() % (opers.size())];
        Task t(data1, data2, oper);
        std::cout << p->_name << " " << " producter data: " << t.PrintTask() << std::endl;
        //  rq.push()
        p->_bq->Push(t);
    }
}

void *consumer(void *args)
{
    // 消费者
    PthreadDate *c = static_cast<PthreadDate *>(args);

    while (true)
    {

        //sleep(5);
        // rq.pop()
        Task t;
        c->_bq->Pop(&t);
        t();

        std::cout << c->_name << " " << "consumer data:" << t.PrintResult() << std::endl;
    }
}

int main()
{
    // 以整型变量为例子先
    // ring_queue<int> *rq = new ring_queue<int>();

    // 同样对于环形队列也可以传输类对象
    srand((uint16_t)time(nullptr) * getpid() * pthread_self()); // 形成随机的数据
    ring_queue<Task> *rq = new ring_queue<Task>();

    // 多生产者多消费者
    // 传输任务
    ring_queue<Task> *bq = new ring_queue<Task>();
    pthread_t pid[3], cid[2];

    PthreadDate *td1 = new PthreadDate();
    td1->_bq = bq;
    td1->_name = "thread-1";
    pthread_create(&pid[0], nullptr, producter, td1);

    PthreadDate *td2 = new PthreadDate();
    td2->_bq = bq;
    td2->_name = "thread-2";
    pthread_create(&pid[0], nullptr, producter, td2);

    PthreadDate *td3 = new PthreadDate();
    td3->_bq = bq;
    td3->_name = "thread-3";
    pthread_create(&pid[0], nullptr, producter, td3);

    PthreadDate *td4 = new PthreadDate();
    td4->_bq = bq;
    td4->_name = "thread-4";
    pthread_create(&cid[0], nullptr, consumer, td4);

    PthreadDate *td5 = new PthreadDate();
    td5->_bq = bq;
    td5->_name = "thread-5";
    pthread_create(&cid[1], nullptr, consumer, td5);

    pthread_join(pid[0], nullptr);
    pthread_join(pid[1], nullptr);
    pthread_join(pid[2], nullptr);
    pthread_join(cid[0], nullptr);
    pthread_join(cid[1], nullptr);

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2259427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何写出优秀的单元测试?

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 写出优秀的单元测试需要考虑以下几个方面&#xff1a; 1. 测试用例设计 测试用例应该覆盖被测试代码的不同场景和边界情况&#xff0c;以尽可能发现潜在的问题。…

【竞技宝】LOL:JDG官宣yagao离队

北京时间2024年12月13日,在英雄联盟S14全球总决赛结束之后,各大赛区都已经进入了休赛期,目前休赛期也快进入尾声,LPL大部分队伍都开始陆续官宣转会期的动向,其中JDG就在近期正式官宣中单选手yagao离队,而后者大概率将直接选择退役。 近日,JDG战队在官方微博上连续发布阵容变动消…

2024美赛数学建模C题:网球比赛中的动量,用马尔可夫链求解!详细分析

文末获取历年美赛数学建模论文&#xff0c;交流思路模型 接下来讲解马尔可夫链在2024年C题中的运用 1. 马尔科夫链的基本原理 马尔科夫链是描述随机过程的一种数学模型&#xff0c;其核心特征是无记忆性。 简单来说&#xff0c;系统在某一时刻的状态只取决于当前状态&#x…

图形学笔记 - 5. 光线追踪 - RayTracing

Whitted-Style Ray tracing 为什么要光线追踪 光栅化不能很好地处理全局效果 软阴影尤其是当光线反射不止一次的时候 栅格化速度很快&#xff0c;但质量相对较低 光线追踪是准确的&#xff0c;但速度很慢 光栅化&#xff1a;实时&#xff0c;光线追踪&#xff1a;离线~10K …

Nginx之配置防盗链(Configuring Anti-hotlinking in Nginx)

运维小白入门——Nginx配置防盗 什么是防盗链&#xff1a; 防盗链技术主要用于防止未经授权的第三方或域名访问网站的静态资源。例如&#xff0c;一个网站可能拥有独特的图片素材&#xff0c;为了防止其他网站通过直接链接图片URL的方式访问这些图片&#xff0c;网站管理员会采…

51c大模型~合集89

我自己的原文哦~ https://blog.51cto.com/whaosoft/12815167 #OpenAI很会营销 而号称超强AI营销的灵感岛实测成效如何&#xff1f; OpenAI 是懂营销的&#xff0c;连续 12 天发布&#xff0c;每天一个新花样&#xff0c;如今刚过一半&#xff0c;热度依旧不减。 毫无疑问&…

深度学习的unfold操作

unfold&#xff08;展开&#xff09;是深度学习框架中常见的数据操作。与我们熟悉的卷积类似&#xff0c;unfold也是使用一个特定大小的窗口和步长自左至右、自上至下滑动&#xff0c;不同的是&#xff0c;卷积是滑动后与核求乘积&#xff08;所以取名为卷积&#xff09;&#…

Jetpack Compose赋能:以速破局,高效打造非凡应用

Android Compose 是谷歌推出的一种现代化 UI 框架&#xff0c;基于 Kotlin 编程语言&#xff0c;旨在简化和加速 Android 应用开发。它以声明式编程为核心&#xff0c;与传统的 View 系统相比&#xff0c;Compose 提供了更直观、更简洁的开发体验。以下是对 Android Compose 的…

Dual-Write Problem 双写问题(微服务)

原文链接https://www.confluent.io/blog/dual-write-problem/ 双写问题发生于当两个外部系统必须以原子的方式更新时。 问题 说有人到银行存了一笔钱&#xff0c;触发 DepositFunds 命令&#xff0c;DepositFunds 命令被发送到Account microservice。 Account microservice需…

桥接模式的理解和实践

桥接模式&#xff08;Bridge Pattern&#xff09;&#xff0c;又称桥梁模式&#xff0c;是一种结构型设计模式。它的核心思想是将抽象部分与实现部分分离&#xff0c;使它们可以独立地进行变化&#xff0c;从而提高系统的灵活性和可扩展性。本文将详细介绍桥接模式的概念、原理…

kubeadm安装K8s集群之高可用组件keepalived+nginx及kubeadm部署

系列文章目录 1.kubeadm安装K8s集群之基础环境配置 2.kubeadm安装K8s集群之高可用组件keepalivednginx及kubeadm部署 3.kubeadm安装K8s集群之master节点加入 4.kubeadm安装K8s集群之worker1节点加入 kubeadm安装K8s集群之高可用组件keepalivednginx及kubeadm部署 1.安装kubeadm…

Avalonia实战实例三:实现可输入框的ComboBox控件

文章目录 一、Avalonia中的ComboBox控件二、更改Template&#xff0c;并添加水印 接着上篇关闭按钮实现登录界面 实现一个可输入&#xff0c;可下拉的用户名输入框 一、Avalonia中的ComboBox控件 Avalonia中Fluent主题里ComboBox实现&#xff1a; <ControlTheme x:Key&q…

TMS320C55x DSP芯片结构和CPU外围电路

第2章 DSP芯片结构和CPU外围电路 文章目录 第2章 DSP芯片结构和CPU外围电路TMS320C55x处理器的特点TMS320c55x CPU单元指令缓冲(Instruction Buffer Unit) I单元程序流程(Program Flow Unit) P单元地址数据(Address-data Flow Unit) A单元数据计算(Data Computation Unit) D单元…

Oracle 与 达梦 数据库 对比

当尝试安装了达梦数据库后&#xff0c;发现达梦真的和Oracle数据库太像了&#xff0c;甚至很多语法都相同。 比如&#xff1a;Oracle登录数据库采用sqlplus&#xff0c;达梦采用disql。 比如查看数据视图&#xff1a;达梦和Oracle都有 v$instance、v$database、dba_users等&a…

数据结构之五:排序

void*类型的实现&#xff1a;排序&#xff08;void*类型&#xff09;-CSDN博客 一、插入排序 1、直接插入排序 思想&#xff1a;把待排序的数据逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。 单趟&#x…

JavaWeb:JavaScript

学习 资源1 学习资源 2 黑马javaweb 1、引入方式 内部脚本&#xff1a; <script>内容</script> 外部脚本&#xff1a; <script src"js/test.js"></script> 2、基础语法 注释&#xff1a;// /* */ 结尾分号可有可无 大括号表示代码块 …

MySQL其五,索引详解,逻辑架构,SQL优化等概念

目录 一、索引 1、索引的概念 2、索引的优缺点 3、添加索引的原则 4、索引的分类 5、索引如何使用 6、存储过程讲解 7、测试索引的效率 7、索引的数据结构 8、覆盖索引&#xff08;SQL优化的点&#xff09; 9、最佳左前缀法则&#xff08;SQL优化的点&#xff09; 二…

考研数学【线性代数基础box(数二)】

本文是对数学二线性代数基础进行总结&#xff0c;一些及极其简单的被省略了&#xff0c;代数的概念稀碎&#xff0c;不如高数关联性高&#xff0c;所以本文仅供参考&#xff0c;做题请从中筛选&#xff01; 本文为初稿&#xff0c;后面会根据刷题和自己的理解继续更新 第一章…

全面解析租赁小程序的功能与优势

内容概要 租赁小程序正在逐渐改变人与物之间的互动方式。通过这些小程序&#xff0c;用户不仅可以轻松找到所需的租赁商品&#xff0c;还能够享受无缝的操作体验。为了给大家一个清晰的了解&#xff0c;下面我们将重点介绍几个核心功能。 建议&#xff1a;在选择租赁小程序时&…

Linux DNS 协议概述

1. DNS 概述 互联网中&#xff0c;一台计算机与其他计算机通信时&#xff0c;通过 IP 地址唯一的标志自己。此时的 IP 地址就类似于我们日常生活中的电话号码。但是&#xff0c;这种纯数字的标识是比较难记忆的&#xff0c;而且数量也比较庞大。例如&#xff0c;每个 IPv4 地址…