【ONE·Linux || 多线程(二)】

news2024/10/6 14:37:43

总言

  多线程:生产者消费者模型与两种实现方式(条件变量、信号量)、线程池。

文章目录

  • 总言
  • 4、生产者消费者模型
    • 4.1、基本概念
    • 4.2、基于BlockingQueue的生产者消费者模型(理解条件变量)
      • 4.2.1、单生产者单消费者模式(1.0)
        • 4.2.1.1、阻塞队列 BlocQueue.hpp:充当交易场所
        • 4.2.2.2、生产者消费者线程 ConProd.cc
        • 4.2.2.3、演示结果与补充说明
      • 4.2.2、多生产者多消费者模式(2.0)
        • 4.2.2.1、引入任务派发 Task.hpp
        • 4.2.2.2、加锁方式设计 lockGuard.hpp:PAII风格
        • 4.2.2.3、整体
        • 4.2.2.4、演示结果与补充说明
    • 4.3、基于环形队列的生产消费模型
      • 4.3.1、POSIX信号量介绍
      • 4.3.2、结构需求和结构说明
      • 4.3.3、单生产者单消费者模式(1.0)
        • 4.3.3.1、环形队列 ringQueue.hpp:充当交易场所
        • 4.3.3.2、信号量对象 Sem.hpp
        • 4.3.3.3、生产者消费者线程 testMain.cc
        • 4.3.3.4、演示结果
      • 4.3.4、多生产者多消费者模式(2.0)
        • 4.3.4.1、相关说明
        • 4.3.4.2、整体演示
  • 5、线程池
    • 5.1、概念介绍
    • 5.2、基本演示
      • 5.2.1、thread.hpp
      • 5.2.2、threadpool.hpp
      • 5.2.3、testMain.cc
      • 5.2.4、Task.hpp、log.hpp、lockGuard.hpp

  
  
  

4、生产者消费者模型

4.1、基本概念

在这里插入图片描述

  说明: 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

在这里插入图片描述

  
  PS:在下述编码实现过程中,始终要站在三种关系、两种角色、一个场所这类角色属性的角度来分析考虑,有助于理解代码操作。
  
  
  

4.2、基于BlockingQueue的生产者消费者模型(理解条件变量)

  
   阻塞队列与普通的队列区别在于
      当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
      当队列为满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
      PS:以上空和满的操作是基于不同的线程来说的。
在这里插入图片描述

  
  
  
  
  

4.2.1、单生产者单消费者模式(1.0)

4.2.1.1、阻塞队列 BlocQueue.hpp:充当交易场所

  框架搭建如下图:
在这里插入图片描述
  
  
  具体实现如下:

#pragma once
#include<iostream>
#include<pthread.h>
#include<time.h>
#include<unistd.h>
#include<queue>

using namespace std;

int gDefaultCap = 5;//默认的阻塞队列容量上限

template <class T> // 模板类型
class BlockQueue
{
private:
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }

public:
    //构造函数:初始化容量上限、初始化互斥锁、初始化条件变量
    BlockQueue(int capacity = gDefaultCap)
        : _capacity(capacity)//容量上限:由用户决定需要多大容量
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }

    //析构函数:销毁互斥锁、销毁条件变量
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

    //Push:放数据的接口:由生产者线程不断向阻塞队列中放数据
    void Push(const T& in)   
    {
        //临界资源区:先上锁
        pthread_mutex_lock(&_mutex);
        //判断当前临界资源是否满足条件:对生产者、isQueueFull
        while(isQueueFull()) pthread_cond_wait(&_Full, &_mutex);
        //代码执行到此步:当前生产者线程持有锁,且阻塞队列未满--->可放入数据
        _bq.push(in);
        //完成后,解锁,并唤醒消费者线程(告诉它此时交易场所中有资源,可以取用)
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_Empty);
    }

    //Pop:取数据的接口:由消费者线程不断从阻塞队列中取数据
    void Pop(T* out)
    {
        //临界资源区:先上锁
        pthread_mutex_lock(&_mutex);
        //判断当前临界资源是否满足条件:对消费者、isQueueEmpty
        while(isQueueEmpty()) pthread_cond_wait(&_Empty, &_mutex);
        //代码执行到此步:当前消费者线程持有锁,且阻塞队列不空--->可取到数据
        *out = _bq.front();
        _bq.pop();
        //完成后,解锁,并唤醒生产者线程(告诉它此时交易场所中资源被取出,可补充)
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_Full);
    }

private:
    queue<T> _bq;           // 使用库里的队列做阻塞队列
    int _capacity;          // 阻塞队列的容量上限
    pthread_mutex_t _mutex; // 互斥锁:保证队列安全
    pthread_cond_t _Empty;  // 判断阻塞队列为空的条件变量:关心该条件的是消费者(空了就不能取数据)
    pthread_cond_t _Full;   // 判断阻塞队列为满的条件变量:关心该条件的是生产者(满了就不能放数据)

    // PS:一些解释
    /*
        生产者消费者模型中,阻塞队列充当了交易场所的角色,生产者消费者线程能同时访问该交易场所,即访问了临界资源。
        1、多线程同时Push/pop:因此需要互斥,保证临界资源一次只能让一线程操作;
        2、在Push的同时存在Pop,因此需要同步,保证访问临界资源的顺序性。
    */
};

  
  
  

