RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九)

news2024/10/23 18:21:21

0. 引言

在金融、电商等对数据完整性要求极高的行业,消息的丢失可能会导致数据不一致,严重影响业务逻辑和数据统计,也影响客户体验,所以在很多业务场景下,我们都要求数据不能丢失。而rocketmq中,如何对消息防丢失进行处理的呢?

1. 原理

1.2 产生消息丢失的场景

首先我们要理解消息的传递过程,在哪些阶段会导致消息丢失,才能知道如何进行防控。

我们之前分析过rabbitmq如何保证消息不丢失, rabbitmq内部有交换机这一转发步骤,所以相对比rocketmq更加复杂,但是两者的分析方法是一致的。
在这里插入图片描述

rocketmq的消息传递分为3个阶段:
(1)生产者发送消息到broker的队列中
(2)broker存储消息
(3)消费者到队列获取消息进行消费
在这里插入图片描述
而这三个阶段可能会导致消息丢失的场景是什么呢?其实由rabbitmq的分析我们可以得到启发。
(1)生产者发送消息到broker的队列中

生产者在发出消息后,可能因为网络异常、broker宕机,导致发出的消息实际并没有到达broker

(2)broker存储消息

broker的存储机制是将消息先存储到内存,存储完成后再发送回执给生产者,然后再异步将数据刷到磁盘,但如果在这个刷盘这个过程中broker宕机了,也会导致消息丢失

(3)消费者到队列获取消息进行消费

broker在将消息发出后,同样可能因为网络异常、消费者宕机或者消息者消费到一半产生错误等因素,导致消息实际并没有被消费者消费,但broker又扣除了这条消息,就会导致消息丢失

1.2 防丢失措施

阶段一:生产者发送消息到broker的队列中

1、因为发送到broker期间网络因素我们很难干预,也很难百分比保证。第一点我们能做的,如果其中一个broker宕机,那能有备用节点顶上,保证可用性。于是第一项就是多节点部署broker

2、但万一节点都挂了呢,或者整个机房网络瘫痪了,如何保证消息不丢失,我们只要从上游控制,如果下游不通时,就不要发了,待会再发。于是也衍生出消息发送失败时的重试机制

3、但如果一直重发不成功怎么办呢,那就需要下游告知上游,这次发送没成功,你记录好状态,这就是broker要有返回状态告知,否则生产者也不知道到底发送成功没有。broker中提供了3种发送方式:同步、异步、单向(详见之前的文章: RocketMQ快速入门:集成java客户端实现各类消息发送|异步、同步、顺序、单向、延迟、事务(五)附带源码)。

这三种方式中单向肯定不行,他是不管返回结果的,最容易丢失消息。而异步需要设置回调函数,在回调函数中处理发送失败时的逻辑,如果对于一些场景回调里很能补救,最常见的就是回调里进行重发,所以最优先保证消息可靠的就是同步发送的方式,一旦获取到发送失败,就进行补救处理,或者不再继续后续的业务逻辑,整个流程直接报错打回

总结一下,生产阶段保证消息可靠的手段包括多节点部署broker, 消息重发、同步发送。这几种方式实际上是可以配合使用的, 比如多节点部署,通过同步发送,发送失败时进行3次重发,都重发失败则记录状态。
在这里插入图片描述

阶段二:broker存储消息

存储阶段导致丢失的原因就是因为broker默认的是异步刷盘机制,如果改成同步刷盘呢,先存储到内存,然后刷新到磁盘,刷新成功后才给生产者返回成功收到的回执,以此保证消息可靠。rocketmq中也是支持同步刷盘的。

但如果只有一个节点的话,即使同步刷盘,当broker宕机后,没有备用的,还是会导致服务不可用,相对可靠性就没有保障了。所以同步刷盘,也可以配合着多节点部署使用

当然如果你的场景对可用性要求不高,即使宕机,只要报错会生产者就行,那同步刷盘也足够了

总结一下,存储阶段主要依赖同步刷盘和多节点部署来保障可靠性,当然多节点部署可以根据业务情况和成本预算选择。
在这里插入图片描述

阶段三:消费者到队列获取消息进行消费

