零基础Linux_24(多线程)线程同步+条件变量+生产者消费模型_阻塞队列版

news2024/11/22 22:25:28

目录

1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

1.2 线程同步的概念

1.3 生产者消费者模型的优点

2. 线程同步的应用

2.1 条件变量的概念

2.2 条件变量操作接口

3. 生产者消费者模型_阻塞队列

3.1 前期代码(轮廓)

3.2 中期代码(简单使用)

BlockQueue.hpp:

3.3 生产者消费者模型高效的体现

3.4 后期代码(处理任务)

Task.hpp

ProdCon.cc

3.5 最终代码(RAII风格的锁)

lockGuard.hpp

BlockQueue.hpp

本篇完。


1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

以生活中消费者生产者为例:

生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。此时我们就是消费者,供应商就是生产者,而超市就是一个交易场所。

  • 将读取数据的线程叫做消费者线程
  • 将产生数据的线程叫做生产者线程
  • 将共享的特定数据结构叫做缓冲区

超市的供应商肯定不止一家,即使同一种商品的供应上也不止一家,不同牌子方便面的生产者它们之间的关系是竞争关系,竞争的表现就是互斥。

站在超市的角度,假设只有一块区域是买方便面的,当生产者来供货的时候,这块区域只能让一家供应商来供货,否则就会导致不同的东西混着放,对消费者来说很不友好。

  • 生产者线程和生产者线程之间是互斥关系
  • 在同一时间只能有一个生产者线程来访问缓冲区。

假设现在超市只有一包方便面了,但是同时来了好多消费者都要买方便面,此时这些消费者之间的关系也是竞争关系,我买上你就买不上了。所以当只有一包方便面的时候,只能有一个买方便面的消费者进入超市。

  • 消费者线程和消费者线程之间是互斥关系
  • 在同一时间只能由一个消费者线程来访问缓冲区。

再假设,超市的方便面卖完了,生产者正在给超市供货,而消费者也正在买方便面,那消费者到底买没买到方便面?有可能生产者刚把方便面搬下来,还没来及摆上去,那么消费者就没有买到,也由可能生产者把方便面摆上去了,那么消费者就买到了。所以最好的办法就是生产者供货的时候,不让消费者进来。

在Linux中,缓冲区存放的都是数据,数据是可以覆盖的,比如消费者线程在读取缓冲区中的数据时,数据是"hello world",刚刚读取完"hello",生产者线程把"world"改成了"rtx",那么消费者线程读取到的就成了"hello rtx",就出错了。所以最好的办法就是当消费者线程访问缓冲区的时候,生产者线程不能访问缓冲区。

  • 消费者线程和生产者线程之间是互斥关系
  • 在同一时间内只有一个线程可以访问缓冲区。

1.2 线程同步的概念

保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。(在计算机操作系统中,饥饿(Starvation)是指某些进程或线程由于资源分配不公平或调度策略不合理而无法获得所需资源或执行时间的现象。当某个进程或线程长时间无法满足其资源需求时,就会出现饥饿问题。)

就像上篇博客中抢票的互斥代码,在每个线程抢完票以后没有进行延时代替其他处理动作时,所有票都被一个线程抢了,其他线程没有机会抢。

由于竞争能力弱而缺乏调度的线程就处于饥饿状态。

而同步就是让所有线程按照一定顺序来抢票,做到尽量公平,避免线程饥饿问题产生。具体如何实现后面会详细讲解。

继续拿超市来说,生产者不能无休止的向超市中供货,否则消费者无法进来消费,最终方便面会放不下。同样,消费者也不能无休止来买方便面,否则生产者进不来,方便面就会卖完,而且没有人来供货。所以最好的办法就是生产者供货,当货架摆满了就不供货了,让消费者来买,当方便面卖完了再让生产者来供货,从而让消费者和生产者协同起来。

  • 消费者线程和生产者线程之间又是同步关系
  • 生产者线程和消费者线程按照一定顺序去访问缓冲区。

根据上面例子和分析,对于生产者消费模型的本质可以总结为123原则(非官方版)

  • 1个交易场所:一段特定结构的缓冲区(上面例子就是超市)
  • 2种角色:生产者和消费者(上面例子就是客人和供货商)
  • 3种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥+同步关系)。

