Linux_生产消费模型_Block_Queue

news2025/1/22 15:48:00

目录

一、互斥锁

1.1 错误的抢票

1.1.1 类的成员函数与构造

1.1.2 start 函数 

1.1.3 线程的回调函数 

1.1.4 main 函数

1.1.5 结果  

1.2 概念

1.3 相关系统调用

1.3.1 锁的创建

1.3.2 锁的初始化

1.3.2.1 动态初始化

1.3.2.2 静态初始化

1.3.3 锁的销毁

1.3.4 加锁和解锁

1.4 正确的抢票

1.4.1 简单锁的封装

1.4.2 线程封装的改进 

1.4.3 main 函数的改进

二、条件变量

2.1 概念

2.2 相关系统调用

2.2.1 条件变量的创建

2.2.2 条件变量的初始化

2.2.3 条件变量的销毁

2.2.4 条件变量的等待

2.2.5 条件变量的唤醒

三、单线程生产消费模型(BQ)

3.1 初步测试代码

3.2 阻塞队列的封装

3.2.1 成员变量

3.2.2 构造析构

3.2.3 生产者模型

3.2.4 消费者模型

3.2.5 整体代码 

3.3 Main.cc 的修改

3.3.1 整体代码

3.3.2 int main 部分

3.3.3 start 函数

3.3.4 wait 函数

3.4 整体代码

3.4.1 BlockQueue.hpp

3.4.2 Thread.hpp

3.4.3 Main.cc

四、多生产多消费模型


一、互斥锁

1.1 错误的抢票

下面先看封装的一个简单的多线程:

#pragma once

#include <iostream>
#include <string>
#include <pthread.h>
namespace ThreadMoudle
{
    typedef void(*func_t)(const std::string &name);//函数指针类型
    class Thread
    {
    public:
        void Excute()
        {
            std::cout << _name << " is running" << std:: endl;
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }
    public:
        Thread(const std::string &name, func_t func):_name(name), _func(func)
        {
            std::cout << "create: " << name << "done" << std::endl;
        }
        std::string Name()
        {
            return _name;
        }
        static void *ThreadRoutine(void *args)
        {
            Thread *self = static_cast<Thread*>(args);//获得了当前对象
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if (n != 0) return false;//创建函数成功返回0
            return true;
        }
        std::string Status()
        {
            if (_isrunning) return "running";
            else return "sleep";
        }
        void Stop()
        {
            if (_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
                std::cout << "Stop" << std::endl;
                std::cout << _name << " Stop" << std::endl;
            }
        }
        void Join()
        {
            if (!_isrunning)
            {
                ::pthread_join(_tid, nullptr);
                std::cout << _name << "Join" << std::endl;
                std::cout << _name << "Joined" << std::endl;
            }
        }
        ~Thread()
        {
        }
    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func;//线程要执行的回调函数
    };
}

1.1.1 类的成员函数与构造

现在来逐步看这个封装类,首先来看一下类成员与类的构造函数:

    typedef void(*func_t)(const std::string &name);//函数指针类型    
    public:
        Thread(const std::string &name, func_t func):_name(name), _func(func)
        {
            std::cout << "create: " << name << "done" << std::endl;
        }
    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func;//线程要执行的回调函数

这里为线程起了一个名字_name,线程的tid为_tid,线程的状态以表明该线程是否在运行_isrunning,以及线程要执行的回调函数。

1.1.2 start 函数 

        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if (n != 0) return false;//创建函数成功返回0
            return true;
        }

在start函数中,封装了 pthread_create 的函数调用,这里不进行详细说明,以后专门写一篇来描述单线程以及多线程方面的知识。这里的 pthreada_create 就是创建一个线程,而且因为传入的是_tid 的引用,所以该系统调用会自动把 _tid 修改为创建线程的 tid 。

1.1.3 线程的回调函数 

    public:
        void Excute()
        {
            std::cout << _name << " is running" << std:: endl;
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }
        static void *ThreadRoutine(void *args)
        {
            Thread *self = static_cast<Thread*>(args);//获得了当前对象
            self->Excute();
            return nullptr;
        }

这里并没有直接写线程要执行的回调函数操作,而是当作中介将参数作为 args参数包 传递给了Excute 函数,这样做的目的是为了让函数更简洁并且这里也有一个坑,成员函数不能直接用作线程函数,因为成员函数需要一个隐藏的 this 指针,而线程库无法自动传递这个指针(参考1.0.2的start函数中的 pthread_create 系统调用):