4.2.2.2、生产者消费者线程 ConProd.cc
#include "BlockQueue.hpp"

// 生产者线程:调用push接口,存入数据
void *Produce(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;

    // 生产数据:
    while (true)
    {
        int data = rand() % 100;
        cout << "producer, 生产一个数据:" << data << endl;
        bqueue->Push(data);
        sleep(1);
    }

    return nullptr;
}

// 消费者线程:调用pop接口,取出数据
void *Consume(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;

    // 消费数据:
    while (true)
    {
        int data;
        bqueue->Pop(&data);
        cout << "consumer, 消费一个数据: " << data << endl;
        sleep(1);
    }

    return nullptr;
}

int main()
{
    //用于充当数据资源
    srand((unsigned int)time(nullptr));

    //创建交易场所:阻塞队列
    BlockQueue<int>* bqueue = new BlockQueue<int>();

    //创建两类线程:生产者、消费者
    pthread_t ptor, cmer;
    pthread_create(&ptor, nullptr, Produce, (void*)bqueue);//arg参数:传入阻塞队列,让两个线程能够看到同一个交易场所
    pthread_create(&cmer, nullptr, Consume, (void*)bqueue);

    //线程终止
    pthread_join(ptor, nullptr);
    pthread_join(ptor, nullptr);

    delete bqueue;//注意释放空间

    return 0;
}

  
  
  

4.2.2.3、演示结果与补充说明

  演示结果:
在这里插入图片描述
  1、可根据需求适当加入策略。如通过sleep控制生产消费双方线程速度,①生产慢、消费快;②生产快、消费慢;③生产消费速度同。等。
  2、但无论上述哪一情况,阻塞队列实际不关心生产者消费者线程谁先运行或者谁在等待,因为在内部实现中已经考虑到执行流程。
  
  
  
  对于生产者消费者模型,其效率优势体现在哪?
  以上述单生产者单消费者为例,一定程度上缓解了生产者和消费者之间的数据处理能力。
在这里插入图片描述
  对于多生产者多消费者,除了上述该点外,也有其它意义,后续说明。
  
  
  
  
  

4.2.2、多生产者多消费者模式(2.0)

4.2.2.1、引入任务派发 Task.hpp
#pragma once
#include <iostream>
#include <functional>

typedef std::function<int(int, int)> func_t;

// 用于派发任务对象:可根据需求设置,此处为演示(两数计算)
class Task
{
public:
    Task(){}; // 默认无参构造

    Task(int x, int y, func_t func) // 需要我们自己传递函数和对应变量
        : _x(x), _y(y), _func(func)
    {
    }

    int operator()()
    {
        return _func(_x, _y);
    }

public:
    int _x;
    int _y;
    func_t _func; // 函数指针
};

  
  
  

4.2.2.2、加锁方式设计 lockGuard.hpp:PAII风格
#pragma once
#include <iostream>
#include<pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *pmutex)
        : _pmutex(pmutex)
    {
    }

    void lock()//加锁
    {
        pthread_mutex_lock(_pmutex);
    }

    void unlock()//解锁
    {
        pthread_mutex_unlock(_pmutex);
    }

private:
    pthread_mutex_t *_pmutex;
};


class lockGuard
{
public:
    lockGuard(pthread_mutex_t* pmutex)//构造:在构造时加锁
    :_mutex(pmutex)//初始化列表初始化_mutex时,调用Mutex的构造函数,需要传入pthread_mutex_t * 类型变量
    {
        _mutex.lock();//上锁
    }

    ~lockGuard()//析构:在对类析构时,顺带就解锁
    {
        _mutex.unlock();//解锁
    }
private:
    Mutex _mutex;
};

  
  
  

4.2.2.3、整体

  实则改动不大。
  1、对BlockQueue.hpp,修改加锁风格,其余部分大体不变。

#pragma once
#include<iostream>
#include<pthread.h>
#include<time.h>
#include<unistd.h>
#include<queue>
#include"lockGuard.hpp"

using namespace std;

int gDefaultCap = 5;//默认的阻塞队列容量上限

template <class T> // 模板类型
class BlockQueue
{
private:
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }

public:
    //构造函数:初始化容量上限、初始化互斥锁、初始化条件变量
    BlockQueue(int capacity = gDefaultCap)
        : _capacity(capacity)//容量上限:由用户决定需要多大容量
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }

    //析构函数:销毁互斥锁、销毁条件变量
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

    //Push:放数据的接口:由生产者线程不断向阻塞队列中放数据
    void Push(const T& in)   
    {
        //临界资源区:先上锁
        lockGuard lockguard(&_mutex);//创建该对象,那么在构造时会加锁,在析构时会解锁。(局部变量,声明周期在该函数中)
        //判断当前临界资源是否满足条件:对生产者、isQueueFull
        while(isQueueFull()) pthread_cond_wait(&_Full, &_mutex);
        //代码执行到此步:当前生产者线程持有锁,且阻塞队列未满--->可放入数据
        _bq.push(in);
        //完成后,唤醒消费者线程(告诉它此时交易场所中有资源,可以取用)
        pthread_cond_signal(&_Empty);
        //出了作用范围,lockguard析构时解锁
    }

    //Pop:取数据的接口:由消费者线程不断从阻塞队列中取数据
    void Pop(T* out)
    {
        //临界资源区:先上锁
        lockGuard lockguard(&_mutex);//创建该对象,那么在构造时会加锁,在析构时会解锁。(局部变量,声明周期在该函数中)
        //判断当前临界资源是否满足条件:对消费者、isQueueEmpty
        while(isQueueEmpty()) pthread_cond_wait(&_Empty, &_mutex);
        //代码执行到此步:当前消费者线程持有锁,且阻塞队列不空--->可取到数据
        *out = _bq.front();
        _bq.pop();
        //完成后,唤醒生产者线程(告诉它此时交易场所中资源被取出,可补充)
        pthread_cond_signal(&_Full);
        //出了作用范围,lockguard析构时解锁

    }

