Rocketmq面试(二)Rocketmq如何保证消息不丢失

news2024/11/24 14:45:52

如果想要保证消息不丢失就要知道,消息可能出现丢失得地方。

1.producer发送消息

2.Broker存储消息

3.Consumer消费消息

4.Broker主从切换

下面一共有9个维度可以保证消息不丢失。

目录

维度一:同步发送

维度二.异步发送

维度三.刷盘策略

维度四.Broker多副本和高可用

维度五.消息确认

维度七.事务消息

维度 9:极端情况


维度一:同步发送

public void send() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");
    SendResult sendResult = null;

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    try {
        sendResult = producer.send(sendMessage);
    } catch (Exception e) {
        e.printStackTrace();
    }
    if (sendResult != null) {
        System.out.println(sendResult.getSendStatus());
    }
}

同步发送回返回四个状态码

1.SEND_OK 消息发送成功,需要主要得是,消息发送到Broker之后,还需要有两个操作,一个刷盘,一个同步给Slave节点,默认这两个操作都是异步得,只有把这两个操作都改成同步的,才会保证这条消息真正得发送成功。

2.FLUSH_DISK_TIMEOUT 消息发送成功,但是刷盘超时

3.FLUSH_SLAVE_TIMEOUT 消息发送成功,但是同步到slave节点超时

4.SLAVE_NOT_AVAILABLE 消息发送成功,但是broker的Slave节点不可用

根据状态码,可以做消息重试,这里设置的是3次

注意:消息重试的时候,消费端一定要保证消息幂等性

维度二.异步发送

public void sendAsync() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    producer.send(sendMessage, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            
        }

        @Override
        public void onException(Throwable e) {
            // TODO 可以在这里加入重试逻辑
        }
    });
}

异步发送,在发送的时候会多传递一个SendCallback()方法,需要重写其中的onSuccess和onException方法

维度三.刷盘策略

刷盘策略默认的是异步刷盘,就是消息在写入commitlog中,并不会直接写入到磁盘,而是先写到PageCache缓存后返回成功,然后后台线程异步把消息刷入到磁盘中,异步刷盘提高了消息的吞吐量,但是有可能造成消息丢失的情况,比如断电导致机器停机,PageCache中还没来得及刷盘的消息就会丢失

同步刷盘,就是消息在写入内存后,立刻请求刷盘线程进行刷盘,如果消息没有在约定时间内(默认是5s)刷盘成功,就会返回上面所说的FLUSH_DISK_TIMEOUT,生产者收到这个响应后,可以进行重试,同步刷盘保证了消息的可靠性,同时降低了吞吐量,增加了延迟。修改刷盘策略,就是在broker配置中添加 flushDiskType=SYNC_FLUSH

维度四.Broker多副本和高可用

Borker为了保证高可用,采用一主多从的方式部署。

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

  1. slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  2. master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  3. slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  4. master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

维度五.消息确认

Consumer的消费消息的代码如下:
 

