【Linux】线程同步分析:什么是条件变量?生产者消费者模型是什么?POSIX信号量怎么用?阻塞队列和环形队列模拟生产者消费者模型

news2024/11/19 14:54:21


上一篇文章我们分析了什么是线程互斥, 以及线程互斥的特点和使用.

说白了, 线程互斥就是多线程在争抢使用临界资源, 谁抢到了谁就用, 抢不到的就等.

这样不会因为多线程同时访问临界资源而造成错误.

虽然没有错误, 但是, 思考另外一个问题:这样合理吗?


文章目录

  • 只互斥的问题: 饥饿
  • 线程同步
    • 条件变量
      • `cond` 及接口
      • `cond` 及接口的使用演示
      • 为什么条件变量需要与互斥锁一起使用?
    • 生产者消费者模型介绍 **
        • 消费者与消费者之间是什么关系?
        • 生产者与生产者之间是什么关系?
        • 消费者与生产者之间需要什么关系?
      • 生产者消费者模型的优点
    • 以阻塞队列模拟生产者消费者模型 **
      • 问题1:条件判断的语句
      • 问题2:什么时候唤醒 或者 什么时候解锁?
    • 理解生产者消费者模型的 `并发`
    • POSIX信号量
      • 什么是信号量?
      • 信号量的接口
    • 以环形队列模拟生产者消费者模型**
      • 环形队列
      • 模拟模型

只互斥的问题: 饥饿

我们以最极端的例子来分析.

当多线程互斥地争抢临界资源时. 如果存在一部分 优先级非常高 的线程, 也存在一部分 优先级非常低(与高优先级线程存在断层) 的线程, 其他线程的优先级也不太低. 那么可能会出现什么问题?

假设 高优先级线程A先抢夺到了临界资源, 然后上了互斥锁. 其他线程就只能在临界区外等.

线程A用完临界资源, 解了锁之后. 所有线程又开始抢临界资源、抢锁. 又有线程抢到了临界资源和锁, 也是高优先级的线程.

那么 此线程用完临界资源, 解了锁之后. 所有线程又会抢临界资源、抢锁. 一直如此.

但是, 由于有一部分线程的优先级非常低. 与其他线程的优先级已经出现断层了.

那么, 在这种争抢的机制下, 这部分线程就可能永远抢不到临界资源、抢不到锁. 也就永远无法被调度, 永远无法分配到资源.

在这种情况下, 就可能出现 饥饿 或 饿死 的问题, 即, 执行流长时间无法获得某种资源的情况, 被称为饥饿或饿死

这种争抢临界资源的机制, 虽然没有错误, 但是很可能存在类似的线程饥饿的情况, 所以 是不太合理的.

线程同步

在线程只使用互斥的方式去访问临界资源时, 就有可能造成线程饥饿的情况.

那么 有没有一种可能, 可以让多线程在访问临界资源时, 还是在某个时刻只能有一个线程访问临界资源. 但是 可以让所有的线程按照一定的顺序访问临界资源.

即, 所有线程像排队一样, 一个一个地访问临界资源. 当一个线程访问完临界资源后, 再重新去队尾排队.

这样就不会出现多线程争夺临界资源地情况, 而可能导致线程饥饿.

确实存在这样的机制, 即 在保证临界资源安全的前提下, 让执行流访问临界资源具有一定的顺序性, 这种机制被称为同步. 也就是本篇文章主要介绍的内容.

虽然同步是指让执行流访问临界资源有一定顺序性的机制, 但是 互斥其实也是同步机制的一种.

虽然只采用互斥 执行流访问资源还是乱序的. 但它还是在一定程度上协调了多个线程的执行, 因为 互斥锁可以保证同一时刻 只有一个执行流访问临界资源.

不过本篇文章介绍时会将同步和互斥区别开, 即 同步不包括互斥, 不然非常容易混淆.

条件变量

同步机制的实现, 一般离不开一个东西:条件变量

那么什么是条件变量呢?

条件变量 是一种可以实现线程同步的机制. 通过条件变量, 可以实现让线程有序的访问临界资源.

线程需要访问临界资源时, 有时候如果临界资源不满足一定的条件, 是不允许线程执行一定的操作的.

比如, 如果线程需要访问一个队列, 但此时队列为空, 那么线程就无法访问, 就必须等待队列中出现新的内容之后, 此线程再访问队列.

而如果 想要实现 如果某条件不满足时, 需要让线程等待, 并且如果条件满足时, 可以让线程恢复继续执行的机制, 就需要用到 条件变量.

解释了这么多, 究竟什么是条件变量呢?

其实, 代码中的条件变量 与 互斥锁很像. 就是 pthread库提供的 一个结构体类型(pthread_cond_t)的变量, 并且 pthread 库中也提供的操作条件变量的一些接口.

cond 及接口

condcondition, 是条件的意思.

pthread_cond_t 即为 定义条件变量的类型.

条件变量的使用接口 与 互斥锁基本相似:

条件变量, 是由 pthread_cond_t 类型定义的.

可以通过宏来初始化, 与互斥锁一样, 通过宏初始化的变量 就不需要去手动destroy

也可以通过 pthread_cond_init() 接口, 来初始化, 第一个参数是条件变量的地址, 第二个参数是条件变量的属性(可以不考虑).

通过 init 接口初始化的条件变量, 在不需要使用时, 需要调用 pthread_cond_destroy() 接口进行销毁.

除了这两个接口外.

还有提供有使用条件变量的接口:

pthread_cond_wait()pthread 库提供的 使用条件变量等待的接口. 线程调用此接口, 线程就会立即进入等待.

pthread_cond_timedwait() 也是 pthread 库提供的 使用条件变量等待的接口, 只不过 此接口是一种定时让线程等待的接口. 即, 可以通过此接口设置一定的时间, 在此时间内让线程等待. 如果此时间内 条件满足了, 则线程会自动被唤醒, 继续执行代码.

这两个接口的参数除了需要条件变量, 还都需要一个互斥锁. 从接口就可以反映出来, 条件变量一般是和互斥锁一起使用的. 具体如何一起使用, 由于还没有使用过条件变量, 我们后面再介绍.

这两个接口, 可以通过条件变量 让线程等待.

有通过条件变量 让线程等待的接口, 就有通过条件变量唤醒线程的接口.

pthread_cond_signal(), 调用此接口, 可以让某个 通过指定条件变量陷入等待的 线程被唤醒.

pthread_cond_broadcast(), 调用此接口, 则可以让通过指定条件变量陷入等待的所有线程唤被醒

cond 及接口的使用演示

我们介绍了几个有关条件变量的使用接口.

先来演示一下, 条件变量是如何使用的:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
using std::cin;
using std::cout;
using std::endl;

// 定义并初始化全局互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 定义全局条件变量
pthread_cond_t cond;

void* waitCommand(void* args) {
    pthread_detach(pthread_self()); // 先让线程自己分离自己, 我们就不在主线程中回收线程了
    // 在此例中, 如果不分离, 线程回收会是个问题. 但具体问题后面再解释和解决.
    // 这里我们只是展示一下 接口的最基本的用法和现象
    const char* name = (const char*)args;

    while (true) {
        pthread_cond_wait(&cond, &mutex);
        cout << name << ", tid: " << pthread_self() << ", run……" << endl;
    }

    return nullptr;
}

