RocketMQ_高级功能

news2024/11/15 8:45:36

目录

一、消息存储

1、存储介质以及性能对比

2、消息的存储和发送

3、消息存储结构

4、刷盘机制

二、高可用性机制

1、消息消费高可用

2、消息发送高可用

3、消息主从复制

三、负载均衡

1、Producer负载均衡

2、Consumer负载均衡

四、消息重试

1、顺序消息的重试

2、无需消息的重试

五、死信队列

1、死信特性

2、查看死信信息

六、消息幂等

1、消息幂等的必要性

2、处理方式


一、消息存储

消息队列由于要考虑到高可靠性的要求,相应的数据需要进行持久化存储。相应处理流程如下:

1、消息生成者发送消息

2、MQ收到消息,将消息进行持久化,在存储中新增一条记录

3、返回ACK给生产者

4、MQ push 消息给对应的消费者,然后等待消费者返回ACK

5、如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤

6、MQ删除消息

1、存储介质以及性能对比

关系型数据库:Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障

文件系统:目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题

性能对比:文件系统>关系型数据库DB

2、消息的存储和发送

消息存储:RocketMQ采用的是文件存储消息,而磁盘操作需要使用得当,其速度可以匹配上网络上传的传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度

消息发送:文件的消息发送到接受方的过程,相应的流程如下,Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  1. 从内核态内存复 制到用户态内存;
  1. 然后从用户态 内存复制到网络驱动的内核态内存;
  1. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的

RocketMQ充分利用了上述特性,也就是所谓的零拷贝技术,提高消息存盘和网络发送的速度

3、消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

  1. CommitLog:存储消息的元数据
  1. ConsumerQueue:存储消息在CommitLog的索引
  1. IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

4、刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

1)同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

2)异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

3)配置

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

二、高可用性机制

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。

MasterSlave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

1、消息消费高可用

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

2、消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

3、消息主从复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

1)同步复制

同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;

在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。

2)异步复制

异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;

3)配置

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

4)总结

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。

三、负载均衡

1Producer负载均衡

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:

图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。

2Consumer负载均衡

1)集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

默认的分配算法是AllocateMessageQueueAveragely,如下图:

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

2)广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。

四、消息重试

1、顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

2、无需消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

2)配置方式

A、消费失败后,重试配置方式

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

返回 Action.ReconsumeLater (推荐)

返回 Null

抛出异常

public class MessageListenerImpl implements MessageListener {

    @Override

    public Action consume(Message message, ConsumeContext context) {

        //处理消息

        doConsumeMessage(message);

        //方式1:返回 Action.ReconsumeLater,消息将重试

        return Action.ReconsumeLater;

        //方式2:返回 null,消息将重试

        return null;

        //方式3:直接抛出异常, 消息将重试

        throw new RuntimeException("Consumer Message exceotion");

    }

}

B、消费失败后,不重试配置方式

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

public class MessageListenerImpl implements MessageListener {

    @Override

    public Action consume(Message message, ConsumeContext context) {

        try {

            doConsumeMessage(message);

        } catch (Throwable e) {

            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;

            return Action.CommitMessage;

        }

        //消息处理正常,直接返回 Action.CommitMessage;

        return Action.CommitMessage;

    }

}

自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

最大重试次数小于等于 16 次,则重试时间间隔同上表描述。

最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。

Properties properties = new Properties();

//配置对应 Group ID 的最大消息重试次数为 20

properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

Consumer consumer =ONSFactory.createConsumer(properties);

注意:

消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。

如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。

配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

获取消息重试次数

消费者收到消息后,可按照如下方式获取消息的重试次数:

public class MessageListenerImpl implements MessageListener {

    @Override

    public Action consume(Message message, ConsumeContext context) {

        //获取消息的重试次数

        System.out.println(message.getReconsumeTimes());

        return Action.CommitMessage;

    }

}

五、死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

1、死信特性

死信消息具有以下特性:

  1. 不会再被消费者正常消费。
  1. 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性

  1. 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  1. 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  1. 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

2、查看死信信息

在控制台查询出现死信队列的主题信息

在消息界面根据主题查询死信消息

选择重新发送消息

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

六、消息幂等

消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。

1、消息幂等的必要性

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

A、发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

B、投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

C、负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

2、处理方式

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:

Message message = new Message();

message.setKey("ORDERID_100");

SendResult sendResult = producer.send(message);

订阅方收到消息时可以根据消息的 Key 进行幂等处理:

consumer.subscribe("ons_test", "*", new MessageListener() {

    public Action consume(Message message, ConsumeContext context) {

        String key = message.getKey()

        // 根据业务唯一标识的 key 做幂等处理

    }

});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/656712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信无人托管智能客服系统

随着人工智能技术的不断发展,大语言模型、智能客服、垂直化场景应用和微信聊天等三方终端系统已经成为了企业营销的重要工具。这些技术的结合可以帮助企业更好地与客户进行沟通,提高客户满意度和忠诚度,从而实现营销目标。 大语言模型可以帮…

Android:绘制自定义View人脸识别框

一.绘制矩形框实现 项目开发需要自定义View实现一个人脸框,代码实现很平常,一些细节记录一下,方便以后查阅。 代码实现: 1.1 自定义人脸识别框: FaceView.java package com.android.example.ui.view;import andro…

018:vue中自定义el-table 表头和单元格的样式

第018个 el-table 用于展示多条结构类似的数据,可对数据进行排序、筛选、对比或其他自定义操作。 vue在使用element UI table的是经常要用到的,由于原有的表头和单元格的样式不能满足项目的需要,需要自己来自定义样式。同时这里也做了个overf…