public void consume() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.subscribe("topic1", "tag1");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        try{
            System.out.printf("Receive New Messages: %s", msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
}

如果消费成功就会返回CONSUMER_SUCCESS;提交offset并从Broker拉取下一批消息

维度六.Cosumer重试

Consumer消费失败有三种情况

返回RECONSUMER_LATER

返回null

抛出异常

Broker收到这个响应后,会把这条消息放入重试队列,重新发送给Consumer.

注意:1.只有在集群模式下(也就是只要有一个消费者消费成功就行)才会生效,广播模式下(就是每个消费者都需要消费这个消息)是不生效的

        2.Consumer一定要做好幂等性处理

       3.Broker默认最多重试16次,如果重试16次后都失败,就会放入到死信队列中,Consumer可以订阅死信队列进行消费

   重试三次都失败就可以说明出现问题了,这时候我们可以把消息放到本地,给Broker返回Consumer_success来结束重试

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
    //TODO 把消息写入本地存储
    System.out.println("重试次数超过3次");
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

维度七.事务消息

可以参考官网的事务消息: 

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

整个事务消息的详细交互流程如下图所示:

 如下是官网的示例代码

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

维度8.消息索引

Rocketmq核心的数据文件有三个:CommitLog,index,consumerQueue

其中Index就是索引文件

 查找消息的时候,首先会根据key的hashcode计算出hash槽的位置,根据这个位置计算Index条目的位置,从Index条目位置读取到消息在CommitLog文件中的offset,从而查找到消息。

在 Producer 发送消息时,可以指定一个 key,代码如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

维度 9:极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/622392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

后端——平台登录功能实战

这里写目录标题 一、登录接口设计示意图二、后端设计三、创建用户表四、后端鉴权逻辑五、登录接口实现六、使用 JWT 生成 token七、路由鉴权八、登录与测试用例服务结合九、跨域一、登录接口设计示意图 二、后端设计 三、创建用户表 db=SQLAlchemy(app

华尔街新风向:多基金失英伟达机会

在过去一年多的美联储暴力加息周期中&#xff0c;科技成长股一直不怎么受到主流投资者待见&#xff0c;但面对今年美股“人工智能涨个不停”的局面后&#xff0c;过去两周里大量的知名基金都在撒开脚丫子狂追高速狂飙的“英伟达列车”。 根据监管文件显示&#xff0c;包括道富…

视频与AI,与进程交互(一)

目的 正在写一个视频与AI的工具&#xff0c;从接入&#xff0c;算法处理&#xff0c;转发&#xff0c;存储&#xff0c; 到调用AI进程&#xff0c;并且与AI进程进行交互&#xff0c;插件化&#xff0c;脚本化&#xff0c;做得比较辛苦&#xff0c;期间的进程和线程交互以及结果…

基于奥比中光深度相机进行虹膜识别处理

MATLAB仿真实现效果展示 图1 奥比中光红外深度相机拍摄效果 MATLAB仿真红外效果的图片&#xff0c;使用奥比中光的Astra_Pro深度相机和Astra进行拍摄&#xff0c;效果很好。 声明&#xff1a;本文的虹膜识别系统模型参考了西澳大利亚大学&#xff0c;计算机科学与软件工程学…

Spark 优化

1.RDD分区数 Task是作用在每个分区上的&#xff0c;每个分区至少需要一个Task去处理 改变分区数可间接改变任务的并行度&#xff0c;类似手动指定Reduce数量 第一个RDD的分区数由切片的数量决定 默认情况下子RDD的分区数等于父RDD的分区数 Shuflle类算子可手动指定RDD分区数 设…

chatgpt赋能python:Python屏幕截图并保存:简单易用的库

Python屏幕截图并保存&#xff1a;简单易用的库 屏幕截图是程序员们在软件开发中常用到的一个小技巧&#xff0c;对于调试、记录Bug、编写文档等方面有极大的帮助。而其中&#xff0c;Python成为了众多程序员的利器之一。 在Python中&#xff0c;大量的库提供了屏幕截图的方法…

PX4-机架选取(基于QG地面站)

因为我的无人机是F450&#xff0c;所以我选用F450的机架 点击应用后&#xff0c;要稍等一会 应用完成后在概述会标识

经纬度坐标为中心点生成米距离长度半径的圆形面,含java js源码+在线绘制,代码简单零依赖

文章目录 java版源码js版源码在线绘制预览效果关于计算的精确度 前些时间在更新我的坐标边界查询工具的时候&#xff0c;需要用到经纬度坐标点的距离计算&#xff0c;和以坐标点为中心生成一个指定距离为半径的圆&#xff0c;搜了一下没有找到现成简单又合适的代码&#xff0c;…

基于OpenCV 和 Dlib 进行头部姿态估计

写在前面 工作中遇到&#xff0c;简单整理博文内容涉及基于 OpenCV 和 Dlib头部姿态评估的简单Demo理解不足小伙伴帮忙指正 庐山烟雨浙江潮&#xff0c;未到千般恨不消。到得还来别无事&#xff0c;庐山烟雨浙江潮。 ----《庐山烟雨浙江潮》苏轼 https://github.com/LIRUILONGS…

2023智源大会议程公开 | 大模型新基建与智力运营论坛

6月9日&#xff0c;2023北京智源大会&#xff0c;将邀请这一领域的探索者、实践者、以及关心智能科学的每个人&#xff0c;共同拉开未来舞台的帷幕&#xff0c;你准备好了吗&#xff1f;与会知名嘉宾包括&#xff0c;图灵奖得主Yann LeCun、OpenAI创始人Sam Altman、图灵奖得主…

【模型评估】混淆矩阵(confusion_matrix)之 TP、FP、TN、FN;敏感度、特异度、准确率、精确率

你这蠢货&#xff0c;是不是又把酸葡萄和葡萄酸弄“混淆”啦&#xff01;&#xff01;&#xff01;这里的混淆&#xff0c;我们细品&#xff0c;帮助我们理解名词“混淆矩阵” 上面日常情况中的混淆就是&#xff1a;是否把某两件东西或者多件东西给弄混了&#xff0c;迷糊了。把…

数据隐私保护的最佳实践:全面了解数据脱敏方案

1、数据脱敏 数据脱敏是一种保护敏感信息的安全措施&#xff0c;通常会将真实数据替换成模拟数据或者经过处理后的数据。下面是常见的数据脱敏实现方案&#xff1a; 字符串替换&#xff1a;将需要脱敏的字符串中指定位置的字符替换为“****”或其他符号。例如&#xff0c;将银…

MySQL数据库误删恢复

前言 经常听说删库跑路这真的不只是一句玩笑话,若不小心删除了数据库,事情很严重。你一个不小心可能会给公司删没。建议研发不要直连生成环境,一般的话都会分配账号权限,生产环境的账号尽量是只读,以防你一个不经意给库或表删除。一定要备份,这很重要,这是一个血的教训。…

iTOP3568开发板-Buildroot 系统设置待机和锁屏

Weston 的超时待机时长可以在启动参数中配置,也可以在 weston.ini 的 core 段配置。 方法一&#xff1a; 修改文件系统中/etc/init.d/S50launcher 文件&#xff0c;如下图所示的红框&#xff0c;0 代表禁止待机&#xff0c;可自行设置待机时间&#xff0c;单位是秒。 方法二&a…

深浅拷贝各种实现方式性能

拷贝方式 拷贝方式类型原理备注Object.clone()默认 浅拷贝&#xff0c;可以自定义实现深拷贝对象内存复制constructor可以实现深拷贝自定义实现BeanUtil.copyProperties()浅拷贝利用 getter/setter 实现属性拷贝反射&#xff0c;spring utilCollectionUtils.clone()深拷贝本质…

强化学习驱动的低延迟视频传输

随着视频会议、视频直播的流行以及未来AR/VR业务的发展&#xff0c;低延迟视频传输服务被广泛使用&#xff0c;但视频质量&#xff08;QoE&#xff09;还不能满足用户要求。那么近年来新兴的AI神经网络是否能为视频传输带来智能化的优化&#xff1f;今天LiveVideoStack大会北京…

macos m1 pip install lightgbm error

MacOS M1电脑&#xff0c;执行 pip install lightgbm 错误如下&#xff1a; 尝试如下操作&#xff1a; 参考链接如下&#xff1a; https://github.com/Microsoft/LightGBM/issues/1324 brew install cmake brew install gcc git clone --recursive https://github.com/Micro…

Unity之OpenXR+XR Interaction Toolkit接入HTC Vive解决手柄无法使用的问题

前言 随着Unity版本的不断进化,VR的接口逐渐统一,现在大部分的VR项目都开始使用OpenXR开发了。基于OpenXR,我们可以快速适配HTC,Pico,Oculus,等等设备。 今天我们要说的问题就是,当我们按照官方的标准流程配置完OpenXR后(参考:Unity之OpenXR+XR Interaction Toolkit…

西门子S7-200 CPU输入/输出接线说明

总结来看&#xff0c;S7-200系列PLC提供4个不同的基本型号的8种CPU&#xff0c;其接线方式也可大致分为6种&#xff1a; 1.CPU SR20接线 2.CPU SR40接线 3.CPU CR40接线 4.CPU ST40接线 5. CPU SR60接线 6. CPU ST60接线 除了CPU外&#xff0c;我们还需要了解200smart PLC的数…

排序算法大总结(插入、希尔、选择、堆、冒泡、快速、归并、计数)

1. 排序概要2. 插入排序直接插入排序希尔排序&#xff08;缩小增量排序&#xff09; 3.选择排序直接选择排序堆排序 4. 交换排序冒泡排序快速排序霍尔版本&#xff08;hoare&#xff09;挖坑法双指针版本快排优化快速排序非递归 5. 归并排序归并递归版本归并非递归版本 6.计数排…