Liunx下的消费者与生产者模型与简单线程池的实现

news2024/11/15 8:33:16

文章目录

  • 前言
  • 1.消费者与生产者模型
  • 2.信号量
    • 1.信号量的接口
    • 2.使用环形队列模拟生产者消费者模型
  • 3.简单实现线程池
  • 4.补充说明

前言

本文主要会结束消费者生产者模型,以及简单线程池的实现。

1.消费者与生产者模型

之前我们学了条件变量和互斥等概念。条件变量本质就是一个队列,它会将因为某种条件不满足不能往后执行的线程添加到这个队列中,避免线程做无用功,当条件满足时,会将队列中的线程重新唤醒继续执行。我们接下将利用条件变量和互斥锁实现一个消费者与生产者模型。

消费者与生产者模型是什么呢?

在生活中,工厂将加工好的产品运到超市,超市将商品卖给消费者。这就是一个生产者消费者模型,这样做的好处是消费者能方便买到商品,工厂也能将产品快速输出没有积压货物。这是一种高效的处理方式。在计算机中,也有这种消费者生产者模型,有些线负责从某种渠道拿到数据,这就是生产者;有些线程处理其他线程手中拿到的数据,这就是消费者。我们可以定义一个缓冲区,相当于超市。一边让消费者线程处理数据,一边让生产者线程产生数据。这就是消费者与生产者模型。

消费者与生产者模型的高效性体现在哪里呢?

我们为啥要定义一个缓冲区这么麻烦呢,这个缓冲区是怎么提高效率的呢?其实很简单,负责拿到数据的线程,其实在拿到数据的时候需要处理时间,如果没有缓冲区,这个时候负责处理数据的线程就会陷入等待状态,执行效率就大幅度降低,如果有了这种缓冲区,那么久可以保证消费者线程时时刻刻都会有数据处理。这样就提高了效率。

消费者生产者模型的注意事项

这个缓冲区需要被消费者和生产者看到,这就意味着这个缓冲区就是一个公共资源,也就是临界资源。既然是多线程访问这个临界资源,这里就涉及到多线程的互斥与同步。怎么维护多线程之间的互斥与同步呢?我们分析一下消费者与生产者之间的关系。

首先,生产者与生产者之间的关系:互斥关系;毕竟缓冲区只有一个,只能允许一个线程去访问这个资源。消费者与消费者的关系同理也是互斥。消费者与生产者的关系:互斥和同步。因为不管是消费者还是生产者,只能其中一个去访问临界资源,这就是互斥。只有生产者生产了数据,消费者才能从缓冲区里获得数据消费,这就是同步关系。简单总结一下消费者和生产者模型就是3 2 1原则,即3种关系,2种角色,1种场所。

我们搞明白了上述的道理之后,就可以使用条件变量和互斥锁简单实现一下这个消费者于生产者模型了。我们可以用队列来模拟这个缓冲区,在定义两个函数,一个处理数据一个生产数据,充当消费者和生产者。

BlockQueue.hpp

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
const int gacp=5;
template<class T>
class BlockQueue
{ 
  public:
    BlockQueue(const int cap=gacp )
    :_cap(gacp)
    {
         pthread_mutex_init(&_mutex,nullptr);
         pthread_cond_init(&_consumerCond,nullptr);
         pthread_cond_init(&_productorCond,nullptr);
    }
   ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumerCond);
        pthread_cond_destroy(&_productorCond);
    }
    void push(const T& in)
    {   
         pthread_mutex_lock(&_mutex);
         while(isFull())
         {
            pthread_cond_wait(&_productorCond,&_mutex);
         }
         _q.push(in);
         //if(_q.size()>=_cap/2)
         
         pthread_cond_signal(&_consumerCond);//唤醒消费者
         
         pthread_mutex_unlock(&_mutex);
    }
   bool isFull()
   {
     return _q.size()==_cap;
   }
   bool isEmpty()
   {
     return _q.empty();
   }
    void pop(T*out)
    {
        pthread_mutex_lock(&_mutex);
        while(isEmpty())
        {
            pthread_cond_wait(&_consumerCond,&_mutex);
        }
        *out=_q.front();
        _q.pop();
        
        pthread_cond_signal(&_productorCond);//唤醒生产者
        pthread_mutex_unlock(&_mutex);
    }
  
  