private:
    queue<T> _bq;           // 使用库里的队列做阻塞队列
    int _capacity;          // 阻塞队列的容量上限
    pthread_mutex_t _mutex; // 互斥锁:保证队列安全
    pthread_cond_t _Empty;  // 判断阻塞队列为空的条件变量:关心该条件的是消费者(空了就不能取数据)
    pthread_cond_t _Full;   // 判断阻塞队列为满的条件变量:关心该条件的是生产者(满了就不能放数据)

    // PS:一些解释
    /*
        生产者消费者模型中,阻塞队列充当了交易场所的角色,生产者消费者线程能同时访问该交易场所,即访问了临界资源。
        1、多线程同时Push/pop:因此需要互斥,保证临界资源一次只能让一线程操作;
        2、在Push的同时存在Pop,因此需要同步,保证访问临界资源的顺序性。
    */
};

  2、对ConProd.cc,创建多线程(多个生产者、多个消费者,此部分内容在之前的环节中也有涉及),派发任务与接收任务(任务对象)。

#include "BlockQueue.hpp"
#include "Task.hpp"

int add(int x, int y)
{
    return x+y;
}


// 生产者线程:调用push接口,存入数据
void *Produce(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;

    // 派发任务:
    while (true)
    {
        //生产前环节:获取一个任务
        int x = rand() % 100;
        usleep(1000);//主要是随机数由时间戳生成,这里延长些间隔时间
        int y = rand() % 100;
        Task assign(x, y, add);

        //生产环节:生产者生产任务到阻塞队列
        printf("%p---productor: (%d , %d )\n",pthread_self(), assign._x, assign._y);
        bqueue->Push(assign);
        sleep(2);
    }

    return nullptr;
}

// 消费者线程:调用pop接口,取出数据
void *Consume(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;

    while (true)
    {
        //消费环节:消费者从阻塞队列中获取任务
        Task assign;
        bqueue->Pop(&assign);

        //消费后环节:处理任务
        printf("%p---consumer: addtion result is  %d + %d = %d\n", pthread_self(), assign._x, assign._y, assign());
        printf("\n");
        sleep(2);
    }

    return nullptr;
}

int main()
{
    //用于充当任务对象派发的数据资源
    srand((unsigned int)time(nullptr));

    //创建交易场所:阻塞队列
    BlockQueue<Task>* bqueue = new BlockQueue<Task>();//向阻塞队列中存入的数据是Task任务对象

    //创建两类线程:生产者、消费者
    pthread_t ptor[2], cmer[2];
    
    pthread_create(ptor, nullptr, Produce, (void*)bqueue);//arg参数:传入阻塞队列,让两个线程能够看到同一个交易场所
    pthread_create(ptor+1, nullptr, Produce, (void*)bqueue);//arg参数:传入阻塞队列,让两个线程能够看到同一个交易场所
    pthread_create(cmer, nullptr, Consume, (void*)bqueue);
    pthread_create(cmer+1, nullptr, Consume, (void*)bqueue);

    //线程终止
    pthread_join(ptor[0], nullptr);
    pthread_join(ptor[1], nullptr);
    pthread_join(cmer[0], nullptr);
    pthread_join(cmer[1], nullptr);

    delete bqueue;//注意释放空间

    return 0;
}

  
  

4.2.2.4、演示结果与补充说明

  演示结果:
在这里插入图片描述

  
  
  对多生产者多消费者意义的补充说明
在这里插入图片描述
  至于使用单生产单消费还是多生产多消费,取决于具体场景中,接收任务和处理任务的耗时情况。假设这两环节在整体流程体系中所占据时间比例很小(即任务简单,都不太需要耗费时间),那么此时使用多生产多消费反而是一种累赘行为。
  
  
  
  

4.3、基于环形队列的生产消费模型

4.3.1、POSIX信号量介绍

  1)、信号量介绍
  1、什么是信号量?
  POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

  信号量本质:是一个计数器,用于预定资源。访问临界资源的时候,必须先申请信号量资源(sem–,预订资源,P),使用完毕信号量资源(sem++,释放资源V)
  
  
  2、如何理解信号量的使用?
  我们申请了一个信号量,当前执行流一定具有一个资源可以被使用。至于是哪一个资源,需要结合场景自定义编码完成。
  
  
  2)、相关接口
  PS:要理解信号量接口在生产者消费者模型中的使用,建议将相关接口的基本描述信息(DESCRIPTION)大致看一遍(man)。理解下述几个接口的含义。
  
  初始化信号量:

NAME
       sem_init - initialize an unnamed semaphore

SYNOPSIS
       #include <semaphore.h>
       int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:
       pshared:0表示线程间共享,非零表示进程间共享
       value:信号量初始值
       