消费阶段的丢失可能性主要来源于消费者没处理好之前就宕机或则异常了,首先一点能想到的那就多个消费者呢,但实际上多点部署在消费阶段并不能解决问题,因为rocketmq消费模式有广播模式和集群模式,广播模式下每个节点都会收到消息,这个模式下的天然就是多节点部署。而集群模式本身也是基于多消费者的情况,但两者都无法保障当消息发送给某一节点后,这个节点拉去了消息,但没实际处理完就异常的场景。

所以就考虑当消费者消费完成后,再给broker发送成功消费的回执,这时broker才更新消息偏移量,将消息标识为被消费。如此才能保障消息的可靠性。

在这里插入图片描述

2. 实现

2.1 生产阶段

1、多节点部署,可以部署主从或集群模式,这里不讲解详细的搭建流程,后续单独讲解

2、同步发送,主要通过producer.send来实现同步发送

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 声明group
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 声明namesrv地址
        producer.setNamesrvAddr("localhost:9876");

        // 设置重试次数
        producer.setRetryTimesWhenSendFailed(2);
        // 启动实例
        producer.start();

        // 设置消息的topic,tag以及消息体
        Message msg = new Message("topic_test", "tag_test", "消息内容1".getBytes(StandardCharsets.UTF_8));

        // 发送消息,并设置10s连接超时
        SendResult send = producer.send(msg, 10000);
        System.out.println("发送结果:"+send);

        // 关闭实例
        producer.shutdown();
    }

如果是springboot集成的,可以通过SendResult sendResult = rocketMQTemplate.syncSend("topic_test:tag_test", message);来实现。通过发送结果SendResult对象,来判断发送失败后的处理逻辑

3、发送重试
(1)首先手动重发的实现很简单,只需要根据send.getSendStatus()状态来判断,如果需要重发多次的,可以结合guava-retry 等重发组件来更方便的实现

// 发送消息,并设置10s连接超时
        SendResult send = producer.send(msg, 10000);
        System.out.println("发送结果:"+send);
        
        if(!send.getSendStatus().equals(SendStatus.SEND_OK)){
            // 发送失败,手动重发
            send = producer.send(msg, 10000);
            
        }

(2)当然rocketmq也封装好了重试机制给我们使用,其重试机制采用衰减的形式,也就是重试间隔时间会逐渐增加
在这里插入图片描述
我们只需要通过producer.setRetryTimesWhenSendFailed(2);方法即可设置,会在发送失败时自动触发重新发送,同时如果超过设置的超时时间还未接收到成功的结果也会触发重发机制,就不需要我们手动书写重发逻辑了,更加推荐这种方式。

        // 声明group
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 声明namesrv地址
        producer.setNamesrvAddr("localhost:9876");

        // 设置重试次数
        producer.setRetryTimesWhenSendFailed(2);
        // 启动实例
        producer.start();

2.2 存储阶段

1、主要将broker的刷盘策略设置为同步刷盘,需要修改broker.conf配置文件

# 设置为同步刷盘模式
flushDiskType = SYNC_FLUSH

2、如果配置的是多节点,一般是主从模式,为了防止主节点有数据,从节点没刷到数据的情况,就需要开启从节点刷盘后再返回ACK回执给生产者,需要修改从节点broker配置文件

# 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER

broker提供了两种主从同步模式:ASYNC_MASTER异步 和 SYNC_MASTER同步

  • ASYNC_MASTER:

消息发送到master节点后,开启一个异步线程更新给从节点,这个过程中有消息同步丢失的风险,优点是性能高

  • SYNC_MASTER:

消息发送到master节点后,同步更新到从节点,当从节点更新完再返回成功的ACK回执给生产者,表示消息发送成功,可靠性高,但性能会有所下降

3、关于多节点部署,我们在后续单独讲解

2.3 消费阶段