private:
  std::queue<T>_q;
  int _cap;
  pthread_mutex_t _mutex;
  pthread_cond_t _consumerCond;//消费者对应条件变量
  pthread_cond_t _productorCond;//生产者对应的条件变量
  


};

用C++queue容器来封装这个阻塞队列作为缓冲区,push行为就是生产者的行为,pop行为就是消费者行为。我们使用条件变量,当队列为满时就不让生产者生产了,这个时候让生产行为处于等待状态,唤醒消费行为。当队列为空时就不让消费者消费了,这个时候让消费行为处于等待状态,唤醒生产行为。

Task.hpp

#include<iostream>
#include<string>
class Task
{
  public:
  Task()
  {

  }
  Task(int x,int y,char op):_x(x),_y(y),_op(op){}
  ~Task(){}
  void operator()()
  {
    switch(_op)
    {
        case '+':
        _ret=_x+_y;
        break;

        case '-':
        _ret=_x-_y;
        break;
       
        case '*':
        _ret=_x*_y;
        break;
        
        case '/':
        { 
            if(_y==0)
            {
                _exitCode=-1;
            }
            else
           _ret=_x/_y;
        }
        break;
        default:
        break;
    }
  }
  std::string formatRet()
  {
     return std::to_string(_ret)+" "+'('+std::to_string(_exitCode)+')';
  }
  std::string formatArg()
  {
    return std::to_string(_x)+" " +_op+" "+std::to_string(_y)+'=';
  }
    private:
    int _x;
    int _y;
    char _op;
    int _ret;
    int _exitCode;


};

这个是封装了一个任务类,模拟要处理的数据。这个任务类会产生一些需要计算的结果。将这些表达式和结果作为数据传入阻塞队列中由生产者和消费者进行处理。

test.cc

#include"BlockQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
 #include<ctime>
#include<unistd.h>
 void *consumer(void*arg)
 {   
    sleep(1);
    BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
    while(1)
    { 
        Task t;
        //1.从blockqueue中获取数据
        bq->pop(&t);
        t();
       //2.结合某种业务逻辑,处理数据
    std::cout<<"consumer data:"<<t.formatArg()<<t.formatRet()<<std::endl;

    }
 }
 void *producter(void*arg)
 {  
    BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
    std::string opers ="+-*/";
    while(1)
    {  
        //1.从某种渠道获取数据
        int x=rand()%10+1;
        int y=rand()%10+1;
        char op=opers[rand()%opers.size()];
        //2.将数据加入blockqueue中,完成生产过程
        Task t(x,y,op);
        bq->push(t);
        std::cout<<"prducter data:"<<t.formatArg()<<"=?"<<std::endl;
    }
 }
 int main()
 {
    //多产生单消费
    srand((uint64_t)time(nullptr));
    BlockQueue<Task>*bq=new BlockQueue<Task>();
    pthread_t  c[2],p[3];
    pthread_create(&c[0],nullptr,consumer,bq);
    pthread_create(&c[1],nullptr,consumer,bq);
    pthread_create(&p[0],nullptr,consumer,bq);
    pthread_create(&p[1],nullptr,producter,bq);
    pthread_create(&p[2],nullptr,producter,bq);
    
    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);
    pthread_join(p[2],nullptr);
  

    return 0;
 }

这里就是创建线程模拟生产者和生产者在缓冲区中处理数据。

在这里插入图片描述

以上就是生产者于消费者模型了,用多线程去模拟消费者和生产者,因为这里只有一份临界资源就是阻塞队列,我们已经对这个阻塞队列进行加锁保护了,同时又维护好了线程之间的同步互斥关系。所以一批线程是没问题的。

2.信号量