void* (*start_routine) (void*);//pthread库的线程函数签名是固定的

因此,需要定义一个静态的 ThreadRoutine 函数,这个函数不依赖于任何对象实例,可以被pthread_create 直接调用。在 Thread_Routine 中,通过 static_cast 将 void* 类型的参数转换为 Thread* 类型的指针,从而获得对象实例的指针,然后调用该对象的成员函数 Excute 。这样,Excute 函数就可以访问该对象的成员变量和成员函数。 

1.1.4 main 函数

首先以简单的思维创建一下多线程,模拟一下让几个线程一起进行抢票:

#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
using namespace ThreadMoudle;
int tickets = 1000;

void route(const std::string &name)
{
    while(true)
    {
        if(tickets > 0)
        {
            // 抢票过程
            usleep(1000); // 1ms -> 抢票花费的时间
            printf("who: %s, get a ticket: %d\n", name.c_str(), tickets);
            tickets--;
        }
        else
        {
            break;
        }
    }
}
int main()
{
    Thread t1("thread-1", route);
    Thread t2("thread-2", route);
    Thread t3("thread-3", route);
    Thread t4("thread-4", route);

    t1.Start();
    t2.Start();
    t3.Start();
    t4.Start();

    t1.Join();
    t2.Join();
    t3.Join();
    t4.Join();
}

按代码来说,在回调函数 route 中,设置了while (tikects > 0) 的条件,也就是说,当 tickets 为正数时,才会进入循环内部执行打印操作,理应抢到还有 0 张票时就停止,但是,运行几次可能会发现,最后会出现某个线程 get 了第 0 张票甚至是负数票。

1.1.5 结果  

观看下面的结果,就会发现错误,这里引入一个概念,票数在这里其实相当于临界资源,所有线程都可以访问并修改,那么就存在线程执行的过快,最后一张票可能沦落到多个线程手中,但是最快执行完的线程打印出了 get a ticket:1 ,并把 1 进行了修改,其他的线程因为执行速度过快,在票还没有变为 0 之前就已经进入了循环,所以会继续执行后续的操作。

打个比方,如果一辆公交车限乘30人,每个人上车后需要进行刷卡,当第30个人已经上车时,后面还有两三个人也挤上来了,第30个人完成刷卡后,公交车检测刷卡人数等于30人,关闭了车门,但此时车上的人已经多余30人,多余的人也可以继续刷卡乘车。

此时,就需要一个规则,比如公交车一次上一人,这个人刷卡后才能上第二个,迁移到 Linux ,这就是互斥锁。

1.2 概念

临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区

互斥锁就可以锁住临界区,当开锁后,临界区才会打开来接收下一个线程,下一个线程被成功接收后又进行了上锁,这里,每个线程的关系称为互斥。就好比公交车一次一人的规则,这里的每个乘客就是互斥关系。

互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

每个线程执行的操作不能被打断,要么就不进行,要么就进行完成,这就是原子性。好比一个人上公交车要么就不乘车,要么就刷卡上车,不能还没刷卡就被别人打断。

原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.3 相关系统调用

1.3.1 锁的创建

这是互斥锁的类型,当要设置互斥锁的时候就可以使用:

pthread_mutex_t _mutex;

1.3.2 锁的初始化

1.3.2.1 动态初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);

参数:
mutex:要初始化的互斥量,注意要传引用
attr:nullptr

1.3.2.2 静态初始化

以上是动态初始化,在定义 mutex 时,同样可以静态传值直接初始化:

pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;

1.3.3 锁的销毁

销毁互斥量需要注意:
1.使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
2.不要销毁一个已经加锁的互斥量
3.已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

注意这里也要传入锁的引用。

1.3.4 加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//返回值:成功返回0,失败返回错误号

加锁时,可能会遇到以下的情况:

1.互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
2.发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

这里的阻塞不是一直阻塞,当锁空着的时候就会自动加锁然后返回成功。

1.4 正确的抢票

知道了 mutex 的使用,就可以在上述代码中把抢票的代码改为临界区。

1.4.1 简单锁的封装

这里当调用锁的时候就会自动构造初始化加锁,出临界区后自动调用析构函数进行解锁。 

