生产者消费者模型和线程同步问题

news2024/10/2 12:36:49

文章目录

  • 线程同步概念
  • 生产者消费者模型
  • 条件变量
    • 使用条件变量
    • 唤醒条件变量
  • 阻塞队列

线程同步概念

互斥能保证安全,但是仅有安全不够,同步可以更高效的使用资源

生产者消费者模型

下面就基于生产者消费者来深入线程同步等概念:
在这里插入图片描述在这里插入图片描述如何理解生产消费者模型:
以函数调用为例:
在这里插入图片描述
在这里插入图片描述

两个线程之间要进行信息交互就需要引入一段内存空间(交易场所)
线程a将数据放入缓冲区(交易场所),线程b从缓冲区进行读取
这样线程a完成数据存放后就继续做自己的事情,线程b去读取数据
这样就能很好的实现多执行流之间的执行解耦
特点:很好的提高了处理数据的能力
支持忙闲不均

条件变量

条件变量:为了不让消费者的每次消费为无效消费.
所以对于生产者,在每次完成自己的任务之后对条件做出改变,当条件的变量达到一定条件后,消费者才进行有效消费
无效消费过程: 消费前(加锁)----尝试消费(无效消费)—消费结束(解锁)

条件变量的目的:
1.不做无效的锁申请
2.假设消费者很多,让他们有执行顺序
相当于条件变量给各个线程在调度他之前给一个提醒

条件变量本质是数据:可以理解为:

在这里插入图片描述

使用条件变量

认识接口
在这里插入图片描述
与互斥锁的创建和使用非常相似

pthread_cond_destroy();//创建布局条件变量要后要进行销毁
pthread_cond_init();//对局部的条件变量进行初始化
pthread_cond_t;//关键字 创建布局变量
全局就要提供PTHREAD_COND_INITIALIZER的宏来进行初始化

条件变量创建的前提是有线程安全,所以条件变量的接口和互斥锁的接口大致类似.

条件创建了还要有一个接口来等待条件成立:

在这里插入图片描述

pthread_cond_wait();//等待条件成立,参数为条件变量和互斥锁
上述所有的参数返回值都是在成功时返回0
失败返回错误原因

唤醒条件变量

在这里插入图片描述

pthread_cond_signal();//唤醒指定的条件变量,并唤醒一个线程
pthread_cond_broadcast();//是条件变量成立,并唤醒所有的线程

在没有条件变量的时候,打印信息如图:

在这里插入图片描述
可以看到线程的调度是不确定的,我们想让这个线程按照我们想要的顺序(如:一次Thread-1,Thread-2,Thread-3,这样)进行打印,那么就需要用到条件变量.
代码:

#include <iostream>
#include <unistd.h>
#include <string>
#include <pthread.h>

//创建条件变量和互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *ThreadRoutine(void* args)
{
    std::string name = static_cast<const char *>(args);
    while(true)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);
        std::cout << "I am a new Thread, my name is " << name << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}
int main()
{

    pthread_t t1, t2, t3;
    pthread_create(&t1, nullptr, ThreadRoutine, (void*)"Thread-1");
    pthread_create(&t2, nullptr, ThreadRoutine, (void*)"Thread-2");
    pthread_create(&t3, nullptr, ThreadRoutine, (void*)"Thread-3");


    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond);
    }
    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_join(t3, nullptr);



    return 0;
}

我们让代码按照t1, t2, t3的线程顺序来执行子线程的任务
在这里插入图片描述
换一个顺序验证也是如此
如果没有条件变量,这个也是按照顺序打印,不过是批次进行,和CPU时钟机制有关,所有使用条件变量更好

使用pthread_cond_broadcast();//唤醒全部线程
就跟加锁机制一样,不过每次是各个线程只执行一次后就会等待,并不想无锁那样批次打印

在这里插入图片描述

单纯的互斥能保证线程的安全, 但不一定合理或者高效.
pthread_cond_wait();//
在等待的时候,会释放这把锁(等待是在临界区内,释放锁是为了资源高效利用,再次加锁是不允许在有锁的临界区内有无锁的线程存在)

再被唤醒的时候,又会再次加锁
当被唤醒的时候,重新申请也是需要参与锁的竞争的(未解决这个问题, 看下main阻塞队列部分的讲解)

阻塞队列

这个队列只有为空,为满两种状态

为空:消费线程不能再消费
为满:生产线程不能在生产

这个场景也满足上述说明的所需的321原则
(3种关系,生产–生产,消费–消费
2中角色:生产者,消费者
1个环境(这个阻塞队列就是一个临界区))
单生产,单消费:
基于队列实现,阻塞队列的操作(消费者生产者实例):

伪唤醒:
在这里插入图片描述

