RocketMQ事务消息 超时重发还是原来的消息吗?

news2024/12/25 0:23:34

以下面的一个demo例子来分析一下,探索RocketMQ事务消息原理。

    public static final String PRODUCER_GROUP = "tran-test";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "Test";

    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println(String.format("executeLocalTransaction: %s", msg.getTransactionId()));
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println(String.format("checkLocalTransaction: tranId=%s, commitLogOffset=%s, queueOffset=%s, msgId=%s",
                        msg.getTransactionId(), msg.getCommitLogOffset(),
                        msg.getQueueOffset(), msg.getMsgId()));
                return LocalTransactionState.UNKNOW;
            }
        };
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
        producer.setTransactionListener(transactionListener);
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();
        Message msg = new Message(TOPIC, "test".getBytes());
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println(String.format("sendResult: tranId=%s, offsetMsgId=%s, queueOffset=%s msgId=%s",
                sendResult.getTransactionId(), sendResult.getOffsetMsgId(),
                sendResult.getQueueOffset(), sendResult.getMsgId()));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }
executeLocalTransaction: C0DE00428BEC18B4AAC27F377B6E0000
sendResult: tranId=C0DE00428BEC18B4AAC27F377B6E0000, offsetMsgId=null, queueOffset=82 msgId=C0DE00428BEC18B4AAC27F377B6E0000
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315411, queueOffset=83, msgId=C0DE004200002A9F0000000000141253
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315805, queueOffset=84, msgId=C0DE004200002A9F00000000001413DD
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316199, queueOffset=85, msgId=C0DE004200002A9F0000000000141567
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316593, queueOffset=86, msgId=C0DE004200002A9F00000000001416F1
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316987, queueOffset=87, msgId=C0DE004200002A9F000000000014187B
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317381, queueOffset=88, msgId=C0DE004200002A9F0000000000141A05
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317775, queueOffset=89, msgId=C0DE004200002A9F0000000000141B8F
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318169, queueOffset=90, msgId=C0DE004200002A9F0000000000141D19
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318563, queueOffset=91, msgId=C0DE004200002A9F0000000000141EA3
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318957, queueOffset=92, msgId=C0DE004200002A9F000000000014202D
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319352, queueOffset=93, msgId=C0DE004200002A9F00000000001421B8
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319747, queueOffset=94, msgId=C0DE004200002A9F0000000000142343
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320142, queueOffset=95, msgId=C0DE004200002A9F00000000001424CE
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320537, queueOffset=96, msgId=C0DE004200002A9F0000000000142659
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320932, queueOffset=97, msgId=C0DE004200002A9F00000000001427E4

通过上述例子的输出结果可以发现,checkLocalTransaction中queueOffset、msgId都发生的变化。那么在broker中到底发生了什么呢。

事务消息原理

当客户端发送一个事务消息时,MessageConst.PROPERTY_TRANSACTION_PREPARED="true" 标记这个消息是一个事务消息。

        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

broker在收到消息时会取出traFlag,如果traFlag=true消息将交给TransactionalMessageService处理

        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        boolean sendTransactionPrepareMessage = false;
        if (Boolean.parseBoolean(traFlag)
            && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            }
            sendTransactionPrepareMessage = true;
        }

        long beginTimeMillis = this.brokerController.getMessageStore().now();

        if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
            CompletableFuture<PutMessageResult> asyncPutMessageFuture;
            if (sendTransactionPrepareMessage) {
                //处理事务消息
                asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
            } else {
                asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            }