1.3 生产者消费者模型的优点

有了超市这个交易场所,生产者只要给超市供大量的货即可,比如几万包方便面,不用关心是消费者什么时候来买,只需要专注自己的生成即可。

对于消费者而言,只需要直接去超市买方便面就行,也不用等待方便面的生产。

超市只需要做的就是方便面卖完了,告诉生产者来上货,然后告诉消费者来买。消费者和生产者完全独立,不存在任何交集。

生产者消费者模型实现了消费者线程和生产者线程之间的解耦。(低耦合,高内聚)

我们平时写的C/C++代码,如果将main函数看成是生产者,普通函数看出是消费者,那么它两就存在高度耦合。

比如main函数里调用func函数:当执行func函数的时候,main函数在等待,只有func执行完毕以后main函数才能继续执行下去。如果将这两个函数看出两个执行流,那么它们就存在高度耦合。

而生产者消费者模型就成功的让生产者执行流和消费者执行流解耦了,生产者只管向缓冲区生产数据,消费者只管从缓冲区消耗数据,不用关心对方的状态。

再比如大部分人在周一到周五上班,在周六日休息,上班时候时间比较少,去超市消费的人也比较少,由于消费者和生产者互斥,所以就可以让生产者在周一到周五的时候来上货。

当周六日消费者休息的时候,去超市消费的人就比较多,方便面也卖的比较快,但是由于生产者供货量足够,所以并不会因为买的人多了就不够了的情况。

生产者消费者模型有助于解决生产者线程和消费者线程忙闲不均的问题。因为缓冲区能够缓存一定量的数据。

再比如我们买东西肯定不会直接去找供应商,因为人家不零售,因为生产者如果零售的话,每次开机器就仅生成几包方便面,成本高,效率低。

对于消费者而言,直接去找生产者还需等待生成者完成商品生成,消耗时间成本高,效率也低。

生产者消费者模型提高了了生产者线程和消费者线程的执行效率。


2. 线程同步的应用

同步是为了让多线程按照一定顺序互斥访问临界资源,在上面的生产者消费者模型中,如何实现同步呢?就要涉及下面的条件变量。

2.1 条件变量的概念

条件变量:用来描述某种临界资源是否就绪的一种数据化描述

当一个线程互斥地访问某个临界资源时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如,存在一个共享的队列,生产者线程负责生产数据到队列中,消费者线程负责从队列中读取数据,当消费者线程发现队列为空时就不要再去竞争锁去访问了,而是应该等待,直到生产者线程将数据生成到队列中。

要想让消费者线程等待,就需要使用到条件变量。

那么条件变量是什么呢?继续拿超市举例:假设现在超出的架子上一次只放一包方便面,只有这包方便面被人买走了,才会放上新的方便面。

此时来了一堆消费者消费者都要买方便面,因为只有一包,所以只能去竞争了,那些竞争能力强的才能买上方便面,甚至不停的抢不停的买,此时那些竞争能力弱的消费者就会始终都买不到方便面。

竞争能力弱的消费者就会始终抢不到锁,就会产生饥饿问题。

为了维持秩序,超市的工作人员设置了一个等待区,所有消费者都在这里排队购买,方便面被摆出来了,工作人员让一个消费者进去买,没有摆出来就等着。如果消费者想买两包甚至多包,只能重新排队。

等待区及工作人员就相当于条件变量

多线程互斥访问一个临界资源时,为了让这些线程按照一定顺序访问,将这些线程都放在条件变量的等待队列中,当另一个线程让条件变量符合条件(唤醒线程)时,队列中的第一个线程就去访问临界资源。


2.2 条件变量操作接口

条件变量同样是由POSIX线程库维护的,所以使用的是POSIX标准,和互斥锁的接口非常相似。

创建条件变量:

pthread_cond_t cond;
  • 同样要加pthread_。同样是类似int a;
  • cond是英文condition的缩写。

条件变量的初始化,释放:man pthread_cond_init:

使用类似互斥锁,只是传递的参数是创建好的条件变量。

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

cond:要初始化的条件变量

attr:nullptr

返回值:成功返回0,失败返回错误码