DESCRIPTION
       sem_init()  initializes  the  unnamed  semaphore at the address pointed to by sem.
       The value argument specifies the initial value for the semaphore.

  销毁信号量:

NAME
       sem_destroy - destroy an unnamed semaphore

SYNOPSIS
       int sem_destroy(sem_t *sem);

  
  等待信号量:

NAME
       sem_wait, sem_timedwait, sem_trywait - lock a semaphore

SYNOPSIS
       #include <semaphore.h>

       int sem_wait(sem_t *sem);//P(),等待信号量,会将信号量的值减1

       int sem_trywait(sem_t *sem);

       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

DESCRIPTION //具体理解信号量的值value递减至0时会做什么
       sem_wait() decrements (locks) the semaphore pointed to by sem.  If the semaphore's
       value is greater than zero, then the decrement proceeds, and the function returns,
       immediately.   If the semaphore currently has the value zero, then the call blocks
       until either it becomes possible to perform the  decrement  (i.e.,  the  semaphore
       value rises above zero), or a signal handler interrupts the call.

  发布信号量:

NAME
       sem_post - unlock a semaphore

SYNOPSIS
       #include <semaphore.h>

       int sem_post(sem_t *sem);//V(),功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
       
DESCRIPTION //具体理解信号量增加时是在做什么
       sem_post()  increments  (unlocks)  the  semaphore pointed to by sem.  If the sema-
       phore's value consequently becomes greater than  zero,  then  another  process  or
       thread blocked in a sem_wait(3) call will be woken up and proceed to lock the sem-
       aphore.

  
  
  PV操作:是一种实现进程互斥与同步的有效方法,与信号量的处理相关。P表示通过的意思,V表示释放的意思。
  PV操作是典型的同步机制之一。用一个信号量与一个消息联系起来,当信号量的值为0时,表示期望的消息尚未产生;当信号量的值非0时,表示期望的消息已经存在。用PV操作实现进程同步时,调用P操作测试消息是否到达,调用V操作发送消息。
  
  
  

4.3.2、结构需求和结构说明

在这里插入图片描述

  
在这里插入图片描述

  
  
  

4.3.3、单生产者单消费者模式(1.0)

4.3.3.1、环形队列 ringQueue.hpp:充当交易场所
#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<unistd.h>
#include<time.h>
#include"Sem.hpp"
using namespace std;

int g_defualt_num = 5;//环形队列默认容量上限

template<class T>
class ringQueue
{ 
public:
    //构造:初始化数据
    ringQueue(int num = g_defualt_num)
        :_ring_queue(num)//这里是对vector对象进行构造:explicit vector (size_type n, const value_type& val = value_type());
        ,_num(num)
        ,p_step(0)//初始下标为0
        ,c_step(0)//初始下标为0
        ,space_sem(num)//构造信号量,初始时空间资源有num个
        ,data_sem(0)//构造信号量,初始时数据资源有0个
    { }

    //析构
    ~ringQueue()
    {}

    // 向环形队列中放入数据:生产者
    void push(const T &in)
    {
        // 申请信号量:预定空间资源
        space_sem.P();
        // 在特定位置生产数据
        _ring_queue[p_step++] = in;
        p_step %= _num; //[0,_num-1]
        // 生产成功,则意味着数据资源多了一个
        data_sem.V();
    }

    // 向环形队列中取出数据:消费者
    void pop(T *out)
    {
        // 申请信号量:预定数据资源
        data_sem.P();
        // 在特定位置消费数据
        *out = _ring_queue[c_step++];
        c_step %= _num;
        // 消费成功,则意味着空间资源多了一个
        space_sem.V();
    }

private:
    vector<T> _ring_queue;//环形队列:以数组的方式实现
    int _num;//环形队列容量上限
    int p_step;//生产者下标:在当前下标位置放入资源
    int c_step;//消费者下标:在当前下标位置取出资源
    Sem space_sem;//生产者信号量:空间资源
    Sem data_sem;//消费者信号量:数据资源
};

  
  
  
  

4.3.3.2、信号量对象 Sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_

#include<iostream>
#include<semaphore.h>

//信号量对象:因环形队列中需要用到不止一个信号量(包括其相关操作),故对其进行封装处理
class Sem
{
public:
    //构造:对信号量进行初始化,设置该信号量的初始值。
    Sem(unsigned int value)
    {
        sem_init(&_sem, 0, value);
    }

    //析构:销毁信号量
    ~Sem()
    {
        sem_destroy(&_sem);
    }

    //P操作:资源预定,信号量--
    void P()
    {
        sem_wait(&_sem);
    }

    //V操作:资源释放,信号量++
    void V()
    {
        sem_post(&_sem);
    }

private:
    sem_t _sem;
};

#endif

  
  
  
  

4.3.3.3、生产者消费者线程 testMain.cc
#include"ringQueue.hpp"
void* Produce(void* args)
{
    ringQueue<int>* rq = (ringQueue<int>*)args;

    while (true)
    {
        // 创建数据
        int data = rand() % 100 +1 ;
        // 生产者将数据存入交易场所中(环形队列)
        rq->push(data);
        printf("生产者: %d\n",data);
        sleep(1);
    }
}

void* Consume(void* args)
{
    ringQueue<int>* rq = (ringQueue<int>*)args;

    while(true)
    {
        // 消费者从交易场所取出数据并做后续处理
        int data;
        rq->pop(&data);
        printf("消费者:%d\n", data);
        sleep(1);
    }
}