int main() {
    pthread_cond_init(&cond, nullptr);

    pthread_t tid1, tid2, tid3;
    pthread_create(&tid1, nullptr, waitCommand, (void*)"Thread_1");
    pthread_create(&tid2, nullptr, waitCommand, (void*)"Thread_2");
    pthread_create(&tid3, nullptr, waitCommand, (void*)"Thread_3");
    while (true) {
        char c = 'a';
        cout << "请输入你的命令(N/Q):: ";
        cin >> c;
        if (c == 'N' | c == 'n') {
            pthread_cond_signal(&cond);
        }
        else
            break;
        usleep(1000);       // 让主线程usleep一下, 防止线程之间在屏幕上打印干扰
    }

    pthread_cond_destroy(&cond);
    return 0;
}

此代码中, 先定义并初始化了互斥锁 和 条件变量. 然后创建线程, 线程执行的函数会使线程循环由条件变量进入等待.

然后在主线程中通过输入 n 和 N 来调用唤醒函数, 唤醒线程, 观察现象:

show_cond

在其他线程通过条件变量等待时, 我们在主线程内通过 输入 N 和 n 来唤醒等待的线程.

观察线程的唤醒现象, 其实可以发现 线程的唤醒是以一定顺序来执行的.

除了 使用 pthread_cond_signal() 来单个唤醒等待的线程.

还可以使用 pthread_cond_broadcast() 来广播唤醒所有等待的线程:

使用 broadcast 唤醒所有等待的线程

这里演示的是, cond 条件变量的没有场景的用法.

而我们介绍条件变量时说, 当前有条件不满足时 会使用条件变量让线程等待.

我们可以设置一个退出条件 quit, 为真时即为满足, 为假时即为不满足

#include <iostream>
#include <unistd.h>
#include <pthread.h>
using std::cin;
using std::cout;
using std::endl;

// 定义并初始化全局互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 定义全局条件变量
pthread_cond_t cond;

// // 定义一个全局退出变量, 用于判断条件
volatile bool quit = false;

void* waitCommand(void* args) {
    pthread_detach(pthread_self()); // 先让线程自己分离自己, 我们就不在主线程中回收线程了
    // 在此例中, 如果不分离, 线程回收会是个问题. 但具体问题后面再解释和解决.
    // 这里我们只是展示一下 接口的最基本的用法和现象
    const char* name = (const char*)args;

    while (!quit) {
        //  不满足退出条件, 就进来等待
        pthread_cond_wait(&cond, &mutex);
        cout << name << ", tid: " << pthread_self() << ", run……" << endl;
    }
    pthread_mutex_unlock(&mutex);       // 暂时不解释 这里解锁的原因
    cout << name << ", tid: " << pthread_self() << ", end……" << endl;

    return nullptr;
}

int main() {
    pthread_cond_init(&cond, nullptr);

    pthread_t tid1, tid2, tid3;
    pthread_create(&tid1, nullptr, waitCommand, (void*)"Thread_1");
    pthread_create(&tid2, nullptr, waitCommand, (void*)"Thread_2");
    pthread_create(&tid3, nullptr, waitCommand, (void*)"Thread_3");
    while (true) {
        char c = 'a';
        cout << "请输入你的命令(N/Q):: ";
        cin >> c;
        if (c == 'N' | c == 'n') {
            pthread_cond_broadcast(&cond);
        }
        else {
            quit = true;			// 修改条件为满足
            pthread_cond_broadcast(&cond); 		// 然后唤醒线程, 再让线程判断条件是否满足
            break;
        }
        usleep(1000);       // 让主线程usleep一下, 防止线程之间在屏幕上打印干扰
    }

    pthread_cond_destroy(&cond);

    return 0;
}

这段代码中, 我们随便定义了一个条件 quit. quit 为真时即为满足条件, quit 为假时即为不满足条件.

不满足条件, 就让线程等待. 满足条件就唤醒线程.

我们输入 N 和 n时, 唤醒一下线程让线程在判断一下条件是否满足, 非 N 和 n 时, 让退出条件被满足, 并唤醒线程.

执行结果为:

这就是条件和条件变量的最简单的使用.

使用条件变量可以让多线程的执行具有一定的顺序性, 即可以实现同步.

而同步和互斥是什么关系呢?

很明显, 是互补的关系.

为什么条件变量需要与互斥锁一起使用?

上面展示的例子中, pthread_cond_wait() 的使用需要同时用到 条件变量和互斥锁.

这里为什么需要使用互斥锁呢?

首先, 条件等待是使用条件变量实现同步的一种机制. 如果只有一个线程, 当条件不满足时, 线程就会一直等下去, 因为唯一的线程在等待, 没有其他线程修改条件, 条件也不可能满足.

所以需要一个线程使条件变得满足, 然后再唤醒等待的线程.

这里的条件, 其实就是 线程对应的需要访问的临界资源的状态. 就像我们介绍互斥时的抢票动作, 需要保证票是>0的 才能抢票.

而, 条件是不可能的无缘无故地没有变化就自己满足的, 所以条件满足势必会存在临界资源数据的变化. 所以 需要用互斥锁来保护临界资源.

所以, 线程判断条件满足之前 需要先上锁, 然后再判断是否满足, 如果不满足条件等待并解锁

然后 让其他可以让条件满足的线程获取锁. 条件满足之后, 再唤醒刚才等待的线程 并 解锁.

让刚被唤醒的线程再次获取锁 判断条件是否满足, 满足就去执行操作, 否则再次陷入等待.

整个过程的重点就是, 谁需要访问临界资源就上锁, 谁不需要了就解锁. 即 保证整个过程中临界资源是被保护着的

而 整个过程中, 除了第一次对临界资源上锁和最后一次对临界资源解锁, 中间所有的上锁和解锁的操作 都是由 pthread_cond_wait() 完成的.

在 线程需要等待时要调用 pthread_cond_wait() 解锁并等待. 在线程被唤醒时, 会自动再去竞争锁. 解锁和上锁的操作都是在 pthread_cond_wait() 接口内部实现的.

这也是为什么, 上面例子中, 我们想让多线程退出时需要在条件满足时先释放锁, 然后再让线程退出. :

 |wide

第2行, 我们让线程分离自己, 不用回收.

第13行, 我们执行了解锁操作. 因为 pthread_cond_wait() 陷入等待时, 会释放锁. 然后被唤醒的时候, 会竞争锁. 如果退出条件满足了. 也就意味着线程将要退出了.

而 此时 线程是处于对临界资源上了锁的状态. 所以在退出之前要先解锁. 不然后面会出现死锁的状态(如果我们不分离线程的话):


那么为什么 pthread_cond_wait() 需要条件变量和互斥锁一起使用?

因为 pthread_cond_wait() 接口需要执行释放锁和竞争锁的操作. 所以 需要先看到锁.

生产者消费者模型介绍 **

生产者消费者模型, 是一种编程模型. 不过如何理解的话, 可以举一个生活中的例子.

这里的生产者和消费者, 我们 不以生物学的角度 看待.

以生活中的超市为例:

超市是供我们所有人来购买商品的, 那么 可以将我们看作消费者, 以下 以学生来代表消费者.

而超市中的商品, 是由工厂供应商来生产供应的, 那么 可以 将工厂看作生产者.

那么, 学生购买商品, 工厂供应商品. 其实都是 通过超市 这个渠道的.


为什么学生不直接通过工厂来购买商品, 工厂不根据学生的具体需求来生产供应呢?

为什么?来想象一件事情. 有一位学生想要吃一包辣条. 他就直接去了生产辣条的工厂.

学生说:我想吃一包辣条, 你们给我生产一包. 工厂听到有人需要一包辣条, 就开动机器只生产了一包辣条, 卖给了学生.

这合理吗?很明显不合理啊!

学生想要买一包辣条, 需要先去工厂告诉工厂, 然后等~

