六、RocketMQ发送事务消息

news2025/1/24 1:41:42

事务消息介绍

在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

当主分支订单系统状态更新失败后,物流、积分、购物车系统都不应该接收到消息

事务消息的发送流程

使用普通消息是做不到的,因为他会直接将消息发送到topic中

而事务消息参考了两阶段提交的原理,

  1. 先把消息发送broker中
  2. 当消息发送成功后,会执行本地事务
  3. 通过本地事务的执行情况,返回一个状态
  4. 状态对应三种情况
    • LocalTransactionState.UNKNOW:需要broker调用发送端的回查机制
    • LocalTransactionState.COMMIT_MESSAGE:broker将消息发送到指定的topic,此时消费端可以接收到消息
    • LocalTransactionState.ROLLBACK_MESSAGE:broker丢弃消息,不发送到指定的topic,消费端接收不到消息

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述

@Test
public void sendTrans() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
    // 创建事务消息发送客户端
    TransactionMQProducer transProducer = new TransactionMQProducer("test-trans-producer");

    transProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);

    // 指定回查事务消息时的线程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    transProducer.setExecutorService(executorService);

    // 设置事务监听器
    transProducer.setTransactionListener(new TransactionListener() {
        // 执行本地事务
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println(Thread.currentThread().getName() + ":执行本地事务");

            // 触发回查机制
            return LocalTransactionState.UNKNOW;
        }

        // 回查本地事务,如果执行本地事务返回UNKNOW状态或者生产者应用退出导致本地事务未提交任何状态
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            System.out.println(Thread.currentThread().getName() + ":触发事务回查");

            // 提交事务
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    });

    transProducer.start();

    Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes());
    // 发送事务消息
    SendResult send = transProducer.sendMessageInTransaction(message,null);
    System.out.println(send.getSendStatus());

    Thread.sleep(Integer.MAX_VALUE);
}

注:需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1094651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL有时候命中索引有时候又不命中

索引失效的情况 -----可能 索引主要看where 、group by 、order by 1.组合索引不遵循最佳左前缀法制。最佳左前缀法制&#xff1a;如果索引了多列&#xff0c;要遵循最左前缀法则&#xff0c;指的是查询从索引的最左前列开始并且不跳过索引中的列。如组合索引为A B C 只有ABC,A…

【蓝桥】数树数

一、题目 1、题目描述 给定一个层数为 n n n 的满二叉树&#xff0c;每个点编号规则如下&#xff1a; 具体来说&#xff0c;二叉树从上往下数第 p p p 层&#xff0c;从左往右编号分别为&#xff1a;1,2,3,4&#xff0c;…, 2p-1。 给你一条从根节点开始的路径&#xff0…

Node.js初体验

Node.js简介 node.js的运行环境 1.V8引擎对js代码进行解析与执行 2.内置API&#xff1a;fs、path、http...等&#xff0c;提供了一些能力&#xff0c;能够使得js调用这些API去做一些后端的事情 流程&#xff1a;我们在node.js的运行环境中编写待执行的JavaScript代码&#…

Spring Cloud Gateway 使用 Redis 限流使用教程

从本文开始&#xff0c;笔者将总结 spring cloud 相关内容的教程 版本选择 为了适应 java8&#xff0c;笔者选择了下面的版本&#xff0c;后续会出 java17的以SpringBoot3.0.X为主的教程 SpringBoot 版本 2.6.5 SpringCloud 版本 2021.0.1 SpringCloudAlibaba 版本 2021.0.1.…

单目3D目标检测——MonoCon 模型训练 | 模型推理

本文分享 MonoCon 的模型训练、模型推理、可视化3D检测结果、以及可视化BEV效果。 模型原理&#xff0c;参考我这篇博客&#xff1a;【论文解读】单目3D目标检测 MonoCon&#xff08;AAAI2022&#xff09;_一颗小树x的博客-CSDN博客 源码地址&#xff1a;https://github.com/2…

在vs code中创建一个名为 “django_env“ 的虚拟环境报错?!以下或许方法可以解决

# vs code 终端窗口中运行&#xff1a; mkvirtualenv django_env # 拓展&#xff1a; mkvirtualenv django_env 是一个命令&#xff0c;用于创建一个名为 "django_env" 的虚拟环境。虚拟环境是一种用于隔离不同Python项目所需依赖的工具。通过创建虚拟环境&#x…

【分布式计算】九、容错性 Fault Tolerance

分布式系统应当有一定的容错性&#xff0c;发生故障时仍能运行 一些概念&#xff1a; 可用性Availability&#xff1a;系统是否准备好立即使用 可靠性Reliability&#xff1a;系统连续运行不发生故障 安全性&#xff1a;衡量安全故障的指标&#xff0c;没有严重事件发生 可维护…

zabbix内置宏、自动发现与注册

一、zabbix内置宏 1、概念&#xff1a; 在Zabbix中&#xff0c;内置宏是一种特殊的变量&#xff0c;通常用在 Trigger 名称和表达式中&#xff0c;引用有关监控对象的信息。 2、种类&#xff1a; {HOST.NAME} 主机名 {HOST.IP} 主机 IP 地址 {TRIGGER.DESCRIPTION} 触…

