RocketMQ中的线程池是如何创建的?

news2025/1/24 8:29:07

前言

大家好,今天主要来和大家聊一聊RocketMQ中的线程池是如何创建的,如何设置线程池数量,同时也可以从中去学习到一些线程池的实践和需要注意的一些细节。

RocketMQ在哪些地方使用到了线程池?

在RocketMQ中存在了大量的对线程池的使用,从消息的生产到投递Broker中,到最后的消息消费每一个环节中都大量使用到线程池的地方,下面我们拿出几个不同类型的线程池来看一看。

在 NameServer的路由注册和剔除中,多次使用到了定时线程池

定时线程池

private final ScheduledExecutorService scheduledExecutorService =
	Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
		"NSScheduledThread"));
复制代码
// 定时任务 每10s扫描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	​
	@Override
	public void run() {
		NamesrvController.this.routeInfoManager.scanNotActiveBroker();
	}
}, 5, 10, TimeUnit.SECONDS);

//定时任务,每隔30s向集群中所有NameServer发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	​
	@Override
	public void run() {
		try {
			BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
		} catch (Throwable e) {
			log.error("registerBrokerAll Exception", e);
		}
	}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
复制代码

线程池newFixedThreadPool

FixedThreadPool常用于创建一个固定大小的线程池,

它的特点就是核心线程数量与最大线程数量一致,采用无界的阻塞队列 LinkedBlockingQueue,并且没有设置队列的大小默认是Integer.MAX_VALUE,适用于负载较重的场景

private ExecutorService remotingExecutor;

this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 用来设置接收到消息后的处理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
复制代码

消息发送初始化默认异步发送者线程池

核心线程数与最大线程数设置均为 Runtime.getRuntime().availableProcessors() ,可用的计算资源

阻塞队列设置为一个初始化50000长度的阻塞队列

keepAliveTime设置60s,超过则时间空闲的线程将被终止

private final ExecutorService defaultAsyncSenderExecutor;

private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);

this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
	Runtime.getRuntime().availableProcessors(),
	Runtime.getRuntime().availableProcessors(),
	1000 * 60,
	TimeUnit.MILLISECONDS,
	this.asyncSenderThreadPoolQueue,
	new ThreadFactory() {
		private AtomicInteger threadIndex = new AtomicInteger(0);

		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
		}
	});
复制代码

消费端拉取消息线程池

我们重点来看一下消费端的线程池是如何创建,它可以说是整个RocketMQ中最关键的一个线程池

为了提高消费速度,我们通常有两种方式来提高消费并行度

  1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度
  2. 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

如何创建?

在消息监听的时候,利用线程池进行不断的拉取消息

提交消费请求,消息提交到内部的线程池

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
复制代码

参数设置

创建内部线程池,核心参数核心线程数和最大线程数,主要是根据配置来进行设置

设置线程池名称以 ConsumeMessageThread_ 开头的,利于排查问题

阻塞队列是一个无界的阻塞队列LinkedBlockingQueue

private final BlockingQueue<Runnable> consumeRequestQueue;

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();


this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));
复制代码

通过RocketMQ的源码,我们看到 consumeExecutor 线程池的创建也是非常简单的

如果想要修改线程池参数,需要注意什么?

根据线程池的原理我们知道,只有阻塞队列为满的情况下,不会创建临时线程

所以线程池内部持有的队列为一个无界队列,导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量

什么时候需要修改?

在正常的业务场景中,启动应用之后,我们就不会再修改消费者线程数,但有可能突发业务高峰导致消息堆积,这时候我们就需要调整单个 Consumer 的消费并行线程数。

如何修改线程数?

  1. 修改线程池后,重新启动消费者,缺点是参数不易评估,随着业务的并发提升,需要频繁的重启服务来更改线程数,这势必会带来一定的造成影响。
  2. 官方也为我们提供了修改线程数的方法,当更新的线程数大于0且小于 Short.MAX_VALUE 且小于最大线程数,则更新核心线程数。

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略

