012 rocketmq事务消息

news2025/3/3 14:21:48

文章目录

  • 事务消息
    • 概念介绍
    • 交互流程
    • 事务消息原理
    • TransactionListener接⼝
    • TransactionProducer.java
    • TransactionConsumer.java

事务消息

内置topic中的消息对消费者不可见
本地事务+mq消息=事务消息

消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。

概念介绍

事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列RocketMQ 事务消息能达到分布式事务的最终⼀致。

半事务消息:暂不能投递的消息,发送⽅已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到⽣产者对该消息的⼆次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于⽹络闪断、⽣产者应⽤重启等原因,导致某条事务消息的⼆次确认丢失,消息队列RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息⽣产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

交互流程

事务消息交互流程如下图所示。
事务消息
事务消息发送步骤如下:

  1. 发送⽅将半事务消息发送⾄消息队列 RocketMQ 版服务端。
  2. 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送⽅返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
  3. 发送⽅开始执⾏本地事务逻辑。
  4. 发送⽅根据本地事务执⾏结果向服务端提交⼆次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅⽅最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅⽅将不会接受该消息。

事务消息回查步骤如下:

  1. 在断⽹或者是应⽤重启的特殊情况下,上述步骤 4 提交的⼆次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送⽅收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  3. 发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤 4 对半事务消息进⾏操作。

注意事项

  1. 事务消息不⽀持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次⽽导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是⽤户可以通过 Broker 配置⽂件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个⾏为。
  3. 事务消息将在 Broker 配置⽂件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时,⽤户还可以通过设置⽤户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查
  5. 提交给⽤户的⽬标主题消息可能会失败,⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过 RocketMQ本身的⾼可⽤性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使⽤同步的双重写⼊机制。
  6. 事务消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的⽣产者 ID 查询到消费者。

事务消息原理

HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
事务消息替换主题,保存原主题和队列信息
半消息对Consumer不可⻅,不会被投递
OP消息: RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作)
Rollback:只做记录
Commit:根据备份信息重新构造消息并投递
回查:
对⽐HALF消息和OP消息进⾏回查

TransactionListener接⼝

要使⽤RocketMQ的事务消息,要实现⼀个TransactionListener的接⼝,这个接⼝中有两个⽅法,如下:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will
     * be invoked to execute local transaction.
     * 执⾏本地事务
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final
    Object arg);

    /**
     * When no response to prepare(half) message. broker will send check
     * message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     * 消息回查后,需要检查对应消息的本地事务执⾏的最终结果
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执⾏完send⽅法后,进⼊的prepared状态,进⼊prepared状态以后,就要执⾏executeLocalTransaction⽅法,这个⽅法的返回值有3个,也决定着这个消息的命运,1.LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进⼊到commited状态,消费者可以消费这个消息;
2.LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
3.LocalTransactionState.UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,⽽最终决定这个消息命运的,是checkLocalTransaction这个⽅法。
当executeLocalTransaction⽅法返回UNKNOW以后,RocketMQ会每隔⼀段时间调⽤⼀次checkLocalTransaction,这个⽅法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个⽅法多⻓时间调⽤⼀次呢?我们在BrokerConfig类中可以找到,

/**
 * Transaction message check interval.
 */
 @ImportantField
 private long transactionCheckInterval = 60 * 1000;

这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能⽆休⽌的检查吧,我们在BrokerConfig类中可以找到

/**
 * The maximum number of times the message was checked, if exceed this
value, this message will be discarded.
 */
 @ImportantField
 private int transactionCheckMax = 15;

这个是检查的最⼤次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。

TransactionProducer.java