1、其实现就是消费后返回成功状态即可

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_test", "*");

        // 设置重试次数
        consumer.setMaxReconsumeTimes(2);

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : list) {
                    String topic = msg.getTopic();
                    try {
                        String messageBody = new String(msg.getBody(), "utf-8");
                        System.out.println(topic+":"+messageBody);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者实例
        consumer.start();

2、如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER状态,消费者就会触发稍后重试机制进行重新消费,同样的可以通过consumer.setMaxReconsumeTimes设置最大重试次数

// 设置重试次数
consumer.setMaxReconsumeTimes(2);

其重试次数和时延等级与生产重试是一致的
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1843146.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jar包运行脚本

start&#xff1a; # 启动项目 #!/bin/bash nohup java -jar audit-2.1.0.jar > app.log 2>&1 & quit&#xff1a; # 关闭程序 #!/bin/bash PID$(pgrep -f audit-2.1.0.jar) # 根据应用程序名称查找进程ID kill -9 $PID # 结束进程使用 sh命令运行

泛微开发修炼之旅--22泛微实现免登陆得解决方案之一

文章链接&#xff1a;22泛微实现免登陆得解决方案之一

SSM旅游系统

摘要 旅游业正处于快速发展阶段&#xff0c;旅游系统的建设已经成为了旅游业发展的重要核心问题。在这样的背景下&#xff0c;SSM框架正逐步发展为一种主要的架构。但目前青海省旅游业信息化的发展仍面临诸多问题&#xff0c;包括系统功能不完善、用户体验不佳、数据管理不规范…

【记录】使用远程SSH配置d2l环境(含装pytorch,同时适用于本地anaconda)

文章目录 前言一、从创建新环境开始二、使用步骤1.安装pytorch2.安装 d2l 包3.安装其他包4.使用jupyter notebook 前言 记录一下如何利用使用命令行进行anaconda配置 d2l环境、pytorch并进行训练深度学习模型。 一、从创建新环境开始 如果是本地直接装一个 anaconda 软件就行…

Matlab数学建模实战应用:案例4 - 图像处理

目录 前言 一、图像处理基础 二、Matlab图像处理工具箱 三、案例&#xff1a;图像锐化、去噪和分割 步骤 1&#xff1a;读取和显示图像 步骤 2&#xff1a;图像锐化 步骤 3&#xff1a;图像去噪 步骤 4&#xff1a;图像分割 完整代码示例 四、实际应用 实例总结 总…

多路输出调光无频闪36V48V60V/300W恒流舞台灯调光芯片FP7126 LED舞台帕灯/激光灯控制IC,无频闪无噪音,多路共阳,调光深度0.1%

方案背景 在舞台演出中&#xff0c;灯光扮演着非常重要的角色&#xff0c;它不仅可以烘托氛围&#xff0c;营造氛围&#xff0c;更能够为表演者增添光彩&#xff0c;塑造形象。在博物馆场所中&#xff0c;突出展品细节。根据灯光用途和适用类型&#xff0c;舞台灯可以细分为聚光…

软硬件节水“组合拳”,助力智慧灌区信息化建设!

水资源短缺已成为全球共同面临的挑战&#xff0c;尤其在农业灌溉领域&#xff0c;其影响尤为显著。农业作为水资源消耗的主要行业之一&#xff0c;在日益严峻的水资源形势下&#xff0c;构建节水型灌区的紧迫性日益凸显。 节水型灌区的建设&#xff0c;旨在通过优化灌溉方式、减…

第三期【Demo教程】教你使用SeaTunnel把数据从MySQL导到Hive

随着数据技术的快速发展&#xff0c;了解并掌握各种工具和技术变得尤为重要。为此&#xff0c;我们准备在Apache SeaTunnel社区发起如何使用连接器的Demo演示计划&#xff0c;邀请所有热爱数据同步技术的同学分享他们的知识和实操经验! 我们第三期主题是&#xff1a;如何使用Se…

Python和OpenCV图像分块之图像边长缩小比率是2

import cv2 import numpy as npimg cv2.imread("F:\\mytupian\\xihuduanqiao.jpg") # 低反光 cv2.imshow(image, img) # # 图像分块 # dst np.zeros(img.shape, img.dtype) ratio 2 #图像边长缩小比率是2&#xff0c;也就是一张图片被分割成四份 height, wi…

Python学习笔记6:pychram相关知识及安装教程,后续需要学习的入门知识

上篇文章说了&#xff0c;今天去公司重新装一下IDE&#xff0c;最后也是把过程这边再记录一下&#xff0c;有需要的可以参考一下。 关于pychram pychram是什么&#xff1f; PyCharm是由JetBrains公司开发的一款流行的Python集成开发环境&#xff08;IDE&#xff09;。它专为…

[创业之路-120] :全程图解:软件研发人员如何从企业的顶层看软件产品研发?

目录 一、企业全局 二、供应链 三、团队管理 四、研发流程IPD 五、软件开发流程 六、项目管理 七、研发管理者的自我修炼 一、企业全局 二、供应链 三、团队管理 四、研发流程IPD 五、软件开发流程 六、项目管理 七、研发管理者的自我修炼

RabbitMQ的部署

一、前言 演示的为RabbitMQ的单机部署&#xff0c;在Centos7虚拟机中使用Docker来安装&#xff0c;需要掌握相应的docker命令 二、下载镜像 启动Docker: systemctl start docker 在线拉取&#xff1a;docker pull docker pull rabbitmq:3-management 三、安装MQ 运行容器&…

华媒舍:8个让你东南亚媒体发稿事半功倍的方法

本文将为您介绍8个方法&#xff0c;可以帮助您在东南亚地区的媒体发稿过程中事半功倍。无论您是一名公关人员、市场营销专家还是普通的新闻工作者&#xff0c;这些方法都将对您极具帮助。 1. 了解目标受众 在东南亚地区发布媒体稿件时&#xff0c;首要的步骤是了解目标受众。不…

Matlab数学建模实战应用:案例3 - 投资组合优化

目录 前言 一、问题分析 二、模型建立 三、Matlab代码实现 完整代码示例 四、模型验证 五、模型应用 实例示范&#xff1a;投资组合优化 步骤 1&#xff1a;导入数据并计算统计量 步骤 2&#xff1a;建立优化模型并求解 步骤 3&#xff1a;绘制有效前沿&#xff08;…

汇聚荣做拼多多口碑怎么样?

汇聚荣做拼多多口碑怎么样?汇聚荣作为拼多多平台上的一个商家&#xff0c;其口碑的好坏直接关联到消费者的购买决策和品牌信誉。在电商平台上&#xff0c;良好的口碑是吸引顾客的重要因素之一&#xff0c;尤其是对于竞争激烈的拼多多平台而言。那么&#xff0c;汇聚荣在拼多多…

iOS APP内存泄漏的问题

iOS APP内存泄漏是指应用程序不再使用内存&#xff0c;但内存却没有被释放&#xff0c;导致应用程序占用过多的内存&#xff0c;甚至崩溃。内存泄漏是iOS开发中常见的问题&#xff0c;会严重影响应用程序的性能和稳定性。北京木奇移动技术有限公司&#xff0c;专业的软件外包开…

《车载以太网通信测试》课程来袭!!!

本课程包含教程和脚本两部分内容。 教程 详细介绍以太网&#xff0c;如何理解TCP/IP协议&#xff0c;CAPL中涉及以太网的代码&#xff0c;以太网测试环境如何搭建&#xff0c;从物理层、链路层、网络层、传输层到应用层多种协议测试点的测试原理和测试方法介绍&#xff0c;中…

【PL理论】(34) 类型系统:不完备性 | 为什么推导树推导失败? | 实现类型系统 | 调整到类型系统 | 思考:强制程序员写类型还是自动推断类型?

&#x1f4ac; 写在前面&#xff1a;回顾我们的目标是为 F- 语言设计一个完备但不完全的类型系统&#xff0c;本章我们探讨的主题是类型系统的完备性。 目录 0x00 类型系统的不完备性 0x01 为什么推导树推导失败&#xff1f; 0x02 实现类型系统 0x03 调整到类型系统 0x04…

Java面试八股之什么是mybatis流式查询

什么是mybatis流式查询 Mybatis流式查询是一种处理大量数据的有效方法&#xff0c;它允许你以低内存消耗的方式来处理查询结果。传统的查询操作会一次性将所有数据加载到内存中&#xff0c;如果数据量非常大&#xff0c;可能会导致OutOfMemoryError&#xff08;OOM&#xff09…

js语法---weakMap和weakSet:弱映射和弱集合

weakMap weakMap是Map的一种&#xff0c;但它有更多的限制&#xff0c; 1. WeakMap 和 Map 的第一个不同点就是&#xff0c;WeakMap 的键必须是对象&#xff0c;不能是原始值(number,string,symbol...) 2. WeakMap 不支持迭代以及 keys()&#xff0c;values() 和 entries() …