【Linux】生产消费模型 + 线程池

news2024/11/23 9:05:35

文章目录

  • 📖 前言
  • 1. 生产消费模型
  • 2. 阻塞队列
    • 2.1 成员变量:
    • 2.2 入队(push)和出队(pop):
    • 2.3 封装与测试运行:
      • 2.3 - 1 对代码进一步封装
      • 2.3 - 2 分配运算任务
      • 2.3 - 3 测试与运行
  • 3. 循环阻塞队列
    • 3.1 POSIX信号量:
      • 3.1 - 1 sem_init / sem_destroy
      • 3.1 - 2 sem_wait
      • 3.1 - 3 sem_post
    • 3.2 成员变量:
    • 3.3 生产消费:
    • 3.4 构造与析构:
    • 3.5 测试:
  • 4. 线程池(懒汉模式)
    • 4.1 单例模式复习:
    • 4.2 成员变量:
    • 4.3 构造和析构:
    • 4.4 两次nullptr判断:
    • 4.5 线程池启动:
    • 4.6 封装加锁/解锁/通知线程等操作:
    • 4.7 测试:

📖 前言

上一章节我们学习了线程的同步与互斥,学习了互斥锁和条件变量的使用。本章我们将学习编程的一个重要模型,生产者消费者模型,并且运用之前学的线程同步和互斥的相关接口来实现阻塞队列和环形队列,最后再来实现一个简易的线程池。
目标已经确定,准备开讲啦……


1. 生产消费模型

在这里插入图片描述
生产者消费者模型是同步与互斥的最典型的应用场景:(重新认识条件变量)

  • 消费者在消费期间,不妨碍工厂去生产,工厂在生产期间不影响消费者消费。
  • 因为超市的存在,消费者和工厂间不再是强耦合的关系,而是一种解耦的关系。

1.消费者有多个,消费者之间是什么关系呢?

  • 竞争关系 —— 互斥

2.供应商有多个,供应商之间是什么关系呢?

  • 竞争关系 —— 互斥

3.消费者和供应商之间又是什么关系呢?

  • 互斥关系,同步关系

除了要保证临界资源的安全性之外,还要保证生产消费过程中的合理性。

  • 如果只有互斥的情况,那么生产者、消费者都要来轮询检测。
  • 通过互斥的方式,效率太低了,不合理。
  • 生产和消费应该要有一定的顺序,消费完了再生产,生产满了再消费。

3 2 1 原则:

  • 生产者和生产者(互斥)消费者和消费者(互斥)生产者和消费者(互斥 / 同步):3种关系
  • 生产者和消费者:线程承担的2种角色
  • 超市:内存中特定的一种内存结构(数据结构):1个交易场所

2. 阻塞队列

基于生产者和消费者模型的阻塞队列。

设计的这个队列要保证,队列元素如果为满的情况下,就不能让生产者生产了,如果为空的情况下,就不能让消费者来消费了,那么这个的队列就称作为阻塞队列。

生产接口:

  • 纯互斥的话,先进行加锁,再判断队列满了没。
  • 如果满了就不生产,然后解锁,之后退出。
  • 只是接口调用完成了,但是这个线程下次会又跑过来了,就又先加锁,再判断满不满足生产,如果满了就不生产,然后解锁,之后退出。
  • 因为优先级比较高,导致了就在这里重复,不断地申请锁,释放锁,导致消费者申请不到锁。

这就是纯互斥,生产者一直在抢占锁,而导致消费线程的饥饿。同样的道理,消费线程也是如此。

这种场景没错,但是不合理:

  • 我们是需要有一个条件变量的方式,让双方能够进行在特定条件不满足的时候,进入不生产并且还休眠的状态。
  • 同样的让消费者在消费的时候,不满足消费条件时,也进行休眠, 让双方彼此唤醒对方。
  • 这种就叫做同步式的阻塞队列。

2.1 成员变量:

既然是阻塞队列,再结合线程互斥与同步来维护该队列:

  • 首先我们需要一个队列来将对象存入(队列就好比是超市)。
  • 我们还需要用于访问控制的互斥锁,在同一时刻只能有一个线程访问队列。
  • 我们需要两个用户线程同步的条件变量,因为我们需要在不同的条件下通知的线程(生产者or消费者)。

代码演示:

#pragma once

#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>

