Chapter7-吞吐量优先的使用场景

news2025/1/15 19:40:54

7.1 在 Broker 端进行消息过滤

        在 Broker 端进行消息过滤,可以减少无效消息发送到 Consumer ,少占用网络带宽从而提高吞吐量。 Broker 端有三种方式进行消息过滤 。

        7.1.1 消息的 Tag 和 Key 

        对一个应用来说,尽可能只用一个 Topic ,不同的消息子类型用 Tag 来标识(每条消息只能有一个 Tag ),服务器端基于 Tag 进行过滤,并不需要读取消息体的内容,所以效率很高。 发送消息设置了 Tag 以后,消费方在订阅消息时,才可以利用 Tag 在 Broker 端做消息过滤。

        其次是消息的 Key 。 对发送的消息设置好 Key ,以后可以根据这个 Key 来查找消息 。 所以这个 Key 一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。 Broker 会创建专门的索引文件,来存储 Key 到消息的映射,由于是哈希索引,应尽量使 Key 唯一 ,避免潜在的哈希冲突 。

        Tag 和 Key 的主要差别是使用场景不同, Tag 用在 Consumer 的代码中,用来进行服务端消息过滤, Key 主要用于通过命令行查询消息 。 

        7.1.2 通过 Tag 进行过滤

        用 Tag 方式进行过滤的方法是传人感兴趣的 Tag 标签, Tag 标签是一个普通字符串,是在创建 Message 的时候添加的, 一个 Message 只能有一个 Tag 。使用 Tag 方式过滤非常高效,Broker 端可以在 ConsumeQueue 中做这种过滤 ,只从 CommitLog 里读取过滤后被命中的消息。 看一下 ConsumerQueue 的存储格式。

        Consume Queue 的第 三部分存储的是 Tag 对应的 hash code ,是一个定长的字符串,通过 Tag 过滤的过程就是对比定长的 hash code 。 经过 hash code 对比 ,符合要求的消息被从 CommitLog 读取出来,不用担心 Hash 冲突问题,消息在被消费前会对比完整的 Message Tag 字符串,消除 Hash 冲突造成的误读。

7.1.3 用 SQL 表达式的方式进行过滤 

        使用 Tag 方式过滤虽然高效,但是支持的逻辑比较简单,在构造 Message的时候,还可以通过 putU serProperty 函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑。

         类似 SQL 的过滤表达式, 支持如下语法 :

  • 数字对比 , 比如 >、>=、<、<= 、 BETWEEN 、 =;
  • 字符串对比,比如=、<>、 IN;
  • IS NULL or IS NOT NULL; 
  • 逻辑符号 AND 、 OR 、 NOT。

支持的数据类型:

  • 数字型,比如 123 、 3.1415;
  • 字符型 ,比如 'abc' 、注意必须用单引 号 ;
  • NULL ,这个特殊字符;
  • 布尔型, TRUE or FALSE 。

        SQL 表达式方式的过滤需 要 Broker 先读出消息里的属性内容, 然 后做SQL 计算,增大磁盘压力,没有 Tag 方式高效。 

        7.1.4 Filter Server 方式过滤 

         Filter Server 是 一 种比 SQL 表 达式更灵活的过滤方式,允许用户自定义Java 函数,根据 Java 函数的逻辑对消息进行过滤 。

        要使用 Filter Server , 首先要在启动 Broker 前在配置文件里加上 filterServer­Nums = 3 这样的配置 , Broker 在 启动的时候 , 就会在本机启动 3 个 FilterServer 进程。 FilterServer 类 似 一 个 RocketMQ 的 Consumer 进程,它从本机Broker 获取消息,然后根据用户上传过来的 Java 函数进行过滤,过滤后的消息再传给远端的 Consumer 。 这种方式会占用很多 Broker 机器的 CPU 资源,要根据实际情况谨慎使用 。 上传的 java 代码也要经过检查 ,不能有申请大内存、创建线程等这样的操作,否则容易造成 Broker 服务器宕机 。 

RocketMQ Filtersrv详解https://my.oschina.net/bieber/blog/492988

 7.2 提高 Consumer 处理能力

         当 C onsumer 的处理速度眼不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有

( 1 )提高消费并行度

        在同一个 ConsumerGroup 下( Clustering 方式),可以通过增加 Consumer实例的数量来提高并行度,通过加机器,或者在已有机器中启动多个 Consumer进程都可以增加 Consumer 实例数。注意总的 Consumer 数量不要超过 Topic 下Read Queue 数量,超过的 Consumer 实例接收不到消息 。 此外,通过提高单个Consumer 实例中的并行处理的线程数 可以在同一个 Consumer 内增加并行度来提高吞吐量(设置方法是修改 consumeThreadMin 和 consumeThreadMax ) 。