信号量之前就提到过,是用来描述临界资源数数目,它的工作机制和买票看电影类似,是一种资源预订机制。它本质就是一个计数器,P操作就是减减,V操作就是加加,PV操作本身是原子的,信号量申请成功表示资源可用,否则表示资源不可用。使用信号量就是相当于把对资源的判断转化成对信号量的申请行为。之前的互斥锁就可以看做是二元信号量,由1到0,再由0到1。信号量也是要被其他线程或者进程所看见的,本质上也是一种临界资源,所以在申请信号量和释放信号量的时候也需要加锁保护。

1.信号量的接口

初始化信号量的函数sem_init

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数说明:sem:需要初始化的信号量。pshared:传入0值表示线程间共享,传入非零值表示进程间共享。value:信号量的初始值(计数器的初始值)。

销毁信号量的函数sem_destroy

int sem_destroy(sem_t *sem);

参数说明:sem:需要销毁的信号量。

等待信号量的函数sem_wait(相当于P操作

int sem_wait(sem_t *sem);

sem:需要等待的信号量。

发布信号量(释放信号量)函数sem_pos(相当于V操作

int sem_post(sem_t *sem);

sem:需要发布的信号量。

2.使用环形队列模拟生产者消费者模型

这个信号量是对临界资源数目的描述,也就是说在某些情况下临界资源是可以拆成一份份的来访问。我们使用环形队列模拟生产者消费者模型时,就可以使用信号量。循环队列每个位置可以看成一份临界资源,为保证消费者和生产者能安全高效的生产的数据,我们规定生产者先生产者数据,消费者再消费数据,同事生产者不能套圈生产数据,不然之前生产的数据可能还没有被消费就会被新数据覆盖,造成数据丢失。消费者也不能追上生产者,因为它没有数据需要消费还可能导致一些异常情况出现。

在这里插入图片描述

生产消费关系分析

1.生产者和消费者关系的资源不一样,生产者关心空间,消费者关心数据。2.只要信号量不为0,表示资源可用,线程可访问。3.环形队列我们只要访问不同的区域,生产行为和消费行为是可以同时进行的。4.当队列为空的时生产者先行,当队列为满的时候消费者先行。生产者不能套圈消费者,消费者不能超过生产者。

在这里插入图片描述

当我们将资源整体使用的时候就优先考虑互斥锁,当资源可以分成多份使用的时候就优先考虑信号量。

RingQueue.h

#pragma once

#include<iostream>
#include<vector>
#include <semaphore.h>

static const int N=5;

template<class T>
class RingQueue
{
private:
void P(sem_t& s)
{
   sem_wait(&s);                                                                                                                
}
void V(sem_t& s)
{
     sem_post(&s);
}
public:
void push(const T&in)
{
    //生产
    P(_space_sem);
    //一定有对应的空间资源,不用判断 
    _ring[_p_step++]=in;
    _p_step%=_cap;
    V(_data_sem);

     

}
void pop(T*out)
{
   //消费
   P(_data_sem);
   *out=_ring[_c_step++];
   _c_step%=_cap;
   V(_space_sem);



}
RingQueue(int num=N):_ring(num),_cap(num)
{
      sem_init(&_data_sem,0,0);
      sem_init(&_space_sem,0,num);
      _c_step=_p_step=0;
}
~RingQueue()
{
       sem_destroy(&_space_sem);
       sem_destroy(&_data_sem);
}


private:
   std::vector<int>_ring; 
   int _cap;//环形队列容量
   sem_t _data_sem ;//数据信号量,消费者关心
   sem_t _space_sem ;//空间信号量,生产者关心
   int _c_step;//消费位置
   int _p_step;//生产位置
};

test.cc

#include<pthread.h>
#include<memory>
#include<unistd.h>
#include<sys/types.h>
#include"RingQueue.hpp"
using namespace std;

void *consumer(void *arg)
{
  RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
  while(1)
  {
    int data=0;
    rq->pop(&data);
    cout<<"consumer done: "<<data<<endl;

  }
}
void *productor(void *arg)
{
 RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
  while(1)
  {
    int data=rand()%10+1;
    rq->push(data);
    cout<<"productor done: "<<data<<endl;
    sleep(1);
  }
}
int main()
{   
    srand(time(nullptr));

    RingQueue<int>*rq=new RingQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,rq);
    pthread_create(&p,nullptr,productor,rq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
}

在这里插入图片描述

因为这里是单生产单消费者,所以对信号量的访问都没有做加锁保护,我们可以将其改造成多生产者多消费者模型。

Task.hpp

#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }
       //模拟处理过程所花费的时间
        usleep(10000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

将之前的模拟计算任务的头文件拿过来,充当数据分配给线程处理。

RingQueue

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

static const int N = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }
    void V(sem_t &s)
    {
        sem_post(&s);
    }
    void Lock(pthread_mutex_t &m)
    {
        pthread_mutex_lock(&m);
    }
    void Unlock(pthread_mutex_t &m)
    {
        pthread_mutex_unlock(&m);
    }

public:
    RingQueue(int num = N) : _ring(num), _cap(num)
    {
        sem_init(&_data_sem, 0, 0);
        sem_init(&_space_sem, 0, num);
        _c_step = _p_step = 0;

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }
    // 生产
    void push(const T &in)
    {  
        //先申请信号量,进行资源分配,再加锁效率高
        
        P(_space_sem);  
        Lock(_p_mutex); 
        _ring[_p_step++] = in;
        _p_step %= _cap;
        Unlock(_p_mutex);
        V(_data_sem);
    }
    // 消费
    void pop(T *out)
    {  //先申请信号量,进行资源分配,再加锁效率高
        P(_data_sem);
        Lock(_c_mutex); 
        *out = _ring[_c_step++];
        _c_step %= _cap;
        Unlock(_c_mutex);
        V(_space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);

        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

private:
    std::vector<T> _ring;
    int _cap;         // 环形队列容器大小
    sem_t _data_sem;  // 消费者关心 数据信号量
    sem_t _space_sem; // 生产者关心 空间信号量
    int _c_step;      // 消费位置
    int _p_step;      // 生产位置

    pthread_mutex_t _c_mutex;//消费者锁
    pthread_mutex_t _p_mutex;//生产者锁
};

这里是多线程并发访问就需要加锁处理,消费者与生产者之间的关系可以通过信号量来维护,消费者与消费者,生产者与生产者的关系是互斥的,需要加锁来访问资源确保任意一个资源只能由一个线程访问,申请资源进行资源的分配,所以这里就定义了两把锁,分别作用与生产者和消费者。

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>

using namespace std;

const char *ops = "+-*/%";

void *consumerRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        rq->pop(&t);
        t();
        cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
    }
}