man pthread_cond_wait:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • 第一个参数cond:创建的条件变量地址。
  • 第二个参数mutex:互斥锁的地址,这个必须有,后面再讲解为什么必须传锁。
  • 返回值:放入条件变量的等待队列成功返回0,失败返回错误码。

pthread_cond_wait的作用:调用该接口的线程放入传入条件变量的等待队列中。

唤醒条件变量等待队列中的一个线程:

man pthread_cond_signal:

int pthread_cond_signal(pthread_cond_t *cond);
  • 参数cond:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_signal作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。

唤醒条件变量等待队列中的所有线程:

int pthread_cond_broadcast(pthread_cond_t *cond);
  • 参数:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_broadcast作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。

将条件变量用到上一篇抢票的代码中,实现多线程按照一定顺序互斥抢票:

  •  创建全局的条件变量(后面就不创建成全局的了)。
  •  每个线程一抢上锁以后就进入条件变量的等待队列。
  •  主线程每个一秒钟唤醒一个等待的线程进行抢票。
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <cassert>
#include <thread>
#include <unistd.h>
#include <pthread.h>
using namespace std;

#define THREAD_NUM 4
int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题 -> 临界资源
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 创建条件变量

class ThreadData
{
public:
    ThreadData(const std::string &n,pthread_mutex_t *pm):tname(n), pmtx(pm)
    {}
public:
    std::string tname;
    pthread_mutex_t *pmtx;
};

void *getTickets(void *args)
{
    ThreadData *td = (ThreadData*)args;
    while(true) // 抢票逻辑
    {
        int n = pthread_mutex_lock(td->pmtx); // 加锁
        assert(n == 0);

        pthread_cond_wait(&cond, td->pmtx); // 进入等待队列
        // 临界区
        if(tickets > 0) // 判断的本质也是计算的一种
        {
            usleep(rand()%1500);
            printf("%s: %d\n", td->tname.c_str(), tickets);
            tickets--; // 也可能出现问题
            n = pthread_mutex_unlock(td->pmtx); // 解锁
            assert(n == 0);
        }
        else
        {
            n = pthread_mutex_unlock(td->pmtx);  // break之前解锁
            assert(n == 0);
            break;
        }
        
        usleep(rand()%2000); // 抢完票,其实还需要后续的动作
    }
    delete td;
    return nullptr;
}

int main()
{
    time_t start = time(nullptr);
    pthread_mutex_t mtx;
    pthread_mutex_init(&mtx, nullptr);

    pthread_t t[THREAD_NUM];

    for(int i = 0; i < THREAD_NUM; i++) // 多线程抢票的逻辑
    {
        std::string name = "thread ";
        name += std::to_string(i+1);
        ThreadData *td = new ThreadData(name, &mtx);
        pthread_create(t + i, nullptr, getTickets, (void*)td);
    }

    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond); // 唤醒一个等待线程
        cout << "main thread wakeup a new thread" << endl;
    }

    for(int i = 0; i < THREAD_NUM; i++)
    {
        pthread_join(t[i], nullptr);
    }

    pthread_mutex_destroy(&mtx);
    return 0;
}

此时就按照4 1 2 3 的顺序抢票了。

  • 每个线程都会被先挂起到等待队列中,等待主线程的唤醒。
  • 唤醒一个线程抢完票以后会继续进入等待队列,并且排在队列的后面。

如果不使用同步,就可能会只有一个线程在抢票,其他线程就会处于饥饿状态。

再放一份代码:(条件变量不再是全局的,还加了函数指针的方法让新线程执行不同的任务)

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

#define TNUM 4
typedef void (*func_t)(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;

// pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 可以定义成全局的,这样很多地方不用传参了,但是这里写正式点
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

class ThreadData
{
public:
    ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
        : name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
    {
    }

public:
    std::string name_;
    func_t func_;
    pthread_mutex_t *pmtx_;
    pthread_cond_t *pcond_;
};

void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        // wait一定要在加锁和解锁之间进行wait
        pthread_mutex_lock(pmtx);
        // if(临界资源是否就绪 -> 没就绪) pthread_cond_wait 但是现在没有这样的判断
        if (!quit) // 这个if加不加不重要
        {
            pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        }
        std::cout << name << " running  --  a播放" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}