TransactionalMessageService在保存消息时会将原来的topic使用RMQ_SYS_TRANS_HALF_TOPIC来替换,原topic信息存放在properties中。这样在是先把消息保存下来,而不让Consumer立刻就能收到。 当收到TransactionMQProducer发来的COMMIT_MESSAGE时,再将消息从RMQ_SYS_TRANS_HALF_TOPIC取出替换成原来的topic写入。同时再向RMQ_SYS_TRANS_OP_HALF_TOPIC的topic中也写一份。 broker通过对比RMQ_SYS_TRANS_OP_HALF_TOPIC和RMQ_SYS_TRANS_HALF_TOPIC中是否同时存在来判断事务消息是否结束了。 当收到的不是COMMIT_MESSAGE而是UNKNOW时,TransactionalMessageCheckService会定时回调TransactionMQProducer#checkLocalTransaction查询本地事务状态,默认最多检查15次。

TransactionalMessageCheckService TransactionalMessageCheckService是一个运行在broker中的一个线程,线程默认每1分钟执行一次来检测系统中超时的half事务消息并发起重试。

    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                //每个half queue都有一个对应的op queue
                MessageQueue opQueue = getOpQueue(messageQueue);
                //获取当前未完成的half queue的offset
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                //获取当前已完成的op queue的offset
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                ......
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                long nextOpOffset = pullResult.getNextBeginOffset();
                int putInQueueCount = 0;
                int escapeFailCnt = 0;

                while (true) {
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    if (removeMap.containsKey(i)) {
                        ......
                    } else {
                        //从RMQ_SYS_TRANS_HALF_TOPIC取出half消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                    	......
                        ......
                        //是否需要丢弃消息
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        ......
                        //判断上次check是否超时
                        boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                            || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                            || valueOfCurrentMinusBorn <= -1;

                        if (isNeedCheck) {
                            //超时
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            putInQueueCount++;
                            log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",
                                    msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
                                    msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                                    msgExt.getQueueOffset(), msgExt.getCommitLogOffset());
                            //重新给TransactionListener发起check请求
                            listener.resolveHalfMsg(msgExt);
                ......
                ......
                ......
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }                            

 

上述代码中有三个比较重要的细节,needDiscard、putBackHalfMsgQueue和listener.resolveHalfMsg。 needDiscard:从half queue取出来后判断消息的TRANSACTION_CHECK_TIMES属性是否大于15次。 小于15次,则TRANSACTION_CHECK_TIMES属性值+1。 大于15次,则从RMQ_SYS_TRANS_HALF_TOPIC中丢弃,通过listener.resolveDiscardMsg保存在TRANS_CHECK_MAX_TIME_TOPIC中交由人工处理。 putBackHalfMsgQueue:将消息重新插入一份到RMQ_SYS_TRANS_HALF_TOPIC,因为CommitLog的applyOnly特性不能修改原消息。所以需要重新apply消息导致queueOffset、commitLogOffset、msgId都发生了变化。

    private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
        PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
        if (putMessageResult != null
            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
            msgExt.setQueueOffset(
                putMessageResult.getAppendMessageResult().getLogicsOffset());
            msgExt.setCommitLogOffset(
                putMessageResult.getAppendMessageResult().getWroteOffset());
            msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            log.debug(
                "Send check message, the offset={} restored in queueOffset={} "
                    + "commitLogOffset={} "
                    + "newMsgId={} realMsgId={} topic={}",
                offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
                msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                msgExt.getTopic());
            return true;
listener.resolveHalfMsg:通过回调resolveHalfMsg方法向TransactionMQProducer重发check。
    public void resolveHalfMsg(final MessageExt msgExt) {
        if (executorService != null) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        sendCheckMessage(msgExt);
                    } catch (Exception e) {
                        LOGGER.error("Send check message error!", e);
                    }
                }
            });
        } else {
            LOGGER.error("TransactionalMessageCheckListener not init");
        }
    }

    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            //取出与broker相连的netty channel发送check消息
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }

half消息示意图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1142836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何理解Quadratic Weighted Kappa?

Motivation 假定我们现在有 N N N个作文样例&#xff0c;以及它们对应的人类评分和GPT评分。评分一共有 C C C个互斥类别&#xff0c;分别是{0,1,2,3}。现在我们要衡量人类评分和GPT评分的一致性。 一个很直观的想法是&#xff0c;画出混淆矩阵&#xff0c;然后将对角线上的值…