void *productorRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[(x + y) % strlen(ops)];
        Task t(x, y, op);
        rq->push(t);
        cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
    }
}

int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq = new RingQueue<Task>();
    
    pthread_t c[3], p[2];
    for (int i = 0; i < 3; i++)
        {
            pthread_create(c + i, nullptr, consumerRoutine, rq);
        }
    for (int i = 0; i < 2; i++)
        { 
         pthread_create(p + i, nullptr, productorRoutine, rq);
        }

    for (int i = 0; i < 3; i++)
        {
            pthread_join(c[i], nullptr);
        }
    for (int i = 0; i < 2; i++)
        {
            pthread_join(p[i], nullptr);
        }

    delete rq;
    return 0;
}

在这里插入图片描述

简单总结一下:当我们将临界资源整体使用后,优先考虑互斥锁,当临界资源可以被拆分使用的时候就要考虑信使用号量。我们在信号量之后加锁比较合适,这样提前将资源分配好,这样锁申请成功之后就能直接使用临界资源了,一定程度上提高了效率。同时消费者和生产者还有一个高效点,就是在处理的数据的过程中是没有枷锁的吗,比如消费者函数在处理计算表达式的时候就没有加锁的,只有访问队列的时候才触及加锁,同样的生产者函数,产生计算表达式的式的时候也没有加锁。这才是高效的地方。

3.简单实现线程池

什么是线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。简单来数线程池就是提前构建一批线程处理任务,这样免去了构建线程的开销,有任务推送即可马上处理。下面我们简单模拟一下线程池的实现。

