结合RocketMQ 源码,带你了解并发编程的三大神器

news2024/12/23 1:17:15
摘要:本文结合 RocketMQ 源码,分享并发编程三大神器的相关知识点。

本文分享自华为云社区《读 RocketMQ 源码,学习并发编程三大神器》,作者:勇哥java实战分享。

这篇文章,笔者结合 RocketMQ 源码,分享并发编程三大神器的相关知识点。

1 CountDownLatch 实现网络同步请求

CountDownLatch 是一个同步工具类,用来协调多个线程之间的同步,它能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。

下图是 CountDownLatch 的核心方法:

我们可以认为它内置一个计数器,构造函数初始化计数值。每当线程执行 countDown 方法,计数器的值就会减一,当计数器的值为 0 时,表示所有的任务都执行完成,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。

举例,数据库有100万条数据需要处理,单线程执行比较慢,我们可以将任务分为5个批次,线程池按照每个批次执行,当5个批次整体执行完成后,打印出任务执行的时间 。

 long start = System.currentTimeMillis();
 ExecutorService executorService = Executors.newFixedThreadPool(10);
 int batchSize = 5;
 CountDownLatch countDownLatch = new CountDownLatch(batchSize);
 for (int i = 0; i < batchSize; i++) {
 final int batchNumber = i;
 executorService.execute(new Runnable() {
 @Override
 public void run() {
 try {
 doSomething(batchNumber);
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 countDownLatch.countDown();
 }
 }
 });
}
countDownLatch.await();
System.out.println("任务执行耗时:" + (System.currentTimeMillis() - start) + "毫秒");

温习完 CountDownLatch 的知识点,回到 RocketMQ 源码。

笔者在没有接触网络编程之前,一直很疑惑,网络同步请求是如何实现的?

同步请求指:客户端线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用如果超时时间内没有得到结果,那么会抛出超时异常。

RocketMQ 的同步发送消息接口见下图:

追踪源码,真正发送请求的方法是通讯模块的同步请求方法 invokeSyncImpl 。

整体流程:

  1. 发送消息线程 Netty channel 对象调用 writeAndFlush 方法后 ,它的本质是通过 Netty 的读写线程将数据包发送到内核 , 这个过程本身就是异步的;
  2. ResponseFuture 类中内置一个 CountDownLatch 对象 ,responseFuture 对象调用 waitRepsone 方法,发送消息线程会阻塞 ;

3.客户端收到响应命令后, 执行 processResponseCommand 方法,核心逻辑是执行 ResponseFuture 的 putResponse 方法。

该方法的本质就是填充响应对象,并调用 countDownLatch 的 countDown 方法 , 这样发送消息线程就不再阻塞。

CountDownLatch 实现网络同步请求是非常实用的技巧,在很多开源中间件里,比如 Metaq ,Xmemcached 都有类似的实现。

2 ReadWriteLock 名字服务路由管理

读写锁是一把锁分为两部分:读锁和写锁,其中读锁允许多个线程同时获得,而写锁则是互斥锁。

它的规则是:读读不互斥,读写互斥,写写互斥,适用于读多写少的业务场景。

我们一般都使用 ReentrantReadWriteLock ,该类实现了 ReadWriteLock 。ReadWriteLock 接口也很简单,其内部主要提供了两个方法,分别返回读锁和写锁 。

 public interface ReadWriteLock {
 //获取读锁
 Lock readLock();
 //获取写锁
 Lock writeLock();
}

读写锁的使用方式如下所示:

1.创建 ReentrantReadWriteLock 对象 , 当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用 lock / unlock 方法 ;

private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

2.读取共享数据 ;

Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
 // TODO 查询共享数据
} finally {
 readLock.unlock();
}

3.写入共享数据;

Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
 // TODO 修改共享数据
} finally {
 writeLock.unlock();
}