#pragma once
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);//构造函数时直接加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);//析构函数时直接解锁
    }
private:
    pthread_mutex_t *_mutex;
};

1.4.2 线程封装的改进 

这里只是多加了一个线程的数据类型的类 ThreadData ,在这个类中设置了一个锁,并且在线程封装中使用 ThreadData 新建了变量 td ,这样在需要加锁时只需要创建 td 中的锁(详见main函数)即可。

所以这里的 Thread 的构造函数也调整了,可以直接使用 Thread 的构造函数为 _td 传入锁。

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>

namespace ThreadMoudle
{
    class ThreadData
    {
    public:
        ThreadData(const std::string &name, pthread_mutex_t *lock):_name(name), _lock(lock)
        {}
    public:
        std::string _name;
        pthread_mutex_t *_lock;
    };

    typedef void (*func_t)(ThreadData *td); // 函数指针类型

    class Thread
    {
    public:
        void Excute()
        {
            std::cout << _name << " is running" << std::endl;
            _isrunning = true;
            _func(_td);
            _isrunning = false;
        }
    public:
        Thread(const std::string &name, func_t func, ThreadData *td):_name(name), _func(func), _td(td)
        {
            std::cout << "create " << name << " done" << std::endl;
        }
        static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
        {
            Thread *self = static_cast<Thread*>(args); // 获得了当前对象
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if(n != 0) return false;
            return true;
        }
        std::string Status()
        {
            if(_isrunning) return "running";
            else return "sleep";
        }
        void Stop()
        {
            if(_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
                std::cout << _name << " Stop" << std::endl;
            }
        }
        void Join()
        {
            ::pthread_join(_tid, nullptr);
            std::cout << _name << " Joined" << std::endl;
            delete _td;
        }
        std::string Name()
        {
            return _name;
        }
        ~Thread()
        {
        }

    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func; // 线程要执行的回调函数
        ThreadData *_td;
    };
} // namespace ThreadModle

1.4.3 main 函数的改进

首先进行了锁的创建于初始化,其次将锁传入创建的线程中,当使用回调函数后,传入类的锁然后进行锁的构造,就可以完成加锁的操作,出生命域临时变量被销毁此时自动解锁。

#include <iostream>
#include <vector>
#include <cstdio>
#include <unistd.h>
#include "Thread.hpp"
#include "LockGuard.hpp"

using namespace ThreadMoudle;

int tickets = 10000; // 共享资源,造成了数据不一致的问题

void route(ThreadData *td)
{
    while (true)
    {
        LockGuard lockguard(td->_lock); // RAII风格的锁
        if (tickets > 0)
        {
            // 抢票过程
            usleep(1000); // 1ms -> 抢票花费的时间
            printf("who: %s, get a ticket: %d\n", td->_name.c_str(), tickets);
            tickets--;
        }
        else
        {
            break;
        }
    }
}

static int threadnum = 4;

int main()
{
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex, nullptr);

    std::vector<Thread> threads;
    for(int i = 0; i < threadnum; i++)
    {
        std::string name = "thread-" + std::to_string(i+1);
        ThreadData *td = new ThreadData(name, &mutex);
        threads.emplace_back(name, route, td);
    }

    for(auto &thread : threads)
    {
        thread.Start();
    }


    for(auto &thread : threads)
    {
        thread.Join();
    }

    pthread_mutex_destroy(&mutex);

}

二、条件变量

2.1 概念

  • 等待特定条件的发生: 条件变量允许一个或多个线程等待某个条件变为真,而无需不断轮询某个变量的状态,从而节省CPU资源线程可以在等待条件变量的同时进入休眠状态,当条件满足时被唤醒。

  • 线程间的协调和通信: 条件变量提供了一种机制,使得一个线程可以通知另一个线程某个条件已经满足。这对于需要在多个线程之间共享资源并协调访问的场景非常有用。

  • 解决生产者-消费者问题: 在生产者-消费者问题中,生产者线程生成数据并放入缓冲区,消费者线程从缓冲区中取出数据处理。条件变量可以用于协调生产者和消费者之间的工作,例如,当缓冲区为空时消费者等待,当缓冲区有数据时通知消费者。

这里也打个比方,如果想买某个商品但是商店缺货,我们需要反复跑去超市确认,毋庸置疑,这是一件费事费力的事情, 但是如果当第一次去超市时,把我们的电话留给店员,当有货时就电话通知我们,是不是就方便灵活了很多?

