Linux(十一)生产者与消费者模型

news2024/12/26 2:59:25

引言

一、实现一个网关来过滤流经网关的数据

二、农忙时节收割麦子

生产者与消费者模型

模型实现

完整源码:


引言

阐述这个模型之前先引入俩个例子:

一、实现一个网关来过滤流经网关的数据

网关会捕捉大量的数据然后进行分析处理,之后再把我们分析处理后的数据送至磁盘中,来进行存储,不过在这一过程中,流经网关的信息量非常巨大,如果数据不能及时进行处理就会被丢弃(俗称丢包),那如何来进行实现呢?

 我们在学习了多线程之后,知道了可以利用多线程来提高cpu的处理效率,用到这里的话也是特别符合条件

 但是即使是使用了多线程的思想处理了这个问题,还是会存在一些数据丢包的情况,因为在数据存入磁盘这一过程是特别的低效,一种极端的情况,我所有的分析处理线程都卡在了等待磁盘存入这一步,一大堆线程看着数据慢悠悠的送往磁盘。然后新产生的数据就会因为无法捕捉导致数据丢失。

二、农忙时节收割麦子

父亲叫来了一个收割机在地里收麦子,麦子需要被装入一个蛇皮袋子里然后搬运到车上(不能放地上),所以父亲只能不断的执行灌袋、装车这俩个动作,可是就算父亲动作再利落,收割机源源不断收取的麦子还是会在父亲装车的时候掉落在地上,那么采取多线程思想,父亲叫儿子过来帮忙,于是父亲和儿子一起执行灌袋、装车的动作,父子俩的配合非常紧密,可还是会有麦子掉到了地上。

那么如何解决这种问题呢?

如果父亲和儿子一个人只负责灌袋、一个人只负责装车,那么掉在地上的麦子就会更少一些。

生产者与消费者模型

在大量的数据处理任务的产生时:

        采用单执行流的缺陷:

                1、效率低下

                2、资源利用不一定合理

                3、耦合度比较强

                        如果一个线程处出现问题,必须要对整个线程体系进行检查

解决思想:

        将任务的产生与处理分离开来,通过一个任务队列来进行交互

 这个任务队列就相当于儿子将麦子装满然后直接放在地上,剩下的装车任务交给父亲来做即可。

在网关的例子中也是使用一个任务队列来进行保存网关接收到的信息,网关不断的接收数据交给各个线程,各个线程取出数据来进行分析,分析结果放入任务队列中,磁盘在进行不断的存储。

就是这样一个        ① 生产与消费动作解耦合

                           ② 支持忙闲不均

                           ③ 支持并发

模型实现

生产者与生产者的关系: 互斥(一个生产完了,才能轮到下一个生产)

消费者与消费者的关系: 互斥(一个消费完了,才能轮到下一个消费)

生产者与消费者的关系:同步 + 互斥

                                (满了就不能放,空了就不能取)条件控制为了访问资源的合理性

                                (生产者生产出来了,消费者才能进行消费)资源的唯一访问

实现一个线程安全的阻塞队列,接下来创建多个线程入队数据,多个线程出队数据

 完成类中构造与析构函数

#include<iostream>
#include<pthread.h>
#include<queue>

template<class T>
class BlockQueue
{
  private:
    std::queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_pro;
    pthread_cond_t _cond_con;
  public:
    BlockQueue(int capacity)
      : _capacity(capacity)
    {
      pthread_mutex_init(&_mutex, NULL);
      pthread_cond_init(&_cond_pro, NULL);
      pthread_cond_init(&_cond_con, NULL);
    }
    ~BlockQueue()
    {
      pthread_mutex_destroy(&_mutex);
      pthread_cond_destroy(&_cond_pro);
      pthread_cond_destroy(&_cond_con);

    }
    bool Push(const T& data);
    bool Pop(T* data);
}

 完成类中入队出队操作

    bool Push(const T& data)
    {
      pthread_mutex_lock(&_mutex);   // 1、加锁
      while(_capacity == _q.size()) // 2、当队列中元素满了,进行等待阻塞
      {
        pthread_cond_wait(&_cond_pro, &_mutex);
      }
      _q.push(data);  // 3、入队数据
      pthread_cond_signal(&_cond_con);  // 4、唤醒消费者
      pthread_mutex_unlock(&_mutex);  // 5、解锁
    }
    bool Pop(T* data)
    {
      pthread_mutex_lock(&_mutex); // 1、加锁
      while(_q.size() == 0){    // 2、如果队列为空,无法进行读取 则阻塞
        pthread_cond_wait(&_cond_con, &_mutex);
      }
      *data = _q.front();   // 3、获取数据
      _q.pop(); // 4、记得要pop出队
      pthread_cond_signal(&_cond_pro); // 5、唤醒生产者
      pthread_mutex_unlock(&_mutex);  // 6、解锁
    }

 完成main函数中实现四个线程作为生产者、四个线程作为消费者