// 默认容量大小
const uint32_t gDefaultCap = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(uint32_t cap = gDefaultCap) 
        : cap_(cap)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&conCond_, nullptr);
        pthread_cond_init(&proCond_, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&conCond_);
        pthread_cond_destroy(&proCond_);
    }
private:
    uint32_t cap_;           // 容量
    queue<T> bq_;            // blockqueue
    pthread_mutex_t mutex_;  // 保护阻塞队列的互斥锁
    pthread_cond_t conCond_; // 让消费者等待的条件变量
    pthread_cond_t proCond_; // 让生产者等待的条件变量
};

我们这里用的是C++的类模板,阻塞队列里的内容就可以相对灵活一些了。

2.2 入队(push)和出队(pop):

阻塞队列类内函数:

入队(push):

// 生产接口
void push(const T &in) // const &: 纯输入
{
    // 先把队列锁住
    lockQueue();
    while (isFull()) // ifFull就是我们在临界区中设定的条件
    {
        proBlockWait();      
    }

    // 条件满足,可以生产
    pushCore(in); // 生产完成
    // wakeupCon(); // 唤醒消费者
    
    // 把队列解锁
    unlockQueue();
    wakeupCon(); // 生产完了,生产者就要唤醒消费者
}

生产之前要判断判断,是否适合生产:

  • bq是否为满,程序员视角的条件
    • 满(不生产)
    • 不满(生产)
  • if(满)不生产(不仅仅要不生产),休眠(更要休眠),休眠期间消费线程就可以去申请锁了。
  • else if(不满)生产,唤醒消费者。

为什么要用while判断而不用if判断:

  • 等待条件变量前:当我等待的时候,会自动释放mutex_(因为不能拿着锁去等)。
  • 阻塞等待,等待被唤醒。
  • 被唤醒 != 条件被满足 (概率虽然很小),要做到:被唤醒 && 条件被满足
  • 因为一些原因导致了被伪唤醒了:
    • 有可能是系统的问题,也有可能是代码本身有问题。
    • 但是要保证代码的健壮性,继续向后执行。
  • 当我醒来的时候,我是在临界区里醒来的!!
    • 当线程被伪唤醒后,它会重新参与调度并尝试获取锁。
    • 如果其他线程已经持有了锁,并且没有释放,那么伪唤醒的线程将无法获得锁,它需要继续等待或者重新检查条件是否满足。
  • 从哪里被阻塞,就要从哪里醒来,醒来之后就是相当于没有锁就访问临界资源了。

先解锁还是先唤醒,以生产者为例:

  • 当消费者在解锁之前被唤醒时:
    • 生产完成,把消费者唤醒了,然后生产者被切走并且没有释放锁。
    • 消费者会在条件变量里被唤醒,然后去争锁,但是争不到,要等待(因为生产者的锁没解锁)。
    • 这次等待不在条件变量下去等了,而是在申请互斥锁上等。
    • 一旦生产者切回来,解锁以后,消费者直接会竞争锁成功。
  • 当消费者在解锁之后被唤醒时:
    • 一旦解锁了,唤醒消费者,消费者就会立马能够从pthread_cond_wait里返回并且把锁重新持有,接下来进行后续操作,进行消费。
    • 如果当刚解锁时,还没有唤醒消费者。
    • 那么此时其他消费者可能把锁拿走了,该消费者线程竞争锁失败了(在申请锁当中去等了)。
    • 其他消费者忙自己的事情,会自己释放锁的。

出队(pop):

// 消费接口
T pop()
{
    // 先把队列锁住
    lockQueue();
    
    while (isEmpty())
    {
        conBlockwait(); // 阻塞等待,等待被唤醒,?
    }

    // 条件满足,可以消费
    T tmp = popCore();

    // 把队列解锁
    unlockQueue();
    wakeupPro(); // 消费完了,消费者就要唤醒生产者

    return tmp;
}

消费之前要判断是否适合消费:

  • bq是否为空,程序员视角的条件:
    • 空(不消费)
    • 有(消费)
  • if(空)不消费,休眠。
  • else if(有)消费,唤醒生产者。

消费接口唤醒生产者和解锁顺序同上生产者操作。

2.3 封装与测试运行:

2.3 - 1 对代码进一步封装

为了代码的可读性,也是为了以后能够修改方便,我们对加锁,条件变量等进行了封装:

void lockQueue()
{
    pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
    pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
    return bq_.empty();
}
bool isFull()
{
    return bq_.size() == cap_;
}

