Rocketmq如何保证消息不丢失

news2025/1/11 10:46:43

如果想要保证消息不丢失就要知道,消息可能出现丢失得地方。

1.producer发送消息

2.Broker存储消息

3.Consumer消费消息

4.Broker主从切换

下面一共有9个维度可以保证消息不丢失。

目录

维度一:同步发送

维度二.异步发送

维度三.刷盘策略

维度四.Broker多副本和高可用

维度五.消息确认

维度七.事务消息

维度 9:极端情况


维度一:同步发送

 

public void send() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");
    SendResult sendResult = null;

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    try {
        sendResult = producer.send(sendMessage);
    } catch (Exception e) {
        e.printStackTrace();
    }
    if (sendResult != null) {
        System.out.println(sendResult.getSendStatus());
    }
}

同步发送回返回四个状态码

1.SEND_OK 消息发送成功,需要主要得是,消息发送到Broker之后,还需要有两个操作,一个刷盘,一个同步给Slave节点,默认这两个操作都是异步得,只有把这两个操作都改成同步的,才会保证这条消息真正得发送成功。

2.FLUSH_DISK_TIMEOUT 消息发送成功,但是刷盘超时

3.FLUSH_SLAVE_TIMEOUT 消息发送成功,但是同步到slave节点超时

4.SLAVE_NOT_AVAILABLE 消息发送成功,但是broker的Slave节点不可用

根据状态码,可以做消息重试,这里设置的是3次

注意:消息重试的时候,消费端一定要保证消息幂等性

维度二.异步发送

public void sendAsync() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    producer.send(sendMessage, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            
        }

        @Override
        public void onException(Throwable e) {
            // TODO 可以在这里加入重试逻辑
        }
    });
}

异步发送,在发送的时候会多传递一个SendCallback()方法,需要重写其中的onSuccess和onException方法

维度三.刷盘策略

刷盘策略默认的是异步刷盘,就是消息在写入commitlog中,并不会直接写入到磁盘,而是先写到PageCache缓存后返回成功,然后后台线程异步把消息刷入到磁盘中,异步刷盘提高了消息的吞吐量,但是有可能造成消息丢失的情况,比如断电导致机器停机,PageCache中还没来得及刷盘的消息就会丢失

同步刷盘,就是消息在写入内存后,立刻请求刷盘线程进行刷盘,如果消息没有在约定时间内(默认是5s)刷盘成功,就会返回上面所说的FLUSH_DISK_TIMEOUT,生产者收到这个响应后,可以进行重试,同步刷盘保证了消息的可靠性,同时降低了吞吐量,增加了延迟。修改刷盘策略,就是在broker配置中添加 flushDiskType=SYNC_FLUSH

维度四.Broker多副本和高可用

Borker为了保证高可用,采用一主多从的方式部署。

 

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

  1. slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  2. master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  3. slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  4. master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

维度五.消息确认

Consumer的消费消息的代码如下:
 

