Day855.生产者-消费者模式 -Java 并发编程实战

news2024/11/24 16:26:53

生产者-消费者模式

Hi,我是阿昌,今天学习记录的是关于生产者-消费者模式的内容。

Worker Thread 模式类比的是工厂里车间工人的工作模式。

但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就是生产者 - 消费者模式。

生产者 - 消费者模式在编程领域的应用也非常广泛,前面曾经提到,Java 线程池本质上就是用生产者 - 消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者 - 消费者模式。

当然,除了在线程池中的应用,为了提升性能,并发编程领域很多地方也都用到了生产者 - 消费者模式,例如 Log4j2 中异步 Appender 内部也用到了生产者 - 消费者模式。


一、生产者 - 消费者模式的优点

生产者 - 消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。

下面是生产者 - 消费者模式的一个示意图,可以结合它来理解。

生产者 - 消费者模式示意图

从架构设计的角度来看,生产者 - 消费者模式有一个很重要的优点,就是解耦

解耦对于大型系统的设计非常重要,而解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。

在生产者 - 消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者 - 消费者模式是一个不错的解耦方案。

除了架构设计上的优点之外,生产者 - 消费者模式还有一个重要的优点就是支持异步,并且能够平衡生产者和消费者的速度差异。

在生产者 - 消费者模式中,生产者线程只需要将任务添加到任务队列而无需等待任务被消费者线程执行完,也就是说任务的生产和消费是异步的,这是与传统的方法之间调用的本质区别,传统的方法之间调用是同步的。

或许会有这样的疑问,异步化处理最简单的方式就是创建一个新的线程去处理,那中间增加一个“任务队列”究竟有什么用呢?觉得主要还是用于平衡生产者和消费者的速度差异

假设生产者的速率很慢,而消费者的速率很高,比如是 1:3,如果生产者有 3 个线程,采用创建新的线程的方式,那么会创建 3 个子线程,而采用生产者 - 消费者模式,消费线程只需要 1 个就可以了。

Java 语言里,Java 线程和操作系统线程是一一对应的,线程创建得太多,会增加上下文切换的成本,所以 Java 线程不是越多越好,适量即可。

生产者 - 消费者模式恰好能支持你用适量的线程。


二、支持批量执行以提升性能

前面在Thread-Per-Message 模式中讲过轻量级的线程,如果使用轻量级线程,就没有必要平衡生产者和消费者的速度差异了,因为轻量级线程本身就是廉价的,那是否意味着生产者 - 消费者模式在性能优化方面就无用武之地了呢?

当然不是,有一类并发场景应用生产者 - 消费者模式就有奇效,那就是批量执行任务。例如,要在数据库里 INSERT 1000 条数据,有两种方案:

  • 第一种方案是用 1000 个线程并发执行,每个线程 INSERT 一条数据;
  • 第二种方案是用 1 个线程,执行一个批量的 SQL,一次性把 1000 条数据 INSERT 进去。

这两种方案,显然是第二种方案效率更高,其实这样的应用场景就是上面提到的批量执行场景。在 两阶段终止模式中,提到一个监控系统动态采集的案例,其实最终回传的监控数据还是要存入数据库的(如下图)。

但被监控系统往往有很多,如果每一条回传数据都直接 INSERT 到数据库,那么这个方案就是上面提到的第一种方案:每个线程 INSERT 一条数据。

很显然,更好的方案是批量执行 SQL,那如何实现呢?

这就要用到生产者 - 消费者模式了。

动态采集功能示意图

利用生产者 - 消费者模式实现批量执行 SQL 非常简单:

将原来直接 INSERT 数据到数据库的线程作为生产者线程,生产者线程只需将数据添加到任务队列,然后消费者线程负责将任务从任务队列中批量取出并批量执行。在下面的示例代码中,创建了 5 个消费者线程负责批量执行 SQL,这 5 个消费者线程以 while(true){} 循环方式批量地获取任务并批量地执行。

需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务

之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。