RocketMQ架构上主要分为四部分,如下图所示 :

  1. Producer :消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  2. Consumer :消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。
  3. BrokerServer :Broker主要负责消息的存储、投递和查询以及服务高可用保证。
  4. NameServer :名字服务是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的zookeeper,支持Broker的动态注册与发现。

NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包(路由信息),NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。

那么 NameServer 如何保存路由信息呢?

路由信息通过几个 HashMap 来保存,当 Broker 向 Nameserver 发送心跳包(路由信息),Nameserver 需要对 HashMap 进行数据更新,但我们都知道 HashMap 并不是线程安全的,高并发场景下,容易出现 CPU 100% 问题,所以更新 HashMap 时需要加锁,RocketMQ 使用了 JDK 的读写锁 ReentrantReadWriteLock 。

1.更新路由信息,操作写锁

2.查询主题信息,操作读锁

读写锁适用于读多写少的场景,比如名字服务,配置服务等。

3 CompletableFuture 异步消息处理

RocketMQ 主从架构中,主节点与从节点之间数据同步/复制的方式有同步双写异步复制两种模式。

异步复制是指消息在主节点落盘成功后就告诉客户端消息发送成功,无需等待消息从主节点复制到从节点,消息的复制由其他线程完成。

同步双写是指主节点将消息成功落盘后,需要等待从节点复制成功,再告诉客户端消息发送成功。

同步双写模式是阻塞的,笔者按照 RocketMQ 4.6.1 源码,整理出主节点处理一个发送消息的请求的时序图。

整体流程:

  1. 生产者将消息发送到 Broker , Broker 接收到消息后,发送消息处理器 SendMessageProcessor 的执行线程池 SendMessageExecutor 线程池来处理发送消息命令;
  2. 执行 ComitLog 的 putMessage 方法;
  3. ComitLog 内部先执行 appendMessage 方法;
  4. 然后提交一个 GroupCommitRequest 到同步复制服务 HAService ,等待 HAService 通知 GroupCommitRequest 完成;
  5. 返回写入结果并响应客户端 。

我们可以看到:发送消息的执行线程需要等待消息复制从节点 , 并将消息返回给生产者才能开始处理下一个消息

RocketMQ 4.6.1 源码中,执行线程池的线程数量是 1 ,假如线程处理主从同步速度慢了,系统在这一瞬间无法处理新的发送消息请求,造成 CPU 资源无法被充分利用 , 同时系统的吞吐量也会降低。

那么优化同步双写呢 ?

从 RocketMQ 4.7 开始,RocketMQ 引入了 CompletableFuture 实现了异步消息处理 。

  1. 发送消息的执行线程不再等待消息复制到从节点后再处理新的请求,而是提前生成 CompletableFuture 并返回 ;
  2. HAService 中的线程在复制成功后,调用 CompletableFuture 的 complete 方法,通知 remoting 模块响应客户端(线程池:PutMessageExecutor ) 。

我们分析下 RocketMQ 4.9.4 核心代码:

1.Broker 接收到消息后,发送消息处理器 SendMessageProcessor 的执行线程池 SendMessageExecutor 线程池来处理发送消息命令;

2.调用 SendMessageProcessor 的 asyncProcessRequest 方法;

3.调用 Commitlog 的 aysncPutMessage 方法写入消息 ;

这段代码中,当 commitLog 执行完 appendMessage 后, 需要执行刷盘任务同步复制两个任务。

但这两个任务并不是同步执行,而是异步的方式。

4.复制线程复制消息后,唤醒 future ;

5.组装响应命令 ,并将响应命令返回给客户端。

为了便于理解这一段消息发送处理过程的线程模型,笔者在 RocketMQ 源码中做了几处埋点,修改 Logback 的日志配置,发送一条普通的消息,观察服务端日志。

从日志中,我们可以观察到:

  1. 发送消息的执行线程(图中红色)在执行完创建刷盘 Future 和同步复制 future 之后,并没有等待这两个任务执行完成,而是在结束 asyncProcessRequest 方法后就可以处理发送消息请求了 ;
  2. 刷盘线程和复制线程执行完各自的任务后,唤醒 future,然后通过刷盘线程组装存储结果,最后通过 PutMessageExecutor 线程池(图中黄色)将响应命令返回给客户端。