public void consume() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.subscribe("topic1", "tag1");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        try{
            System.out.printf("Receive New Messages: %s", msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
}

如果消费成功就会返回CONSUMER_SUCCESS;提交offset并从Broker拉取下一批消息

维度六.Cosumer重试

Consumer消费失败有三种情况

返回RECONSUMER_LATER

返回null

抛出异常

Broker收到这个响应后,会把这条消息放入重试队列,重新发送给Consumer.

注意:1.只有在集群模式下(也就是只要有一个消费者消费成功就行)才会生效,广播模式下(就是每个消费者都需要消费这个消息)是不生效的

        2.Consumer一定要做好幂等性处理

       3.Broker默认最多重试16次,如果重试16次后都失败,就会放入到死信队列中,Consumer可以订阅死信队列进行消费

   重试三次都失败就可以说明出现问题了,这时候我们可以把消息放到本地,给Broker返回Consumer_success来结束重试

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
    //TODO 把消息写入本地存储
    System.out.println("重试次数超过3次");
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

维度七.事务消息

可以参考官网的事务消息: 

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

整个事务消息的详细交互流程如下图所示:

 

 如下是官网的示例代码

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

维度8.消息索引

Rocketmq核心的数据文件有三个:CommitLog,index,consumerQueue

其中Index就是索引文件

 查找消息的时候,首先会根据key的hashcode计算出hash槽的位置,根据这个位置计算Index条目的位置,从Index条目位置读取到消息在CommitLog文件中的offset,从而查找到消息。

在 Producer 发送消息时,可以指定一个 key,代码如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

维度 9:极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/598983.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库期末复习(2)

关系数据库 图1 上图为思考题1的答案 \d student #查看完整性约束 模式和实例 关系的模式:关系有哪些、关系又什么类型、关系的约束时什么&#xff0c;一般来说关系的模式一般比较稳定&#xff0c;不会随着动态的变化而变化。 关系的实例:关系的实例一般随着变化的次数比较…

体验 InsCode AI,原来 AI 也扛不住互联网黑话

CSDN AI写作助手上线了&#xff01;InsCode AI 创作助手不仅能够帮助用户高效创作文章&#xff0c;而且能够作为对话式AI回答你想知道的问题。成倍提高生产力&#xff01;以下是我的体验分享 一、你平时会使用这类AI工具吗&#xff1f;你对这类型的工具有什么看法&#xff1f;…

RPC(2):RPC简介

1 RFC RFC(Request For Comments) 是由互联网工程任务组(IETF)发布的文件集。文件集中每个文件都有自己唯一编号&#xff0c;例如&#xff1a;rfc1831。目前RFC文件由互联网协会(Internet Society&#xff0c;ISOC)赞助发行。 RPC就收集到了rfc 1831中。可以通过下面网址查看…

微信小程序websocket使用protobuf,发送arraybuffer

❤️砥砺前行&#xff0c;不负余光&#xff0c;永远在路上❤️ 目录 前言一、如何在小程序websocket中使用 Protobuf 发送buffer二、使用过程遇到的坑&#xff08;版本问题&#xff09;1、需要注意下Protobuf版本 使用 protobufjs6.8.6最好&#xff0c;我在使用的时候安装7.多 …

1_7后端优化

后端优化是指将一段时间内相机所有关键帧的位姿、内参、每个点3维坐标作为参数进行优化&#xff0c;得到最优的内、外参&#xff1b;利用的方法主要是Bundle Adjustment。 所谓Bundle Adjustment可以理解为从任意特征点发射出来的几束光线&#xff0c;它们会在几个相机的成像平…

寄存器-汇编复习(2)

通过阅读本文小节内容&#xff0c;可以清楚的明白汇编承接的能力和机器语言&#xff0c;高级语言之间的表达关系。文中虽然讨论16位cpu&#xff0c;最新的64或以后的128理论都一样的&#xff0c;类推就好了。 继续将 通用寄存器-汇编复习(1)_luozhonghua2000的博客-CSDN博客 …

很多人打商标的主意,悄悄埋伏

很多人在打商标的主意&#xff0c;等着抢劫呢 等你的品牌结了果实&#xff0c;然后出手勒索 趣讲大白话&#xff1a;鬼子进村&#xff0c;打枪的不要 【趣讲信息科技183期】 **************************** 有些公司申请几千件商标 有公司一个月申请100件 春江水暖贼先知 有一条…

WSL2网络配置

WSL2越来越好用了&#xff0c;但是在windows下使用clash时&#xff0c;配置WSL2的网络很麻烦&#xff0c;通常使用设置环境变量export ALL_PROXY"http://127.0.0.1:8000"的方式有很大的弊端&#xff0c;例如pip和conda就不会走代理。 经过我亲身体验&#xff0c;最好…

基于小脑模型神经网络的轨迹跟踪研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

模拟退火算法(附简单案例及详细matlab源码)

作者&#xff1a;非妃是公主 专栏&#xff1a;《智能优化算法》 博客地址&#xff1a;https://blog.csdn.net/myf_666 个性签&#xff1a;顺境不惰&#xff0c;逆境不馁&#xff0c;以心制境&#xff0c;万事可成。——曾国藩 文章目录 专栏推荐序一、概论二、物理退火1. 加温…

MyBatis - CRUD 操作

文章目录 1.环境配置1.1 导入相关依赖1.2 基本配置1.3 数据模型 2.基于 XML 开发2.1 创建 Mapper 接口2.2 创建 XML 映射文件2.3 insert2.4 select2.5 delete2.6 update2.7 编写单元测试 3.基于注解开发3.1 常用注解3.2 创建 Mapper 接口 MyBatis 支持通过 XML 和注解两种方式来…

chatgpt赋能python:Python运行程序没反应怎么办?

Python运行程序没反应怎么办&#xff1f; Python作为一种高级编程语言&#xff0c;已经成为了很多开发者的首选语言。然而&#xff0c;在使用Python编写程序时&#xff0c;有时候会出现运行程序却没有任何反应的情况。这是什么原因导致的呢&#xff1f;本文将为大家介绍Python…

单例模式的饿/懒汉模式

目录 1. 什么是单例模式2. 饿汉模式2.1 饿汉模式概念2.2 饿汉模式代码 3. 懒汉模式3.1 懒汉模式概念3.2 单线程情况下的懒汉模式3.3 单例模式的写法(保证线程安全) 4. wait 和 sleep 的区别 1. 什么是单例模式 保证某个类在程序中只存在一份实例&#xff0c;而不会创建多个实例…

Apache Kafka - 跨集群数据镜像 MirrorMaker

文章目录 概述跨集群数据镜像的原理MirrorMaker配置小结 概述 在分布式系统中&#xff0c;数据镜像是一项重要的功能&#xff0c;它可以将数据从一个集群复制到另一个集群&#xff0c;以保证数据的高可用性和容错性。Apache Kafka是一个流处理平台&#xff0c;它提供了一种跨集…

程序设计综合实习(C语言):学生成绩单制作

一、目的 1&#xff0e;掌握结构体变量及数组的定义、赋值、初始化、输入、输出 2&#xff0e;结构体数组的操作。 二、实习环境 Visual Stdio 2022 三、实习内容、步骤与要求 1&#xff0e;定义一个结构体数组&#xff0c;存放10个学生的学号&#xff0c;姓名&#xff0c;三…

Linux 设备树文件手动编译的 shell 脚本

前言 前面通过 Makefile 实现手动编译 Linux 设备树 dts 源文件及其 设备树依赖 dtsi、.h 头文件&#xff0c;如何写成一个 shell 脚本&#xff0c;直接编译呢&#xff1f; 其实就是 把 Makefile 重新编写为 shell 脚本即可 编译设备树 shell 脚本 脚本内容如下&#xff1a…

【六一 iKun】Happy LiuYi, iKuns

六一了&#xff0c;放松下。 Python iKun from turtle import * screensize(1000,1000) speed(6)#把衣服画出来&#xff0c;从右肩膀开始#领子 penup() goto(-141,-179) pensize(3) fillcolor("black") pencolor("black") begin_fill() pendown() left(1)…

【Python实战】Python采集高校信息

前言 大家好,我们今天来爬取某站的高校名单,把其高校名单,成员和内容数获取下来,不过,我们发现这个网站比我们平时多了一个验证,下面看看我是怎么解决的。 环境使用 python 3.9pycharm模块使用 requests模块介绍 requests requests是一个很实用的Python HTTP客户端…

【线性dp必学四道题】线性dp四道经典例题【最长上升子序列】、【最长公共子序列】、【最长公共上升子序列(maxv的由来)】【最长公共子串】

【最长上升子序列】、【最长公共子序列】、【最长公共上升子序列】 最长上升子序列f[i] 表示以i结尾的最长子序列 最长公共子序列f[i][j] 表示 a前i 和 b前j个 最长公共长度 最长公共上升子序列f[i][j]代表所有a[1 ~ i]和b[1 ~ j]中以b[j]结尾的公共上升子序列的集合 最长公共子…

Spring Boot如何实现分布式追踪和监控

Spring Boot如何实现分布式追踪和监控 在分布式系统中&#xff0c;由于服务数量的增加和服务之间的相互调用&#xff0c;会出现跨服务的请求链路较长&#xff0c;难以追踪问题和定位性能瓶颈等问题。因此&#xff0c;分布式追踪和监控变得越来越重要。本文将介绍如何使用 Spri…