在这份代码中,将来如果因为productor慢不满足生产, 多个线程在一个阻塞队列中等待,而有一个Push达到(有一个生产刚产出), 假设此时的代码是将全部线程都唤醒,那么除了第一个线程得到条件变量的满足和锁的满足,其他线程会在条件变量下的等待转化为竞争锁等待的情况, 假设此时若第一个线程完成pop且unlock速度快,那么这时后续的线程会在得到锁之后直接对空队列进行Pop操作,这是就会出现错误,这个状态就是伪唤醒(条件不满足,但是线程被唤醒了)

(虽然上述只是假设, 但是cpu的运行速度很快, 我们不防会有这样的情况发生)

所以修改Pop内的if(IsFull)代码和Push内的(IsEmpty)代码,还可以使用之前的锁封装的代码

在这里插入图片描述
此时的消费者生产者代码:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"

int defaultcap = 5; // for test

template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap)
        : _capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }
    bool IsEmpty()
    {
        return _q.size() == 0;//查看队列状态
    }
    bool IsFull()
    {
        return _q.size() == _capacity;//此时为满,
    }
    bool Pop(T *out)
    {
        LockGuard lg(&_mutex);
        while(IsEmpty())
        {
            //为空, 进行等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        //可以生产//可增加水准线进行响应的操作
        pthread_cond_signal(&_p_cond);
        //pthread_mutex_unlock(&_mutex);
        return true;
    }
    bool Push(const T &in)
    {
        // 当前变量进行加锁
        LockGuard lg(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(IsFull())
        {
            // 为满,进行阻塞等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 进行生产等待
        _q.push(in);
        //可以进行消费
        pthread_cond_signal(&_c_cond);
        //pthread_mutex_unlock(&_mutex);
        return true;
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

private:
    std::queue<T> _q;
    int _capacity; // 为空时,不能再消费,为满时,不能再生产,状态是capacity与size进行比较
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;
    pthread_cond_t _c_cond;
};

main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void *productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data = rand() % 10 + 1;//[1,10];
        bq->Push(data);

        std::cout << "consumer data: " << data << std::endl;

        sleep(1);
    }
}
void *consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data = 0;
        bq->Pop(&data);

        std::cout << "product data: " << data << std::endl;
    }
}
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());
    BlockQueue<int> * bq = new BlockQueue<int>();
    pthread_t c, p;//两个线程
    pthread_create(&p, nullptr, productor, bq);
    pthread_create(&c, nullptr, consumer, bq);


    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

利用生产者消费者模型实现分派任务的操作()
在此基础上构建一个任务类型:

Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
enum //设置退出码
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};
const std::string opts = "+-*/%)[()]"; //设置随机运算
class Task
{
public:
    Task()//无参构造函数用于生成无参临时对象,如果有参构造是全缺省那么可以不用写这个无参构造函数
    {}
    Task(int x, int y, char op)
    :data_x(x), data_y(y), opt(op), result(0), code(ok)
    {}
    void Run()//任务主题内容
    {
        switch(opt)
        {
            case '+':
                result = data_x + data_y;
                break;
            case '-':
                result = data_x - data_y;
                break;
            case '*':
                result = data_x * data_y;
                break;
            case '/':
            {
                if(data_y == 0)
                {
                    code = div_zero;
                }
                else
                {
                    result = data_x / data_y;
                }
                break;
            }
            case '%':
            {
                if(data_y == 0)
                {
                    code = mod_zero;
                }
                else
                {
                    result = data_x % data_y;
                }
                break;
            }
            default:
                code = unknow;
                break;
        }
    }
    void operator()()
    {
        Run();
    }
    ~Task()
    {}
    //打印任务,用于更清晰的认识
    std::string print_task()
    {
        std::string s;
        s = std::to_string(data_x) + opt + std::to_string(data_y) + "=?\n";
        return s;
    }
    std::string print_result()
    {
        std::string s;
        s = std::to_string(data_x) + opt + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]" + "\n";
        return s;
    }
private:
    int data_x;
    int data_y;
    char opt;

    int result;
    int code;
};

对上述代码的修改:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void *productor(void *args)//生产者
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);//阻塞队列的对象
    while(true)
    {
        int x = rand() % 11;//[0,10];
        int y = rand() % 11;//[0,10];
        char opt = opts[rand() % opts.size()];
        Task t(x, y, opt);
        std::cout << t.print_task() << std::endl;;
        bq->Push(t);//放入队列,队列size+1

        //usleep(1000);
    }
}
void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        sleep(1);
        Task t;
        //1.拿到消费数据
        bq->Pop(&t);

        //2.执行任务
        t();
        
        //3.打印任务信息
        std::cout << t.print_result() << std::endl;;
    }
}
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//伪随机种子
    BlockQueue<Task> * bq = new BlockQueue<Task>();//创建一个阻塞队列
    pthread_t c, p;//两个线程
    pthread_create(&p, nullptr, productor, bq);//两个线程模拟消费者生产者模型
    pthread_create(&c, nullptr, consumer, bq);


    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