工厂听到学生要买一包辣条, 然后就开动机器去制作了一包, 制作完成卖给学生.

这样的成本可太高了吧.

工厂 为生产一包辣条 机器就开动了, 那这机器、电力等各种损耗一包辣条肯定没法弥补.

还有学生, 像简简单单买包辣条还要告诉工厂, 让工厂现场生产. 这不是浪费时间嘛.

所以, 工厂一般情况下是无法直接与消费者买卖商品的.


工厂无法直接与消费者买卖商品, 就可以通过超市这个媒介.

超市因为要供所有的消费者来购买商品, 所以一般需要大量的商品. 这时候 工厂就可以火力全开生产商品了. 而消费者 也可以直接找到现成的商品购买.

那么, 超市的存在 第一个作用就是 提高了效率

除此之外, 工厂也不需要直接根据学生的需求再来生产商品.

假设只要有商品就能卖出去. 那么 工厂就可以不停的生产商品, 为的是给超市供货. 而超市只要保持有商品, 那么 学生过来直接购买就可以了, 也不用再去跟工厂说. 这就把上面例子中 学生与工厂之间的强耦合关系就没有了, 变成了松耦合.

消费者没有去购买商品, 工厂也可以一直生产. 工厂没有生产商品, 只要超市里有, 消费者也可以直接去购买.

这 也就是超市的第二个作用 解耦.

那么, 超市有没有很像 工厂与消费者之间的缓冲区一样的存在?


将这样的模型拿到我们编程中, 将 消费者 看作消费线程, 将 生产者 看作生产线程.

那么, 消费线程需要从超市中拿东西, 生产线程需要向超市中放东西. 超市又可以看做什么呢?

超市其实就可以看作是 临界资源.

而在生活中, 消费者不可能只有一个, 生产者也不可能只有一个.

那么:

  1. 消费者与消费者之间是什么关系?

    消费者与消费者之间好像是没有关系, 你买你的我买我的, 互不干扰. 但是 仔细想一想, 消费者与消费者之间实际是一种 竞争关系.

    互不干扰是因为商品充足, 如果商品不足的话, 是需要竞争的. 其实是 竞争关系

    而竞争关系, 其实就是 一种 互斥关系

  2. 生产者与生产者之间是什么关系?

    生产者与生产者之间, 其实也是一种竞争关系. 竞争超市内的空间资源. 竞争谁可以给超市供货. 即 互斥关系

  3. 消费者与生产者之间需要什么关系?

    消费者和生产者, 看似是没有之间的关系的. 但是思考一个问题, 既然超市是临界资源. 那么消费者和生产者是可能在同一时间访问临界资源的.

    如果 供应商再给超市供货的时候, 货还没有供完, 货架上的东西还没有放完. 在生活中我们可以直接拿走一个, 然后去超市结账.

    但是如果从计算机的角度来看, 生产线程还没有向临界资源内写完数据, 消费线程可以从临界资源中拿走数据吗? 很明显是不可以的. 因为 消费线程可能拿不到完整的资源.

    所以, 以计算机的角度来说, 消费者和生产者首先 要保持一个互斥关系

    而除了互斥之外, 其实还需要 保持同步.

    因为消费者不能在超市没有商品的时候购买商品, 需要等待, 让生产者先向超市供货. 生产者也不能在超市的空间资源已满的情况下继续向超市供货, 需要等待, 让消费者先购买商品. 等到超市有商品了, 再通知消费者来购买. 等到超市有空间了, 再通知生产者来供货.

以计算机的角度看待这个生产者消费者模型:

生产者消费者模型存在的关系:生产者之间(互斥关系), 消费者之间(互斥关系), 生产者和消费者之间(互斥、同步关系), 共 3

生产者和消费者:线程承担的:2种角色

超市:为生产者和消费者之间提供缓冲、解耦, 类似 缓冲区、临界资源 的 1 个交易场所

交易场所是让不同的线程之间进行 "交易"

即, 可以将生产者消费者模型以这种:3、2、1的思想来理解

而在我们编程中, 就可以围绕这个 3、2、1 的模型, 来编写和解决问题. 即, 多线程访问临界资源, 生产线程之间保持互斥、消费线程之间也保持互斥、生产线程和消费线程之间保持互斥、同步. 这样来整理思路, 可以方便解决很多问题.

而最重要的就是. 如何让生产线程和消费线程之间保持互斥和同步.

同步需要根据条件让生产线程和消费线程等待和唤醒. 那么 如何让生产线程或消费线程等待?又如何让生产线程和消费线程被唤醒?又如何判断所需条件是否被满足?

而 让线程等待和唤醒线程的功能 其实 条件变量 就可以为我们提供的

生产者消费者模型的优点

生产者消费者模型的优点, 可以有三个:

  1. 解耦, 可以将两个角色之间的 强耦合关系 变为 松耦合关系.
  2. 支持并发.
  3. 支持忙闲不均.

这就是生产者消费者模型的优点

以阻塞队列模拟生产者消费者模型 **

阻塞队列:当队列为空时, 从队列获取元素的操作将会被阻塞, 直到队列中被放入了元素; 当队列满时, 往队列里存放元素的操作也会被阻塞, 直到有元素被从队列中取出 (以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

阻塞队列的特点像一个管道

我们可以使用阻塞队列, 模拟一个生产者消费者模型.

那么, 阻塞队列的大致结构为:

|wide

成员变量:

  1. uint32_t _cap 记录阻塞队列的容量
  2. queue<T> _bq, 即为阻塞队列本身
  3. _mutex_conCond_proCond 一个互斥锁, 两个线程分别用的条件变量

构造函数负责初始化容量、互斥锁和条件变量, 析构函数负责摧毁 互斥锁和条件变量

结构定义完成之后, 就需要根据我们需要实现的功能 封装一些私有的成员函数:

上锁、解锁、条件等待、唤醒等待、判空、判满、生产任务、消费任务

|wide

这些都是私有的接口, 实际还需要两个公共的接口.

完整的从阻塞队列中消费的接口 以及 完整的向阻塞队列中生产的接口:

|wide

实现了之后, 就可以测试一下了:

blockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
using std::queue;
using std::cout;
using std::endl;

const uint32_t gDefultCap = 5;

template <class T>
class blockQueue {
public:
    // 构造函数
    blockQueue(uint32_t cap  = gDefultCap) 
        :_cap(cap) {
        pthread_mutex_init(&_mutex, nullptr);           // 初始化锁
        pthread_cond_init(&_proCond, nullptr);          // 初始化生产线程使用的条件变量
        pthread_cond_init(&_conCond, nullptr);          // 初始化消费线程使用的条件变量
    }
    // 析构函数
    ~blockQueue() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_conCond);
        pthread_cond_destroy(&_proCond);
    }
    // 生产接口
    void push(const T &in) {
        // 生产的全过程为
        // 1. 上锁
        // 2. 判满. 满不生产 条件等待, 不满则生产.
        // 3. 生产之后, 解锁
        // 4. 唤醒消费接口

        lockQueue();        // 上锁
        while(isFull()) {
            // 满 进入条件等待
            condWait(_proCond);        // 传入生产线程所用的条件变量, 让生产线程等待
        }
        // 不满 则生产
        pushCore(in);
        // 解锁
        unlockQueue();
        condWakeUp(_conCond);          // 传入消费线程所用的条件变量, 唤醒消费线程
    }

    T pop() {
        // 消费的全过程为
        // 1. 上锁
        // 2. 判空. 空则不消费 条件等待, 不空 则消费
        // 3. 消费之后, 解锁
        // 4. 唤醒生产接口

        lockQueue();
        while(isEmpty()) {
            condWait(_conCond);
        }
        T tmp = popCore();
        unlockQueue();
        condWakeUp(_proCond);

        return tmp;
    }


