【Linux】从多线程同步到生产者消费者模型:多线程编程实践

news2025/1/12 15:57:20

目录

1.线程的同步

1.1.为什么需要线程的同步?

2.2.条件变量的接口函数

2.生产消费模型

2.1 什么是生产消费模型

2.2.生产者消费者模型优点

2.3.为何要使用生产者消费者模型

3.基于BlockingQueue的生产者消费者模型

3.1为什么要将if判断变成while?

3.2.pthread_cond_wait函数调用的作用:

代码:

4.POSIX信号量

4.1.POISX信号量是什么?

4.2.POISX信号量常见接口

4.3.POSIX信号量的核心PV操作

4.3.1、P操作(等待信号量)

4.3.2、V操作(释放信号量)

5.环形队列

5.1.生产消费模型搭建的原理

5.2.环形队列的具体实现

5.3.代码:

5.4.多生产和多消费的并发性体现在:

1.线程的同步

1.1.为什么需要线程的同步?

上面我们讲解了线程的互斥问题,但此时我们又发现了一个问题!

如果某一个线程抢票能力过于强大,把所有的票一个人都抢走了,比如上面的线程4,一个人就抢到了8088张票,而线程2和线程3一张票都没有抢到,这就造成了线程2和线程3的饥饿问题!

在现实世界里,这肯定是不行的,秉持着公平公正的原则,我们应该让这4个线程抢到的票都差不多,才有实际意义。

所以互斥能解决抢票抢到负数的问题,但是不能解决饥饿问题,饥饿问题就需要线程同步去解决!

通过条件变量我们可以实现线程的同步!

2.2.条件变量的接口函数

int pthread_cond_init(pthread_cond_t *restrict cond , const pthread_condattr_t *restrictattr);:初始化接口
int pthread_cond_destroy(pthread_cond_t *cond):销毁接口
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);:在条件不满足时阻塞等待
int pthread_cond_broadcast(pthread_cond_t *cond);:条件满足,唤醒所有线程,开始竞争。
int pthread_cond_signal(pthread_cond_t *cond);:条件满足,唤醒一个线程。

条件变量需要一个线程队列和相应的通知机制,才能保证线程同步!

2.生产消费模型

2.1 什么是生产消费模型

总结一句话就是“321”原则:

  1. 一个交易场所(特定数据结构形式存在的一段内存空间)
  2. 两种角色(生产角色,消费角色):生产线程,消费线程
  3. 三种关系:生产与生产(互斥关系) , 消费与消费(互斥关系),生产与消费。

1个交易场指的就是共享资源(临界资源),有多个厂商(生产者)和多个用户(消费者),所以这就是我们常说的多线程的同步和互斥问题。

超市是什么?临时保存数据的“内存空间”——某种数据结构对象。

商品是什么?就是数据!

2.2.生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

2.3.为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

3.基于BlockingQueue的生产者消费者模型

 

3.1为什么要将if判断变成while?

如果生产者只生产了一份,但是叫醒了5个消费者,当一个消费者竞争锁结束取走仅有的一份商品,那接下来的4个消费者就会看到空的队列,如果是if,因为之前已经判断过,所以会直接执行下面取空的队列,因此会直接报错,但是如果是while的话,仍需要判断队列是否已经满了,因为当等待的线程被唤醒的时候,继续从当前的位置进行执行代码!

3.2.pthread_cond_wait函数调用的作用:

a. 让调用线程等待

b. 自动释放曾经持有的_mutex锁

c. 当条件满足,线程唤醒,pthread_cond_wait要求线程必须重新竞争_mutex锁,竞争成功,方可返回!!!

代码:

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

template <typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap) : _cap(cap)
    {
        _productor_wait_num = 0;
        _consumer_wait_num = 0;
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consum_cond, nullptr);
    }
    void Enqueue(T &in) // 生产者用的接口
    {
        pthread_mutex_lock(&_mutex);
        while(IsFull()) // 保证代码的健壮性
        {
            // 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
            // 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的_mutex锁 c. 当条件满足,线程唤醒,pthread_cond_wait要求线性
            // 必须重新竞争_mutex锁,竞争成功,方可返回!!!
            // 之前:安全
            _productor_wait_num++;
            pthread_cond_wait(&_product_cond, &_mutex);  //  只要等待,必定会有唤醒,唤醒的时候,就要继续从这个位置向下运行!!
            _productor_wait_num--;
            // 之后:安全
        }
        // 进行生产
        // _block_queue.push(std::move(in));
        // std::cout << in << std::endl;
        _block_queue.push(in);
        // 通知消费者来消费
        if(_consumer_wait_num > 0)
            pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T *out) // 消费者用的接口 --- 5个消费者
    {
        pthread_mutex_lock(&_mutex);
        while(IsEmpty()) // 保证代码的健壮性
        {
            // 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
            // 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
            _consumer_wait_num++;
            pthread_cond_wait(&_consum_cond, &_mutex);  // 伪唤醒
            _consumer_wait_num--;
        }

        // 进行消费
        *out = _block_queue.front();
        _block_queue.pop();
        // 通知生产者来生产
        if(_productor_wait_num > 0)
            pthread_cond_signal(&_product_cond);
        pthread_mutex_unlock(&_mutex);
        // pthread_cond_signal(&_product_cond);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consum_cond);
    }