( 2 )以批量方式进行消费

        某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及 update 某个数据库, 一次 update IO 条的时间会大大小于十次 update 1 条数据的时间 。 这时可以通过批量方式消费来提高消费的吞吐量 。 实现方法是设置 Consumer 的 consume-MessageBatchMaxSize 这个参数 ,默认是 1 ,如果设置为 N,在消息多 的时候每次收到的是个长度为 N 的消息链表。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

Rocketmq源码分析: Rocketmq push 模式下 consumeMessageBatchMaxSize 和 pullBatchSize 两个参数的意义https://blog.csdn.net/cliuyang/article/details/109221653( 3 )检测延时情况,跳过非重要消息

        Consumer 在消 费的过程中, 如果发现由于某种原因发生严重的消息堆积,短 时间无法消除堆积,这个时候可以选择丢弃不重要 的消息,使 Consumer 尽快追上 Producer 的进度没有优化空间,除此之外还有三种方法可 以提高 Consumer 的处理能力 。 

        当某个队列的消息数堆积到 90000 条以上,就直接丢弃,以便快速追上发送消息的进度。

 7 .3 Consumer 的负载均衡

         上一节中讲到,想要提高 Consumer 的处理速度,可以启动多个 Consumer并发处理,这个时候就涉及如何在多个 Consumer 之间负载均衡的问题,接下来结合源码分析 Consumer 的负载均衡实现。

        要做负载均衡,必须知道一些全局信息,也就是一个 ConsumerGroup 里到底有多少个 Consumer , 知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个 Consumer 。 在 RocketMQ 中,负载均衡或者消息分配是在Consumer 端代码中完成的, Consumer 从 Broker 处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息 。

        7.3.1 DefaultMQPushConsumer 的负载均衡

        DefaultMQPushConsumer 的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个 DefultMQPushConsumer 启动后,会马上会触发一个doRebalance 动作;而且在同一个 ConsumerGroup 里加入新的 DefaultMQPush­Consumer 时,各个 Consumer 都会被触发 do Rebalance 动作。

        如图 7-2 所示,具体的负载均衡算法有五种,默认用的是第一种AllocateMessageQueueAveragely 。 负载均衡的结果与 Topic 的 Message Queue 数量,以及 ConsumerGroup 里的 Consumer 的数量有关。 负载均衡的分配粒度只到 Message Queue ,把 Topic 下的所有 Message Queue 分配到不同的Consumer 中,所以 Message Queue 和 Consumer 的数量关系,或者整除关系影响负载均衡结果。

        以 AllocateMessageQueueAve ragely 策略为例,如果创建 Topic 的时候,把Message Queue 数设为 3 , 当 Consumer 数量为 2 的时候,有一个 Consumer 需要处理 Topic 三分之二的消息,另一个处理三分之一的消息;当 Consumer 数量为 4 的时候,有 一个 Consumer 无法收到消息,其他 3 7.3.2 DefaultMQPullConsumer 的负载均衡个 Consumer 各处理Topic 三分之一 的消息 。可见 Message Queue 数量设置过小不利于做负载均衡,通常情况下,应把一个 Topic 的 Message Queue 数设置为 16 。

RocketMQ负载均衡icon-default.png?t=N2N8https://blog.csdn.net/Weixiaohuai/article/details/123898841

        7.3.2 DefaultMQPullConsumer 的负载均衡

        Pull Consumer 可以看到所有的 Message Queue , 而且从哪个 MessageQueue 读取消息,读消息时的 Offset 都由使用者控制,使用者可以实现任何特殊方式的负载均衡。DefaultMQPullConsumer 有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener 函数

         registerMessageQueueListener 函数在有新的 Consumer 加入或退出时被触发。 另一个辅助工具是 MQPullConsumerScheduleService 类,使用这个 Class类 似使用 DefaultMQPushConsumer ,但是它把 Pull 消息的主动性留给了使用者。