private:
    // 队列上锁
    void lockQueue() {
        pthread_mutex_lock(&_mutex);
    }
    // 队列解锁
    void unlockQueue() {
        pthread_mutex_unlock(&_mutex);
    }
    // 判空
    bool isEmpty() {
        return _bq.empty();
    }
    // 判满
    bool isFull() {
        return _bq.size() == _cap;
    }
    // 条件等待
    void condWait(pthread_cond_t &cond) {
        pthread_cond_wait(&cond, &_mutex);
    }
    // 唤醒等待
    void condWakeUp(pthread_cond_t &cond) {
        pthread_cond_signal(&cond);
    }
    // 生产任务
    void pushCore(const T &in) {
        // 即为向队列中添加任务
        _bq.push(in);
    }
    // 消费任务
    T popCore() {
        // 即从队列中拿出任务
        T tmp = _bq.front();
        _bq.pop();
        return tmp;
    }

private:
    uint32_t _cap;                  // 队列容量
    queue<T> _bq;                   // 阻塞队列
    pthread_mutex_t _mutex;         // 互斥锁
    pthread_cond_t _conCond;        // 消费线程使用的条件变量
    pthread_cond_t _proCond;        // 生产线程使用的条件变量
};

blockQueue.cc:

#include <iostream>
#include <ctime>
#include "blockQueue.hpp"
using std::cout;
using std::endl;

void* productor(void* args) {
    blockQueue<int>* pBq = static_cast<blockQueue<int>*>(args);
    while (true) {
        // 制作数据
        int data = rand() % 10;
        // 向队列中生产数据
        pBq->push(data);
        cout << "productor 生产数据完成……" << data << endl;
        sleep(2);
    }

    return nullptr;
}

void* consumer(void* args) {
    blockQueue<int>* pBq = static_cast<blockQueue<int>*>(args);
    while (true) {
        int data = pBq->pop();
        cout << "consumer 消费数据完成……" << data << endl;
    }

    return nullptr;
}

int main() {
    // 设置一个随机数种子
    srand((unsigned long)time(nullptr) ^ getpid());
    // 定义阻塞队列
    // 创建两个线程

    blockQueue<int> bq;

    pthread_t pro, con;
    pthread_create(&pro, nullptr, productor, &bq); // 生产线程
    pthread_create(&con, nullptr, consumer, &bq);  // 消费线程

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    return 0;
}

我们 使用阻塞队列,

创建productor生产者 函数, 创建随机数并 每2s push入队列中.

创建 consumer消费者 函数, 从队列中取数据, 不做间隔限制.

执行结果为:

productor生产线程 每2s, 生产一个数据. consumer消费线程跟随生产的节奏来消费数据.

如果我们修改一下生产和消费的间隔, 或许更能说明条件变量的作用:

|wide

消费线程2s一消费, 生产线程1s一生产:

可以看到, 刚开始因为 队列未满, 所以1s生成一个, 顺序为:5 4 3 3 4 0 1 6

而 消费2s一次的顺序为 5 4 3 3.

之后 队列满, 所以 生产数据的速度会跟随消费数据的速度.


问题1:条件判断的语句

上面我们实现 生产和消费 接口的时候, 我们判断条件是否满足使用的是 while() 而不是 if() 为什么 ?

首先思考一个问题:当线程被唤醒的时候, 一定代表条件被满足了吗?

即 执行过 pthread_cond_wait() 之后, 接着向后执行代码, 一定表示条件已经被满足了吗?

其实不一定. 第一种情况就是, 这个函数因为某种情况调用失败了. 调用失败很可能会继续执行之后的代码. 此时 条件很大概率是没有满足的.

其次, 就是 当多线程一起生产或一起消费的时候, 可能也会造成 伪唤醒 的情况.

伪唤醒即为条件还未被满足时, 线程被唤醒了.

由于这种情况的存在, 所以, 不能只用一个 if() 判断. 唤醒之后 需要再次判断才能保证是否正确的唤醒

问题2:什么时候唤醒 或者 什么时候解锁?

我们上面实现的接口, 队列的解锁是在唤醒线程之前的, 即先解锁, 再唤醒线程

那么, **可不可以 先唤醒线程, 再解锁呢?**有没有什么影响呢?

其实是没有多少影响的. 我们先解锁, 再唤醒线程. 也就意味着 唤醒线程之前 锁已经准备好了, 线程可以直接竞争锁.

而 如果是先唤醒线程, 再解锁. 其实也就是 线程可以先准备着竞争锁, 等锁被解开之后, 再竞争锁就可以了.

没有数据安全的影响

理解生产者消费者模型的 并发

经过对条件变量的介绍和使用, 以及对生产者消费者模型的介绍和模拟.

其实我们已经对条件变量 差不多是很熟悉了. 也可以理解 生产者消费者模型的 解耦 支持忙闲不均 的优点.

但是, 我们好像并没有看到, 生产者消费者模型的 支持并发的这一特点

消费者从超市消费商品是互斥的, 生产者给超市生产商品也是互斥的. 消费者与生产者也不能同时从超市消费或给超市生产.

并发 体现在哪里呢?

我们上面模拟了一个生产者消费者模型, 但是生产数据和消费数据的过程其实并不明显. 因为只有定义和返回 只有一个语句,

下面我们使用另外一个类, 当作阻塞队列的数据类型.

Task.hpp:

#pragma once
#include <iostream>
#include <string>

class Task {
public:
    Task(int one = 0, int two = 0, char op = 0)
        : elemOne_(one)
        , elemTwo_(two)
        , operator_(op) {}

    // 仿函数定义
    int operator()() {
        return run();
    }

    int run() {
        int result = 0;
        switch (operator_) {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/': 
            // 除0处理
            if (elemTwo_ == 0) {
                std::cout << "div zero, abort" << std::endl;
                result = -1;
            }
            else {
                result = elemOne_ / elemTwo_;
            }
            break;
        case '%': 
            // 除0处理
            if (elemTwo_ == 0) {
                std::cout << "mod zero, abort" << std::endl;
                result = -1;
            }
            else {
                result = elemOne_ % elemTwo_;
            }
            break;
        default:
            std::cout << "非法操作: " << operator_ << std::endl;
            break;
        }

        return result;
    }

    int get(int* e1, int* e2, char* op) {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;

        return 0;
    }

private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};

blockQueue.cc:

#include <iostream>
#include <ctime>
#include "blockQueue.hpp"
#include "Task.hpp"
using std::cout;
using std::endl;

const std::string ops = "+-*/%";

// 生产任务接口
void* productor(void* args) {
    blockQueue<Task>* pBq = static_cast<blockQueue<Task>*>(args);
    while (true) {
        // 制作任务
        int elemOne = rand() % 50;
        int elemTwo = rand() % 10;
        char oper = ops[rand() % 4];        // 操作符
        Task t(elemOne, elemTwo, oper);
        // 生产任务
        pBq->push(t);
        cout << "producter[" << pthread_self() << "] " <<
            (unsigned long)time(nullptr) << " 生产了一个任务: " <<
            elemOne << oper << elemTwo << "=?" << endl;
        sleep(1);
    }

    return nullptr;
}