我们可以用vector容器作为存分线程的载体,在vector中存放一批线程,同时将需要需要处理的任务放置在任务队列中,我们可以将先将推送的任务添加至任务队列中,再将任务队列中线程取出分配给线程。这样就能维护一个线程池了。同时如果任务队列中无任务了,就需要将线程池中处理任务的逻辑给休眠,等到有任务时在将其换新进行任务的处理。实际上,这个线程池就相当于消费者,缓冲区是封装在线程池中。这也是消费者与生产者模型。生产者我们可以直接自定义的导入任务放入线程池中。

ThreaPool.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"

const static int N = 5;

template <class T>
class ThreadPool
{
public:
    ThreadPool(int num = N) : _num(num)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    pthread_mutex_t* getlock()
    {
        return &_lock;
    }
    void threadWait()
    {
        pthread_cond_wait(&_cond, &_lock);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    bool isEmpty()
    {
        return _tasks.empty();
    }
    T popTask()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }
    static void threadRoutine(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        while (true)
        {
            // 1. 检测有没有任务
            // 2. 有:处理
            // 3. 无:等待
            //必定加锁
            T t;
            {
                LockGuard lockguard(tp->getlock());
                while (tp->isEmpty())
                {
                    tp->threadWait();
                }
                t = tp->popTask(); // 从公共区域拿到私有区域
            }
           
            t();
            std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
            
        }
    }
    void init()
    {
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(Thread(i, threadRoutine, this));
        }
    }
    void start()
    {
        for (auto &t : _threads)
        {
            t.run();
        }
    }
    void check()
    {
        for (auto &t : _threads)
        {
            std::cout << t.threadname() << " running..." << std::endl;
        }
    }
    void pushTask(const T &t)
    {
        LockGuard lockgrard(&_lock);
        _tasks.push(t);
        threadWakeup();
    }
    ~ThreadPool()
    {
        for (auto &t : _threads)
        {
            t.join();
        }
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

private:
    std::vector<Thread> _threads;//线程池
    int _num;

    std::queue<T> _tasks; //任务队列

    pthread_mutex_t _lock;
    pthread_cond_t _cond;
};

接下来就是对一些的简单封装了,比如简单封装一下互斥锁和线程以及任务。

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex // 自己不维护锁,有外部传入
{
public:
    Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmutex);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *_pmutex;
};

class LockGuard // 自己不维护锁,有外部传入
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        _mutex.lock();
    }
    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

Task

#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }

        usleep(100000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>

class Thread
{
public:
    typedef enum
    {
        NEW = 0,
        RUNNING,
        EXITED
    } ThreadStatus;
    typedef void (*func_t)(void *);

public:
    Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args)
    {
        char name[128];
        snprintf(name, sizeof(name), "thread-%d", num);
        _name = name;
    }
    int status() { return _status; }
    std::string threadname() { return _name; }
    pthread_t threadid()
    {
        if (_status == RUNNING)
            return _tid;
        else
        {
            return 0;
        }
    }
  
    static void *runHelper(void *args)
    {
        Thread *ts = (Thread*)args; // 就拿到了当前对象
        (*ts)();
        return nullptr;
    }
    void operator ()() //仿函数
    {
        if(_func != nullptr) _func(_args);
    }
    void run()
    {
        int n = pthread_create(&_tid, nullptr, runHelper, this);
        if(n != 0) exit(1);
        _status = RUNNING;
    }
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        if( n != 0)
        {
            std::cerr << "main thread join thread " << _name << " error" << std::endl;
            return;
        }
        _status = EXITED;
    }
    ~Thread()
    {
    }

private:
    pthread_t _tid;
    std::string _name;
    func_t _func; // 线程未来要执行的回调
    void *_args;
    ThreadStatus _status;
};

接着就是调用逻辑了。

#include"ThreadPool.hpp"
#include<memory>