// 生产者进行阻塞等待
void proBlockWait() // 生产者一定是在临界区中的!
{
    // 1. 在阻塞线程的时候,会自动释放mutex_锁
    pthread_cond_wait(&proCond_, &mutex_);
}

// 消费者进行阻塞等待
void conBlockwait() // 阻塞等待,等待被唤醒
{
    // 1. 在阻塞线程的时候,会自动释放mutex_锁
    pthread_cond_wait(&conCond_, &mutex_);
    // 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回
}

// 唤醒生产者
void wakeupPro()
{
    // 一定要在生产者所在的条件变量下唤醒
    pthread_cond_signal(&proCond_);
}

// 唤醒消费者
void wakeupCon()
{
    // 一定要在消费者所在的条件变量下唤醒
    pthread_cond_signal(&conCond_);
}

// 生产完成
void pushCore(const T &in)
{
    bq_.push(in);
}

// 消费
T popCore()
{
    T tmp = bq_.front();
    bq_.pop();

    return tmp;
}

2.3 - 2 分配运算任务

因为阻塞队列我们实现的时候是用了类模版,所以我们可以给队列分配Task对象(任务)

#pragma once

#include <iostream>
#include <string>

using namespace std;

class Task
{
public:
    Task() : elemOne_(0), elemTwo_(0), operator_('0')
    {
    }
    Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
    {
    }
    int operator() ()
    {
        return run();
    }
    int run()
    {
        int result = 0;
        switch (operator_)
        {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/':
        {
            if (elemTwo_ == 0)
            {
                cout << "div zero, abort" << endl;
                result = -1;
            }
            else
            {
                result = elemOne_ / elemTwo_;
            }
        }
        break;
        case '%':
        {
            if (elemTwo_ == 0)
            {
                std::cout << "mod zero, abort" << std::endl;
                result = -1;
            }
            else
            {
                result = elemOne_ % elemTwo_;
            }
        }
        break;
        default:
            cout << "非法操作: " << operator_ << endl;
            break;
        }
        return result;
    }

    // 输出型参数
    int get(int *e1, int *e2, char *op)
    {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;
    }
private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};

2.3 - 3 测试与运行

生产者生产任务并放入到阻塞队列当中:

void *productor(void *args)
{
    BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        // 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户
        int one = rand() % 50;
        int two = rand() % 20;
        char op = ops[rand() % ops.size()];
        Task t(one, two, op);

        // 2. 生产任务
        bqp->push(t);
        cout << "producter[" << pthread_self() << "] " 
             << (unsigned long)time(nullptr) << " 生产了一个任务: " 
             << one << op << two << "=?" << endl;
            
        sleep(1);
    }
}

消费者从队列里拿任务,并执行任务:

const std::string ops = "+-*/%";

void *consumer(void *args)
{
    BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        Task t = bqp->pop(); // 消费任务
        int result = t();    // 处理任务 --- 任务也是要花时间的!
        int one, two;
        char op;
        t.get(&one, &two, &op);
        cout << "consumer[" << pthread_self() << "] " 
             << (unsigned long)time(nullptr) << " 消费了一个任务: " 
             << one << op << two << "=" << result << endl;
    }
}
int main()
{
    // 生产者用来生产计算任务,消费者用来消费计算任务
    BlockQueue<Task> bq;

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, &bq);
    pthread_create(&p, nullptr, productor, &bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    
    return 0;
}

于是就实现了,生产一个任务消费一个任务:

在这里插入图片描述
生产者生产任务的时候和消费者消费任务的时候是并发执行的:(重点)

  • 并发并不是在交易场所中并发。
  • 并不是在临界区中并发(一般而言),而是生产前(before blockqueue),消费后(after blockqueue)对应的并发。
  • 在消费的同时也在制作任务,并发体现就在这里。

消费者必须按照生产的节奏来走,和管道一样,写得慢那么读的也慢(得有访问控制,互斥同步机制)。
解耦体现在生产者生产的任务,可以通过阻塞队列派发给消费者。

生产和消费的速度不一致,如何理解?

  • 并不仅仅指的是,在阻塞或者环形队列,里忙闲不均速度不均。
  • 更重要指的是,生产者生产制作一个任务,和消费者把一个任务全都处理完。
  • 这两个的时间是不一样的。