void* consumer(void* args) {
    blockQueue<Task>* pBq = static_cast<blockQueue<Task>*>(args);
    while (true) {
        // 消费任务
        Task t = pBq->pop();
        // 处理任务
        int result = t();
        int elemOne, elemTwo;
        char oper;
        t.get(&elemOne, &elemTwo, &oper);
        cout << "consumer[" << pthread_self() << "] " <<
            (unsigned long)time(nullptr) << " 消费了一个任务: " <<
            elemOne << oper << elemTwo << "=" << result << endl;
    }

    return nullptr;
}

int main() {
    // 设置一个随机数种子
    srand((unsigned long)time(nullptr) ^ getpid());
    // 定义阻塞队列
    // 创建两个线程
    blockQueue<Task> bq;

    pthread_t pro, con;
    pthread_create(&pro, nullptr, productor, &bq); // 生产线程
    pthread_create(&con, nullptr, consumer, &bq);  // 消费线程

    pthread_join(pro, nullptr);
    pthread_join(con, nullptr);

    return 0;
}

blockQueue.hpp 还是上面的内容.

那么, 这段代码的执行结果:

上面我们实现的代码, 是使用阻塞队列 模拟生产者消费者模型, 生产和消费加减乘除的任务


但是上面这个例子, 好像还是没有表现出 生产者消费者模型对并发的支持.

依旧是 消费者从超市消费商品是互斥的, 生产者给超市生产商品也是互斥的. 消费者与生产者也不能同时从超市消费或给超市生产

那么. 此模型支持并发, 究竟体现在哪里呢?

生产者消费者模型支持并发, 其实并不是指 消费者和生产者可以并发的向"超市"消费或生产数据.

而是指, 生产者制作商品\任务消费者处理商品\任务 的这种过程, 其实是可以并发执行的.

在上例中, 就是生产者制作任务 和 消费者处理任务的过程 :

|wide

虽然 还是不太明显, 但是 多行的语句其实已经可以说明, 制作任务和处理任务的过程其实是需要消耗一定的资源的, 比如时间.

并且, 并行的制作任务 在串行的向 队列中生产. 串行的从队列中消费任务, 然后并行的处理任务.

向队列中生产任务, 只是将已经制作好的任务 push 入队列的过程. 从队列中消费任务, 只是从队列中将任务 pop 出来的过程.

在生产到队列中之前, 还有一个可能会非常漫长的制作任务的过程. 在消费任务之后, 也可能有一个非常漫长的处理任务的过程.

**制作任务是可以并行制作的, 处理任务也是可以并行处理的, 制作任务和处理任务也是可以并行操作的. 这些过程互不影响 **

也就是说, 生产者消费者模型支持并发的情况 一般情况下并不是在临界区并发, 而是在**生产前和消费后支持并发**

生产者消费者模型的支持忙闲不均, 同样体现在 生产前和消费后的制作和处理上.

POSIX信号量

信号量 也是同步的一种机制.

什么是信号量?

那么什么是信号量呢?信号量可以看作是一个计数器.

以我们生活中的一个例子 来简单的解释一下信号量: 看电影.

如果我们已经有了想要看的电影, 我们去电影院首先要做的事是什么?选放映厅然后选座.

电影院的每一个放映厅的座位都是有限的, 不过每一个空座位都可以被任何人选择.

我们要看电影就需要选座位买票, 每买一张票选一个座位, 放映厅内的空座位就会少一个.

买到了电影票, 实际上就是选了放映厅内指定的座位, 让空座位减了1, 即 预定了放映厅内的座位.

那么, 以编程的角度:

就可以将 放映厅看作为一个 临界资源, 每一个座位都是临界资源的一小部分资源, 这所有的座位就可以 看作是信号量. 当有人买票时选中了座位, 接下来可选择的座位就少了一个, 可以看作是 信号量--; 如果有人退票, 就可以看作是 **信号量++ **

当放映厅里没有了空座位, 就表示 信号量减到了0, 其他人再想要买票, 就需要等有人退票.

也就是说, 信号量减1 也就表示着临界资源中的一部分被选中了. 也就表示着之后只能选择临界资源的其他部分.

这样一个看电影选票的例子, 其实就可以很好的解释信号量.

那么, 提问:如果一个放映厅只有一个座位, 也就是说信号量最大为1. 那么 信号量可以表示什么?

互斥锁! 如果信号量只有1, 那么此信号量就可以当互斥锁用.

此时, 信号量 1 –> 0 就是上锁的过程. 信号量 0 –> 1 就是解锁的过程.

信号量为1时, 此信号量被称为 二元信号量


上面我们举的例子, 是将放映厅当作了一个临界资源, 每个座位都是临界资源得一部分.

那么, 我们编程中的 临界资源也可以分为一小部分一小部分的吗

是可以的.

临界资源分为一小部分一小部分的, 通过 信号量操作 来让线程选中.

另外一个问题:申请信号量, 实际就是对一部分临界资源的申请. 那么 如果申请到了信号量, 就表示一定获得了一部分临界资源吗?

这个问题的答案也是肯定的. 只要申请到了信号量, 就一定获得了一部分临界资源. 因为, 你申请到了, 只要你不释放, 别人就无法申请, 从原则上来说, 已经获得了这部分资源.

信号量可以被所有线程申请和释放, 即 --++. 即 信号量也是一个临界资源. 即 信号量的申请和释放需要时原子性的.

而实际的实现也确实如此. 信号量的申请-- 和 信号量的释放++ 都是 原子性的

信号量的接口

介绍过互斥锁和条件变量之后, 信号量的接口和使用其实就显得很简单了.

首先, 信号量的类型为 sem_t. 其常用的基本接口有:

sem_init()初始化:

sem_destroy()销毁:

sem_wait()等待, 即申请信号量:

sem_post()释放信号量:

这些接口, 也都是 pthread 库提供的. 不过需要使用的是 semphore.h 头文件

以环形队列模拟生产者消费者模型**

上面我们使用条件变量 以阻塞队列模拟了生产者消费者模型.

现在, 我们通过信号量 以环形队列模拟生产者消费者模型.

不过首先要介绍一下什么是环形队列以及环形队列的特点:

环形队列

环形队列的物理实现肯定不是环形的. 只不过我们可以使用普通的数组模拟出环形队列的感觉.

比如: 一个有限的数组[0, 7] 一共8个空间. 要模拟出一个环形的队列.

我们知道队列的特点是 先进先出. 而环形队列实现先进先出的方法是, 可变的队头

用数组实现的普通队列的先进先出一般是固定的队头, 如果以 [0, 7] 来实现, 那么队头恒为0.

先进先出总是 从 0位置出, 然后将后面的元素向前移动一位.

环形队列不同. 环形队列可以看作将数组卷了起来:

|inline

环形队列 使用两个"指针"来表示队头和队尾. 并且, 不同于普通的队列, 环形队列的队头是可以变化的. 什么意思呢?

举个例子:

如果环形队列中, 0、1、2、3位置存储有数据. 那么 队头指针指向0, 队尾指针指向3 或是 4.

出队列时, 需要取到0位置的数据, 然后将 队头指针++, 移动到 1. 新的队头就是1位置

入队列就是 队尾指针++在存放数据, 或者存放数据后再++

这就是环形队列的特点. 出队列时, 队头改变, 即 ++. 那么也就是说, 环形队列的任意位置都可能是队头.

队头移动了一位, 也就表示 队尾可以多向后走一位. 就像是在转圈一样.


数组实现环形队列可以使出队列的操作更快, 因为不用移动数据.

但是, 环形队列还有一个缺点, 就是 不容易直接判断队列是否 为满或为空.

环形队列, 队列为空 队头和队尾指针指向同一位置, 队列为满 对头和队尾指针也指向同一位置. 所以, 环形队列判断为空或未满, 一般通过入队列出队列计数器, 或者恒在队头的前一个位置留空.