2.2 相关系统调用

2.2.1 条件变量的创建

pthread_cond_t

和互斥锁类似,条件变量的数据类型为 pthread_cond_t 

2.2.2 条件变量的初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);

参数:
cond:要初始化的条件变量
attr:nullptr 

2.2.3 条件变量的销毁

int pthread_cond_destroy(pthread_cond_t *cond)

2.2.4 条件变量的等待

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释

2.2.5 条件变量的唤醒

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

其中第一个是唤醒一个线程,第二个是唤醒所有的线程。 

三、单线程生产消费模型(BQ)

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

3.1 初步测试代码

这里直接复用上述封装的线程库:

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        Thread(func_t<T> func, T &data, const std::string &name="none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T> *self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T &_data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} // namespace ThreadModule

#endif

main 函数中,使用 vector<Thread<int>> 包装一个线程数组, 

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
int a = 10;
void Consumer(int &data)
{
    while (data)
    {
        std::cout << "Consumer: " << data-- << std::endl;
        sleep(1);
    }
}
void StartConsumer(std::vector<Thread<int>> *threads, int num)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "Thread-" + std::to_string(i + 1);
        threads->emplace_back(Consumer, a, name);//执行的函数 向函数传入的参数 name
        threads->back().Start();//取到最后一个线程并启动
    }
}

void Productor(int &data)
{
    while (data)
    {
        std::cout << "Productor: " << data-- << std::endl;
        sleep(1);
    }
}
void StartProductor(std::vector<Thread<int>> *threads, int num)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "Thread-" + std::to_string(i + 1);
        threads->emplace_back(Productor, a, name);
        threads->back().Start();
    }
}

void WaitAllThread(std::vector<Thread<int>> &threads)
{
    for (auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    std::vector<Thread<int>> threads;
    StartConsumer(&threads, 1);
    StartProductor(&threads, 1);
    WaitAllThread(threads);
    return 0;
}



以上是对消费者与生产者提供的接口,每次启动时把线程的数组传入,以及创建的线程数量传入即可,这里因为是单线程, num 默认传为 1 .



在对消费者与生产者提供的接口中,封装一个消费者与生产者执行的行为。其次,代表其行为的函数可以传入引用来标准执行的次数,所以就需要传入全局变量而不是局部变量,可以看到最上方定义了一个全局变量 a 来规定执行的次数。

下面看一下程序,生产者和消费者都在有序的访问并修改同一份空间。

3.2 阻塞队列的封装

3.2.1 成员变量

下面就要开始封装 blockqueue 阻塞队列,首先来看一下成员变量:

private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _product_cond;
    pthread_cond_t _consume_cond;

阻塞队列其中一定要包含一个队列,所以定义了一个 _block_queue 的队列,以后生产者和消费者就在这个队列中添加与取数据。 

其次,我们需要生产者与消费者有序的访问而且访问时不再受其他线程的打扰,所以还需要带锁, _mutex 。
最后,在阻塞队列中,当队列满了以后,生产者无法生产;当队列为空时,消费者无法消费。这就要求我们为生产者与消费者各定义一个条件变量:_product_cond  _consume_cond

3.2.2 构造析构

这两个成员函数相对来说比较简单,只需要继续使用系统调用即可:

public:
    BlockQueue(int cap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consume_cond, nullptr);
    }    
    ~BlockQueue()
    {
        _cap = 0;
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consume_cond);
    }

3.2.3 生产者模型

生产者模型其实就是入队列的过程,当队列为满时,停止入队,此时让它的条件变量休眠,反之,唤醒条件变量并将生产的数据加入队列:

    void Enqueue(T &in)//提供给生产者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsFull())//队列已满,无法继续生产
        {
            //释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。
            //线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。
            pthread_cond_wait(&_product_cond, &_mutex);
        }
        //满足条件可以生产
        _block_queue.push(in);
        //唤醒消费者来消费
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }

其中, pthread_cond_wait 的作用值得再次阐述一遍:

释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。

线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足并重新拿到锁。

3.2.4 消费者模型