生产消费的交易场所就是一个内存,这个内存具体呈现的是:队列、双端队列、环形队列、可能是其他结构用来资源数据或者任务交换的。


3. 循环阻塞队列

在我们之前学习数据结构的时候,我们学习过环形队列,【环形队列复习】。

生产消费模型用上了循环队列之后,就会有一个很大的优势:

  • 因为生产者和消费者访问的(假设是数组实现的循环队列)是不同下标位置
  • 这二者访问的并非同一块内存空间,所以这就实现了同时访问
  • 这样就更加体现了生产消费的并发属性

对比与需求:

  • 之前学的queue是整体被使用的,没法被切割。
  • 现在想做一个公共的临界资源,但是这个临界资源可以被划分成不同的区域,要用信号量将这些区域保护起来。
  • 所以要写一个基于固定大小的环形队列。

此时就相当于把循环队列这个临界资源分成了一小块一小块,只有满或空的时候,头指针和尾指针才会指向同一块数组空间,其他时间都是不冲突的!

访问同一个位置有可能吗?答:有可能!什么时候会发生呢?

  • 两个指针指向同一个位置的时候,只有满或空的时候!(互斥和同步)
  • 其他时候,都指向的是两个不同的位置!(并发)
  • 让生产者和消费者同时访问数组的不同区域就可以让它俩同时进行生产和消费。

多线程情况下根本就不用考虑队列为满还是为空,因为信号量帮我们考虑。

3.1 POSIX信号量:

在之前的共享内存的学习中,我们简单的提到过信号量 传送门,信号量本质上是一个计数器,是一个描述临界资源数量的计数器。

保证不会在限有资源的情况下让多的线程进入到临界区对临界资源的访问。通过信号量来限制进入临界资源当中的线程的个数。

  • P操作:申请资源(原子的)
  • V操作:归还资源(原子的)
  • 临界资源可以当成整体,可以不可以看做一小部分一小部分呢?
  • 可以,是由应用场景决定的。
  • 信号量申请成功了,就一定能保证你会拥有一部分临界资源吗?
  • 只要信号量申请成功,那么一定会获得指定的资源。

持有0和1的信号量叫做,二元信号量 == 互斥锁

信号量:1

  • P — 1->0 — 加锁
  • V — 0->1 — 释放锁

小结:

  • 如果保证了信号量是多个,那么就可以保证临界资源被划分了不同的区域。
  • 所以此时每个线程想进入这个区域,就得先申请信号量,只要申请成功了,这个资源一定给你了。
  • 信号量是个计数器,用来衡量临界资源当中,资源数目的,申请信号量的本质叫做预定某种资源。
  • 当申请信号量成功的时候,这个信号量对应的资源才可以被唯一的使用。

3.1 - 1 sem_init / sem_destroy

初始化一个未命名的信号量:

在这里插入图片描述
销毁信号量:

在这里插入图片描述

3.1 - 2 sem_wait

在这里插入图片描述
介绍:

  • sem_wait是一个信号量操作函数,用于请求和等待信号量的可用性。
  • 它的作用是尝试获取信号量,如果信号量的值大于0,则将信号量的值减1,并立即返回。
  • 如果信号量的值为0,则当前线程会被阻塞,直到有其他线程释放信号量。

这个接口和锁 / 条件变量那里的等待是一样的,可以简单理解为,这个接口就是让信号量减减。

3.1 - 3 sem_post

在这里插入图片描述
介绍:

  • sem_post 是一个信号量操作函数,用于释放或增加信号量的值。
  • 它的作用是将信号量的值加1,并唤醒可能因为等待信号量而被阻塞的线程。

sem_postsem_wait是一对重要的信号量操作函数,用于实现并发控制和临界区的进入与退出。
通过调用sem_post来释放信号量,可以让其他线程获取信号量进入临界区,从而实现资源的共享和同步。

3.2 成员变量:

有了上述知识,我们就能可以来着手实现了:

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>

using namespace std;

// 默认容量
const int gCap = 10;

template <class T>
class RingQueue
{
private:
    vector<T> ringqueue_; // 环形队列
    sem_t roomSem_;       // 衡量空间计数器,productor
    sem_t dataSem_;       // 衡量数据计数器,consumer
    uint32_t pIndex_;     // 当前生产者写入的位置,如果是多线程,pIndex_也是临界资源
    uint32_t cIndex_;     // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源