int main()
{
    std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
    tp->init();
    tp->start();
    while(1)
    {
     int x, y;
        char op;
        std::cout << "please Enter x> ";
        std::cin >> x;
        std::cout << "please Enter y> ";
        std::cin >> y;
        std::cout << "please Enter op(+-*/%)> ";
        std::cin >> op;

        Task t(x, y, op);
        tp->pushTask(t);

        // 充当生产者, 从读取数据,构建成为任务,推送给线程池
         sleep(1);
    
    }
}

在这里插入图片描述

以上就是对线程池的简单实现,这将之前写过的东西都给串起来了。

4.补充说明

STL中的容器是否是线程安全的?不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.

其他常见的几种锁

悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。

以上内容如有问题,欢迎指正!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vETSTStudio - CAPL - CAN报文未使用位

目录 ChkStart_PayloadGapsObservation 代码示例 ChkStart_PayloadGapsObservationTx 代码示例 ChkStart_PayloadGapsObservationRx 代码示例 我们在做CAN&CANFD通信或者CAN&CANFD网络管理的时候&#xff0c;就会测试到DBC中报文各种信号和位的使用状态&#xff…

从数据采集到智能控制,探寻锂电卷绕机的自动化之路

在锂电池制造过程中&#xff0c;卷绕机被视为关键设备之一。它负责将正负极材料和隔膜按照设计要求卷绕成电芯&#xff0c;是确保锂电池性能和质量的重要环节。为了提高生产效率、确保产品质量&#xff0c;锂电池行业越来越注重引入智能化技术和设备。 图.锂电池生产&#xff0…

spring boot+MySQL福聚苑社区团商品购系统

开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9

尚硅谷Linux学习笔记

文章目录 1. Linux概述2. Linux目录结构3. Linux操作命令3.1 vim编辑命令3.1.1 一般模式3.1.2 编辑模式3.1.3 指令模式 3.2 网络相关命令3.3 系统管理3.4 帮助命令3.4.1 man 获得帮助信息3.4.2 help 获得 shell 内置命令的帮助信息3.4.3 常用快捷键 3.5 文件目录类3.5.1 pwd、e…

ROS:rosbag的使用

目录 一、背景二、概念及作用三、rosbag命令行四、rosbag程序实现4.1C实现4.2Python实现 一、背景 机器人传感器获取到的信息&#xff0c;有时我们可能需要时时处理&#xff0c;有时可能只是采集数据&#xff0c;事后分析&#xff0c;比如: 机器人导航实现中&#xff0c;可能…

基于 BPF 的 Linux 系统自动调优工具:Oracle 开发了 “bpftune”

导读Oracle 开源了一个基于 BPF 的 Linux 参数自动调优工具 “bpftune”&#xff0c;这是一个自动配置器&#xff0c;可以监控 Linux 系统的工作负载并自动设置正确的内核参数值。 Oracle 开源了一个基于 BPF 的 Linux 参数自动调优工具 “bpftune”&#xff0c;这是一个自动配…

U盘写流程USB协议抓包分析

U盘写流程USB协议抓包分析 因好奇于操作系统在对U盘这个块设备是如何进行读写传递数据包&#xff0c;笔者通过抓包测试&#xff0c;做了一个简单分析。安装了wireshark的usbPcap即能抓取主机USB接口上的usb包。 A、基本包信息分析 让我们先从读流程开始分析一下USB包的包结构…

音视频技术开发周刊 | 301

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 微软、谷歌、亚马逊&#xff0c;打响大模型时代的云战争 过去数月&#xff0c;云巨头们砸下真金白银&#xff0c;研发大模型、战略投资、自研 AI 芯片……大模型的时代方兴…

ArcGIS Pro中的模型构建器演示

前言 ArcGIS Pro的模型构建器在功能上相较于大致没有什么改动,主要是界面上变得相对漂亮,流程中使用了一些半透明的效果,相较于arcmap中的模型构建器,可以说是颜值进化很大了。 实战 首先我们来看一下演示效果,怎么样,是不是很方便 先建立一个模型 对于模型构建器我一直…

java main 方法的理解