针对上述代码,生产者和消费者本身就是互斥的,也就是串行执行,怎么会高效呢?

探究这个问题,首先从消费者消费后去哪里?生产在生产之前从哪来?来考虑.
生产者的数据,在产生时是花费时间,消费者消费也要花时间.
在消费者处理数据时花时间的同时生产者在某个时刻刚好将数据传给临界区,生产者只需要保证自己完成传送就可以做其他自己的事,消费者自己继续处理数据
所以高效,并发不体现在同步互斥,而是在拿数据,处理数据这里.

多线程任务下的消费者生产者模型多对多:
在这里插入图片描述将bq和线程名字一起封装可以更好的观察:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本篇结束,下篇更精彩,关注我,带你飞~~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1912641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VBA实现Excel数据排序功能

前言 本节会介绍使用VBA如何实现Excel工作表中数据的排序功能。 本节会通过下表数据内容为例进行实操&#xff1a; 1. Sort 单列排序 语法&#xff1a;Sort key1,Order1 说明&#xff1a; Key1&#xff1a;表示需要按照哪列进行排序 Order1&#xff1a;用来指定是升序xlAsce…

AI提示词:AI辅导「数学作业」

辅导孩子作业对许多家长来说可能是一件头疼的事&#xff0c;但这部分工作可以在一定程度上交给AI来完成。 打开ChatGPT4,输入以下内容&#xff1a; # Role 数学辅导专家## Profile - author: 姜小尘 - version: 02 - LLM: Kimi - language: 中文 - description: 专门为小学生…

53-4 内网代理6 - frp搭建三层代理

前提:53-3 内网代理5 - frp搭建二级代理-CSDN博客 三级网络代理 在办公区入侵后,发现需要进一步渗透核心区网络(192.168.60.0/24),并登录域控制器的远程桌面。使用FRP在EDMZ区、办公区与核心区之间建立三级网络的SOCKS5代理,以便访问核心区的域控制器。 VPS上的FRP服…

力扣hot100 -- 动态规划(上)

目录 ❄技巧 &#x1f33c;爬楼梯 &#x1f354;杨辉三角 &#x1f30a;打家劫舍 &#x1f40e;完全平方数 &#x1f33c;零钱兑换 &#x1f33c;单词拆分 ❄技巧 动态规划dp-CSDN博客 &#x1f446;花 5 分钟快速刷一遍 花 10 分钟浏览一下 线性DP 背包DP&#x1f447…

算法金 | 12 个最佳 Python 代码片段,帮我完成工作自动化,香~

​大侠幸会幸会&#xff0c;我是日更万日 算法金&#xff1b;0 基础跨行转算法&#xff0c;国内外多个算法比赛 Top&#xff1b;放弃 BAT Offer&#xff0c;成功上岸 AI 研究院 Leader&#xff1b; Python是一种多功能的编程语言&#xff0c;它提供了各种功能和库来有效地自动化…

STM32F103RB多通道ADC转换功能实现(DMA)

目录 概述 1 硬件 1.1 硬件实物介绍 1.2 nucleo-f103rb 1.3 软件版本 2 软件实现 2.1 STM32Cube配置参数 2.2 项目代码 3 功能代码实现 3.1 ADC功能函数 3.2 函数调用 4 测试 4.1 DMA配置data width&#xff1a;byte 4.2 DMA配置data width&#xff1a;Half wor…

Qt常用基础控件总结—输入部件(QComboBox类和QLineEdit)

输入部件 下拉列表控件QComboBox 类 QComboBox 类是 QWidget 类的直接子类,该类实现了一个下拉列表(组合框)。 QComboBox 类中的属性函数 1)count:const int 访问函数:int count() const; 获取组合框中的项目数量,默认情况下,对于空组合框或未设置当前项目的组合框,…

Java版Flink使用指南——合流

大纲 新建工程无界流奇数Long型无界流偶数Long型无界流奇数String型无界流 合流UnionConnect 测试工程代码 在《Java版Flink使用指南——分流导出》中&#xff0c;我们通过addSink进行了输出分流。本文我们将介绍几种通过多个无界流输入合并成一个流来进行处理的方案。 新建工…

ArcGIS实战—等高线绘制