B/S版医院检验科lis系统源码 云lis系统

LIS系统为实验室服务对象提供检验申请、采集标本、结果查询等功能;为实验室工作人员的核收标本、分送标本、传送资料、分析前处理、质量控制、单向或双向通讯、分析后处理、结果审核、打印报告、结果查询等标本检测过程提供全面的技术支持。 .Net Core LIS系统源码…

python代码性能分析

基准测试可以发现程序变慢了,那么是因为什么原因导致性能变慢的,需要进一步做代码性能分析。python同样提供了性能分析工具。 cProfile cProfile是python默认的性能分析器,他只测量CPU时间,并不关心内存消耗和其他与内存相关联的…

逻辑越权之找回机制及接口安全(35)

会涉及到这三个内容 验证会涉及到,暴力测试,绕过测试 找回会涉及到,客户端回显,respponse状态值,找回流程绕过 接口会涉及到,调用便利 找回就像是忘记密码那种,然后会有验证,手机…

知识图谱实战应用16-知识图谱在化学物质结构上的应用,快速查找化学分子式与结构

大家好,我是微学AI,今天给大家介绍一下知识图谱实战应用16-知识图谱在化学物质结构上的应用,快速查找化学分子式与结构。在化学领域,知识图谱可以应用于化学物质结构上。化学物质结构主要指分子结构和化学键的组成情况。知识图谱可以将化学物质结构的相关数据以图谱的形式展…

oVirt 4.4.10三节点超融合集群安装配置及集群扩容(三)

本篇主要记录安装及使用过程中遇到的问题<包含4.4.x, 4.5.x> 设置engine管理页面可以通过IP访问ssh连接engine服务器并在/etc/ovirt-engine/engine.conf.d新建99-custom-sso-setup.conf,添加engine节点的IP或出口IPSSO_ALTERNATE_ENGINE_FQDNS="engine103.cluster.…

MySQL 索引与事务

MySQL 索引相关知识详解与事务的详解 一、索引的概念二、索引的作用索引的副作用 三、索引是如何实现的四、创建索引的原则依据五、索引的优缺点1、优点2、缺点 六、索引的分类和创建1、普通索引2、唯一索引3、主键索引4、组合索引5、全文索引 七、索引的查看八、索引的删除1、…

操作系统2:进程的描述与控制

目录 1、什么是前驱图&#xff1f; 2、进程的定义和描述 &#xff08;1&#xff09;什么是进程&#xff1f; &#xff08;2&#xff09;进程的基本状态及转换 &#xff08;3&#xff09;挂起操作和进程状态的转换 3、进程管理中的数据结构 &#xff08;1&#xff09;进程…

楼宇照明系统在图书馆的应用介绍 安科瑞 许敏

【摘要】EIB总线作为楼宇家居自动化控制技术的主流&#xff0c;具有适应性好、功能强大与可靠性高等多方面优点&#xff0c;能很 好地满足定时、合成照度、人体检测和手控等不同的照明控制需求。通过智能化的自动控制实现了楼宇的舒适照 明和节能照明两大目标&#xff0c;克服传…

一、DSMP/OLS等夜间灯光数据贫困地区识别——理论

一、前言 对于贫困的定量研究,前人多实用传统的社会经济统计数据构建模型,但是该数据存在统计口径多源、样本获取受限等不足,不能较好的反映区域贫困的时间按序列编号。随着遥感技术的不断发展,DMSP/OLS等夜间灯光数据的广泛应用为了大范围、动态的区域贫困监测提供一种新…

c++系列之string类的常用接口函数

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; string string时表示字符串的字符类 //使用 string类包含#include 头文件 以及…

基于Java人事管理信息系统设计实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a; ✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精…

fast admin 使用百度富文本编辑器添加赋值

这篇文章为大家介绍一下fastadmin框架如何引入并使用百度富文本 文章目录 前言下载文件编辑文件配置上传图片添加代码总结 前言 在学习fastadmin的时候需要使用到富文本编辑器&#xff0c;于是查阅了一下资料&#xff0c;实现后将我的经验分享给大家 一、下载文件并放入自己的…

es6的模块化 import()方法进行动态加载模块

import 语句和import()方法是不一样的 import 语句是在编译的时候起作用&#xff0c;if&#xff08;&#xff09;{import 语句} 这句话在编译的时候不会执行if语句&#xff0c;会报错 import 语句无法在运行时加载模块&#xff0c;在语法上&#xff0c;按条件加载模块是不行…

Andriod开发 fragment

1.fragment fragment是一个可以嵌入到Activity中的可重用UI组件。它可以让你在一个Activity中展示多个界面&#xff0c;并且可以在运行时动态地添加、移除、替换和组合不同的fragment&#xff0c;从而实现复杂的UI交互效果。 与Activity类似&#xff0c;Fragment也有自己的生…

Linux监控Raid磁盘健康状态

Raid卡型号与操作 Raid卡市场主要是LSI、Adaptec、Highpoint、Promise等厂商提供。Adaptac被PMC收购后&#xff0c;提供的Raid卡即为PMC,简称为P卡。LSI公司提供的Raid卡&#xff0c;即为L卡。 Raid卡配置操作方式 Raid配置可以通过BIOS启动后进入Raid的配置页面进行配置&#…

【Proteus仿真】74HC192功能验证

前言 74HC192是一种四位可向上或向下计数的计数器芯片&#xff0c;可用于电子设备中的计数器、定时器和频率计等应用。74HC192的模式可以分为4种&#xff0c;向上计数&#xff0c;向下计数&#xff0c;并行输入&#xff0c;重置。还有就是&#xff0c;仿真中一些引脚的名称可能…