    pthread_mutex_t pmutex_;
    pthread_mutex_t cmutex_;
};

除了两个信号量,生产消费的时候,还需要操生产和消费这两个指针,指向队列正确的位置。

3.3 生产消费:

操作的基本原则:

  • 空:消费者不能超过生产者,【生产者先行】:
    • 消费者前面的数据根本没有,如果超过去读取读到的全都是废弃的数据。
  • 满:生产者不能把消费者套一个圈,继续在往后写入,【消费者先行】:
    • 就会把曾经生产出来的,消费者还没来得及消费的数据就覆盖掉了。

生产者:最关心的是什么资源?

  • 空间 N:[N,0] 从N到0的过程

消费者:最关心的是什么资源?

  • 数据 N:[0,N] 从0到N的过程

代码演示:

// 生产 -- 先申请信号量
void push(const T &in)
{
    // 申请信号量在锁前面的话,如果是多线程,那么多个线程都可以申请到资源
    // 然后再去争锁
    sem_wait(&roomSem_); // 如果锁加在前面的话,信号量就无法被多次的申请(P操作)

    // 在锁这里等时,每个线程都是拿着信号量去等
    pthread_mutex_lock(&pmutex_);

    ringqueue_[pIndex_] = in; // 生产的过程,有线程安全的问题
    pIndex_++;   // 写入位置后移
    pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征

    pthread_mutex_unlock(&pmutex_);
    sem_post(&dataSem_); // V操作
}

// 消费
T pop()
{
    sem_wait(&dataSem_); // 申请数据资源
    pthread_mutex_lock(&cmutex_);

    T temp = ringqueue_[cIndex_]; // 消费
    cIndex_++;
    cIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征

    pthread_mutex_unlock(&cmutex_);
    sem_post(&roomSem_); // 数据已拿走,空间就露出来了,空间多了一个

    return temp;
}

生产者和消费者都为空的时候,一定能保证生产线程先运行,因为一开始消费线程的数据信号量一开始为0,sem_wait(&dataSem_)函数一开始要阻塞等待。

在这里插入图片描述
两个线程各自申请各自所关心的资源,各自释放对方所关心的资源,那么此时这两个就可以互相的互调,协同起来了。

环形队列的使用:(重点)

  • 生产者生产时:空间多了一个,申请了一个空间(空间信号量 - 1),数据信号量 + 1。
  • 消费者消费时:空间少了一个,释放了一个空间(空间信号量 + 1),数据信号量 - 1。

因为有信号量帮我们做了访问控制,所以我们不需要判断循环队列什么时候为满,什么时候为空:

  • 队列为满时,空间信号量为0,生产者无法申请空间。
  • 生产者无法生产,会在空间信号量里面等待,不会继续生产,消费者继续消费。
  • 队列为空的时候,空间信号量为满,数据信号量为0,没有可以消费的数据。
  • 消费者无法消费,会在数据信号量里面等待,不会继续消费,生产者继续生产。

3.4 构造与析构:

RingQueue(int cap = gCap)
    : ringqueue_(cap), pIndex_(0), cIndex_(0)
{
    // 生产(空间信号量)
    sem_init(&roomSem_, 0, ringqueue_.size());
    
    // 消费(数据信号量)
    sem_init(&dataSem_, 0, 0);

    pthread_mutex_init(&pmutex_ ,nullptr);
    pthread_mutex_init(&cmutex_ ,nullptr);
}

~RingQueue()
{
    // 销毁信号量计数器
    sem_destroy(&roomSem_);
    sem_destroy(&dataSem_);

    pthread_mutex_destroy(&pmutex_);
    pthread_mutex_destroy(&cmutex_);
}

3.5 测试:

#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>

void *productor(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rand()%10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
        // sleep(1);
    }
}

void *consumer(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
        sleep(1);
    }
}

int main()
{
    srand((unsigned long)time(nullptr)^getpid());

    RingQueue<int> rq;

    pthread_t c1,c2,c3, p1,p2,p3;
    pthread_create(&p1, nullptr, productor, &rq);
    pthread_create(&p2, nullptr, productor, &rq);
    pthread_create(&p3, nullptr, productor, &rq);
    pthread_create(&c1, nullptr, consumer, &rq);
    pthread_create(&c2, nullptr, consumer, &rq);
    pthread_create(&c3, nullptr, consumer, &rq);


    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    return 0;
}

在这里插入图片描述