Linux Centos7安装后,无法查询到IP地址,无ens0,只有lo和ens33的解决方案

文章目录 前言1 查看network-scripts目录2 创建并配置 ifcfg-ens33 文件3 禁用NetworkManager4 重新启动网络服务总结 前言 在VMware中&#xff0c;安装Linux centos7操作系统后&#xff0c;想查询本机的IP地址&#xff0c;执行ifconfig命令 ifconfig结果如下&#xff1a; 结…

吴恩达《机器学习》1-5:模型描述

一、单变量线性回归 单变量线性回归是监督学习中的一种算法&#xff0c;通常用于解决回归问题。在单变量线性回归中&#xff0c;我们有一个训练数据集&#xff0c;其中包括一组输入特征&#xff08;通常表示为&#x1d465;&#xff09;和相应的输出目标&#xff08;通常表示为…

UVa140 Bandwidth(带宽)

1、题目 2、题意 给出一个 n &#xff08; n ≤ 8 &#xff09; n&#xff08;n≤8&#xff09; n&#xff08;n≤8&#xff09;个结点的图G和一个结点的排列&#xff0c;定义结点 i i i 的带宽 b ( i ) b(i) b(i) 为 i i i 和相邻结点在排列中的最远距离&#xff0c;而所…

Ansible上通过roles简化playbook演示介绍

目录 一.roles介绍 1.作用 2.role的目录结构 3.role和tasks的执行优先级顺序 二.自定义一个httpd的角色 1.完整目录结构展示 2.主要的各个目录配置 &#xff08;1&#xff09;vars目录和templates目录 &#xff08;2&#xff09;tasks目录和handlers目录 &#xff08…

操作系统中套接字和设备独立性软件的关系

网络编程就是编写程序让两台联网的计算机相互交换数据。在我们不需要考虑物理连接的情况下&#xff0c;我们只需要考虑如何编写传输软件。操作系统提供了名为“套接字”&#xff0c;套接字是网络传输传输用的软件设备。 这是对软件设备的解释&#xff1a; 在操作系统中&#…

Unity ScrollView最底展示

Unity ScrollView最底展示 问题方案逻辑 问题 比如在做聊天界面的时候我们肯定会使用到ScrollView来进行展示我们的聊天内容&#xff0c;那么这个时候来新消息的时候就需要最底展示&#xff0c;我认为这里有两种方案&#xff1b; 一种是通过算法每一条预制体的高度*一共多少…

轮转数组(Java)

大家好我是苏麟 , 这篇文章是凑数的 ... 轮转数组 描述 : 给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 题目 : 牛客 NC110 旋转数组: 这里牛客给出了数组长度我们直接用就可以了 . LeetCode 189.轮转数组 : 189. 轮…

Python---break关键字对for...else结构的影响

for循环中添加else结构 循环可以和else配合使用&#xff0c; else下方缩进的代码指的是当循环正常结束之后要执行的代码。 强调&#xff1a; 循环 正常结束&#xff0c;else之后要执行的代码。 非正常结束&#xff0c;其else中的代码是不会执行的。&#xff08;如遇到br…

类和对象(1):类,对象,this指针

面向过程和面向对象初步认识&#xff1a; C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出问题求解的步骤&#xff0c;用函数调用逐步解决。C是基于面向对象的&#xff0c;关注的是对象&#xff0c;将一件事情拆分成不同的对象&#xff0c;靠对象之间的交互完成。…

【.NET Core】创建一个在后台运行的控制台程序(ConsoleApp)

文章目录 1. 添加Nuget包2. 修改Program.cs3. 添加TestService 借助.NET的通用主机&#xff08;IHostBuilder&#xff09;可以轻易创建一个可以执行后台任务的程序 1. 添加Nuget包 Microsoft.Extensions.Hosting 2. 修改Program.cs 通过Host获取IHostService&#xff0c;然…