#define MAX_SIZE 4
int main()
{
    BlockQueue<int> q;
    pthread_t pro_tid[MAX_SIZE], con_tid[MAX_SIZE];
    int ret;
    for(int i = 0; i < MAX_SIZE; i++)
    {
        ret = pthread_create(&pro_tid[i], NULL, productor, (void*)&q);
        if(ret != 0){
          std::cout<<"create error"<<std::endl;
          return -1;
        }
        ret = pthread_create(&con_tid[i], NULL, consumer, (void*)&q);
        if(ret != 0){
          std::cout<<"create error"<<std::endl;
          return -1;
        }      
    }
    for(int i = 0; i < MAX_SIZE; i++)
    {
      pthread_join(pro_tid[i], NULL);
      pthread_join(con_tid[i], NULL);
    }
    return 0;
}

 完成生产者与消费者的入口函数书写

void *productor(void *arg)
{
    BlockQueue<int> *q = (BlockQueue<int>*)arg;
    int data = 0;
    while(1)
    {
      q->Push(data);
      printf("+++++%p: Push data :%d\n", pthread_self(), data++);
    }
    return NULL;
}

void *consumer(void *arg)
{
    BlockQueue<int> *q = (BlockQueue<int>*)arg;
    while(1)
    {
      int data;
      q->Pop(&data);
      printf("------%p: Push data :%d\n", pthread_self(), data);
    }
    return NULL;
}

 完整源码:

#include<iostream>
#include<pthread.h>
#include<queue>
#include<stdio.h>

#define MAX_CAPACITY 5
template<class T>
class BlockQueue
{
  private:
    std::queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_pro;
    pthread_cond_t _cond_con;
  public:
    BlockQueue(int capacity = MAX_CAPACITY)
      : _capacity(capacity)
    {
      pthread_mutex_init(&_mutex, NULL);
      pthread_cond_init(&_cond_pro, NULL);
      pthread_cond_init(&_cond_con, NULL);
    }
    ~BlockQueue()
    {
      pthread_mutex_destroy(&_mutex);
      pthread_cond_destroy(&_cond_pro);
      pthread_cond_destroy(&_cond_con);

    }
    bool Push(const T& data)
    {
      pthread_mutex_lock(&_mutex);   // 1、加锁
      while(_capacity == _q.size()) // 2、当队列中元素满了,进行等待阻塞
      {
        pthread_cond_wait(&_cond_pro, &_mutex);
      }
      _q.push(data);  // 3、入队数据
      pthread_cond_signal(&_cond_con);  // 4、唤醒消费者
      pthread_mutex_unlock(&_mutex);  // 5、解锁
    }
    bool Pop(T* data)
    {
      pthread_mutex_lock(&_mutex); // 1、加锁
      while(_q.size() == 0){    // 2、如果队列为空,无法进行读取 则阻塞
        pthread_cond_wait(&_cond_con, &_mutex);
      }
      *data = _q.front();   // 3、获取数据
      _q.pop(); // 4、记得要pop出队
      pthread_cond_signal(&_cond_pro); // 5、唤醒生产者
      pthread_mutex_unlock(&_mutex);  // 6、解锁
    }
};

void *productor(void *arg)
{
    BlockQueue<int> *q = (BlockQueue<int>*)arg;
    int data = 0;
    while(1)
    {
      q->Push(data);
      printf("+++++%p: Push data :%d\n", pthread_self(), data++);
    }
    return NULL;
}

void *consumer(void *arg)
{
    BlockQueue<int> *q = (BlockQueue<int>*)arg;
    while(1)
    {
      int data;
      q->Pop(&data);
      printf("------%p: Push data :%d\n", pthread_self(), data);
    }
    return NULL;
}