int main()
{
    //用随机数模拟数据
    srand((unsigned int)time(nullptr));

    //创建交易场所:环形队列
    ringQueue<int>* rq = new ringQueue<int>();

    //创建并初始化生产者、消费者线程
    pthread_t ptor, cmer;
    pthread_create(&ptor, nullptr, Produce, (void*)rq);
    pthread_create(&cmer, nullptr, Consume, (void*)rq);

    //线程捕获
    pthread_join(ptor,nullptr);
    pthread_join(cmer,nullptr);

    delete rq;

    return 0;
}

  
  
  
  

4.3.3.4、演示结果

   演示结果:对于数据输出部分可以根据需求调整修改。

在这里插入图片描述

  
  
  
  

4.3.4、多生产者多消费者模式(2.0)

4.3.4.1、相关说明

在这里插入图片描述

  
  
  

4.3.4.2、整体演示

  对testMain.cc:多增了线程创建和线程终止。

#include"ringQueue.hpp"
void* Produce(void* args)
{
    ringQueue<int>* rq = (ringQueue<int>*)args;

    while (true)
    {
        // 创建数据
        int data = rand() % 100 +1 ;
        // 生产者将数据存入交易场所中(环形队列)
        rq->push(data);
        printf("生产者--%p: %d\n",pthread_self(),data);
        sleep(1);
    }
}

void* Consume(void* args)
{
    ringQueue<int>* rq = (ringQueue<int>*)args;

    while(true)
    {
        // 消费者从交易场所取出数据并做后续处理
        int data;
        rq->pop(&data);
        printf("消费者--%p: %d\n", pthread_self(),data);
        sleep(1);
    }
}

int main()
{
    //用随机数模拟数据
    srand((unsigned int)time(nullptr)^getpid());

    //创建交易场所:环形队列
    ringQueue<int>* rq = new ringQueue<int>();

    //创建并初始化生产者、消费者线程
    pthread_t ptor[2], cmer[2];
    pthread_create(ptor, nullptr, Produce, (void*)rq);
    pthread_create(ptor+1, nullptr, Produce, (void*)rq);
    pthread_create(cmer, nullptr, Consume, (void*)rq);
    pthread_create(cmer+1, nullptr, Consume, (void*)rq);

    //线程捕获
    pthread_join(ptor[0],nullptr);
    pthread_join(ptor[1],nullptr);
    pthread_join(cmer[0],nullptr);
    pthread_join(cmer[1],nullptr);

    delete rq;

    return 0;
}

  
  
   对ringQueue.hpp:在环形队列中分别为生产者消费者创建了各自的锁,使用于pushpop接口。

#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<unistd.h>
#include<time.h>
#include"Sem.hpp"
using namespace std;

int g_defualt_num = 5;//环形队列默认容量上限

template<class T>
class ringQueue
{ 
public:
    //构造:初始化数据
    ringQueue(int num = g_defualt_num)
        :_ring_queue(num)//这里是对vector对象进行构造:explicit vector (size_type n, const value_type& val = value_type());
        ,_num(num)
        ,p_step(0)//初始下标为0
        ,c_step(0)//初始下标为0
        ,space_sem(num)//构造信号量,初始时空间资源有num个
        ,data_sem(0)//构造信号量,初始时数据资源有0个
    { 
        pthread_mutex_init(&p_mutex, nullptr);
        pthread_mutex_init(&c_mutex, nullptr);
    }

    //析构
    ~ringQueue()
    {
        pthread_mutex_destroy(&p_mutex);
        pthread_mutex_destroy(&c_mutex);
    }

    // 向环形队列中放入数据:生产者
    void push(const T &in)
    {
        // 申请信号量:预定空间资源
        space_sem.P();
        //访问临界资源前加锁:生产者内部竞争,胜者持锁访问交易场所
        pthread_mutex_lock(&p_mutex);
        // 在特定位置生产数据
        _ring_queue[p_step++] = in;
        p_step %= _num; //[0,_num-1]
        //解锁
        pthread_mutex_unlock(&p_mutex);
        // 生产成功,则意味着数据资源多了一个
        data_sem.V();
    }

    // 向环形队列中取出数据:消费者
    void pop(T *out)
    {
        // 申请信号量:预定数据资源
        data_sem.P();
        //访问临界资源前加锁:消费者内部竞争,胜者持锁访问交易场所
        pthread_mutex_lock(&c_mutex);
        // 在特定位置消费数据
        *out = _ring_queue[c_step++];
        c_step %= _num;
        //解锁
        pthread_mutex_unlock(&c_mutex);
        // 消费成功,则意味着空间资源多了一个
        space_sem.V();
    }

private:
    vector<T> _ring_queue;//环形队列:以数组的方式实现
    int _num;//环形队列容量上限
    int p_step;//生产者下标:在当前下标位置放入资源
    int c_step;//消费者下标:在当前下标位置取出资源
    Sem space_sem;//生产者信号量:空间资源
    Sem data_sem;//消费者信号量:数据资源

    pthread_mutex_t p_mutex;//生产者内部使用的锁
    pthread_mutex_t c_mutex;//消费者内部使用的锁

};

  
  
  演示结果:还可以将其修改为派发任务的情况。

在这里插入图片描述
  
  
  
  
  

5、线程池