package com.example.rocketmq.demo.transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.TimeUnit;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("GroupTransaction");
        //2.指定nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //3.添加事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法执行本地事务
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                //db ,本地事务 + mq消息 = 事务消息
                System.out.println("executeLocal:"+msg.getTags());
                if(StringUtils.equals("TAGA",msg.getTags())){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if(StringUtils.equals("TAGB",msg.getTags())){
                    //B消息本地事务返回rollback
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }else if(StringUtils.equals("TAGC",msg.getTags())){
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 该方法用于MQ进行消息的回查
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("checkLocalTransaction:"+msg.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        //4.启动producer
        producer.start();
        String[] tags = {"TAGA","TAGB","TAGC"};
        //发送三条消息
        for (int i = 0; i < 3; i++) {
            // 发送A、B、C三条
            Message msg = new Message("TransactionTopic", tags[i],
                    ("Hello RocketMQ: " + tags[i]).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            //6.发送消息
            SendResult sendResult = producer.sendMessageInTransaction(msg,null);
            //7.获取发送状态
            SendStatus sendStatus = sendResult.getSendStatus();
            System.out.printf("发送结果:%s%n", sendStatus);
            TimeUnit.SECONDS.sleep(1);
        }
        //8.关闭生产者producer
//        producer.shutdown();
    }
}

TransactionConsumer.java

package com.example.rocketmq.demo.transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 1.创建消费者consumer、制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupTransaction");

        // 2.指定nameserver地址
        consumer.setNamesrvAddr("localhost:9876");

        // 3.订阅主题Topic和Tag
        consumer.subscribe("TransactionTopic", "*");

        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for(MessageExt msg:msgs){
                    String key = msg.getKeys();
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));

                }
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5.启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2308412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot原理-02.自动配置-概述

一.自动配置 所谓自动配置&#xff0c;就是Spring容器启动后&#xff0c;一些配置类、bean对象就自动存入了IOC容器当中&#xff0c;而不需要我们手动声明&#xff0c;直接从IOC容器中引入即可。省去了繁琐的配置操作。 我们可以首先将spring项目启动起来&#xff0c;里面有一…

知识图谱+智能问诊预诊系统vue+django+neo4j架构、带问诊历史

文章结尾部分有CSDN官方提供的学长 联系方式名片 文章结尾部分有CSDN官方提供的学长 联系方式名片 关注B站&#xff0c;有好处&#xff01; &#x1f90d;编号&#xff1a;D032 &#x1f90d;智能问答&#xff1a;智能问答自诊、预诊功能&#xff0c;同时可以保存问答历史 &…

DeepSeek 助力 Vue3 开发:打造丝滑的悬浮按钮(Floating Action Button)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

Java数据结构_一篇文章了解常用排序_8.1

本文所有排序举例均默认为升序排列。 目录 1. 常见的排序算法 2. 常见排序算法的实现 2.1 插入排序 2.1.1 基本思想&#xff1a; 2.1.2 直接插入排序 2.1.3 希尔排序&#xff08;缩小增量排序&#xff09; 2.2 选择排序 2.2.1 基本思想&#xff1a; 2.2.2 直接选择排…

(南京观海微电子)——倍压设计与应用

在电路设计过程中&#xff0c;当后级需要的电压比前级高出数倍而所需要的电流并不是很大时&#xff0c;就可以使用倍压整流电路。倍压整流&#xff1a;可以将较低的交流电压&#xff0c;用耐压较高的整流二极管和电容器&#xff0c;“整”出一个较高的直流电压。 01 倍压整流电…

网络安全-使用DeepSeek来获取sqlmap的攻击payload

文章目录 概述DeepSeek使用创建示例数据库创建API测试sqlmap部分日志参考 概述 今天来使用DeepSeek做安全测试&#xff0c;看看在有思路的情况下实现的快不快。 DeepSeek使用 我有一个思路&#xff0c;想要测试sqlmap工具如何dump数据库的&#xff1a; 连接mysql数据库&#…

【含文档+PPT+源码】基于SpringBoot+Vue医药知识学习与分享平台的设计与实现

项目介绍 本课程演示的是一款 基于SpringBootVue医药知识学习与分享平台的设计与实现&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署…

coze生成的工作流,发布后,利用cmd命令行执行。可以定时发日报,周报等。让他总结你飞书里面的表格。都可以

coze生成的工作流&#xff0c;发布后&#xff0c;利用cmd命令行执行。可以定时发日报&#xff0c;周报等。让他总结你飞书里面的表格。都可以。 很简单。 准备工作&#xff0c;先发布你的工作流&#xff0c;和发布应用。 然后&#xff0c;点击扣子API 。 申请一个&#xff0…