SSD: Single Shot MultiBox Detector(2016.11)

文章目录 AbstractIntroduction此前本文贡献总结如下: The Single Shot Detector (SSD)SSD ModelMulti-scale feature maps for detectionConvolutional predictors for detectionDefault boxes and aspect ratiosTrainingMatching strategyTraining objectiveChoosing scales …

python---for循环结构中的else结构(是同级关系)

为什么需要在for循环中添加else结构 循环可以和else配合使用&#xff0c; else下方缩进的代码指的是当循环正常结束之后要执行的代码。 强调&#xff1a; 循环 正常结束&#xff0c;else之后要执行的代码。 非正常结束&#xff0c;其else中的代码是不会执行的。&#xf…

GienTech动态|入选软件和信息技术服务名牌企业;荣获城市数字化转型优秀案例;参加第四届深圳国际人工智能展

中电金信入选“2023第二届软件和信息技术服务名牌企业” 近日&#xff0c;中国电子信息行业联合会发布了“2023第二届软件和信息技术服务名牌企业”名单&#xff0c;中电金信入选。此名单发布原则&#xff0c;重点突出技术创新力。突出市场影响力&#xff0c;品牌建设良好&…

Leetcode刷题笔记--Hot81--90

1--打家劫舍III 主要思路&#xff1a; 基于从下到上的 dp 回溯法&#xff0c;每一个节点只有两种状态&#xff0c;dp[0]表示被打劫&#xff0c;dp[1]表示不被打劫&#xff1b; 当前节点被打劫时&#xff0c;其孩子只能都不被打劫&#xff1b;dp[0] left[1] right[1] cur->…

redis集群理论和搭建

目录 环境 一&#xff0c;安装和部署redis 1&#xff0c;安装 2&#xff0c;部署 ​编辑 3&#xff0c;允许非本机连接redis 二、主从模式 主从模式搭建&#xff1a; 三&#xff0c;哨兵模式 哨兵模式搭建 四&#xff0c;集群模式 架构细节: 心跳机制 集群模式搭建&#xff1a…

【NLP】word复制指定内容到新的word文档

目录 1.python代码 2.结果 需求&#xff1a; 复制word文档里的两个关键字&#xff08;例如“起始位置”到“结束位置”&#xff09;之间的内容到新的word文档。 前提&#xff1a;安装win32包&#xff0c;通过pip install pywin32命令直接安装。话不多说&#xff0c;直接上代码…

底层全部重构,小米澎湃OS完整系统架构公布

上周&#xff0c;雷军发文称小米全新操作系统澎湃 OS 正式版已完成封包&#xff0c;将逐步接替 MIUI。而后&#xff0c;又有网友曝光小米澎湃 OS 界面。 今日&#xff0c;雷军再度发表长文预热小米澎湃 OS&#xff0c;正式公布了完整系统架构。 据介绍&#xff0c;从架构设计之…

CSS基础入门03

目录 1.圆角矩形 1.1基本用法 1.2生成圆形 1.3生成圆角矩形 1.4展开写法 2.Chrome 调试工具--查看 CSS 属性 2.1打开浏览器 2.2标签页含义 2.3elements 标签页使用 3.元素的显示模式 3.1块级元素 3.2行内元素/内联元素 3.3行内元素和块级元素的区别 3.4改变显示模…

Leetcode刷题详解——寻找峰值

1. 题目链接&#xff1a;162. 寻找峰值 2. 题目描述&#xff1a; 峰值元素是指其值严格大于左右相邻值的元素。 给你一个整数数组 nums&#xff0c;找到峰值元素并返回其索引。数组可能包含多个峰值&#xff0c;在这种情况下&#xff0c;返回 任何一个峰值 所在位置即可。 你可…