今天分享一个使用ArcGIS Pro制作等高线地图的教程&#xff0c;等高线是用来表达地形最常见的形式之一。那么如何制作一个效果比较好的等高线地形图呢&#xff1f;让我们开始今天的教程。 1 DEM数据 第一步&#xff1a;获取DEM地形数据&#xff0c;网址&#xff08;https://dwt…

贴脸细看Mixtral 8x7B- 稀疏混合专家模型(MoE)的创新与推动

贴脸细看Mixtral 8x7B- 稀疏混合专家模型&#xff08;MoE&#xff09;的创新与推动 原创 一路到底孟子敬 上堵吟 2024年01月15日 20:05 美国 I. 引言 A. Mixtral 8x7B的背景和目的 • 背景&#xff1a;随着大型语言模型在自然语言处理&#xff08;NLP&#xff09;领域的广泛…

本地 HTTP 文件服务器的简单搭建 (deno/std)

首发日期 2024-06-30, 以下为原文内容: 在本地局域网搭建一个文件服务器, 有很多种方式. 本文介绍的是窝觉得比较简单的一种. 文件直接存储在 btrfs 文件系统之中, 底层使用 LVM 管理磁盘, 方便扩容. 使用 btrfs RAID 1 进行镜像备份 (一个文件在 2 块硬盘分别存储一份), 防止…

es是如何处理索引数据的变动的?

1 概述 es是如何处理索引数据的变动的&#xff1f; 或者说索引数据变动时&#xff0c;es会执行哪些操作&#xff1f; refresh、fsync、merge 和 flush 操作有何作用&#xff1f; es是如何确保即使es发生宕机数据也不丢失的&#xff1f; 在回答上述问题前&#xff0c;可以先…

【Linux】多线程_1

文章目录 九、多线程1. 线程概念2. 线程的控制 未完待续 九、多线程 1. 线程概念 我们知道&#xff1a;进程 内核数据结构 进程代码和数据 。那什么是线程呢&#xff1f;线程是进程内部的一个执行分支。一个进程内部可以有多个执行流&#xff08;内核数据结构&#xff09;&…

[高频 SQL 50 题(基础版)]第一千七百五十七题,可回收且低脂产品

题目&#xff1a; 表&#xff1a;Products ---------------------- | Column Name | Type | ---------------------- | product_id | int | | low_fats | enum | | recyclable | enum | ---------------------- product_id 是该表的主键&#xff08;具有唯…

大数据专业创新人才培养体系的探索与实践

一、引言 随着大数据技术的迅猛发展&#xff0c;其在各行各业中的应用日益广泛&#xff0c;对大数据专业人才的需求也日益增长。我国高度重视大数据产业的发展&#xff0c;将大数据作为国家战略资源&#xff0c;推动大数据与各行业的深度融合。教育部也积极响应国家战略&#…

C语言编程4:复合赋值,递增递减运算符,局部变量与全局变量,本地变量,转义字符

一篇文章带你玩转C语言基础语法4&#xff1a;复合赋值&#xff0c;递增递减运算符&#xff0c;局部变量与全局变量&#xff0c;本地变量&#xff0c;转义字符 一、复合赋值&#x1f33f; 1.1&#x1f4a0;定义 赋值就是给任意一个变量或者常量赋一个值&#xff0c;这个值可以…

在亚马逊云科技AWS上利用SageMaker机器学习模型平台搭建生成式AI应用(附Llama大模型部署和测试代码)

项目简介&#xff1a; 接下来&#xff0c;小李哥将会每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。本次介绍的是如何在Amazon …

Jmeter在信息头中设置Bearer与 token 的拼接值

思路&#xff1a;先获取token&#xff0c;将token设置成全局变量&#xff0c;再与Bearer拼接。 第一步&#xff1a;使用提取器将token值提取出来&#xff0c;使用setProperty函数将提取的token值设置成全局变量&#xff0c;在登录请求后面添加BeanShell取样器 或者 BeanShell后…

嘉立创EDA学习笔记

嘉立创EDA学习笔记 PCB引线一、设计规则间距安全间距其他间距 物理导线网络长度差分对过孔尺寸 平面铺铜 PCB布线 作为一个嵌入式开发潜力工程师&#xff0c;咱们必须得学会如何绘制开发板以满足顾客各种功能的需求&#xff0c;因此小编去学习了一下嘉立创&#xff0c;写这篇文…

配网行波故障预警与定位装置:配电线路安全性与可靠性的保障

配网行波故障预警与定位装置&#xff1a;配电线路安全性与可靠性的保障 一、传统配网故障排查的困境 1. 巡检效率低下&#xff1a;在二十世纪80年代及以前&#xff0c;电力线路故障的排查主要依赖于人工巡检&#xff0c;这种方式效率低下&#xff0c;特别是在故障区间较大的情…