那么, 总结一下

什么是环形队列?

首先环形队列的长度是一定的. 然后 队头和队尾用两个"指针"表示. 数据入队列, 队尾指针会向后++. 数据出队列, 队头指针会向后++.

指针移动到数组的最后一个元素时, 再++ 就会回到0 位置. (此操作, 一般由取模控制)

两指针在同一个位置时, 队列可能为空, 也可能为满:

  1. 如果刚创建的队列, 那就为空.
  2. 如果队尾指针刚追上队头指针, 那就为满
  3. 如果队头指针刚追上队尾指针, 那就为空

那么, 环形队列如何模拟生产者消费者模型呢?

模拟模型

其实, 模拟的思路也很简单.

还是将线程分为生产者和消费者. 生产者生产数据, 即为 将数据入队列. 消费者消费数据, 即为 将数据出队列.

队列为空时, 生产者可以生产, 消费者不能消费. 当队列为满时, 生产者不能生产, 消费者可以消费. 此时的生产线程和消费线程需要访问同一个位置. 是互斥与同步的关系.

当队列为其他情况时, 生产者和消费者可以并发的生产和消费. 因为, 此时生产线程和消费线程访问的不是同一个位置.

并且, 在队列中 生产者不能超越消费者, 消费者也不能超越生产者. 即, 队列满时, 不能再生产, 队列空时, 不能再消费.

这些在实现时, 都是由信号量来保证的

信号量如何保证呢?

首先要知道, 在队列中:

  1. 生产者需要的是什么资源?

    需要空间资源, 因为需要向队列中, 入数据

  2. 消费者需要的是什么资源?

    需要数据资源, 因为需要从队列中, 出数据

那么, 就可以针对不同的资源, 创建两个信号量. 一个是 表示空间资源量的 roomSem, 另一个是 表示数据资源量 dataSem

生产者需要 等待 roomSem, 即申请 空间资源信号量, 申请成功 则空间资源信号量--. 并且, 申请成功就表示获得了一块空间资源, 别人就无法获取. 然后 等到生产数据入队列之后, 数据资源信号量还需要++, 因为 有数据入队列了.

消费者同样如此, 需要等待 dataSem, 申请 数据资源信号量, 申请成功 则数据资源信号量--. 等到数据出队列之后, 空间资源信号量也是需要++的.

那么, 初始情况下 roomSem 应该为多少? dataSem 应该为多少?

初始情况, 队列中没有数据, 所以 roomSem 应该为 N, dataSem 应该为 0.

只要控制好, roomSem++时, 对应的dataSem需要--; dataSem++时, 对应的roomSem需要--. 就可以保证生产者、消费者不会互相超越.

队列中没有数据时, roomSem 为 N, dataSem 为 0. 需要生产者先生产; 队列中满数据时, roomSem 为 0, dataSem 为 N, 需要消费者先消费.
这两种情况, 生产者和消费者访问的是同一块空间. 所以是需要互斥、同步的.

而在其他情况时, 生产者和消费者不是指向同一空间, 那么 生产和消费的动作就可以并发 的执行.


上面我们说了这么多模拟生产者消费者模型的思路. 其实都是在一个前提下:不同的线程访问的是临界资源的不同部分

而这个前提, 是由编写者实现的.

下面, 我们就来正式以环形队列模拟一下, 生产者消费者模型:

ringQueue.hpp:

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using std::cout;
using std::endl;
using std::vector;

const int gDefultCap = 10; 

template <class T>
class ringQueue {
public:
    // 构造函数
    ringQueue(const int cap = gDefultCap) 
        : _ringQueue(cap) 
        , _pIndex(0)
        , _cIndex(0) {
        sem_init(&_roomSem, 0, _ringQueue.size());
        sem_init(&_dataSem, 0, 0);
        // sem_init() 接口的 
        // 第一个参数是 需要初始化的信号量, 
        // 第二个参数是 
        // 第三个参数是 需要初始化为多少
    }
    // 析构函数
    ~ringQueue() {
        sem_destroy(&_roomSem);
        sem_destroy(&_dataSem);
    }
    // 生产接口
    void push(const T &in) {
        // 生产数据
        // 先申请空间信号量
        sem_wait(&_roomSem);            // 申请成功则 _roomSem--, 否则等待
        _ringQueue[_pIndex] = in;       // 将数据放入 数组
        sem_post(&_dataSem);            // 数组中数据+1, 那么 dataSem 需要++
        _pIndex++;                      // 生产者下一次生产数据的位置 ++
        _pIndex %= _ringQueue.size();   // 跟新下标, 保证环形特性
    }
    // 消费接口
    T pop() {
        // 消费数据
        // 先申请数据信号量
        sem_wait(&_dataSem);            // 申请成功则 _dataSem--, 否则等待
        T tmp = _ringQueue[_cIndex];    // 存储应拿到的数据
        sem_post(&_roomSem);            // 拿出了数据, 空间+1, 那么 _roomSem ++
        _cIndex++;
        _cIndex %= _ringQueue.size();

        return tmp;
    }


private:
    vector<T> _ringQueue;   // 模拟循环队列的数组
    sem_t _roomSem;         // 空间资源信号量, 生产者申请
    sem_t _dataSem;         // 数据资源信号量, 消费者申请
    uint32_t _pIndex;       // 生产者生产数据的索引下标, 即插入数据的下标
    uint32_t _cIndex;       // 消费者消费数据的索引下标, 即获取数据的下标
};

环形队列模拟生产者消费者模型的封装实现, 没有什么需要特别注意的点.

首先成员是:

  1. 一个数组, 用来模拟环形队列
  2. 一个空间信号量、一个数据信号量. 分别用来控制生产者对空间的申请, 消费者对数据的申请
  3. 一个生产数据的索引下标, 一个消费数据的索引下标, 分别用来表示插入数据的下标, 和拿出数据的下标

其次, 就是初始化信号量的操作. sem_init() 的使用:

int sem_init(sem_t *sem, int pshared, unsigned int value);

此接口需要传入三个参数:

  1. sem_t* sem, 需要初始化的信号量
  2. int pshared, 信号量的类型. 我们暂不考虑, 传入0
  3. unsigned int value, 信号量的初始值

注意: 此接口不能对一个信号量重复使用

最后, 需要注意的就是:

需要使用 _cIndex %= _ringQueue.size()_pIndex %= _ringQueue.size() 来控制环形特性

因为 在生产数据和消费数据后, _pIndex_cIndex 需要 ++. 如果不控制, 迟早超出数组. 所以需要 取模控制一下, 并控制环形特性

ringQueue.cc:

#include "ringQueue.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
using std::cout;

// 消费线程调用函数
void* consumer(void* args) {
    ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
    while (true) {
        sleep(3);
        int data = ringQp->pop();
        cout << "consumer_pthread[" << pthread_self() << "]"
             << " 消费了一个数据: " << data << endl;
    }
}

// 生产线程调用函数
void* productor(void* args) {
    ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
    while (true) {
        int data = rand() % 20;
        ringQp->push(data);
        cout << "productor_pthread[" << pthread_self() << "]"
             << " 生产了一个数据: " << data << endl;
        sleep(1);
    }
}

int main() {
    srand((unsigned long)time(nullptr) ^ getpid());

    ringQueue<int> ringQ;

    pthread_t con, pro;
    pthread_create(&con, nullptr, consumer, &ringQ);
    pthread_create(&pro, nullptr, productor, &ringQ);

    pthread_join(con, nullptr);
    pthread_join(pro, nullptr);

    return 0;
}

主函数相关的代码, 就没有需要注意的地方了.