消费者模型与生产者模型类似,都是一样的逻辑,但是当队列为空时:wait ;当队列为满时:signal 

    void Pop(T *out)//提供给消费者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsEmpty())
        {
            pthread_cond_wait(&_consume_cond, &_mutex);
        }
        *out = _block_queue.front();
        _block_queue.pop();
        //唤醒生产者来生产
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }

3.2.5 整体代码 

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template<typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consume_cond, nullptr);
    }
    void Enqueue(T &in)//提供给生产者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsFull())//队列已满,无法继续生产
        {
            //释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。
            //线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。
            pthread_cond_wait(&_product_cond, &_mutex);
        }
        //满足条件可以生产
        _block_queue.push(in);
        //唤醒消费者来消费
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T *out)//提供给消费者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsEmpty())
        {
            pthread_cond_wait(&_consume_cond, &_mutex);
        }
        *out = _block_queue.front();
        _block_queue.pop();
        //唤醒生产者来生产
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        _cap = 0;
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consume_cond);
    }
private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _product_cond;
    pthread_cond_t _consume_cond;
};
#endif

3.3 Main.cc 的修改

3.3.1 整体代码

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
int a = 10;

void Consumer(BlockQueue<int> &bq)
{
    while (true)
    {
        int data;
        bq.Pop(&data);
        std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;
        sleep(1);
    }
}

void Productor(BlockQueue<int> &bq)
{
    int cnt = 1;
    while (true)
    {
        bq.Enqueue(cnt);
        std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;
        cnt++;
    }
}

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Productor);
}

void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{
    for (auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;

    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

3.3.2 int main 部分

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;

    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

下面从 main 函数作为切入点,首先我们把线程数组中的线程节点改为阻塞队列,紧接着就开始调用生产者与消费者的 start 函数,并传入了新建的阻塞队列 bq ,保证生产者与消费者使用的是同一份阻塞队列

3.3.3 start 函数

为了函数的简洁,我们创建了一个公共区域:

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Productor);
}

现在不论生产者或是消费者在启动时,只需要访问同一份代码即可,不同的是传入的线程执行方法不同。 

在 StartComm 中,对线程数组进行了 emplace_back ,emplace_back 会根据传入的参数自动构造一个数组的成员,也就是 thread ,传入参数为线程的执行方法、阻塞队列、线程名称。

然后来看一下 StartComm 中不同线程要执行的不同方法,就是分别打印信息:

void Consumer(BlockQueue<int> &bq)
{
    while (true)
    {
        int data;
        bq.Pop(&data);
        std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;
        sleep(1);
    }
}

void Productor(BlockQueue<int> &bq)
{
    int cnt = 1;
    while (true)
    {
        bq.Enqueue(cnt);
        std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;
        cnt++;
    }
}

下面重新回看 int main 部分:

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;
    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

首行的 bq 默认的容量设置为 10 ,我们创建的生产者线程数量为 1 ,所以生产者的执行函数会执行 10 次,依次打印 1 - 11 的线程名称。然后创建了消费者线程数量,此时消费者的执行函数又会执行 10 次,所以打印内容为:
 
可以看到它们访问的地址空间都是同一份。

3.3.4 wait 函数

线程等待就比较简单了,直接调用系统调用即可。

3.4 整体代码

3.4.1 BlockQueue.hpp

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template<typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consume_cond, nullptr);
    }
    void Enqueue(T &in)//提供给生产者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsFull())//队列已满,无法继续生产
        {
            //释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。
            //线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。
            pthread_cond_wait(&_product_cond, &_mutex);
        }
        //满足条件可以生产
        _block_queue.push(in);
        //唤醒消费者来消费
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T *out)//提供给消费者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsEmpty())
        {
            pthread_cond_wait(&_consume_cond, &_mutex);
        }
        *out = _block_queue.front();
        _block_queue.pop();
        //唤醒生产者来生产
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        _cap = 0;
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consume_cond);
    }
private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _product_cond;
    pthread_cond_t _consume_cond;
};
#endif

3.4.2 Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        Thread(func_t<T> func, T &data, const std::string &name="none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T> *self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T &_data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} // namespace ThreadModule

#endif

3.4.3 Main.cc

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
//int a = 10;

void Consumer(BlockQueue<int> &bq)
{
    while (true)
    {
        int data;
        bq.Pop(&data);
        std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;
        sleep(1);
    }
}

void Productor(BlockQueue<int> &bq)
{
    int cnt = 1;
    while (true)
    {
        bq.Enqueue(cnt);
        std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;
        cnt++;
    }
}

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Productor);
}