private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;                     // 总上限
    pthread_mutex_t _mutex;       // 保护_block_queue的锁
    pthread_cond_t _product_cond; // 专门给生产者提供的条件变量
    pthread_cond_t _consum_cond;  // 专门给消费者提供的条件变量

    int _productor_wait_num;
    int _consumer_wait_num;
};

#endif

我们之前学习了基于条件变量和阻塞队列实现(空间可以动态分配)的生产消费者模型,今天我们来用POSIX信号量基于固定大小的环形队列重写这个程序。

4.POSIX信号量

4.1.POISX信号量是什么?

信号量本质是一个计数器,可以在初始化时对设置资源数量,进程 / 线程 可以获取信号量来对资源进行操作和结束操作可以释放信号量!

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

4.2.POISX信号量常见接口

信号量初始化:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数分别为:
sem_t *sem:传入信号量的地址
pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
value:信号量的初始值(计数器的初始值)

信号量销毁:

#include <semaphore.h>
int sem_destroy(sem_t *sem)

4.3.POSIX信号量的核心PV操作

POSIX信号量的PV操作是信号量机制中的核心,它们分别代表了对信号量的等待(P操作)和释放(V操作)。

4.3.1、P操作(等待信号量)

P操作,也称为“申请资源”或“等待信号量”操作,用于尝试减少信号量的值。当线程或进程需要访问某个临界资源时,它会执行P操作来申请信号量。

  1. 函数原型

    int sem_wait(sem_t *sem);

    其中,sem是指向要等待的信号量的指针。

  2. 操作过程

    • 如果信号量的当前值大于0,那么P操作会将信号量的值减1,并立即返回,表示申请资源成功。
    • 如果信号量的当前值为0,那么执行P操作的线程或进程将被阻塞,直到信号量的值变为大于0(即有其他线程或进程释放了信号量)。此时,被阻塞的线程或进程会重新尝试P操作,如果成功,则信号量的值再次减1。
  3. 返回值

    • 成功时,返回0。
    • 失败时,返回-1,并设置errno来指示错误类型。

4.3.2、V操作(释放信号量)

V操作,也称为“释放资源”或“发布信号量”操作,用于增加信号量的值。当线程或进程完成对临界资源的访问后,它会执行V操作来释放信号量。

  1. 函数原型

    int sem_post(sem_t *sem);

    其中,sem是指向要释放的信号量的指针。

  2. 操作过程

    • V操作会将信号量的值加1
    • 如果有线程或进程因为信号量的值为0而被阻塞在P操作上,那么V操作会唤醒其中一个被阻塞的线程或进程,使其能够继续执行P操作并访问临界资源。
  3. 返回值

    • 总是返回0,表示成功。V操作永远不会阻塞。

注意PV操作是原子的,这意味着它们在执行过程中不会被其他线程或进程的打断。这保证了信号量机制的正确性和可靠性。

5.环形队列

5.1.生产消费模型搭建的原理

环形队列底层也是普通数组,

生产者和消费者指向同一位置有两种情况:

  1. 队列为空(让生产者先跑)
  2. 队列为满(让消费者先跑)

环形队列当队列不为空或者满的时候,真正实现了多线程同步。当然生产者不能把消费者套一个圈,消费者不能超过生产者。这些都可以通过POSIX信号量的特性实现~

5.2.环形队列的具体实现

首先需要区分生产者和消费者,生产者只关注空间,消费者只关注资源。生产者和消费者都需要进行PV操作,生产者对应的将任务加入队列,消费者对应的取出队列里的任务。

  1. Consumer线程不断从环形队列中取出Task对象,执行其操作,并打印消费结果。
  2. Productor线程则持续生成新的Task对象并将其放入队列中,同时打印出生产信息。

并且还需要两把锁,分别给生产者和消费者,保证多线程并发的线程安全。

5.3.代码:

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>