实现之后, 编译执行这段代码:

我们在代码中设置, 1s生产一次数据, 3s消费一次数据.

所以 从代码的执行结果中, 我们可以看到. 刚开始 生产3个数据, 消费1个数据, 并且按照生产的顺序消费. 到后面, 由于消费速度不快, 所以队列很快被占满, 但是生产者也并没有超越消费者去生产数据, 而是等着消费者消费之后, 在生产.

这就是由于信号量在控制着. 队列被占满时, sem_wait(_roomSem) 是无法申请成功空间信号量的, 因为此时 _roomSem 为0. 队列为空时, 则相反.

不过, 此时我们实现的代码是 单生产线程和单消费线程的.

如果是 多生产线程和多消费线程, 我们当前模拟的生产者消费者模型的代码会出现错误吗?

毫无疑问, 会出现错误.

因为, 如果是多线程生产和消费, 那么我们 对索引下标的保护就不合格.

在多线程生产时, 只要队列中存在足够的空间, 多线程就会并发的去访问 索引下标, 而一个对象内只有一个索引下标, 如果不对索引下标添加保护, 就一定会造成错误.

所以, 我们需要用锁来对索引下标进行保护.

那么, ringQueue封装的成员就需要改进一下:

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using std::cout;
using std::endl;
using std::vector;

const int gDefultCap = 30; 				// 实现了多线程, 适当的将队列放大 

template <class T>
class ringQueue {
public:
    // 构造函数
    ringQueue(const int cap = gDefultCap) 
        : _ringQueue(cap) 
        , _pIndex(0)
        , _cIndex(0) {
        sem_init(&_roomSem, 0, _ringQueue.size());
        sem_init(&_dataSem, 0, 0);
        // sem_init() 接口的 
        // 第一个参数是 需要初始化的信号量, 
        // 第二个参数是 
        // 第三个参数是 需要初始化为多少

        // 初始化锁
        pthread_mutex_init(&_pMutex, nullptr);
        pthread_mutex_init(&_cMutex, nullptr);
    }
    // 析构函数
    ~ringQueue() {
        sem_destroy(&_roomSem);
        sem_destroy(&_dataSem);

        pthread_mutex_destroy(&_pMutex);
        pthread_mutex_destroy(&_cMutex);
    }
    // 生产接口
    void push(const T &in) {
        // 生产数据
        // 先申请空间信号量
        sem_wait(&_roomSem);                // 申请成功则 _roomSem--, 否则等待
        pthread_mutex_lock(&_pMutex);       // 申请信号量成功后, 加锁
        _ringQueue[_pIndex] = in;           // 将数据放入 数组
        _pIndex++;                          // 生产者下一次生产数据的位置 ++
        _pIndex %= _ringQueue.size();       // 跟新下标, 保证环形特性
        pthread_mutex_unlock(&_pMutex);     // 访问完临界资源, 解锁
        sem_post(&_dataSem);                // 数组中数据+1, 那么 dataSem 需要++
    }
    // 消费接口
    T pop() {
        // 消费数据
        // 先申请数据信号量
        sem_wait(&_dataSem);            // 申请成功则 _dataSem--, 否则等待
        pthread_mutex_lock(&_cMutex);
        T tmp = _ringQueue[_cIndex];    // 存储应拿到的数据
        _cIndex++;
        _cIndex %= _ringQueue.size();
        pthread_mutex_unlock(&_cMutex);
        sem_post(&_roomSem); // 拿出了数据, 空间+1, 那么 _roomSem ++
        return tmp;
    }


private:
    vector<T> _ringQueue;   // 模拟循环队列的数组
    sem_t _roomSem;         // 空间资源信号量, 生产者申请
    sem_t _dataSem;         // 数据资源信号量, 消费者申请
    uint32_t _pIndex;       // 生产者生产数据的索引下标, 即插入数据的下标
    uint32_t _cIndex;       // 消费者消费数据的索引下标, 即获取数据的下标

    // 保护索引下标的锁
    pthread_mutex_t _cMutex;    // 消费数据索引下标的锁
    pthread_mutex_t _pMutex;    // 生产数据索引下标的锁
};

我们在类内, 添加了两个锁来分别保护 生产和消费数据的索引下标.

为什么要用两个锁? 因为需要保证 生产和消费在不同位置的并发, 所以不能只用一把锁.

并且, 在生产接口和消费接口中, 都是在 申请到信号量之后上的锁. 这是为什么?

因为, 我们说过 申请到信号量 就表示其实可以看成已经获取到资源了. 所以 申请到信号量之后再上锁, 可以实现让线程先预定资源 再等待的功能. 防止出现, 线程先因为锁 阻塞了一会, 终于抢到锁了 却申请不到资源的情况.

那么, 改进封装之后, 我们多线程试验一下.

为了测试多线程是否做好了保护, 我们可以先让多个生产线程一直生产, 看一看会不会出错. 然后再消费线程:

#include "ringQueue.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
using std::cout;

// 消费线程调用函数
void* consumer(void* args) {
    sleep(10);
    ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
    while (true) {
        sleep(1);
        int data = ringQp->pop();
        cout << "consumer_pthread[" << pthread_self() << "]"
             << " 消费了一个数据: " << data << endl;
    }
}

// 生产线程调用函数
void* productor(void* args) {
    ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
    while (true) {
        int data = rand() % 20;
        ringQp->push(data);
        cout << "productor_pthread[" << pthread_self() << "]"
             << " 生产了一个数据: " << data << endl;
        usleep(500000);
    }
}

int main() {
    srand((unsigned long)time(nullptr) ^ getpid());

    ringQueue<int> ringQ;

    pthread_t con1, con2, con3, pro1, pro2, pro3;
    pthread_create(&con1, nullptr, consumer, &ringQ);
    pthread_create(&con2, nullptr, consumer, &ringQ);
    pthread_create(&con3, nullptr, consumer, &ringQ);
    pthread_create(&pro1, nullptr, productor, &ringQ);
    pthread_create(&pro2, nullptr, productor, &ringQ);
    pthread_create(&pro3, nullptr, productor, &ringQ);

    pthread_join(con1, nullptr);
    pthread_join(con2, nullptr);
    pthread_join(con3, nullptr);
    pthread_join(pro1, nullptr);
    pthread_join(pro2, nullptr);
    pthread_join(pro3, nullptr);

    return 0;
}

我们让单线程每 0.5s 生产一个数据, 让10s后, 单消费线程再以 1s 消费一个数据的速度消费.

代码的执行结果为:

虽然打印的结果很混乱, 但是还是可以看出没有出现生产或消费出错的.

打印首先是因为屏幕也是临界资源, 但我们没有保护.

其次是因为 cout

这就是 以环形队列模拟生产者消费者模型.


本片文章到这里就结束啦~

感谢阅读

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/444718.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android Studio连接使用第三方模拟器

使用Android Studio自带的模拟器&#xff0c;第一会比较卡&#xff0c;第二配置容易出错&#xff0c;第三&#xff0c;自带的模拟器很吃电脑配置。如果电脑配置较差&#xff0c;会比较耽误事。所以为例解决上面三个问题&#xff0c;可以在电脑上按照第三方手机模拟器&#xff0…

陶泓达:4.18午间欧盘黄金原油最新精准操作建议!

黄金方面&#xff1a; 黄金消息面解析&#xff1a;周一&#xff08;4月17日&#xff09;美市盘中&#xff0c;美国公布的4月纽约联储制造业指数和4月NAHB房产市场指数均超出预期&#xff0c;提振了美联储在5月继续加息的预期。数据公布之后&#xff0c;美元指数加速上扬&#x…