文章目录 理解命令行参数用法举例IDEA工具配置参数&#xff08;了解&#xff09; 理解 由于JVM需要调用类的main()方法&#xff0c;所以该方法的访问权限必须是public&#xff0c;又因为JVM在执行main()方法时不必创建对象&#xff0c;所以该方法必须是static的&#xff0c;该…

python爬虫_selenuim登录个人markdown博客站点

文章目录 ⭐前言⭐selelunim⭐博客站点&#x1f496; 自动填充账号密码登录 ⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享python使用selenuim登录个人markdown博客站点。 该系列文章&#xff1a; python爬虫_基本数据类型 python爬虫_函数的使用 python爬虫…

接口中的默认方法和静态方法

接口中的默认方法和静态方法 接口中的默认方法 package cn.tedu.inter; //1.定义接口 public interface Inter1 {/*1.接口中默认方法的修饰符public可以省略*///2.定义接口中的默认方法public default void play(){System.out.println("我是接口中的默认方法&#xff0c;…

OSPFv2基础03_综合实验

目录 1.创建OSPF进程 2.创建OSPF区域 3.使能OSPF 4.创建虚连接&#xff08;可选&#xff09; 5.OSPF常用命令 6.实验配置步骤 7.实验效果 1.创建OSPF进程 OSPF是一个支持多进程的动态路由协议&#xff0c;OSPF多进程可以在同一台路由器上运行多个不同的OSPF进程&#x…

ES6基本知识点

目录 1.对象优化 1.1 新增API 1.2 object.assign方法的第一个参数是目标对象&#xff0c;后面的参数都是源对象 1.3 声明对象简写 1.4 对象的函数属性简写 1.5 对象拓展运算符 2.map和reduce 2.1 数组中新增的map和reduce方法 3 promise 3.1 promise封装异步操作 4.模…

C# PaddleInference OCR 验证码识别

说明 C# PaddleInference OCR 验证码识别 自己训练的模型&#xff0c;只针对测试图片类型&#xff0c;准确率99% 效果 项目 VS2022.net4.8OpenCvSharp4Sdcb.PaddleInference 测试图片 代码 using OpenCvSharp; using Sdcb.PaddleInference.Native; using Sdcb.PaddleInfer…

mac笔记本安装java环境以及idea设置

系列文章目录 文章目录 系列文章目录安装java环境一、安装jdk二、下载安装IntelliJ IDEA三、安装maven四、安装git五、安装tomcat六、安装appenv配置文件七、有关idea的设置1、快捷键设置2、新建类的命名3、字体的大小&#xff0c;有关菜单栏的大小4、框内的tab最多能有多少个窗…

【2 beego学习 - 项目导入与项目知识点】

0 项目导入 1 在英文路径下新建一个同名的项目,拷贝其他数据到这个文件 bee new 同名项目名 cd 同名项目名 go mod tidy go get -u -v github.com/astaxie/beego go get 同名项目名/models2 拷贝部分的项目文件到新目录 bee run 运行的其他错误,按照提示安装文件 1 后端获取…

微软MFC技术中的消息队列及消息处理

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天来聊聊微软MFC技术中的消息队列及消息处理。 MFC应用程序中由Windows 系统以消息的形式发送给应用程序的窗口。窗口接收和处理消息之后&#xff0c;把控制返回给Windows。Windows系统在同一时间可显示多…

HashMap底层原理:数据结构+put()流程+2的n次方+死循环+数据覆盖问题

导航&#xff1a; 【Java笔记踩坑汇总】Java基础进阶JavaWebSSMSpringBoot瑞吉外卖SpringCloud黑马旅游谷粒商城学成在线MySQL高级篇设计模式常见面试题源码_vincewm的博客-CSDN博客 目录 一、底层 1.1 HashMap数据结构 1.2 扩容机制 1.3 put()流程 1.4 HashMap是如何计算…

电话号码的字母组合问题

解题思路&#xff1a; 当我第一眼看到这题的时候&#xff0c;我直接举出来一个列子“258”&#xff0c;直接套用多重for循环遍历可以罗列出来&#xff0c;但是根据数字组合的长度不能确定for循环的多少&#xff08;除非把所有for循环个数情况都罗列一遍&#xff09; 所以只能…