#define MAX_SIZE 4
int main()
{
    BlockQueue<int> q;
    pthread_t pro_tid[MAX_SIZE], con_tid[MAX_SIZE];
    int ret;
    for(int i = 0; i < MAX_SIZE; i++)
    {
        ret = pthread_create(&pro_tid[i], NULL, productor, (void*)&q);
        if(ret != 0){
          std::cout<<"create error"<<std::endl;
          return -1;
        }
        ret = pthread_create(&con_tid[i], NULL, consumer, (void*)&q);
        if(ret != 0){
          std::cout<<"create error"<<std::endl;
          return -1;
        }      
    }
    for(int i = 0; i < MAX_SIZE; i++)
    {
      pthread_join(pro_tid[i], NULL);
      pthread_join(con_tid[i], NULL);
    }
    return 0;
}

 底行模式下替换命令:

        : 22,26s/hello/hi/g    将22行到26行中的hello替换为hi   

        :14,34s/abcdef//g     将14行到34行中的abcdef替换为(空)(即为删除)

        :noh        去掉高亮显示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/337158.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VHDL语言基础-状态机设计-时序电路与状态机的关系

目录 时序电路的概念&#xff1a; 下面以一个简单的三位计数器为例&#xff0c;说明时序电路的结构&#xff1a; 三位计数器的结构&#xff1a; 次态逻辑关系的推导&#xff1b;画出卡诺图如下&#xff1a; 电路图&#xff1a; 时序电路与状态机的关系&#xff1a; 状态机…

PyTorch学习笔记:nn.Tanh——Tanh激活函数

PyTorch学习笔记&#xff1a;nn.Tanh——Tanh激活函数 torch.nn.Tanh()功能&#xff1a;逐元素应用Tanh函数&#xff08;双曲正切&#xff09;对数据进行激活&#xff0c;将元素调整到区间(-1,1)内 函数方程&#xff1a; Tanh(x)tanh(x)ex−e−xexe−x\text{Tanh}(x)\text{ta…

每天10个前端小知识 【Day 11】

前端面试基础知识题 1. 浏览器的垃圾回收机制有哪些&#xff1f; JS会在创建变量时自动分配内存&#xff0c;在不使用的时候会自动周期性的释放内存&#xff0c;释放的过程就叫 “垃圾回收”。 一方面自动分配内存减轻了开发者的负担&#xff0c;开发者不用过多的去关注内存…

单链表--C语言版(从0开始,超详细解析,小白一看就会)

目录 一、前言 &#x1f34e; 为什么要学习链表 &#x1f4a6;顺序表有缺陷 &#x1f4a6; 优化方案&#xff1a;链表 二、链表详解 &#x1f350;链表的概念 &#x1f349;链表的结构组成&#xff1a;节点 &#x1f353;链表节点的连接&#xff08;逻辑结构与物理结构的区…

java spring注解方式 实现基本类型属性注入

之前 我们看了几个注入属性的注解 但他们都是注入对象类型的 那么 下面我们就看一个 给基本属性注入值的注解 value 我们直接代码快速演示一下 创建一个项目 然后引入 spring 所需要的依赖 然后在src下创建包 Bean 在 Bean目录下创建一个包 叫 UserData 然后在src下创建 bean…

leaflet 上传geojson文件,在地图上显示图形(示例代码053)

第053个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet示例中上传geojson文件,通过L.geojson解析,在地图上显示图形。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共97行)相关API参考:专栏目标…

微服务项目【mybatis-plus与微服务注册】

Mybatis与微服务注册 一、SpringBoot整合MybatisPlus 创建自动生成代码子模块 基于maven方式创建子模块zmall-generator&#xff0c;用于结合mybatis-plus生成代码。 在公共模块zmall-common中注释掉mybatis的依赖引入&#xff0c;改换成mybatis-plus依赖引入 <!-- myba…

【大数据Hadoop】Hadoop 3.x 新特性总览

Hadoop 3.x 新特性剖析系列11. 概述2. 内容2.1 JDK2.2 EC技术2.3 YARN的时间线V.2服务2.3.1 伸缩性2.3.2 可用性2.3.3 架构体系2.4 优化Hadoop Shell脚本2.5 重构Hadoop Client Jar包2.6 支持等待容器和分布式调度2.7 支持多个NameNode节点2.8 默认的服务端口被修改2.9 支持文件…

MAC Pro 安装 VS Code 配置 C/C++ 开发环境

目录 文章目录目录安装 VS Code配置 C/C 开发环境Hello World1、创建项目和源码2、编译运行3、调试C/C configuration安装 VS Code 下载安装包&#xff1a;https://code.visualstudio.com/Download解压并将文件放入 “应用程序"。 配置 C/C 开发环境 官方文档&#xff1…

linux 服务器线上问题故障排查