template<typename T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap): _ring_queue(cap), _cap(cap),  _productor_step(0), _consumer_step(0)
    {
        sem_init(&_room_sem, 0, _cap);
        sem_init(&_data_sem, 0, 0);

        pthread_mutex_init(&_productor_mutex, nullptr);
        pthread_mutex_init(&_consumer_mutex, nullptr);
    }
    void Enqueue(const T &in)
    {
        // 生产行为
        P(_room_sem);
        Lock(_productor_mutex);
        // 一定有空间!!!
        _ring_queue[_productor_step++] = in; // 生产
        _productor_step %= _cap;
        Unlock(_productor_mutex);
        V(_data_sem);
    }
    void Pop(T *out)
    {
        // 消费行为
        P(_data_sem);
        Lock(_consumer_mutex);
        *out = _ring_queue[_consumer_step++];
        _consumer_step %= _cap;
        Unlock(_consumer_mutex);
        V(_room_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_productor_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
    }
private:
    // 1. 环形队列
    std::vector<T> _ring_queue;
    int _cap; // 环形队列的容量上限

    // 2. 生产和消费的下标
    int _productor_step;
    int _consumer_step;

    // 3. 定义信号量
    sem_t _room_sem; // 生产者关心
    sem_t _data_sem; // 消费者关心

    // 4. 定义锁,维护多生产多消费之间的互斥关系
    pthread_mutex_t _productor_mutex;
    pthread_mutex_t _consumer_mutex;
};

5.4.多生产和多消费的并发性体现在:

消费者在处理任务的时候可以并发,

所以多生产和多消费的意义不在于向队列中生产,再从队列中拿走。而在于生产前我们可以多线程并发获取原始任务,生产后,被我们的消费者拿走任务后,可以多线程并发式的去执行各自的任务。这才是多生产多消费的意义

多生产,多消费的模型主要在于,多个生产者去竞争一个名额然后进行加锁,多个消费者竞争一个名额然后进行加锁,所以最终还是会变成单生产,单消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2219711.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

API的力量:解决编程技术问题的利器

在软件开发的世界里&#xff0c;编程技术问题无处不在。从数据获取到用户认证&#xff0c;从支付处理到地图服务&#xff0c;这些问题的解决方案往往需要深厚的专业知识和大量的开发时间。然而&#xff0c;应用程序编程接口&#xff08;API&#xff09;的出现&#xff0c;为开发…

架构师备考-背诵精华(系统架构设计)

软件架构风格 类型 子类型 说明 数据流风格 批处理 每个处理步骤是一个单独的程序&#xff0c;每一步必须在前一步结束后才能开始&#xff0c;而且数据必须是完整的&#xff0c;以整体的方式传递。 前面的构件处理完&#xff0c;后面构件才能处理&#xff1b;数据完整传输…

(五)若使用LQR控制小车倒立摆,该如何对小车和摆杆的动力学方程线性化?哪些变量是可以进行简化的,线性化后的状态空间方程应该怎么列写

写在前面&#xff1a; 关于lqr控制的讲解&#xff0c;可以观看如下三个视频&#xff1a; 2. LQR数学公式理解_哔哩哔哩_bilibili 如何感性地理解LQR控制&#xff1f;_哔哩哔哩_bilibili LQR简介与使用_哔哩哔哩_bilibili 正文&#xff1a; 在之前系列的文章中我们已经得出…

scala 抽象类

理解抽象类 抽象的定义 定义一个抽象类 &#xff1a;abstract class A {} idea实例 抽象类重写 idea实例 练习 1.abstract2.错3.abstract class A{}4.对

Redis应用高频面试题

Redis 作为一个高性能的分布式缓存系统,广泛应用于后端开发中,因此在后端研发面试中,关于 Redis 的问题十分常见。 本文整理了30个常见的 Redis 面试题目,涵盖了 Redis 的源码、数据结构、原理、集群模式等方面的知识,并附上简要的回答,帮助大家更好地准备相关的面试。 …

web前端--html 5---qq注册

<!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title>qq注册</title> <link rel"impo…

图像识别解决方案

图像识别解决方案是一种基于人工智能技术的图像处理和识别方法&#xff0c;能够实现对图像内容的自动分析和理解。以下是朗观视觉小编对图像识别解决方案的详细阐述&#xff1a; 一、技术原理 图像识别解决方案的核心原理是机器学习算法和深度学习网络。通过收集大量的图像数据…

nnUnet 大模型学习笔记(续):3d_fullres 模型的推理、切片推理、计算dice系数

目录 1. 前言 2. 更改epochs 3. 推理 3.1 nnUNet_predict 3.2 切成小的nii gz文件推理 切片代码 融合代码 3.3 可视化展示 3.4 评估指标 参考 1. 前言 训练了一天半&#xff0c;终于跑完了。。。。 训练的模型在这可以免费下载&#xff1a; 基于nnUnet3d-fullres训…