@Override
public void updateCorePoolSize(int corePoolSize) {
    if (corePoolSize > 0
        && corePoolSize <= Short.MAX_VALUE
        && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
        this.consumeExecutor.setCorePoolSize(corePoolSize);
    }
}
复制代码

这两种方式都存在一定的痛点

  1. 线程数量随着业务的变动,需要修改代码
  2. 在springBoot和SpringCloud Stream下,对线程池参数变更不是很友好
  3. 不能通过管理界面,直接动态修改线程池参数

针对上面的痛点问题,我们可以考虑封装线程池动态参数调整,首先肯定原来代码是毫无侵入性的,

同时通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/91389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学籍信息网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 学籍信息管理&#xff1a;添加信息、修改信息、删除信息、查询信息 添加信息&#xff0c;管理员根据学生的将信息导入系…

[附源码]Python计算机毕业设计高校师资管理系统Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

行业分析| 智慧头盔在快对讲上的应用与实践

快对讲综合调度系统是基于移动通信网络&#xff0c; 整合集群对讲、视频监控、实时音视频技术、PSTN、GIS定位、IM和调度业务的产品&#xff0c;为客户提供专业对讲、视频会议、可视化融合指挥调度等功能为一体的音视频实时交互平台。 快对讲和智慧头盔 智慧头盔&#xff0c;…

PHP实验室管理系统mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP实验室管理系统 是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为PHP APACHE&#xff0c;数据库为 mysql5.0&#xff0c;使用php语言开发。 PHP…

第10讲:vue脚手架集成axios

一、创建项目并添加axios支持 创建项目请参考&#xff1a;使用脚手架创建vue项目 创建路由项目请参考&#xff1a;路由开发 1.1、添加axios支持 使用如下命令添加axios支持 npm install axios //vue-cli2.0安装方式1.2、在main.js中引用并使用axios 使用如下命令 impor…

git初识(三)

分支 顾名思义&#xff0c;分支就是从主线上分离出来进行另外的操作&#xff0c;而又不影响主线&#xff0c;主线又可以继续干它的事&#xff0c;&#xff0c;最后分支做完事后合并到主线上而分支的任务完成可以删掉了。为了不受其他开发人员的影响&#xff0c;你可以在主分支…

数据看板可视化

前言 这段时间一直在做可视化&#xff0c;在我的项目中有一部分是电力巡检的数据可视化。其中的数据看板比较简单&#xff0c;我将其单独抽离出来形成一个demo&#xff0c;为保密demo中数据非真实数据。先看效果。 具体效果 链接相关 浏览链接&#xff1a;http://xisite.top…

【人工智能与机器学习】——聚类(学习笔记)

&#x1f4d6; 前言&#xff1a;我们之前学习的算法均是有监督学习&#xff08;supervised learning&#xff09;&#xff0c;它的一个鲜明特征是通过给定的标签来学习从数据特征&#xff08;如图像&#xff09;到语义标签的映射关系。但在很多实际问题中&#xff0c;数据并没有…

vuex笔记

Vuex Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式。 调试工具&#xff1a;vue devtools Vuex就像眼镜&#xff1a;您自会知道什么时候需要它。 1、state 在store中定义数据&#xff0c;在组件中直接使用&#xff1a; 目录&#xff1a;store/index.js export defau…

相关分析与回归分析

相关与回归分析就是了解变量之间相关关系的统计方法 一.相关分析 具有相关关系的变量之间&#xff0c;如果不区分原因和结果&#xff0c;我们称之为相关分析 相关分析是看两个因素之间的相关性&#xff0c;不需要确定哪个是自变量&#xff0c;哪个是因变量&#xff0c;两个因…

RK3568 GT911触摸屏调试