笔者一直认为:异步是更细粒度的使用系统资源的一种方式,在异步消息处理的过程中,通过 CompletableFuture 这个神器,各个线程各司其职,优雅且高效的提升了 RocketMQ 的性能。

点击关注,第一时间了解华为云新鲜技术~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/53918.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WRF模式行业应用问题解析及辅助学习

>>> 高精度气象模拟软件WRF(Weather Research Forecasting)技术及案例应用 今天小编给大家整理了WRF模式行业应用问题解析&#xff0c;不管是正在应用WRF还是入门小白&#xff0c;都建议多听一些行业问题&#xff0c;借鉴及深入了解&#xff0c;以下摘抄了个别问题&a…

5. 线性回归的从零开始实现

1.生成数据集 # num_examples 表示样本数量&#xff0c;也就是房屋数量 # w是权重向量 def synthetic_data(w, b, num_examples): #save"""生成yXwb噪声"""# X是一个从独立的正态分布中抽取的随机数的张量&#xff0c;正态分布的平均值为0、标…

双十二怎么入手,几款性能好物分享

过完了双十一&#xff0c;接下来就应该面临今年最后一个大优惠力度的双十二了&#xff0c;而且双十二的时间刚好靠近在过年&#xff0c;所以在这期间相信很多人购买的物品是更加偏向于家居用品方面&#xff0c;那么就不能够错过本篇文章了&#xff0c;本篇文章将为你们分享一些…

[附源码]计算机毕业设计springboot松林小区疫情防控信息管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

sonarqube安装

Sonarqube安装文档 1. 环境准备 参照官方文档Prerequisites and Overview | SonarQube Docs 安装符合sonarqube版本的JDK和数据库 目前服务器上JDK版本为11.0.2 sonarqube版本为9.1.0 postgresql版本为13.7 2. 安装JDK11.0.2 将openjdk-11.0.2_linux-x64_bin.tar.gz放到/usr/…

spring详解(一)

今天我们来学习一个新的框架spring!!! spring是什么呢? spring是2003年兴起的,是一款轻量级、非侵入式的IOC和AOP的一站式的java开发框架&#xff0c;为简化企业即开发而生。 轻量级&#xff1a;spring核心功能的jar包不大 非侵入式&#xff1a;我们的业务代码不需要继承或…

linux parted 方式挂盘,支持大于4T盘扩容

此 内容与之前的linux mbr转gpt格式有些重复&#xff0c;但为了便于查询&#xff0c;还是单抽出相关内容&#xff0c;进行操作&#xff1a; 1.查询要挂的有磁盘路径, 输入 parted -l 。 2 . 进入parted对/dev/vdb盘的交互方式&#xff1a;输入&#xff1a; parted /dev/vdb&am…

Spring Cloud Gateway 网关组件及搭建实例

Spring Cloud Gateway 是 Spring Cloud 团队基于 Spring 5.0、Spring Boot 2.0 和 Project Reactor 等技术开发的高性能 API 网关组件。Spring Cloud Gateway 旨在提供一种简单而有效的途径来发送 API&#xff0c;并为它们提供横切关注点&#xff0c;例如&#xff1a;安全性&am…

Linux 线程控制 —— 线程取消 pthread_cancel

线程退出pthread_exit只能终止当前线程&#xff0c;也就是哪个线程调用了pthread_exit&#xff0c;哪个线程就会退出&#xff1b;但是线程取消pthread_cancel &#xff0c;不光可以终止自己&#xff0c;还可以终止其他线程。 》自己终止自己&#xff0c;没问题&#xff01; 》…

Android ViewPager2 + TabLayout + BottomNavigationView