深⼊理解指针(2)

目录 1. 数组名的理解 2. 使⽤指针访问数组 3. ⼀维数组传参的本质 4. ⼆级指针 5. 指针数组 6. 指针数组模拟⼆维数组 1. 数组名的理解 我们在使⽤指针访问数组的内容时&#xff0c;有这样的代码&#xff1a; int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[…

rancher安装并快速部署k8s 管理集群工具

主机准备 准备4台主机 3台用于k8s集群 &#xff0c;1台用于rancher 每台服务器新增配置文件 vi etc/sysctl.confnet.ipv4.ip_forward 1 刷新生效 sysctl –p 安装docker 安装的时候可以去github上检索rancher看看最新版本适配那个版本的docker&#xff0c;这里安装23.0.1…

Linux 线程概念及线程控制

1.线程与进程的关系 执行流&#xff08;Execution Flow&#xff09;通常指的是程序执行过程中的控制路径&#xff0c;它描述了程序从开始到结束的指令执行顺序。例如我们要有两个执行流来分别进行加法和减法的运算&#xff0c;我们可以通过使用 fork 函数来创建子进程&#xf…

智慧商城项目2-登录模块

登录页静态布局 1.先重置默认样式 找到styles/common.less文件,没有就新建 // 重置默认样式 * {margin: 0;padding: 0;box-sizing: border-box;}// 文字溢出省略号.text-ellipsis-2 {overflow: hidden;-webkit-line-clamp: 2;text-overflow: ellipsis;display: -webkit-box;-…

CentOS7安装RabbitMQ-3.13.7、修改端口号

本文安装版本&#xff1a; Erlang&#xff1a;26.0 官网下载地址 Erlang RabbitMQ&#xff1a;3.13.7 官网下载地址 RabbitMQ RabbitMQ和Erlang对应关系查看&#xff1a;https://www.rabbitmq.com/which-erlang.html 注&#xff1a;安装erlang之前先安装下依赖文件&#xff0…

无人机之放电速率篇

无人机的放电速率是指电池在一定时间内放出其储存电能的能力&#xff0c;这一参数对无人机的飞行时间、性能以及安全性都有重要影响。 一、放电速率的表示方法 放电速率通常用C数来表示。C数越大&#xff0c;表示放电速率越快。例如&#xff0c;一个2C的电池可以在1/2小时内放…

《知道做到》

整体看内容的信息密度较低。绿灯思维、积极心态、反复练习值得借鉴。 引言 行动是老子&#xff0c;知识是儿子&#xff0c;创造是孙子&#xff01;行是知之始&#xff0c;知是行之成。 前言 工作中最让你失望的事情是什么&#xff1f; 一个人行为的改变总是先从内心想法的转…

MySQL 【日期】函数大全(六)

目录 1、TIME_FORMAT() 按照指定的格式格式化时间。 2、TIME_TO_SEC() 将指定的时间值转为秒数。 3、TIMEDIFF() 返回两个时间之间的差值。 4、TIMESTAMP() 累加所有参数并将结果作为日期时间值返回。 5、TIMESTAMPADD() 将指定的时间间隔加到一个日期时间值上并返回结果。…

数据库->库的操作

目录 一、查看数据库 1.显示所有的数据库 二、创建数据库 1.创建数据库 2.查看警告信息 3.创建一个名为database的数据库 三、字符集编码和校验(排序)规则 1.查看数据库⽀持的字符集编码 2.查看数据库⽀持的排序规则 3.一条完整创建库的语句 4. 不同的字串集与排序规…

keepalived(高可用)+nginx(负载均衡)+web

环境 注意&#xff1a; (1) 做高可用负载均衡至少需要四台服务器&#xff1a;两台独立的高可用负载均衡器&#xff0c;两台web服务器做集群 (2) vip&#xff08;虚拟ip&#xff09;不能和物理ip冲突 (3) vip&#xff08;虚拟ip&#xff09;最好设置成和内网ip同一网段&#xf…

传感器驱动系列之PAW3212DB鼠标光电传感器

目录 一、PAW3212DB鼠标光电传感器简介 1.1 主要特点 1.2 引脚定义 1.3 传感器组装 1.4 应用场景 1.5 传感器使用注意 1.5.1 供电选择 1.5.2 SPI读写设置 1.5.3 MOTION引脚 1.6 寄存器说明 1.6.1 Product_ID1寄存器 1.6.2 MOTION_Status寄存器 1.6.3 Delta_X寄存器…

【论文笔记】X-Former: Unifying Contrastive and Reconstruction Learning for MLLMs

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: X-Former: Unifying Contr…