void func2(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- b下载" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}
void func3(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- c刷新" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}
void func4(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- d扫描" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}

void *Entry(void *args) // 添加了一个类似软件层的东西,只是演示下
{
    ThreadData *td = (ThreadData *)args;         // td在每一个线程自己私有的栈空间中保存
    td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回
    delete td;
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx;
    pthread_cond_t cond;
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funcs[TNUM] = {func1, func2, func3, func4};
    for (int i = 0; i < TNUM; i++)
    {
        std::string name = "Thread ";
        name += std::to_string(i + 1);
        ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);
        pthread_create(tids + i, nullptr, Entry, (void *)td);
    }

    sleep(5); // 主线程和所有新线程都休眠五秒 -> 新线程阻塞等待

    // ctrl new thread
    int cnt = 10;
    while (cnt)
    {
        std::cout << "resume thread run code ...." << cnt-- << std::endl;
        pthread_cond_signal(&cond);
        sleep(1); // 每隔一秒唤醒一个线程,线程不用自己休眠了

        // pthread_cond_broadcast(&cond); // 全部唤醒 -> 设置条件变量有效
    }

    std::cout << "ctrl done" << std::endl; // 控制结束
    quit = true;
    pthread_cond_broadcast(&cond); // quit = true;后再全部唤醒一次

    for (int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        std::cout << "thread: " << tids[i] << "quit" << std::endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

这个代码只是演示一下不同的使用方法和场景、


3. 生产者消费者模型_阻塞队列

上图所示就是要实现的模型,有一个生产者线程,一个消费者线程,还有一个阻塞队列。

  • 阻塞队列这里使用C++STL容器中的queue来实现。
  • 阻塞队列是公共资源,所以要保证它的安全,线程A和线程B要互斥访问,只需要一把锁就能实现生产者和消费者,生产者和生产者,消费者和消费者之间的互斥。
  • 阻塞队列中有数据消费者才能读,此时生产者不能进行生产,生产者线程要进入它的等待队列中。
  • 阻塞队列中没有数据或者不满时,生产者才能进行生产,消费者在生产的时候不能读,要进入它的等待队列。

3.1 前期代码(轮廓)

先敲个轮廓出来就是这样的:(代码具体就不讲解了,看注释就行)

Makefile:

ProdCon:ProdCon.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ProdCon

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>

const int gDefaultCap = 7;

template <class T>
class BlockQueue
{
public:
    BlockQueue(int capacity = gDefaultCap) 
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    void push(const T& in)
    {
    }
    void pop(T* out)
    {
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

protected:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 保证队列安全
    pthread_cond_t _Empty; // 表示bq 是否空的条件
    pthread_cond_t _Full;  // 表示bq 是否满的条件
};

ProdCon.cc:(Producer consumer model)

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* consumer(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    while(true) // 获取任务
    {
        sleep(1);
    }

    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    while(true) // 制作任务
    {
        sleep(1);
    }

    return nullptr;
}

int main()
{
    BlockQueue<int> *bqueue = new BlockQueue<int>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;
    return 0;
}

3.2 中期代码(简单使用)

然后现在来写一写关键的pop和push:

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>

const int gDefaultCap = 7;

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap) : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    void push(const T& in) // 生产者
    {
        pthread_mutex_lock(&_mtx);

        // pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用while
        while(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件
        {
            pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待
            // 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?
            // 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放
            // 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的
            // 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
        }

        _bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的

        pthread_cond_signal(&_Empty); // 唤醒
        pthread_mutex_unlock(&_mtx); // 解锁
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_mtx);
        while (isQueueEmpty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }

        *out = _bq.front(); // 访问临界资源
        _bq.pop();

        pthread_cond_signal(&_Full); // 唤醒
        pthread_mutex_unlock(&_mtx); // 解锁
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

protected:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 用它来表示bq 是否空的条件
    pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

ProdCon.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* consumer(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    while(true) // 获取任务
    {
        int a = 0;
        bqueue->pop(&a);
        std::cout << "消费一个数据" << a << std::endl;
        sleep(1);
    }
    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    int a = 1;
    while(true) // 制作任务
    {
        bqueue->push(a);
        std::cout << "生产一个数据" << a++ << std::endl;
        // sleep(1); // 两个换着sleep看看能不能看到约束的效果
    }
    return nullptr;
}

int main()
{
    BlockQueue<int> *bqueue = new BlockQueue<int>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;
    return 0;
}

编译运行试试效果:

此时就实现了生产者生产一批数据,然后两个线程一人消费一个,然后再生成,再消费。

把代码里生产者和消费者的sleep注释换一下:(此时就应该是消费者跟着生产者走了)

编译运行:

也成功达到了预想的效果。

如果不想类似生产一个消费一个的话还可以在生产者或者消费者里的唤醒线程前加if判断语句,比如:在生产者线程里:if(_bq.size() >= _capacity / 2) pthread_cond_signal(&_Empty);


3.3 生产者消费者模型高效的体现

前面在分析生产者消费者模型时,一直都在说该模型高效,那么到底体现在什么地方呢?

多个生产者线程向阻塞队列中生成数据,多个消费者线程从阻塞队列中消费数据。

该模型的三种关系决定了访问阻塞队列的线程同一时间只有一个。

尤其是上面代码现象中,消费和生产是一前一后的,对于阻塞队列的访问是串行的,凭什么说这个模型是高效的呢?

因为在生产者线程和消费者线程中,访问阻塞队列临界资源的代码都只有一条,只有临界区的代码才是串行访问的。

除了临界区的代码,其他部分代码所有线程都是并发执行的。

实际的线程中,临界区之外的代码会有很多,而且有可能会非常耗时,但是这些代码是可以多线程并发执行的,该模型的效率就会很高。

生产者消费者模型的高效体现在:非临界区的代码,多线程可以并发执行。

该模型的高效并不体现在对临界资源(阻塞队列)的访问上。


3.4 后期代码(处理任务)

生产者消费者模型实际上并不是仅仅用来生产消费整型数据的,它更多的是处理任务的。

这里创建一个计算任务的类Task(这里写在了一个头文件下,只弄了加法)。在类中的仿函数调用回调函数执行具体的计算逻辑,还有一个显示任务的接口。把传给阻塞队列的int传成Task

BlockQueue.hpp和前面一样

Task.hpp

#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:
    Task() 
    {}
    Task(int x, int y, func_t func)
        : _x(x)
        , _y(y)
        , _func(func)
    {}

    int operator()()
    {
        return _func(_x, _y);
    }

public: // 不想写get接口就直接弄公有了
    int _x;
    int _y;
    // int type;
    func_t _func;
};

ProdCon.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

inline int myAdd(int x, int y)
{
    return x + y;
}
void* consumer(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while(true) // 获取任务
    {
        Task t;
        bqueue->pop(&t);
        std::cout << pthread_self() <<" consumer: "<< t._x << " + " << t._y << " = " << t() << std::endl;
        // sleep(1);
    }
    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    int a = 1;
    while(true) // 制作任务
    {
        int x = rand() % 10 + 1;
        usleep(rand() % 1000);
        int y = rand() % 5 + 1;
        Task t(x, y, myAdd);
        bqueue->push(t);
        std::cout <<pthread_self() <<" productor: "<< t._x << " + " << t._y << " = ?" << std::endl;
        sleep(1); // 两个换着sleep看看能不能看到约束的效果
    }
    return nullptr;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x777); // 只是为了更随机
    BlockQueue<Task> *bqueue = new BlockQueue<Task>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;
    return 0;
}

编译运行:


3.5 最终代码(RAII风格的锁)

ProdCon.cc和Task.hpp跟前面一样,

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* mtx) 
        :_pmtx(mtx)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmtx);
        std::cout << "进行加锁成功" << std::endl;
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmtx);
        std::cout << "进行解锁成功" << std::endl;
    }
    ~Mutex()
    {}