5.1、概念介绍

  线程池:
  一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价,线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
  
   线程池的应用场景:
  1、需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
  
  
  

5.2、基本演示

  PS:以下只是最基本的演示案例,可根据需求继续修改、完善、补充。

5.2.1、thread.hpp

#pragma once
#include<iostream>
#include<string>
#include<pthread.h>

using namespace std;
typedef void* (*func_t)(void*);//函数指针:此处用于线程表示线程的执行函数


//args传参设置:设置成类,增加args传参选择
class ThreadData
{
public:
    string _name;//对应线程名称
    void* _args;//对应线程回调函数中args参数
};




class Thread
{
public:
    Thread(int inode, func_t rountine, void* args)
    :_routine_func(rountine)//注意:这里线程的执行函数、参数args都是需要通过外部传入的
    {
        char buffer[64]="";
        snprintf(buffer, sizeof(buffer), "thread-%d",inode);
        _name = buffer;

        _tdata._args = args;
        _tdata._name = _name;
    }

    ~Thread()
    {}

    void start()//启动线程:用于创建线程,构造函数只是做了线程名称、ID等各参数设置,实则并未真正创建出线程
    {
        pthread_create(&_tid, nullptr, _routine_func, (void*)&_tdata);
    }

    void join()//终止线程
    {
        pthread_join(_tid, nullptr);
    }

private:
    string _name;//线程名
    pthread_t _tid;//线程ID
    func_t _routine_func;//线程的执行函数
    ThreadData _tdata;
};

  
  

5.2.2、threadpool.hpp

#pragma once

#include<vector>
#include<queue>
#include "thread.hpp"
#include "lockGuard.hpp"


//线程池:内部放置有多个处于等待状态的线程(消费者),当有任务派遣进入时(线程池内置的仓库,存放任务的队列),可唤醒线程处理任务
//相对于把任务到来再从零开始创建线程,此法中线程属于预备状态,那么有任务时直接派发处理即可。

int g_thread_num = 5;

template<class T>//模板:用于表示交易场所中资源数据类型(如:任务对象)
class threadPool
{
private://此处接口主要是用于rountine线程执行函数(其为静态成员函数,无法直接访问到类中非静态的成员变量)
    
    bool is_TaskQueue_Empty()//用于判断任务队列是否为空
    {
        return _task_queue.empty();
    }

    void condwait()//根据条件变量挂起对应线性
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    pthread_mutex_t* getMutex()//用于获取锁
    {
        return &_mutex;
    }

    T getTask()
    {
        T tmp = _task_queue.front();
        _task_queue.pop();
        return tmp;
    }



public:
    threadPool(int thread_num = g_thread_num)//构造函数
    :_num(thread_num)//待创建线程的个数
    {
        //初始化锁、条件变量
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        //初始化线程对象
        for(int i = i; i <= _num; ++i)
        {
            _threads.push_back(new Thread(i, routine, this));
        }
    }

    ~threadPool()
    {
        //销毁锁、条件变量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);

        //线程终止
        for(auto & itor: _threads)
        {
            itor->join();
            delete itor;//销毁构造函数中,new出来的空间
        }
    }

    void run()//启动线程池:实际创建线程
    {
        for(auto & itor: _threads)//这里遍历调用线程对象中的start,用以创建线程
        {   
            itor->start();
        }
    }

    //消费者:用于执行任务的线程
    static void* routine(void* args)
    {
        ThreadData* pdata = (ThreadData*)args;
        threadPool<T>* pthis = (threadPool<T>*)pdata->_args;

        while(true)//线程非执行一次任务就终止
        {
            T task;// 此处受制于作用域,任务对象需要创建在{}外
            {                                 
                // 加上括号的原因是方便在创建lockGuard对象时自动加锁解锁
                lockGuard lockguard(pthis->getMutex()); // 先上锁
                while (pthis->is_TaskQueue_Empty())pthis->condwait(); // 判断临界资源是否满足
                // 到此步骤,说明线程持有锁,且临界资源满足条件(将任务从仓库中取出)
                task = pthis->getTask();
            }
            task(pdata->_name);
        }
        
    }

    //仓库:将获取到的任务对象存入队列中
    void pushTask(const T& task)
    {   
        //仓库属于临界资源,放入数据到仓库中是生产者做的事(外部调用),也要加锁处理
        lockGuard lockguard(&_mutex);
        _task_queue.push(task);
        //仓库中有数据资源,此时可以唤醒被挂起的线程,让其执行任务
        pthread_cond_signal(&_cond);
    }


private:
    vector<Thread*> _threads;//线程池
    int _num;//线程池中线程数目
    queue<T> _task_queue;//队列:用于充当交易场所
    pthread_mutex_t _mutex;//交易场所可被多个线程访问,属于临界资源,故而需要互斥锁
    pthread_cond_t _cond;//当某一线程持有锁时,为了防止其余线程屡次申请锁,引入条件变量
};

  
  

5.2.3、testMain.cc

#include<time.h>
#include<unistd.h>
#include"threadPool.hpp"
#include"Task.hpp"
#include"log.hpp"