Unity中Shader的深度缓冲区

文章目录 前言一、什么是深度缓冲区深度缓冲区是和颜色缓冲区、模板缓冲区平行的一个缓冲区在这里插入图片描述 二、什么是深度信息三、深度缓冲区的作用 前言 Unity中的深度缓冲区 一、什么是深度缓冲区 深度缓冲区是和颜色缓冲区、模板缓冲区平行的一个缓冲区 深度缓冲区&a…

勒索软件组织声称它“损害了所有索尼系统”

新晋勒索软件组织 RansomedVC 声称已成功入侵娱乐巨头索尼的计算机系统。正如勒索软件团伙所做的那样&#xff0c;它在其暗网网站上发布了这一消息&#xff0c;并在那里出售从受害者计算机网络中窃取的数据。 该公告称索尼的数据正在出售&#xff1a; 索尼集团公司&#xff08…

数据结构与算法--并查集结构

数据结构与算法--并查集结构 1 岛问题 2 并查集结构 1 岛问题 一个矩阵中只有0和1两种值&#xff0c;每个位置都可以和自己的上、下、左、右 四个位置相连&#xff0c;如果有一片1连在一起&#xff0c;这个部分叫做一个岛&#xff0c;求一个矩阵中有多少个岛? 【举例】 001…

FutureTask的测试使用和方法执行分析

FutureTask类图如下 java.util.concurrent.FutureTask#run run方法执行逻辑如下 public void run() {if (state ! NEW ||!RUNNER.compareAndSet(this, null, Thread.currentThread()))return;try {Callable<V> c callable;if (c ! null && state NEW) {V res…

【软考】9.4 图的概念/存储/遍历/最小生成树/拓扑/查找

《图》 图的存储&#xff08;顶点和边&#xff09; 邻接矩阵&#xff1a;适合边数较多的图&#xff0c;不易造成浪费无向图&#xff1a;不分方向&#xff1b;对称矩阵 邻接链表&#xff1a;顶点&#xff0c;边——>&#xff08;编号&#xff0c;权值&#xff09;&#xff1b…

二维数组的行指针与列指针

二维数组的行指针与列指针 笔记来源&#xff1a;懒猫老师-C语言-用指针访问二维数组&#xff08;指针与二维数组&#xff09; 声明&#xff1a;本文笔记来自bili懒猫老师&#xff0c;仅供学习参考 回顾一维数组 int a[3];//其中a或a[0]是数组首地址 //a1指向第二个元素的地址…

The SDK location is inside Studio install location 解决

The SDK location is inside Studio install location 解决 安装 Android Studio SDK 时提示&#xff1a;The SDK location is inside Studio install location 解决 问题&#xff1a; 由于 SDK 与 编辑器(Android Studio)的安装在同一目录下所以报错。 解决 你需要在 Andro…

五、RocketMQ发送顺序消息

顺序消息的应用场景 在有序事件处理、撮合交易、数据实时增量同步等场景下&#xff0c;异构系统间需要维持强一致的状态同步&#xff0c;上游的事件变更需要按照顺序传递到下游进行处理。 例如需要保证一个订单的生成、付款和发货&#xff0c;这三件事情是被顺序执行的。 如…

C# OpenVINO Det 物体检测

效果 耗时 elephant:89% Preprocess: 0.00ms Infer: 47.21ms Postprocess: 11.63ms Total: 58.84ms 项目 代码 using OpenCvSharp; using Sdcb.OpenVINO; using Sdcb.OpenVINO.Natives; using System; using System.Diagnostics; using System.Drawing; using System.Text; …

淘宝店铺订单详情接口,淘宝店铺订单列表数据,淘宝店铺订单物流接口,淘宝店铺订单线下发货接口-

淘宝店铺订单详情接口可以理解为淘宝开放平台&#xff08;Taobao Open Platform&#xff09;提供的API接口1。 该接口可以获取淘宝店铺的订单详情&#xff0c;包括订单号、买家昵称、收货人姓名、收货地址、邮编、电话、商品描述、价格等信息。开发者可以在淘宝开放平台注册一…

使用winUSB进行USB开发

什么是winUSB WinUSB是Windows操作系统提供的一种通用USB驱动程序&#xff0c;用于简化USB设备的开发和使用。它是一个用户模式驱动程序&#xff0c;可以在Windows XP及更高版本的操作系统上使用。WinUSB提供了一组API和工具&#xff0c;使开发人员能够与USB设备进行通信&…

C++11可变参数模板

文章目录 1.可变参数模板的介绍1.1C语言中的可变参数1.2C98/C11的类模板和函数模板1.3可变参数的函数模板1.4展开参数包递归展开初始化列表展开 2.可变参数模板的应用 1.可变参数模板的介绍 1.1C语言中的可变参数 1.2C98/C11的类模板和函数模板 C98/03&#xff0c;类模版和函…