【Python】基础语法三

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解Python的函数、列表和数组。 > 毒鸡汤&#xff1a;有些事情&#xff0c;总是不明白&#xff0c;所以我不会坚持。早安! > 专栏选自&#xff…

使用 Spring Boot 和 Keycloak 的 OAuth2 快速指南

1. 概述 本教程是关于使用 Spring Boot 和 Keycloak 通过 OAuth2 配置后端的。 我们将使用 Keycloak 作为 OpenID 提供程序。我们可以将其视为负责身份验证和用户数据&#xff08;角色、配置文件、联系信息等&#xff09;的用户服务。它是最完整的 OpenID Connect &#xff0…

第三十六:6.6. 【$refs、$parent】

概述&#xff1a; $refs用于 &#xff1a;父→子。 $parent用于&#xff1a;子→父。 // 向外部提供数据 defineExpose({house}) 原理如下&#xff1a; 属性说明$refs值为对象&#xff0c;包含所有被ref属性标识的DOM元素或组件实例。$parent值为对象&#xff0c;当前组件…

【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f44d; 欢迎点赞、收藏、关注&#xff0c;跟上我的更新节奏 &#x1f4da;欢迎订阅专栏…

【开源免费】基于SpringBoot+Vue.JS网络海鲜市场系统(JAVA毕业设计)

本文项目编号 T 222 &#xff0c;文末自助获取源码 \color{red}{T222&#xff0c;文末自助获取源码} T222&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

Go中slice和map引用传递误区

背景 关于slice和map是指传递还是引用传递&#xff0c;很多文章都分析得模棱两可&#xff0c;其实在Go中只有值传递&#xff0c;但是很多情况下是因为分不清slice和map的底层实现&#xff0c;所以导致很多人在这一块产生疑惑&#xff0c;下面通过代码案例分析slice和map到底是…

C++ ++++++++++

初始C 注释 变量 常量 关键字 标识符命名规则 数据类型 C规定在创建一个变量或者常量时&#xff0c;必须要指定出相应的数据类型&#xff0c;否则无法给变量分配内存 整型 sizeof关键字 浮点型&#xff08;实型&#xff09; 有效位数保留七位&#xff0c;带小数点。 这个是保…

【北京迅为】iTOP-RK3568OpenHarmony系统南向驱动开发-第1章 GPIO基础知识

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

探秘《矩阵之美》:解锁矩阵的无限魅力

在这个数据驱动的时代&#xff0c;矩阵作为数学中的瑰宝&#xff0c;不仅在理论研究中占据核心地位&#xff0c;更在工程技术、计算机科学、物理学、经济学等众多领域发挥着不可替代的作用。今天&#xff0c;让我们通过中科院大学耿修瑞老师&#xff08;中科院空天信息研究院研…

进行性核上性麻痹患者的生活护理指南

进行性核上性麻痹是一种神经系统退行性疾病&#xff0c;合理的生活护理能有效改善症状&#xff0c;提高生活质量。 居家环境要安全。移除地面杂物&#xff0c;铺设防滑垫&#xff0c;安装扶手&#xff0c;降低跌倒风险。在浴室、厨房等湿滑区域要特别加强防护措施。建议在床边、…

pyside6学习专栏(八):在PySide6中使用matplotlib库绘制三维图形

本代码原来是PySide6官网的一个示例程序&#xff0c;我对其进行的详细的注释&#xff0c;同时增加了一个功能&#xff1a;加载显示cass的地形图坐标数据示例&#xff0c;示例可显示以下几种三维图形 程序运行界面如下&#xff1a; 代码如下&#xff1a; # -*- coding: utf-8 -…

松灵机器人地盘 安装 ros 驱动 并且 发布ros 指令进行控制

安装驱动 $ cd ~/catkin_ws/src $ git clone https://github.com/agilexrobotics/ugv_sdk.git $ git clone https://github.com/agilexrobotics/scout_ros.git $ cd .. $ catkin_make安装 ● 使能 gs_usb 内核模块 ● 设置 500k 波特率和使能 can-to-usb 适配器 sudo modp…