屏幕规格书 需要主要硬件通信电压为&#xff1a;1.8V或者3.3V I2C通信的地址&#xff1a;0x5D 和0x40 系统上电时序&#xff1a;不同的地址&#xff0c;稍微有些差异 对应代码中如下&#xff1a; 与RK3568的硬件接口电路 DTS 配置 驱动&#xff1a;RK自带的驱动程序就可以正确工…

音视频学习 -- 弱网对抗技术相关实践

背景介绍 实时音视频通话在当前的生活中是无时不刻存在的&#xff0c;包括社交、安防、交通等等各个方面都需要。用户场景复杂多变、要求严苛、网络环境不一致等给实时音视频通话带来很大条件。我们在这方向稍微做了一些工作&#xff0c;虽然和其他大厂的优化工作相比&#xf…

Commons Collections3

省流 SerialKiller 可以通过⿊名单与⽩名单的⽅式来限制反序列化时允许通过的 类&#xff0c;其中限制了cc1和cc2中命令执行的类&#xff0c;InvokerTransformer cc3就是为了绕过对其的限制&#xff0c;这里使用的是com.sun.org.apache.xalan.internal.xsltc.trax.TrAXFilter来…

基于改进的DBN降水预测方法(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 精确高效的降水预测模型可以更好地反映未来的气候&#xff0c;为管理决策提供重要参考&#xff0c;帮助人们为未来的恶劣天气做…

首个元宇宙国家?!# Tuvalu

当我们还在谈论如何设计和构建一个未来城市的时候&#xff0c;首个入驻元宇宙的国家也诞生了。太平洋岛国「图瓦卢」开始 在虚拟世界创建一个数字国家。这是个怎样的国家&#xff1f;图瓦卢是位于南太平洋的一个岛国&#xff0c;人口约为 1.2 万&#xff0c;由 9 个环形小珊瑚岛…

FKM规范静强度和疲劳强度计算分析

1. 概述 WB-FKM/WB-FKM-Weld工具包是德国CADFEM公司基于FKM规范&#xff08;德国机械协会主持和通过的机械产品强度评估规范&#xff09;的基础上&#xff0c;在ANSYS WB内开发的针对结构母材及焊缝进行静强度和疲劳强度评估的工具包。 该工具包的最大优势是&#xff1a;基于AN…

艾美捷CpG ODN系列——ODN 2006 (TLRGRADE)说明

艾美捷CpG ODN系列——ODN 2006 (TLRGRADE)&#xff1a;具有硫代磷酸酯骨架的CpG寡脱氧核苷酸&#xff08;B型&#xff09;。人和小鼠TLR9&#xff08;Toll样受体9&#xff09;的特异性配体。 艾美捷CpG ODN 丨ODN 2006 (TLRGRADE)化学性质&#xff1a; 序列&#xff1a;5-tcg…

减少win11核显占用的内存怎么操作

减少win11核显占用的内存如何操作是很多小伙伴反应的问题&#xff0c;当我们的电脑新安装完win11的时候会发现系统的内存占用比较高&#xff0c;但是自己却没有开任何的占用高的软件&#xff0c;下面小编给大家分享一下减少win11核显占用的内存操作方法吧&#xff0c;以便解决大…

音视频编解码 -- 编码参数 CRF

之前多多少少接触过一些编解码参数&#xff0c;CRF 参数也用过&#xff0c;但是最近在和朋友们聊天时&#xff0c;说到使用 FFMPEG 过程中碰到 CRF 参数&#xff0c;以及具体作用流程&#xff0c;这个之前一直没有跟踪过&#xff0c;也没有详细记录过&#xff0c;所以吊起了自己…

【笔记】计算机组成原理复习重点——篇四

计算机组成原理复习重点笔记 第&#xff14;章 存 储 器 4.1 概述 存储一个二进制位的物理器件叫存储元。地址码相同的多个存储元构成一个存储单元。若干个存储单元构成存储体。多个存储体构成存储器。多个存储器构成存储体系。存储元→存储单元→存储体→存储器→存储体系 4…