一 线上故障排查概述 1.1 概述 线上故障排查一般从cpu,磁盘,内存,网络这4个方面入手; 二 磁盘的排查 2.1 磁盘排查 1.使用 df -hl 命令来查看磁盘使用情况 2.从读写性能排查:iostat -d -k -x命令来进行分析 最后一列%util可以看到每块磁盘写入的程度,而rrqpm/s以及…

C语言 | 预处理知识详解 #预处理指令有哪些?他们如何使用?宏和函数有哪些区别?...#

文章目录前言预定义符号介绍预处理指令#define#define替换规则预处理指令 #undef宏和函数的对比宏和函数的对比图命名约定命令行定义条件编译预处理指令 #include嵌套文件包含其他预处理指令写在最后前言 上篇文章介绍了一个程序运行的 编译与链接 &#xff0c;其中编译阶段有个…

python+django在线教学网上授课系统vue

随着科技的进步&#xff0c;互联网已经开始慢慢渗透到我们的生活和学习中&#xff0c;并且在各个领域占据着越来越重要的部分&#xff0c;很多传统的行业都将面临着巨大的挑战&#xff0c;包括学习也不例外。现在学习竞争越来越激烈&#xff0c;人才的需求量越来越大&#xff0…

Java高级-集合-Collection部分

本篇讲解java集合 集合 集合框架的概述 集合、数组都是对多个数据进行存储操作的结构&#xff0c;简称Java容器。 说明&#xff1a;此时的存储&#xff0c;主要指的是内存层面的存储&#xff0c;不涉及到持久化的存储&#xff08;.txt,.jpg,.avi&#xff0c;数据库中&#xf…

Java面试——MyBatis相关知识

目录 1.什么是MyBatis 2.MyBatis优缺点 3.MyBatis工作原理 4.MyBatis缓存模式 5.MyBatis代码相关问题 6.MyBatis和hibernate区别 1.什么是MyBatis MyBatis是一个半ORM持久层框架&#xff08;对象关系映射&#xff09;&#xff0c;基于JDBC进行封装&#xff0c;使得开发者…

【Python实战案例】Python3网络爬虫:“可惜你不看火影,也不明白这个视频的分量......”m3u8视频下载,那些事儿~

前言 哈喽&#xff01;上午好嘞&#xff0c;各位小可爱们&#xff01;有没有等着急了呀~ 由于最近一直在学习新的内容&#xff0c;所以耽搁了一下下&#xff0c;抱歉.jpg 双手合十。 所有文章完整的素材源码都在&#x1f447;&#x1f447; 粉丝白嫖源码福利&#xff0c;请移…

蓝海创意云获苏州电信2022年度“云业务优秀合作方”表彰

2月8日&#xff0c;中国电信苏州分公司召开产业数字化生态合作峰会&#xff0c;围绕“力量源于团结 奋斗创造奇迹”主题&#xff0c;凝聚合作伙伴合力&#xff0c;构建共生共赢的产业生态&#xff0c;蓝海创意云作为合作企业代表应邀出席峰会。会上&#xff0c;蓝海创意云荣获峰…

在阿里干了8年测试的表哥放假回来了,聊完之后大彻大悟

表哥是阿里某个项目组的测试开发&#xff0c;今年过年提前半个月放假回来了&#xff0c;一见面就给我们几个弟弟妹妹一人拿了部iPhone13pm。这一出手属实是阔绰&#xff0c;想想他的工作单位&#xff0c;也许对于他来说三四万也就是半个月工资而已。想想我那个小公司&#xff0…

第七节 平台设备驱动

在之前的字符设备程序中驱动程序&#xff0c;我们只要调用open() 函数打开了相应的设备文件&#xff0c;就可以使用read()/write() 函数&#xff0c;通过file_operations 这个文件操作接口来进行硬件的控制。这种驱动开发方式简单直观&#xff0c;但是从软件设计的角度看&#…

【Linux】操作系统进程概念

文章目录1. 冯诺依曼体系结构2. 操作系统3. 进程进程的基本概念查看进程和杀死进程父进程和子进程通过系统调用创建子进程1. 冯诺依曼体系结构 冯诺依曼结构也称普林斯顿结构&#xff0c;是一种将程序指令存储器和数据存储器合并在一起的存储器结构。数学家冯诺依曼提出了计算…

适配器模式(Adapter Pattern)

1.什么是适配器模式&#xff1f; 适配器模式&#xff08;Adapter Pattern&#xff09;是作为两个不兼容的接口之间的桥梁。这种类型的设计模式属于结构型模式&#xff0c;它结合了两个独立接口的功能。 这种模式涉及到一个单一的类&#xff0c;该类负责加入独立的或不兼容的接…