protected:
    pthread_mutex_t* _pmtx;
};

class lockGuard // RAII风格的加锁方式
{
public:
    lockGuard(pthread_mutex_t* mtx) // 因为不是全局的锁,所以传进来,初始化
        :_mtx(mtx)
    {
        _mtx.lock();
    }
    ~lockGuard()
    {
        _mtx.unlock();
    }
protected:
    Mutex _mtx;
};

BlockQueue.hpp

(只是加锁方式变了,不用自己解锁了)

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 7;

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap) 
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    void push(const T& in) // 生产者
    {
        lockGuard lockgrard(&_mtx); // 自动调用构造函数
        //pthread_mutex_lock(&_mtx);

        // pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用while
        while(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件
        {
            pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待
            // 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?
            // 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放
            // 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的
            // 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
        }

        _bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的

        pthread_cond_signal(&_Empty); // 唤醒
        // pthread_mutex_unlock(&_mtx); // 解锁
    } // 出了代码块自动调用析构函数

    void pop(T* out)
    {
        lockGuard lockgrard(&_mtx); // 自动调用构造函数
        // pthread_mutex_lock(&_mtx);

        while (isQueueEmpty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }

        *out = _bq.front(); // 访问临界资源
        _bq.pop();

        pthread_cond_signal(&_Full); // 唤醒
        // pthread_mutex_unlock(&_mtx); // 解锁
    } // 出了代码块自动调用析构函数
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

protected:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 用它来表示bq 是否空的条件
    pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

成功运行。


本篇完。

下一篇:零基础Linux_25(多线程)信号量+自选锁+读写锁(基于环形队列的生产者消费模型)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1148236.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Starknet开发工具

1. 引言 目前Starknet的开发工具流可为&#xff1a; 1&#xff09;Starkli&#xff1a;音为Stark-lie&#xff0c;为替换官方starknet-CLI的快速命令行接口。Starkli为单独的接口&#xff0c;可独自应用&#xff0c;而不是其它工具的组件。若只是想与Starknet交互&#xff0…

C语言 定义一个函数,并调用,该函数中打印显示直角三角形

#include<stdio.h> void chengfabiao() {for (int i 1; i < 5; i){for (int j 1; j < i; j){printf("*");} printf("\n");} } int main(int argc,const char *argv[]) {chengfabiao();return 0; }

leetCode 136.只出现一次的数字 + 位运算

136. 只出现一次的数字 - 力扣&#xff08;LeetCode&#xff09; 给你一个 非空 整数数组 nums &#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。你必须设计并实现线性时间复杂度的算法来解决此问题&#xff0c;且该算…

SpringBoot面试题8:运行 Spring Boot 有哪几种方式?Spring Boot 需要独立的容器运行吗?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:运行 Spring Boot 有哪几种方式? 运行Spring Boot应用有多种方式,具体取决于你的需求和环境。以下是几种常见的运行Spring Boot应用的方式: 使…

Ubuntu学习---跟着绍发学linux课程记录(第4部分)

第3部份内容记录在&#xff1a;Ubuntu学习—跟着绍发学linux课程记录&#xff08;第3部分&#xff09; 文章目录 14 ubuntu服务器上的java14.1 Java的安装14.2 运行java程序14.3 Java启动脚本 15 ubuntu服务器上的tomcat15.1 Tomcat服务器15.2 Tomcat的配置15.3 Tomcat启动日志…

【多线程面试题十一】、如何实现子线程先执行,主线程再执行?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;如何实现子线程先执行&a…

beyond compare 4密钥2023大全,beyond compare 4注册码最新

beyond compare 4是一款文件对比软件&#xff0c;可以快速比较文件和文件夹、合并以及同步&#xff0c;非常实用&#xff0c;一些用户想知道beyond compare 4密钥有哪些&#xff0c;本文为大家带来了介绍哦~ 密钥&#xff1a; w4G-in5u3SH75RoB3VZIX8htiZgw4ELilwvPcHAIQWfwf…

redis原理 主从同步和哨兵集群

主从库如何实现数据一致 我们总说的 Redis 具有高可靠性&#xff0c;又是什么意思呢&#xff1f;其实&#xff0c;这里有两层含义&#xff1a;一是数据尽量少丢失&#xff0c;二是服务尽量少中断。AOF 和 RDB 保证了前者&#xff0c;而对于后者&#xff0c;Redis 的做法就是增…

微服务框架Consul--新手入门

Consul Consul 是由 HashiCorp 开发的一款软件工具&#xff0c;提供了一组功能&#xff0c;用于服务发现、配置管理和网络基础设施自动化。它旨在帮助组织管理现代分布式和微服务架构系统的复杂性。以下是Consul的一些关键方面和功能&#xff1a; 服务发现&#xff1a;Consul …

[Python]unittest-单元测试

目录 unittest的大致构成: Test Fixture Test Case-测试用例 Test Suite-测试套件 Test Runner 批量执行脚本 makeSuite() TestLoader discover() 用例的执行顺序 忽略用例执行 skip skipIf skipUnless 断言 HTML测试报告 错误截图 unittest是python中的单元测…

禁用Google Chrome自动升级、查看Chrome版本号

问题 查看Chrome版本时&#xff0c;会自动升级&#xff0c;这个设计很垃圾,对开发者不友好&#xff1b;查看Chrome版本方法&#xff1a;chrome浏览器右上角—>自定义及控制Google Chrome(三个竖着的点号)------>帮助---->关于Google Chrome。 解决办法 禁用自动升级…

二叉树题目合集(C++)

二叉树题目合集 1.二叉树创建字符串&#xff08;简单&#xff09;2.二叉树的分层遍历&#xff08;中等&#xff09;3.二叉树的最近公共祖先&#xff08;中等&#xff09;4.二叉树搜索树转换成排序双向链表&#xff08;中等&#xff09;5.根据树的前序遍历与中序遍历构造二叉树&…

木疙瘩文字变形动画

H5-大 第一步 创建文字到新动画图层 第二步&#xff1a;直接在时间末尾插入变形动画帧 文字专属的参数和形状变化一样都是变形动画才可以动起来&#xff01;关键帧动画主要服务元件&#xff01; 第三步&#xff1a;改变文字参数即可&#xff01;

苍穹外卖-day03

苍穹外卖-day03 课程内容 公共字段自动填充新增菜品菜品分页查询删除菜品修改菜品 **功能实现&#xff1a;**菜品管理 菜品管理效果图&#xff1a; 1. 公共字段自动填充 1.1 问题分析 在上一章节我们已经完成了后台系统的员工管理功能和菜品分类功能的开发&#xff0c;在…

Linux C/C++ 实现网络流量分析(性能工具)

网络流量分析的原理基于对数据包的捕获、解析和统计分析&#xff0c;通过对网络流量的细致观察和分析&#xff0c;帮助管理员了解和优化网络的性能、提高网络安全性&#xff0c;并快速排查和解决网络故障和问题。 Linux中的网络流量常见类型 在Linux中&#xff0c;网络流量可以…

数组与链表算法-链表与多项式

目录 数组与链表算法-链表与多项式 多项式链表表示法 C代码 数组与链表算法-链表与多项式 使用链表的最大好处就是减少内存的浪费&#xff0c;并且能增加使用上的弹性。例如数学上常用的多项式表示法&#xff0c;虽然可以使用数组方式来处理&#xff0c;但当数据内容变动时…

AC修炼计划(AtCoder Regular Contest 164)

传送门&#xff1a;AtCoder Regular Contest 164 - AtCoder A.签到题&#xff0c;在此不做赘述 B - Switching Travel 这题本来该是秒的&#xff0c;但是因为没有考虑清楚环的问题而被卡半天&#xff0c;其实我们不难发现&#xff0c;要想使题目存在节点&#xff0c;就得让该节…

【PC电脑windows-学习样例generic_gpio-ESP32的GPIO程序-基础样例学习】

【PC电脑windows-学习样例generic_gpio-ESP32的GPIO程序-基础样例学习】 1、概述2、实验环境3、 物品说明4、自我总结5、本次实验说明6、实验过程&#xff08;1&#xff09;复制目录到桌面&#xff08;2&#xff09;手动敲写&#xff08;3&#xff09;反复改错&#xff08;4&am…

将知识图谱结合到地铁客流预测中:一个分散注意力关系图卷积网络

导读 论文题目《Combining knowledge graph into metro passenger flow prediction: A split-attention relational graph convolutional network 》。该论文于2023年发表于《Expert Systems With Applications》&#xff0c;文章基于知识图谱&#xff0c;提出了一种分割注意力…

【操作系统】考研真题攻克与重点知识点剖析 - 第 1 篇:操作系统概述

前言 本文基础知识部分来自于b站&#xff1a;分享笔记的好人儿的思维导图与王道考研课程&#xff0c;感谢大佬的开源精神&#xff0c;习题来自老师划的重点以及考研真题。此前我尝试了完全使用Python或是结合大语言模型对考研真题进行数据清洗与可视化分析&#xff0c;本人技术…