【wireshark】Ubuntu 安装 wireshark 以及 wireshark 过滤器的使用

目录 1、安装wireshark 2、wireshark 过滤器比较符号 3、wireshark 过滤方式 (1) 根据 IP 地址过滤 (2) 根据端口号过滤 (3) 根据报文长度过滤 (4) HTTP协议过滤 参考文章链接&#xff1a;Wireshark 过滤器使用 1、安装wireshark 在命令行输入如下命令安装 wireshark …

Flutter与Android开发:构建跨平台移动应用的新选择

Flutter与Android开发&#xff1a;构建跨平台移动应用的新选择 本文内容提纲如下&#xff1a; 介绍Flutter技术&#xff1a;Flutter是一种由Google推出的开源UI工具包&#xff0c;用于构建高性能、跨平台的移动应用。文章将介绍Flutter的基本概念、特点和优势&#xff0c;包括其…

计算机设置定时任务及自动开关机

目录 创建定时任务 自动开关机 创建定时任务 1、右击桌面计算机&#xff0c;点击管理&#xff0c;打开计算机管理或通过控制面板打开[控制面板-管理工具-计算机管理] 2、依次选择&#xff1a;系统工具->任务计划程序->任务计划程序库->Microsoft->Windows&#…

MOD8ID 加密芯片的 AES-GCM 模式使用

一&#xff1a;什么是 AES-GCM 加密&#xff1f; AES-GCM是一种高级加密标准&#xff08;AES&#xff09;的加密模式&#xff0c;同时使用加密和身份验证&#xff08;AEAD&#xff09;功能。它使用加密算法AES和Galois Counter Mode&#xff08;GCM&#xff09;计数器模式&…

5行Python代码采集3000+上市公司信息,很爽

嗨害大家好鸭&#xff01;我是爱摸鱼的芝士❤ 毕业季也到了找工作的季节了&#xff0c; 很多小伙伴都会一家一家的公司去看&#xff0c; 这得多浪费时间啊。 今天用Python教大家怎么采集公司的信息&#xff0c; 相信大家会很喜欢这个教程的&#xff0c;nice&#xff01; pyth…

中介者设计模式(Mediator Design Pattern)[论点:概念、组成角色、相关图示、示例代码、适用场景]

文章目录 概念组成角色相关图示示例代码适用场景 概念 中介者设计模式是一种行为型设计模式&#xff0c;它通过引入一个中介对象来封装一组对象之间的交互&#xff0c;使得对象之间不需要显式地相互引用&#xff0c;从而降低它们之间的耦合。通过将对象间的通信封装到中介者对象…

Ubuntu20.4利用httpd(Apache2)源码搭建web服务器

Apache取自“a patchy server”的读音&#xff0c;源于NCSAhttpd服务器&#xff0c;经过多次修改&#xff0c;成为世界上最流行的Web服务器软件之一&#xff0c;Apache的特点是简单、速度快、性能稳定&#xff0c;并可做代理服务器来使用。 本来它只用于小型或试验Internet网络…

TinyOS 配置教程

文章目录 前言1. 安装1.1. 实验环境1.2. TinyOS基础工作1.3. TinyOS 的配置1.4. 安装 java1.5. 安装编译器 2. 测试仿真程序总结 前言 本文主要用于记录在 WSN 课程中&#xff0c;配置大作业所需使用的 TinyOS 仿真环境 1. 安装 1.1. 实验环境 本实验以如下版本为例&#xf…

Python面向对象详解(非常详细)

非常详细的讲解&#xff08;爆肝1w字&#xff09;&#x1f44f;&#x1f3fb;&#x1f44f;&#x1f3fb;&#x1f44f;&#x1f3fb; 零基础一样学得会&#x1f44c;&#x1f3fb; 干货满满不看后悔&#x1f44d;&#x1f44d;&#x1f44d; &#x1f4dd;个人主页→数据…

函数重载注意事项

C为什么支持函数重载&#xff0c;C语言不支持函数重载&#xff1f; C代码产生函数符号时&#xff0c; 是函数名参数列表类型组成的&#xff01;如_Z3sumii C代码产生函数符号时&#xff0c;只由函数名决定 什么是函数重载&#xff1f; 一组函数&#xff0c;其中函数名相同&…

读书笔记-《ON JAVA 中文版》-摘要14[第十四章 流式编程]

文章目录 第十四章 流式编程1. 流支持2. 流创建2.1 流创建2.2 随机数流2.3 int 类型的范围2.4 generate()2.5 iterate()2.6 流的建造者模式2.7 Arrays2.8 正则表达式 3. 中间操作3.1 跟踪和调试3.2 流元素排序3.3 移除元素3.4 应用函数到元素3.5 在 map() 中组合流 4. Optional…

电脑文件恢复怎么做?3个方法快速恢复文件!

案例&#xff1a;电脑文件恢复怎么操作&#xff1f; 【我的电脑已经好久没用了&#xff0c;最近因为需要查看一些相关的文件才用到电脑&#xff0c;但是我打开后发现里面很多重要的文件都不见了&#xff0c;请问电脑文件应该怎么恢复呢&#xff1f;感谢回答&#xff01;】 电…

AIGC从入门到精通

一键起飞 # 提前安装好python 3.10.9 ​git clone https://github.com/AUTOMATIC1111/stable-diffusion-webui cd stable-diffusion-webui ./webui.sh -f --api --listen --enable-insecure-extension-access 非常详细&#xff01;6000字详解AI绘画文生图干货、技巧&#xf…

【多线程】线程的状态

1. 等待一个线程 join 有一天张三与小美在家里吃饭&#xff0c;张三早早的把饭吃完了&#xff0c;对小美说&#xff0c;等你把饭吃完了&#xff0c;我就去洗碗了&#xff01; 此时张三就要等待小美吃完饭才能去洗碗&#xff0c;就好比线程 A 要等待线程 B 执行完&#xff0c;线…

Ansys Lumerical | 光子集成电路之PN 耗尽型移相器仿真工作流

01 说明 本文旨在介绍Ansys Lumerical针对有源光子集成电路中PN耗尽型移相器的仿真分析方法。通过FDE和CHARGE求解器模拟并计算移相器的性能指标&#xff08;如电容、有效折射率扰动和损耗等&#xff09;&#xff0c;并创建用于INTERCONNECT的紧凑模型&#xff0c;然后将其表征…

vue_03

文章目录 导航菜单功能的实现在Admin.vue中添加下列代码布局选择点击跳转事件 vuex的安装及配置安装配置新建store和index.js在index.js下写如下代码在main.js中引入store 封装菜单请求工具类新建menus.js编写menus.js文件 解决F5刷新数据丢失问题 导航菜单功能的实现 在Admin…

多项开发任务,如何做好任务分配和管理?

1、确定任务清单 任务精细化分解 需要将任务进行精细化分解&#xff0c;每个子任务时间最好不超过一周&#xff0c;明确子任务的目标、时间点和交付物。 多项开发任务&#xff0c;如何做好任务分配和管理&#xff1f; 2、优先级排序 需要将精细化好的任务&#xff0c;进行优先级…

TFTP+Filezilla文件双向传输(2)-ubuntu(VMware)-win10(host)

TFTP&#xff08;Trivial File Transfer Protocol,简单文件传输协议&#xff09;是TCP/IP协议族中的一个用来在客户机与服务器之间进行简单文件传输的协议&#xff0c;提供不复杂、开销不大的文件传输服务。端口号为69。 ftpd-hpa是tftp服务器 tftp-hpa是tftp客服端 inetd的全称…