int main()
{   

    //随机数:用于模拟任务数据
    srand((unsigned long)time(nullptr)^getpid());

    //创建线程池
    threadPool<Task>* tp = new threadPool<Task>();//无参,此处线程数目为默认值
    logMessage(DEBUG,"成功创建线程池");

    //运行线程池
    tp->run();
    logMessage(DEBUG,"成功运行线程池");

    //派发任务
    while(true)
    {
        int x = rand()%100;
        usleep(2333);
        int y = rand()%100;
        //拉姆达表达式
        Task t(x,y, [](int x, int y)->int{return x + y; });
        
        logMessage(DEBUG,"成功制作任务--- add: %d, %d", t._x, t._y);

        //生产者:将任务存放入线性池的仓库中
        tp->pushTask(t);

        sleep(1);
    }

    return 0;
}

  
  
  

5.2.4、Task.hpp、log.hpp、lockGuard.hpp

  Task.hpp

#pragma once
#include <iostream>
#include <functional>
#include <string>

typedef std::function<int(int, int)> task_func_t;

// 用于派发任务对象:可根据需求设置,此处为演示(两数计算)
class Task
{
public:
    Task(){}; // 默认无参构造

    Task(int x, int y, task_func_t func) // 需要我们自己传递函数和对应变量
        : _x(x), _y(y), _func(func)
    {
    }

    void operator()(string name)//执行函数调用时报一下线程名称
    {
        printf("任务结果: %d + %d = %d \n",_x, _y, _func(_x, _y));
    }

public:
    int _x;
    int _y;
    task_func_t _func; // 函数指针
};

  
  
  log.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>

// 日志级别
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};

#define LOGFILE "./threadpool.log"

// 完整的日志功能,至少有 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)//const char *format, ... 可变参数
{
#ifndef DEBUG_SHOW
    if(level== DEBUG) return;
#endif

    //标准部分:固定输出的内容
    char stdBuffer[1024]; 
    time_t timestamp = time(nullptr);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);


    //自定义部分:允许用户根据自己的需求设置
    char logBuffer[1024]; 
    va_list args; //定义一个va_list对象
    va_start(args, format); 
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args); //相当于 args == nullptr

    printf("%s%s\n", stdBuffer, logBuffer);
}

  
  
   lockGuard.hpp

#pragma once
#include <iostream>
#include<pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *pmutex)
        : _pmutex(pmutex)
    {
    }

    void lock()//加锁
    {
        pthread_mutex_lock(_pmutex);
    }

    void unlock()//解锁
    {
        pthread_mutex_unlock(_pmutex);
    }

private:
    pthread_mutex_t *_pmutex;
};


class lockGuard
{
public:
    lockGuard(pthread_mutex_t* pmutex)//构造:在构造时加锁
    :_mutex(pmutex)//初始化列表初始化_mutex时,调用Mutex的构造函数,需要传入pthread_mutex_t * 类型变量
    {
        _mutex.lock();//上锁
    }

    ~lockGuard()//析构:在对类析构时,顺带就解锁
    {
        _mutex.unlock();//解锁
    }
private:
    Mutex _mutex;
};

  
  
  
  
  其它:待补充。
  
  
  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1060223.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法训练-数组 三】【数组矩阵】螺旋矩阵、搜索二维矩阵

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是螺旋矩阵&#xff0c;使用【二维数组】这个基本的数据结构来实现 螺旋矩阵【EASY】 二维数组的结构特性入手 题干 解题思路 根据题目示例 mat…

WEB3 创建React前端Dapp环境并整合solidity项目,融合项目结构便捷前端拿取合约 Abi

好 各位 经过我们上文 WEB3 solidity 带着大家编写测试代码 操作订单 创建/取消/填充操作 我们自己写了一个测试订单业务的脚本 没想到运行的还挺好的 那么 今天开始 我们就可以开始操作我们前端 Dapp 的一个操作了 在整个过程中 确实是没有我们后端的操作 或者说 我们自己就…

延迟队列

KEYS命令和SCAN命令都可以用于在Redis中查找匹配指定模式的键名&#xff0c;但它们之间有以下区别&#xff1a; 1. 阻塞 vs 非阻塞&#xff1a;KEYS命令是一个阻塞操作&#xff0c;它会遍历整个键空间来查找与给定模式匹配的键名。在执行KEYS命令期间&#xff0c;Redis服务器会…

oracle linux8.8上安装oracle 19c集群

1、操作系统版本告警 处理办法&#xff1a;export CV_ASSUME_DISTIDRHEL7.6 2、ssh互信故障 查看ssh版本 [rootdb1 ~]# ssh -V OpenSSH_8.0p1, OpenSSL 1.1.1k FIPS 25 Mar 2021 处理办法-2个节点都需要操作 安装前配置 # mv /usr/bin/scp /usr/bin/scp.orig # echo "…

以太网基础学习(四)——IP协议

一 、IP协议概述 IP&#xff08;Internet Protocol&#xff0c;互联网协议&#xff09;是互联网通信的基础协议&#xff0c;它负责将数据包从源地址传输到目的地址。IP协议定义了如何封装数据包&#xff0c;如何寻址数据包以及如何路由数据包&#xff0c;它是随着互联网的出现而…

[VIM]spcaevim

Home | SpaceVim SpaceVim - 知乎 关于Vim/Neovim/SpaceVim的一些思考 - 知乎 vim高配版(1) – SpaceVim 简介 SpaceVim 是国内的一个大佬将一些NB的插件整合到一起的一个插件包. 一键式安装, 功能强大. 官网参见 Home | SpaceVim vim高配版(2) – vimplus 简介 vimplu…