import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService; //导入依赖的package包/类
public static void main(String[] args) throws MQClientException {
    final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

    scheduleService.setMessageModel(MessageModel.CLUSTERING);
    scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {

        @Override
        public void doPullTask(MessageQueue mq, PullTaskContext context) {
            MQPullConsumer consumer = context.getPullConsumer();
            try {
                long offset = consumer.fetchConsumeOffset(mq, false);
                if (offset < 0)
                    offset = 0;

                PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                System.out.println(offset + "\t" + mq + "\t" + pullResult);
                switch (pullResult.getPullStatus()) {
                case FOUND:
                    break;
                case NO_MATCHED_MSG:
                    break;
                case NO_NEW_MSG:
                case OFFSET_ILLEGAL:
                    break;
                default:
                    break;
                }

                consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

                context.setPullNextDelayTimeMillis(100);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    scheduleService.start();
}

        然后我们 看一看在 MQPullConsumerScheduleService 类的实现里,实现负载均衡的代码PullScheduleService.javaicon-default.png?t=N2N8https://gitee.com/apache/rocketmq/blob/develop/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java

        从源码中可以看出,用户通过更改 MessageQueueListenerimpl 的实现来做自己的负载均衡策略。

7.4 提高 Producer 的发送速度 

        发送一条消息出去要经过三步,一是客户端发送请求到服务器,二是服务器处理该请求, 三是服务器向客户端返回应答, 一次消息的发送耗时是上述三个步骤的总和 。在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用 ,可以采用 Oneway 方式发送, O neway 方式只发送请求不等待应答,即将数据写入客户端的 Socket 缓冲区就返回,不等待对方返回结果,用这种方式发送消息的耗时可以缩短到微秒级。

        另一种提高发送速度的方法是增加 Producer 的并发量,使用多个 Producer 同时发送,我们不用担心多 Producer 同时写会降低消息写磁盘的效率,RocketMQ 引人了 一个并发窗口,在窗口内消息可以并发地写入 DirectMem 中 ,然后异步地将连续一段无空洞的数据刷入文件系统当中 。 顺序写 CommitLog 可让 RocketMQ 无论在 HDD 还是 SSD 磁盘情况下都能保持较高的写入性能。 目前在阿里内部经过调优的服务器上,写人性能达到 90 万+的 TPS ,我们可以参考这个数据进行系统优化。

        在 Linux 操作系统层级进行调优,推荐使用 EXT4 文件系统, IO 调度算法使用 deadline 算法。

        如图 7-3 所示, EXT4 创建/删除文件的性能比 EXT3 及其他文件系统要好, RocketMQ 的 CommitLog 会有频繁的创建/删除动作 。

        另外, IO 调度算法也推荐调整为 deadline 。 deadline 算法大致思想如下 :实现四个队列,其中两个处理正常的 read 和 write 操作,另外两个处理超时的read 和 write 操作。 正常的 read 和 write 队列中,元素按扇区号排序,进行正常的 IO 合并处理以提高吞吐量。 因为 IO 请求可能会集中在某些磁盘位置,这样会导致新来的请求一直被合并,可能会有其他磁盘位置 的 IO 请求被饿死。 超时的 read 和 write 的队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时(达到最终期限时间)的队列中的IO 请求会优先被处理。

7.5 系统性能调优的一般流程

         这里讨论的系统是指能完成某项功能的软硬件 整 体,比如我们用RocketMQ ,加上自己 写 的 Producer 、 Consumer 程序,部署到一台服务器上,组成一个消息处理系统。

        首先是搭建测试环境, 查看硬件利用率。 把测试系统搭建好以后,要想办法模拟实际使用时的情况,并且逐步增大请求量 ,同时检测系统的 TPS 。 在请求量增大到一定程度时·,系统的 QPS 达到峰值,这个时候维持这种请求量,保持系统在峰值状态下运行。 然后查看此时系统的硬件使用情况:

       ( 1 )使用 TOP 命令查看 CPU 和内存的利用率

        上面的数据显示, CPU 有 99 . 8% 空闲;内存总共 8G ,有大约 1.5G 空闲 。

        ( 2 ) 使用 Linux 的 sar 命令查看网卡使用情况

         如果想进一步验证网卡是否达到了极限值,可以使用 iperf3 命令查看。 还可以用 nets tat 一t 查看 网卡的连接情况, 看是否有大量连接造成堵塞 。

        然后用 iostat 查看磁盘的使用情况 :

        经过上面的一 系列检查 ,应该能够找到系统的瓶颈。 比如瓶颈是在 CPU 、网卡还是磁盘? 可以先确定网卡和磁盘是否繁忙,这两个中如果有一个被占满了, 问题就可以被直接定位了 。 比如网 卡打满了,我们可以判断是发送的数据量超出了网卡的带宽 ,可以 考虑更换高速网卡,或者更新程序减少数据发送量。

        还有一种情况是这三者都没有到使用极限, 这也是一种比较常见而且有优化空 间的情况,这种情况说明 CPU 利用率没有发挥出来, 比如可能是锁的机制有 bug ,造成线程阻塞 。对于 Java 程序来说,接下来可以用 Java 的 profiling 工具来找出程序的具体问题,比如 jvisualvm 、 jstack 、 perfJ 等 。

        通过上面这些工具,可以逐步定位出是哪些 Java 线程比较慢,|那个函数占用的时间多,是否因为存在锁造成了忙等的情况,然后通过不断的更改测试,找到影响性能的关键代码,最终解决问题。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/428668.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构学习3】线性表-链表、单链表

目录链式存储结构链表概念头结点的意义单链表的定义和表示单链表的基本操作链式存储结构 链表概念 概念 结点在存储器中的位置是任意的&#xff0c;即逻辑上相邻的数据元素在物理上不一定相邻。线性表的链式表示又称为非顺序映像或链式映像用一组物理位置任意的存储单元来存…

三公经费用泛微全过程数字化管理,使用有记录,付款有依据

公开透明是现代财政制度的重要准则和基本特征。组织要以公开、透明、科学的预算制度确定财政支出&#xff0c;贯穿预算编制、执行、监督全过程。 组织常见的费用管理——“三公”经费&#xff0c;通常指因公出国&#xff08;境&#xff09;费、公务用车购置及运行费、公务接待…

富士康转移3000亿产能,iPhone的印度产能倍增,不再“赏饭吃”

日前消息指今年三月份印度的iPhone产量已经是去年的四倍之多&#xff0c;占比将近7&#xff05;&#xff0c;显示出苹果和富士康都在加速提升印度的iPhone产能&#xff0c;凸显出他们的决心&#xff0c;这对中国制造业将带来深远影响。一、富士康对中国制造影响巨大2021年的数据…

leetcode每日一题:数组篇(1/2)

&#x1f61a;一个不甘平凡的普通人&#xff0c;日更算法学习和打卡&#xff0c;期待您的关注和认可&#xff0c;陪您一起学习打卡&#xff01;&#xff01;&#xff01;&#x1f618;&#x1f618;&#x1f618; &#x1f917;专栏&#xff1a;每日算法学习 &#x1f4ac;个人…

key的作用和原理、列表过滤

一、key的作用与原理 虚拟DOM对象的标识。当状态中的数据发生变化时&#xff0c;vue会根据新数据生成新的虚拟DOM。随后vue进行新虚拟DOM与旧虚拟DOM的差异比较&#xff0c;规则如下&#xff1a;对比规则&#xff1a; 旧虚拟DOM中找到了与新虚拟DOM相同的key 若虚拟DOM中内容没…

Iceberg 数据湖是什么?数据湖能解决什么问题?独立于计算层和存储层之间的表格层?

Iceberg 数据湖是什么&#xff1f;数据湖能解决什么问题&#xff1f;独立于计算层和存储层之间的表格层&#xff1f;0. 导读1. Hive数仓遇到的问题2. 一种开放的表格式3. 自下而上的元数据4. 高性能的查询4.1 分区剪裁4.2 文件过滤4.3 RowGroup过滤参考&#xff1a;https://ice…

kafka-4 生产者和消费者

kafka的生产者和消费者四、 生产者4.1 分区分配策略4.2 副本和消息消费4.2.1 副本&#xff08;AR、ISR、OSR&#xff09;4.2.2 HW与LEO4.2.3 ISR 集合和 HW、LEO的关系五、消费者5.1 分区分配策略5.2 消费者offset的存储四、 生产者 4.1 分区分配策略 &#xff08;1&#xff…

【20】linux进阶——linux的数据流和重定向

大家好&#xff0c;这里是天亮之前ict&#xff0c;本人网络工程大三在读小学生&#xff0c;拥有锐捷的ie和红帽的ce认证。每天更新一个linux进阶的小知识&#xff0c;希望能提高自己的技术的同时&#xff0c;也可以帮助到大家 另外其它专栏请关注&#xff1a; 锐捷数通实验&…

一、LED子系统框架分析

个人主页&#xff1a;董哥聊技术我是董哥&#xff0c;嵌入式领域新星创作者创作理念&#xff1a;专注分享高质量嵌入式文章&#xff0c;让大家读有所得&#xff01;文章目录1、前言2、LED裸机处理3、LED子系统框架4、LED子系统目录结构及核心文件5、sysfs目录结构5.1 确保LED子…

低代码开发重要工具:私有化部署的jvs-logic的设计与价值

逻辑引擎介绍 逻辑引擎是一种能够处理逻辑表达式的程序&#xff0c;它能够根据用户输入的表达式计算出表达式的值。在实际应用中&#xff0c;逻辑引擎通常被用于处理规则引擎、决策系统、业务规则配置等领域&#xff0c;具有广泛的应用前景。 逻辑引擎如下图所示&#xff0c;在…

Hive中SQL基本操作

文章目录1. Hive 中DDL1) 数据库操作2) 创建表 Create table3&#xff09;内部外部表区别4) Create Table As Select (CTAS)4&#xff09;分区表 partition2 .Hive中 DML1&#xff09;从文件加载数据 Loading files into tables2&#xff09;插入 INSERT3&#xff09;更新 UPDA…

《疯狂Java讲义》读书笔记7

Navicat快捷键 刷新&#xff1a;FnF5 表内容页面打开表设计页面&#xff1a;CtrlD 打开 MYSQL 命令行窗口&#xff1a;FnF6 MYSQL 增减查改 create table student(stuid int not null auto_increment primary key,stuname varchar(200),stustatus varchar(100) ); create …

TCP/IP协议工作原理与工作流程

一、TCP/IP协议工作原理 1、OSI模型 使用OSI模型来描述一个网络中的各个协议层&#xff0c;如下&#xff1a; 2、理解TCP/IP协议 TCP/IP协议&#xff0c;英文全称Transmission Control Protocol/Internet Protocol&#xff0c;包含了一系列构成互联网基础的网络协议&#xff0…

本地部署ChatGLM-6B模型(使用JittorLLMs大模型推理库)

简介 网上冲浪时发现的这两个国产模型&#xff0c;重点是对硬件的要求并不高&#xff0c;有2GB内存就可以跑&#xff0c;觉得有趣就弄来玩了下。 https://github.com/Jittor/JittorLLMs https://github.com/THUDM/ChatGLM-6B 简单介绍下用到的仓库 ChatGLM-6B ChatGLM-6B 是…

有偏估计的发展及其在球谐函数拟合中的应用研究

有偏估计的发展及其在球谐函数拟合中的应用研究 阚昊宇 专业&#xff1a;大地测量与测量工程 学号&#xff1a;XXX 邮箱&#xff1a;XXX 电话&#xff1a;XXX 摘要&#xff1a;球谐函数在大地测量等领域具有重要意义&#xff0c;然而在观测量不多或空间分布不均匀时进行高阶…

视频编解码(一)之virtio-gpu环境搭建

一、基础概念 VA-API Video Acceleration API 是一组开源应用API接口&#xff0c;赋能应用&#xff08;比如VLC播放器、GStreamer等&#xff09;使用hardware video acceleration&#xff08;一般是GPU提供硬件视频加速功能&#xff09;&#xff0c;VA-API主要由开源库libva和…

机器学习算法系列(三)

机器学习算法之–对数几率回归&#xff08;逻辑斯蒂回归&#xff09;算法 上个算法&#xff08;算法系列二&#xff09;介绍了如何使用线性模型进行回归学习&#xff0c;但若要做的是分类任务&#xff0c;则需要找一个单调可微函数将分类任务的真实标记y与线性回归模型的预测值…

Scons自动大型系统构建工具

1、先了解一下make 在Linux系统上做过c/c++开发的同学们都知道一个项目工程管理工具:make和Makefile。 make 这个工具自上个世纪 70 年代 Stuart Feldman 在贝尔实验室开发出以来,就一直是类 UNIX 程序员的最爱之一。 通过检查文件的修改时间,make 工具可以知道编译目标文…

关于原子服务的思考

在我们以往的工作中经常会遇到“原子服务”或“原子化服务”的概念&#xff0c;如果没有一个清晰的概念边界&#xff0c;会使大家的沟通和讨论无法在同一个共识上展开&#xff0c;大家不在同一频道上&#xff0c;割裂于不同的领域空间&#xff0c;难以产生一致性的进展。那么&a…

SAR ADC系列26:系统设计

目录 串一下知识点&#xff1a; 设计一个高速异步SAR ADC 设计目标&#xff1a; 电路架构&#xff1a;&#xff08;具体电路看前文&#xff09; 采样网络&#xff1a; 采样电容&#xff1a; CDAC开关切换策略&#xff1a; CDAC阵列冗余设计&#xff1a; SAR 逻辑 比…