//任务队列
BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000);
//启动5个消费者线程
//执行批量任务  
void start() {
  ExecutorService es=executors
    .newFixedThreadPool(5);
  for (int i=0; i<5; i++) {
    es.execute(()->{
      try {
        while (true) {
          //获取批量任务
          List<Task> ts=pollTasks();
          //执行批量任务
          execTasks(ts);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}
//从任务队列中获取批量任务
List<Task> pollTasks() 
    throws InterruptedException{
  List<Task> ts=new LinkedList<>();
  //阻塞式获取一条任务
  Task t = bq.take();
  while (t != null) {
    ts.add(t);
    //非阻塞式获取一条任务
    t = bq.poll();
  }
  return ts;
}
//批量执行任务
execTasks(List<Task> ts) {
  //省略具体代码无数
}

三、支持分阶段提交以提升性能

利用生产者 - 消费者模式还可以轻松地支持一种分阶段提交的应用场景。

知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,往往采用异步刷盘的方式。

一个项目,其中的日志组件是自己实现的,采用的就是异步刷盘方式,刷盘的时机是:

  1. ERROR 级别的日志需要立即刷盘;
  2. 数据积累到 500 条需要立即刷盘;
  3. 存在未刷盘数据,且 5 秒钟内未曾刷盘,需要立即刷盘。

这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面具体看看用生产者 - 消费者模式如何实现。

在下面的示例代码中,可以通过调用 info()和error() 方法写入日志,这两个方法都是创建了一个日志任务 LogMsg,并添加到阻塞队列中,调用 info()和error() 方法的线程是生产者;

真正将日志写入文件的是消费者线程,在 Logger 这个类中,只创建了 1 个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。


class Logger {
  //任务队列  
  final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
  //flush批量  
  static final int batchSize=500;
  //只需要一个线程写日志
  ExecutorService es = Executors.newFixedThreadPool(1);
  //启动写日志线程
  void start(){
    File file=File.createTempFile(
      "foo", ".log");
    final FileWriter writer= new FileWriter(file);
    this.es.execute(()->{
      try {
        //未刷盘日志数量
        int curIdx = 0;
        long preFT=System.currentTimeMillis();
        while (true) {
          LogMsg log = bq.poll(
            5, TimeUnit.SECONDS);
          //写日志
          if (log != null) {
            writer.write(log.toString());
            ++curIdx;
          }
          //如果不存在未刷盘数据,则无需刷盘
          if (curIdx <= 0) {
            continue;
          }
          //根据规则刷盘
          if (log!=null && log.level==LEVEL.ERROR ||
              curIdx == batchSize ||
              System.currentTimeMillis()-preFT>5000){
            writer.flush();
            curIdx = 0;
            preFT=System.currentTimeMillis();
          }
        }
      }catch(Exception e){
        e.printStackTrace();
      } finally {
        try {
          writer.flush();
          writer.close();
        }catch(IOException e){
          e.printStackTrace();
        }
      }
    });  
  }
  //写INFO级别日志
  void info(String msg) {
    bq.put(new LogMsg(
      LEVEL.INFO, msg));
  }
  //写ERROR级别日志
  void error(String msg) {
    bq.put(new LogMsg(
      LEVEL.ERROR, msg));
  }
}
//日志级别
enum LEVEL {
  INFO, ERROR
}
class LogMsg {
  LEVEL level;
  String msg;
  //省略构造函数实现
  LogMsg(LEVEL lvl, String msg){}
  //省略toString()实现
  String toString(){}
}

四、总结

Java 语言提供的线程池本身就是一种生产者 - 消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。

但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。生产者 - 消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,你可以借助分布式消息队列(MQ)来实现生产者 - 消费者模式。

MQ 一般都会支持两种消息模型:

  • 一种是点对点模型
  • 一种是发布订阅模型。

这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);

而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,你可以结合观察者模式实现广播功能。


在日志组件异步刷盘的示例代码中,写日志的线程以 while(true){} 的方式执行,你有哪些办法可以优雅地终止这个线程呢?

this.writer.execute(()->{
  try {
    //未刷盘日志数量
    int curIdx = 0;
    long preFT=System.currentTimeMillis();
    while (true) {
    ......
    }
  } catch(Exception e) {}
}    

在应用系统中,日志系统一般都是最后关闭的吧,因为它要为其他系统关闭提供写日志服务。所以日志系统关闭时需要把队列中所有日志都消费掉才能关闭。
可能需要在关闭日志系统时投入一个毒丸,表示没有新的日志写入。线程池在消费到毒丸时知道没有日志写入,将所有的日志刷盘,break循环体。

  1. 使用线程池的shutdown或者shutdownNow关闭线程池
  2. 可以使用interrupt,但是线程是线程池管理的,没有消费者线程的引用中断不了
  3. 通过设置标志位来让线程优雅终止
    • 通过判断线程的中断状态Thread.currentThread.isInterrupted()
    • 设置自己的线程终止标志位,该标志位volatile修饰的共享变量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/154522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RoboMaster EP 实用功能开发(三): 基于树莓派的ROS2机器人系统搭建

功能&#xff1a;在树莓派4b上安装ros2系统&#xff0c;引入robomaster sdk&#xff0c;搭建一个基于ROS2的机器人系统&#xff0c;用于ROS系统的学习、开发和实践。 硬件&#xff1a;RobotMaster EP、树莓派4b 系统平台&#xff1a;Ubuntu 20.04、ROS2&#xff08;Foxy&…

Selenium【Selenium环境搭建与Junit5】

Selenium【Selenium环境搭建与Junit5】&#x1f34e;一. 自行下载谷歌浏览器或者火狐浏览器&#x1f352;1.1 安装好之后需要去掉谷歌(火狐)浏览器自动更新(建议)&#x1f352;1.2下载谷歌(火狐)驱动&#x1f34e;二.Selenium下载与配置&#x1f352;2.1Selenium下载&#x1f3…

服务案例|SQL Server数据库监控反复重启问题

监控平台对主流数据库的监控&#xff0c;能够及时发现异常&#xff0c;快速响应&#xff0c;保障业务系统的稳定。平台通过对SQL Server数据库监控&#xff0c;帮助用户在数据库出现异常时事件处理。 SQL Server数据库监控内容如下 1 、数据库服务器基本性能监控。包括&#…

Hive/MaxCompute SQL性能优化(三):数据倾斜优化实战

SQL性能优化系列&#xff1a;Hive/MaxCompute SQL性能优化(一)&#xff1a;什么是数据倾斜Hive/MaxCompute SQL性能优化(二)&#xff1a;如何定位数据倾斜前面介绍了如何定位数据倾斜&#xff0c;本文介绍如果遇到各种数据倾斜的情况该怎样优化代码。Map长尾优化一、Map读取数据…

ArcGIS如何将Excel表格转换为SHP格式

概述数据的获取渠道是多种多样的&#xff0c;获取的数据格式也是多种多样&#xff0c;作为一名GISer&#xff0c;需要熟练掌握各种格式的数据之间的转换&#xff0c;例如本文要介绍的Excel格式的数据&#xff0c;经常会遇到&#xff0c;这里为大家介绍一下转换方法&#xff0c;…

区块链基础知识(二)

密码学与安全技术 参考书籍 《区块链原理、设计与应用》 Hash算法 加解密算法 混合加密机制 离散对数与Diffie-Hellman秘钥交换协议 消息认证码 数字签名 PKI体系 PKI基本组件 证书签发 证书的撤销 Merkle tree结构 默克尔树逐层记录哈希值的特点&#xff0c;让它具有了一些独特…

【我的渲染技术进阶之旅】关于C++轻量级界面开发框架Dear ImGui介绍

文章目录一、怎么知道ImGui的1.1 Filament中有使用ImGui1.2 其他很多渲染框架都有使用ImGui二、ImGui介绍2.1 ImGui风格2.2 Imgui介绍2.2.1 Imgui简介2.2.2 Imgui用法2.2.3 Demo示例2.2.4 集成2.2.5 更多案例2.3 查看Imgui实例源代码2.3.1 运行demo2.3.2 项目结构分析2.3.3 示例…

TCP/IP网络编程(2)——套接字类型与协议设置

文章目录二、套接字类型与协议设置2.1 套接字协议及数据传输特性2.1.1 创建套接字2.1.2 协议族&#xff08;Protocol Family&#xff09;2.1.3 套接字类型&#xff08;Type&#xff09;2.1.4 套接字类型1&#xff1a;面向连接的套接字&#xff08;SOCK_STREAM&#xff09;2.1.5…

RHCE学习笔记-133-2

rpm and kickstart The RPM Way 不会有互动事件 可以适用在所有软件,如kernerl和其他额外的软件都可以以rpm的形式 不需要安装前面的版本才能安装后面的版本 RPM Packge manager RPM components local database /var/lib/rpm rpm and related executables package files primar…

大数据NiFi(十三):NiFi监控

文章目录 NiFi监控 一、处理器状态指示有如下几种情况 二、对于每个组的监控情况如下

CMMI之客户验收

客户验收&#xff08;Customer Acceptance, CA&#xff09;是指客户依据合同对产品进行审查和测试&#xff0c;确保产品满足客户需求。客户验收过程域是SPP模型的重要组成部分。本规范阐述了客户验收的规程&#xff0c;该规程的“目标”、“角色与职责”、“启动准则”、“输入…

Spring 源码解析~13、Spring 中的钩子函数汇总

Spring 中的钩子函数汇总 一、生命周期总览 二、BeanDefinition 生成与注册阶段 钩子执行顺序与博文顺序一致&#xff0c;即 1->n 1、EmptyReaderEventListener#defaultsRegistered 触发点&#xff1a;创建 BeanDefinitionParserDelegate 委派类时触发解释&#xff1a;通知…

本立道生:必备的基础知识

通过前面两节课的内容&#xff0c;我带领大家熟悉了一下 Visual Studio C 开发环境的必备知识&#xff0c;虽然还有很多关于 Visual Studio 的重要知识没有介绍&#xff0c;但为了让你尽快进入 C 开发环节&#xff0c;及早获得开发程序的愉悦&#xff0c;我们暂时只介绍这些必备…

【数据结构】5.5 遍历二叉树和线索二叉树

5.5.1 遍历二叉树 遍历定义 顺着某一条搜索路径巡访二叉树中的每个结点&#xff0c;使得每个结点均被访问依次&#xff0c;而且仅被访问一次&#xff08;又称周游&#xff09;。访问的含义很广&#xff0c;可以是对结点作各种处理&#xff0c;如&#xff1a;输出结点的信息&a…

Centos7开启SSH连接配置

1、查看是否已安装openssh-server&#xff1a; [rootlocalhost ~]# yum list installed | grep openssh-server 如果有信息说明已安装了openssh-server&#xff0c;如果输出没有任何结果&#xff0c;说明没有安装。 2、安装openssh-server&#xff08;如果已安装&#xff0c…

微信小程序(学习笔记篇)

基本项目结构 pages用来存放所有小程序的页面utils 用来存放工具性质的模块&#xff08;例如:格式化时间的自定义模块)app.js小程序项目的入口文件app.json 小程序项目的全局配置文件app.wXss小程序项目的全局样式文件project.config.json项目的配置文件sitemap.json用来配置小…

买卖股票的最佳时机 II -数学推导证明贪心思路 -leetcode122

问题说明来源leetcode 一、问题描述: 122. 买卖股票的最佳时机 II 难度中等1941 给你一个整数数组 prices &#xff0c;其中 prices[i] 表示某支股票第 i 天的价格。 在每一天&#xff0c;你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可…

Spark Core----RDD详解

为什么需要RDD 分布式计算需要&#xff1a; 分区控制&#xff08;多台机器并行计算&#xff0c;将一份数据分成多份&#xff0c;在不同机器上执行&#xff09;Shuffle控制&#xff08;不同分区数据肯定需要进行相关的关联&#xff0c;不同分区进行数据传输叫Shuffle控制&…

分享77个NET源码,总有一款适合您

NET源码 分享77个NET源码&#xff0c;总有一款适合您 NET源码下载链接&#xff1a;https://pan.baidu.com/s/1vhXwExVAye5YrB77Vxif8Q?pwdzktx 提取码&#xff1a;zktx 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xf…

Html 3D旋转相册制作

程序示例精选 Html 3D旋转相册制作 如需安装运行环境或远程调试&#xff0c;见文章底部微信名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对<<Html 3D旋转相册制作>>编写代码&#xff0c;代码整洁&#xff0c;规则&#xff0c;易读。 学习…