小谈设计模式(16)—抽象工厂模式

小谈设计模式&#xff08;16&#xff09;—抽象工厂模式 专栏介绍专栏地址专栏介绍 抽象工厂模式结构抽象工厂&#xff08;AbstractFactory&#xff09;具体工厂&#xff08;ConcreteFactory&#xff09;抽象产品&#xff08;AbstractProduct&#xff09;具体产品&#xff08;C…

解决每次重启ganache虚拟环境,十个账号秘钥都会改变问题

很多时候 我们启动一个 ganache 环境 然后 通过私钥 在 MetaMask 中 导入用户 但是 当我们因为 电脑要关机呀 或者 ETH 消耗没了呀 那我们就不得不重启一个ganache虚拟环境 然后 你在切一下网络 让它刷新一下 你就会发现 上一次导入的用户就没有了 这是因为 你每次 ganache…

Ae 效果:CC Power Pin

扭曲/CC Power Pin Distort/CC Power Pin CC Power Pin &#xff08;CC 强力边角定位&#xff09;与同组内的边角定位 Corner Pin效果非常类似&#xff0c;常用于对源图像的透视扭曲变形和四点跟踪合成。使用 CC Power Pin 会有更多的调整属性和更直观的操作。 ◆ ◆ ◆ 效果…

RobotFramework流程控制(最新版本)

文章目录 一 分支流程1. 关键字&#xff1a;Run Keyword If2. 关键字&#xff1a;IF/ELSE3. 嵌套IF/ELSE4. 关键字&#xff1a;Set Variable If 二 循环流程1. 普通FOR循环2. 嵌套FOR循环3. 退出循环4. 其它常用循环 一 分支流程 1. 关键字&#xff1a;Run Keyword If Run Key…

2023年10月4日

服务器 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);//实例化一个服务器server new QTcpServer(this);//此时&#xff0c;服务器已经成功进入监听状态&…

Docker通过Dockerfile创建Redis、Nginx--详细过程

创建Nginx镜像 我们先创建一个目录&#xff0c;在目录里创建Dockerfile [rootdocker-3 ~]# mkdir mynginx [rootdocker-3 ~]# cd mynginx [rootdocker-3 ~]# vim Dockerfile Dockerfile的内容 FROM daocloud.io/library/centos:7 RUN buildDepsreadline-devel pcre-devel o…

Ventoy万能U盘安装系统,支持任何的操作系统安装

Ventoy万能U盘安装系统&#xff0c;支持任何的操作系统安装&#xff1a; Download . VentoyVentoy is an open source tool to create bootable USB drive for ISO files. With ventoy, you dont need to format the disk again and again, you just need to copy the iso fil…

【网络安全---ICMP报文分析】Wireshark教程----Wireshark 分析ICMP报文数据试验

一&#xff0c;试验环境搭建 1-1 试验环境示例图 1-2 环境准备 两台kali主机&#xff08;虚拟机&#xff09; kali2022 192.168.220.129/24 kali2022 192.168.220.3/27 1-2-1 网关配置&#xff1a; 编辑-------- 虚拟网路编辑器 更改设置进来以后 &#xff0c;先选择N…

基于SSM的宿舍管理系统

基于SSM的学生宿舍管理系统的设计与实现&#xff0c;前后端分离 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 【主要功能】 系统主要分学生和管理员两个角色&#xff0c;功能有…

RSA攻击:模数分解

目录 一、模数分解总览 1.1直接分解法 1.2费马分解与Pollard_rho分解 1.3公约数分解 1.4其他模数分解 二、实战特训 2.1[黑盾杯 2020]Factor 2.2[GWCTF 2019]babyRSA 2.3[LitCTF 2023]yafu (中级) 2.4[RoarCTF 2019]RSA 2.5[CISCN 2022 西南]rsa 三、总结 一、模数分解总览 …

进程调度的时机,切换与过程以及方式

1.进程调度的时机 进程调度&#xff08;低级调度〉&#xff0c;就是按照某种算法从就绪队列中选择一个进程为其分配处理机。 1.需要进行进程调度与切换的情况 1.当前运行的进程主动放弃处理机 进程正常终止运行过程中发生异常而终止进程主动请求阻塞&#xff08;如等待l/O)…

(粗糙的笔记)动态规划

动态规划算法框架&#xff1a; 问题结构分析递推关系建立自底向上计算最优方案追踪 背包问题 输入&#xff1a; n n n个商品组成的集合 O O O&#xff0c;每个商品有两个属性 v i v_i vi​和 p i p_i pi​&#xff0c;分别表示体积和价格背包容量 C C C 输出&#xff1a; …

【C语言】函数的定义、传参与调用(二)

&#x1f497;个人主页&#x1f497; ⭐个人专栏——C语言初步学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; 目录 导读&#xff1a; 1. 函数的嵌套调用 1.1 什么是嵌套调用 1.2 基础实现 1.3 调用流程解析 2. 函数的链式访问 2.1 …

算法通过村第十二关-字符串|青铜笔记|隐形的王者

文章目录 前言转换成小写字母字符串转换整数总结 前言 提示&#xff1a;为别人而活着&#xff0c;其实是最简单的一种活法。 --蔡崇达《命运》 字符串本身并不是一种数据结构&#xff0c;但是由于其本身的特殊性&#xff0c;额可以产生很多特殊的算法问题。另外&#xff0c;字符…