Android ViewPager2 TabLayout BottomNavigationView 实际案例 本篇主要介绍一下 ViewPager2 TabLayout BottomNavigationView 的结合操作 概述 相信大家都看过今日头条的的样式 如下: 顶部有这种tab 并且是可以滑动的, 这就是本篇所介绍的 ViewPager2 TabLayout 的组合…

【C++】C++实战项目机房预约管理系统

前言 这是C总结性练习&#xff0c;主要以一个综合案例对以前学过的知识进行复习巩固&#xff0c;为以后编程打下基础。 1. 机房预约系统需求 1.1 系统简介 学校有几个规格不同的机房&#xff0c; 由于使用时经常出现“撞车”现象&#xff0c;现开发一套机房预约系统&#x…

[附源码]JAVA毕业设计会议室租赁管理系统(系统+LW)

[附源码]JAVA毕业设计会议室租赁管理系统&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技…

EMC原理 传导(共模 差模) 辐射(近场 远场) 详解

第一章、EMC概念介绍 EMC&#xff08;electromagnetic compatibility&#xff09;作为产品的一个特性&#xff0c;译为电磁兼容性&#xff1b;如果作为一门学科&#xff0c;则译为电磁兼容。它包括两个概念&#xff1a;EMI和EMS。EMI(electromagneticinterference) 电磁干扰&a…

从Github上整理下来的《Java面试神技》

该文档曾在Github上线6天&#xff0c;共收获55Kstar的Java面试神技&#xff08;这赞数&#xff0c;质量多高就不用我多说了吧&#xff09;非常全面&#xff0c;包涵Java基础、Java集合、JavaWeb、Java异常、OOP、IO与NIO、反射、注解、多线程、JVM、MySQL、MongoDB、Spring全家…

通俗易懂帮你理清操作系统(Operator System)

文章目录概念&#xff08;是什么&#xff09;设计OS的目的&#xff08;为什么&#xff09;如何理解 "管理"&#xff08;怎么办&#xff09;总结系统调用和库函数概念概念&#xff08;是什么&#xff09; 任何计算机系统都包含一个基本的程序集合&#xff0c;称为操作…

照亮无尽前沿之路:华为正成为科技灯塔的守护者

20世纪中叶&#xff0c;著名科学家、工程师&#xff0c;被誉为“信息时代之父”的范内瓦布什&#xff0c;在《科学&#xff1a;无尽的前沿》中讨论了科学战略与科学基础设施对科技发展的重要性。其中提出&#xff0c;人类科技发展已经从以个人、学校为单位&#xff0c;来到了以…

【能效管理】关于学校预付费水电系统云平台应用分析介绍

概述 安科瑞 李亚俊 壹捌柒贰壹零玖捌柒伍柒 当下智慧校园、平安校园的建设越来越普及&#xff0c;作为智慧校园建设的重要一环&#xff0c;学生宿舍的用电预付费和用电管理措施是必不可少的。学生宿舍预付费电控系统可以解决使用传统电表人工抄表费时费力&#xff0c;不方便统…

[附源码]JAVA毕业设计基于MVC框架的在线书店设计(系统+LW)

[附源码]JAVA毕业设计基于MVC框架的在线书店设计&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 …

光源基础(2)——光的强度、波长、颜色合成与互补关系

光源基本参数 光的度量 辐射能和光能 以辐射形式发射、传播或接收的能量称为辐射能&#xff0c;其计量单位为焦耳(J)。光能是光通量在可见光范围内对时间的积分&#xff0c;其计量单位为流明秒(lms)。 辐射通量和光通量 辐射通量或辐射功率是以辐射形式发射、传播或接收的功率…

【servelt原理_4_Http协议】

Http协议 1.认识url url被称为统一资源定位符&#xff0c;用来表示从互联网上得到的资源位置和访问这些资源的方法。 他的表示方法一般为&#xff1a; <协议>://<主机>:<端口>/<路径>如下我们启动一个servlet程序&#xff0c;来看一下我们的url表示 …