void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{
    for (auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;
    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

四、多生产多消费模型

假设一个场景,假设以下为消费者,当队列有1个数据时,多个线程同时被唤醒,那么此时只有一个线程可以竞争得到锁,看一下我们的程序:


此时线程已经被唤醒,应该直接执行下面的代码,但是锁并不在它身上,当它访问的时候,阻塞队列中唯一的数据已经被竞争锁成功的线程拿走了,此时再进行访问就会出现错误,这种唤醒被称作伪唤醒。所以我们需要把判断中的 if 改为 while ,每次都要判断阻塞队列是否为空,同时,生产者线程中每次都要判断队列是否为满。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1859967.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在电脑上免费下载并安装 VMware Workstation Pro

下载 VMware 首先我们先来看一看免费下载VMware Workstation Pro 的具体步骤&#xff1a; 首先我们可以先进入 Broadcom 注册页面&#xff1a;https://profile.broadcom.com/web/registration&#xff0c;然后这时候就需要注册&#xff0c;我们可以使用电子邮件进行注册。 在…

B端系统:增删改查中的新建(增)页面如何设计体验更爽。

在B系统中&#xff0c;增删改查是最基本、最常用的功能之一。这四个操作对于系统的正常运行和数据管理至关重要。其中&#xff0c;新增&#xff08;新建&#xff09;页面的设计尤为关键&#xff0c;因为它直接影响着用户体验和系统功能的完整性。 一、新增&#xff08;新建&…

springboot加载注入bean的方式

在SpringBoot的大环境下&#xff0c;基本上很少使用之前的xml配置Bean&#xff0c;主要是因为这种方式不好维护而且也不够方便。 springboto注入bean主要采用下图几种方式&#xff0c;分为本地服务工程注解声明的bean和外部依赖包中的bean。 一、 springboot装配本地服务工程…

Linux环境下安装MySQL5.7.20(源码安装)

&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;本专栏主要发表mysql实战的文章&#xff0c;文章主要包括&#xff1a; 各版本数据库的安装、备份和恢复,性能优化等内容的学习。。 &#x1f4e3; ***如果需要观看配套视频的小伙伴们&#xff0c;请…

单体架构改造为微服务架构之痛点解析

1.微服务职责划分之痛 1.1 痛点描述 微服务的难点在于无法对一些特定职责进行清晰划分&#xff0c;比如某个特定职责应该归属于服务A还是服务B? 1.2 为服务划分原则的痛点 1.2.1 根据存放主要数据的服务所在进行划分 比如一个能根据商品ID找出商品信息的接口&#xff0c;把…

Redis持久化(RDB、AOF)详解

Redis持久化详解 一、Redis为什么需要持久化&#xff1f; Redis 是一个基于内存的数据库&#xff0c;拥有极高的读写性能&#xff0c;但是内存中的数据在断电或服务器重启时会全部丢失&#xff0c;因此需要一种持久化机制来将数据保存到硬盘上&#xff0c;以便在需要时进行恢复…

R语言数据分析案例34-基于ARIMA模型的武汉市房价趋势与预测研究

一、背景阐述 房地产行业对于国民经济和社会及居民的发展和生活具有很大的影响&#xff0c;而房价能够体现经济运转的好坏&#xff0c;因而房价的波动牵动着开发商和购房者的关注&#xff0c;城市房价预测是一个研究的热点问题&#xff0c;研究房价对民生问题具有重要意义。 …

【面试干货】Java中的++操作符与线程安全性

【面试干货】Java中的操作符与线程安全性 1、什么是线程安全性&#xff1f;2、 操作符的工作原理3、 操作符与线程安全性4、如何确保线程安全&#xff1f;5、 结论 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java编程中&#xff0c;操…

思维导图MindManager2024最新版,让你的思维飞起来!

亲爱的朋友们&#xff0c;今天我要跟大家分享一款我近期深度使用并彻底被种草的神器——MindManager2024最新版本的思维导图软件。作为一位对效率和创意有着极高追求的内容创作者&#xff0c;我几乎尝试过市面上所有的思维导图工具&#xff0c;而MindManager2024无疑是其中的佼…

跟着DW学习大语言模型-什么是知识库,如何构建知识库

建立一个高效的知识库对于个人和组织来说非常重要。无论是为了个人学习和成长&#xff0c;还是为了组织的持续创新和发展&#xff0c;一个完善的知识管理系统都是不可或缺的。那么&#xff0c;如何建立一个高效的知识库呢&#xff1f; 在建立知识库之前&#xff0c;首先需要确定…

【办公类-51-01】月评估数字生成01-平均数空值

期末需要制作月评估&#xff0c;每月给孩子的能力水平打分。 以前我是做在EXCEL里&#xff0c;手动打分&#xff0c;然后用公式计算1、2、3出现的个数&#xff0c;然后计算平均数&#xff0c;最后复制到Word里。 因为是手动计算&#xff0c;每次都要算很长时间&#xff0c;确保…

借助TheGraph 查询ENS信息

关于ENS (以太坊域名服务) ENS 全称是 Ethereum Name Service,它是一个建立在以太坊区块链上的去中心化域名系统。 ENS 在 Web3 领域发挥着重要作用,主要有以下几个方面: 可读性更好的地址: ENS 允许用户将复杂的以太坊地址(如 0x12345…) 映射为更简单易记的域名。这极大地提…

KUBIKOS - Animated Cube Mini BIRDS(卡通立方体鸟类)

软件包中添加了对通用渲染管线 (URP) 的支持! KUBIKOS - 动画立方体迷你鸟是17种不同的可爱低多边形移动友好鸟的集合!每只都有自己的动画集。 完美收藏你的游戏! +17种不同的动物! + 低多边形(400~900个三角形) + 操纵和动画! + 4096x4096 纹理图集 + Mecanim 准备就绪…

生命在于学习——Python人工智能原理(4.3)

三、Python的数据类型 3.1 python的基本数据类型 3.1.4 布尔值&#xff08;bool&#xff09; 在Python中&#xff0c;布尔值是表示真或假的数据类型&#xff0c;有两个取值&#xff0c;True和False&#xff0c;布尔值常用于控制流程、条件判断和逻辑运算&#xff0c;本质上来…

项目实训-接口测试(十八)

项目实训-后端接口测试&#xff08;十八&#xff09; 文章目录 项目实训-后端接口测试&#xff08;十八&#xff09;1.概述2.测试对象3.测试一4.测试二 1.概述 本篇博客将记录我在后端接口测试中的工作。 2.测试对象 3.测试一 这段代码是一个单元测试方法&#xff0c;用于验证…

idea 开发工具properties文件中的中文不显示

用idea打开一个项目&#xff0c;配置文件propertise中的中文都不展示&#xff0c;如图&#xff1a; 可修改idea配置让中文显示&#xff1a; 勾选箭头指向的框即可&#xff0c;点击应用保存&#xff0c;重新打开配置文件&#xff0c;显示正常

篮球联盟管理系统

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;球员管理&#xff0c;用户管理&#xff0c;球队管理&#xff0c;论坛管理&#xff0c;篮球资讯管理&#xff0c;基础数据管理 前台账户功能包括&#xff1a;系统首页&#xff0…

Cell2Sentence:为LLM传输生物语言

像GPT这样的LLM在自然语言任务上表现出了令人印象深刻的性能。这里介绍一种新的方法&#xff0c;通过将基因表达数据表示为文本&#xff0c;让这些预训练的模型直接适应生物背景&#xff0c;特别是单细胞转录组学。具体来说&#xff0c;Cell2Sentence将每个细胞的基因表达谱转换…

前端架构(含演进历程、设计内容、AI辅助设计、架构演进历程)

前端架构的演进历程 前端架构师的必要条件 全面的技术底蕴全局观&#xff08;近期 远期&#xff09;业务要有非常深刻的理解沟通协调能力和团队意识深刻理解前端架构的原则和模式 前端架构的设计内容 技术选型(库、工具、标准规范、性能、安全、扩展性 )设计模式及代码组织(模…

ADS SIPro使用技巧之RapidScan-Z0

PCB走线的阻抗对每个网络的信号完整性至关重要&#xff0c;但是&#xff0c;验证每个信号是不切实际的&#xff0c;尤其对于设计复杂度很高的产品而言&#xff0c;设计者的有限精力只能用于关注关键的设计点&#xff0c;这一过程往往会造成一些设计的疏忽从而导致错误。 ADS SI…