环形队列允许生产和消费同时进入临界区,没问题,只要不同时访问同一个位置就可以,但是如果是多生产多消费,那么就必须维护生产者和生产者之间,消费者和消费者之间的互斥关系。
生产者和生产者之间争一个出来访问环形队列, 消费者和消费者之间争一个出来访问环形队列。
只允许一个线程进入临界资源写入,只允许一个线程从临界资源当中读取。


4. 线程池(懒汉模式)

我们只需要把任务交到这个线程的池子里面,其就能帮我们多线程执行任务,计算出结果。

当任务来时才创建线程,这个成本有点高,如果提前先把各种池化的东西准备好,等任务来的时候,直接把任务指派给某个线程。

无论是进程池还是线程池,本质上都是一种对于执行流的预先分配,当有任务时,直接指定,而不需要创建进程/线程来处理任务。

4.1 单例模式复习:

在我们之前学过的单例模式分为两种,一种是懒汉模式,一种是饿汉模式 [传送门] 。

  • 懒汉:刚开始先不创建对象,等第一次使用的时候再去创建。
    • 缺点:是第一次创建对象需要等待。
    • 优点:是程序启动快。
  • 饿汉:在main函数之前就将对象创建出来。
    • 缺点:是程序启动会比较慢。
    • 优点:是启动之后获取对象会比较快。

4.2 成员变量:

用懒汉模式实现一个线程池:

#pragma once

#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Log.hpp"
#include "Lock.hpp"

using namespace std;

int gThreadNum = 5;

template <class T>
class ThreadPool
{
private:
    bool isStart_; // 表示是否已经启动
    int threadNum_;
    queue<T> taskQueue_;
    pthread_mutex_t mutex_;
    pthread_cond_t cond_;

    // 改成懒汉模式
    static ThreadPool<T> *instance;
    const static int a = 100;
};

因为不用关心线程的退出信息,也不需要对线程进行管理,在创建好线程之后,直接detach分离即可。

static变量我们需要在类外初始化,模板类型还需要带上template关键字:

template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;

4.3 构造和析构:

构造:

private:
    ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
    {
        assert(threadNum_ > 0);
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    
    // 将拷贝构造和赋值重载删掉
    ThreadPool(const ThreadPool<T> &) = delete;
    void operator=(const ThreadPool<T>&) = delete;

因为是懒汉模式的单例,提供一个指针作为单例,不对外开放构造函数。
同时,用delete关键字,禁止拷贝构造和赋值重载。

析构:

~ThreadPool()
{
    pthread_mutex_destroy(&mutex_);
    pthread_cond_destroy(&cond_);
}

4.4 两次nullptr判断:

static ThreadPool<T> *getInstance()
{
    static Mutex mutex;
    if (nullptr == instance) // 仅仅是过滤重复的判断
    {
        LockGuard lockguard(&mutex); // 进入代码块,加锁。退出代码块,自动解锁。
        if (nullptr == instance)
        {
            instance = new ThreadPool<T>();
        }
    }

    return instance;
}
  • 第一个判断是为了保证单例,只要单例对象存在了,就不再创建单例对象了。
  • 第二个判断是保证线程安全,可能会出现线程A在创建单例,线程B在申请锁中等待的情况。
  • 此时如果不进行第二次nullptr判断,线程B从锁中被唤醒后,又会继续执行,多创建了一个单例对象!

4.5 线程池启动:

处理任务:

static void *threadRoutine(void *args) // args收到了类内指针
{
    pthread_detach(pthread_self());

    // 此时就拿到了线程池对象指针
    ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
    prctl(PR_SET_NAME, "follower");
    while (1)
    {
        tp->lockQueue();
        // 处理任务
        while (!tp->haveTask())
        {
            tp->waitForTask();
        }
        // 这个任务就被拿到了线程的上下文中
        T t = tp->pop();
        tp->unlockQueue();

        // for debug
        int one, two;
        char oper;
        t.get(&one, &two, &oper);
        // 规定,所有的任务都必须有一个run方法
        Log() << "新线程完成计算任务: " << one << oper << two << "=" << t.run() << "\n";
    }
}

void start()
{
    // 作为一个线程池,不能被重复启动
    assert(!isStart_);
    for (int i = 0; i < threadNum_; i++)
    {
        pthread_t temp;
        pthread_create(&temp, nullptr, threadRoutine, this);
    }
    isStart_ = true;
}
  • 类内成员,成员函数,都有默认参数this,类内要是想把线程搞起来,只能是static。
  • static成员函数,无法访问类内成员函数和成员变量,只能通过接口来访问。

4.6 封装加锁/解锁/通知线程等操作:

private:
    void lockQueue() { pthread_mutex_lock(&mutex_); }
    void unlockQueue() { pthread_mutex_unlock(&mutex_); }
    bool haveTask() { return !taskQueue_.empty(); }
    void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
    void choiceThreadForHandler() { pthread_cond_signal(&cond_); }

    T pop()
    {
        T temp = taskQueue_.front();
        taskQueue_.pop();
        return temp;
    }

4.7 测试:

makefile中新的用法,可以更加泛型编程:

CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=threadpool
src=ThreadPoolTest.cc

$(bin):$(src)
	$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:
	rm -f $(bin)

测试代码:

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <thread>

// 如何对一个线程进行封装, 线程需要一个回调函数,支持lambda
// class tread{
// };

int main()
{
    // 给线程改名字
    prctl(PR_SET_NAME, "master");

    const string operators = "+/*/%";

    // unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());// 懒汉模式之后这个就不能用了

    unique_ptr<ThreadPool<Task>> tp(ThreadPool<Task>::getInstance());
    tp->start();

    srand((unsigned long)time(nullptr));

    // 派发任务的线程
    while (true)
    {
        int one = rand() % 50;
        int two = rand() % 10;
        char oper = operators[rand() % operators.size()];
        Log() << "主线程派发计算任务: " << one << oper << two << "=?"
              << "\n";
        Task t(one, two, oper);
        tp->push(t);
        sleep(1);
    }

    return 0;
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1036467.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能生成内容AIGC:AIGC for Various Data Modalities: A Survey

论文作者&#xff1a;Lin Geng Foo,Hossein Rahmani,Jun Liu 作者单位&#xff1a;Singapore University of Technology and Design (SUTD); Lancaster University 论文链接&#xff1a;http://arxiv.org/abs/2308.14177v1 内容简介&#xff1a; 人工智能生成内容&#xff…

(十)VBA常用基础知识:worksheet的各种操作之sheet复制

当前sheet确认 2.Copy Before&#xff1a;将复制的sheet放到指定sheet前边 Sub Hello()6 Copy Before把sheet6拷贝到sheet3前边Worksheets("Sheet6").Copy Before:Worksheets("Sheet3") End Sub3.Copy After&#xff1a;将复制的sheet放到指定sheet后边 …

Unity Shader 透明度效果

游戏中有以下两种达到透明度效果&#xff1a; 1.透明度测试 只要一个片元的透明度不满足条件&#xff08;通常小于某个阈值&#xff09;&#xff0c;那么就舍弃对应的片元。被舍弃的片元不会进行任何的处理&#xff0c;也不会对颜色缓冲产生任何影响。否则就会按照普通的不透…

P3842 [TJOI2007] 线段

[TJOI2007] 线段 - 洛谷 #include<bits/stdc.h> using namespace std; const int N2e410; int n; int f[N][2],a[N][2]; int dis(int a,int b) {return abs(a-b); } int main() {scanf("%d",&n);for(int i1;i<n;i)scanf("%d %d",&a[i][0]…

MySQL学习笔记12

MySQL 查询语句&#xff1a; 1、查询五子句&#xff1a;&#xff08;重点&#xff09; mysql> select */字段列表 from 数据表名称 where 子句 group by 子句 having 子句 order by 子句 limit 子句; 1&#xff09;where 子句&#xff1b;条件筛选。 2&#xff09;group…

Arch挂载错误

临时解决方案&#xff1a;手动挂载 到 /run/media/sonichy 目录打开终端 sudo mkdir DATA sudo mount /dev/sda5 /run/media/sonichy/DATA

微信CRM系统在旅游行业的应用

旅游业目前存在的问题 1. 产品同质化严重 各大旅游企业推出的产品雷同率高&#xff0c;缺乏创新性与唯一性&#xff0c;旅游景点的宣传方式和体验也大都雷同&#xff0c;客户在选择去旅游的时候会对比价格问题&#xff0c;哪里价格低去哪里。 2. 获客成本高 国内旅游景点众…

光电探测器指标分析

先来看一下一个光电探测器的数据手册 第一个光电二极管类型 常用的是PIN管和APD管&#xff0c;两种管子各有优劣 PIN&#xff1a;光电二极管&#xff08; Photo Diode&#xff09;&#xff0c;当半导体中的PN结受到光照射&#xff0c;且入射光能量高于光电二极管的带隙能时&am…

R语言贝叶斯广义线性混合(多层次/水平/嵌套)模型GLMM、逻辑回归分析教育留级影响因素数据...

全文下载链接&#xff1a;http://tecdat.cn/?p24203 本教程使用R介绍了具有非信息先验的贝叶斯 GLM&#xff08;广义线性模型&#xff09; &#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 当前教程特别关注贝叶斯逻辑回归在二元结果和计数/比例结果场景中的…

.NET 8 性能比 .NET 7 大幅提升

微软 .NET 开发团队的工程师 Stephen Toub 发表博客《Performance Improvements in .NET 8》&#xff0c;详细介绍了 .NET 8 中的性能改进。 介绍了 .NET 8 的性能表现&#xff0c;包括 JIT、原生 AOT、VM、GC、Mono、线程、文件 I/O、网络、JSON 处理、日志等。 .NET 7 was s…

数据结构与算法基础-(2)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

大模型的最大bug,回答正确率几乎为零,GPT到Llama无一幸免

目录 前言 1.名字和描述颠倒一下&#xff0c;大模型就糊涂了 2.实验及结果 3.未来展望 前言 大模型的逻辑&#xff1f;不存在的。 我让 GPT-3 和 Llama 学会一个简单的知识&#xff1a;A 就是 B&#xff0c;然后反过来问 B 是什么&#xff0c;结果发现 AI 回答的正确率竟然是…

Java正则表达式解析复杂跨行日志

Java正则表达式解析复杂跨行日志 解析内容正则使用完整代码 使用正则表达式解析日志 解析内容 String content "2023-09-23 11:31:54.705 INFO [ main] com.zlm.tools.ToolsApplication : Starting ToolsApplication using Java 1.8.0_201 on \n&qu…

SVG 基本语法

1. 概述 svg为可缩放矢量图形&#xff0c;使用 XML 格式定义图像。 2. 基础图形 2.1 矩形 &#xff08;1&#xff09; 基础语法 <rect x"20" y"20" rx"20" ry"20" width"150" height"100" fill"red&qu…

10.4Cookie和Session

一.概念: 二.相关方法: SendCookie: import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servle…

HDLBits-Edgedetect

刚开始写的代码如下&#xff1a; module top_module (input clk,input [7:0] in,output [7:0] pedge );reg [7:0] in_pre;always (posedge clk)begin in_pre < in;endassign pedge in & ~in_pre; endmodule但是提交结果是错误的。猜想原因如下&#xff1a; assign p…

关于地址存放的例题

unsigned int a 0x1234; unsigned char b *(unsigned char*)&a; 上面代码大端存储和小端存储的值分别是多少&#xff1f; 大端存储的是把高位地址存放在低位地址处&#xff0c;低位存放到高位。小端是高位存放在高位&#xff0c;低位在低位。因为a是整型&#xff0c;所…

Python 逢七拍手小游戏

"""逢七拍手游戏介绍&#xff1a;逢七拍手游戏的规则是&#xff1a;从1开始顺序数数&#xff0c;数到有7&#xff0c;或者是7的倍数时&#xff0c;就拍一手。例如&#xff1a;7、14、17......70......知识点&#xff1a;1、循环语句for2、嵌套条件语句if/elif/e…

java框架-Springboot3-基础特性+核心原理

文章目录 java框架-Springboot3-基础特性核心原理profiles外部化配置生命周期监听事件触发时机事件驱动开发SPISpringboot容器启动过程自定义starter java框架-Springboot3-基础特性核心原理 profiles 外部化配置 生命周期监听 事件触发时机 事件驱动开发 Component public c…

竞赛 基于深度学习的目标检测算法

文章目录 1 简介2 目标检测概念3 目标分类、定位、检测示例4 传统目标检测5 两类目标检测算法5.1 相关研究5.1.1 选择性搜索5.1.2 OverFeat 5.2 基于区域提名的方法5.2.1 R-CNN5.2.2 SPP-net5.2.3 Fast R-CNN 5.3 端到端的方法YOLOSSD 6 人体检测结果7 最后 1 简介 &#x1f5…