RocketMQ 与 Spring Cloud Stream之事务消息配置

news2024/9/21 0:34:22

1 引言

RocketMQ的事务消息设计是为了解决分布式系统中数据一致性的问题。在分布式系统中,由于数据可能分布在不同的服务或节点上,因此需要一种机制来确保数据的最终一致性。事务消息通过引入本地事务和消息状态的关联,确保了消息的发送与本地事务的执行结果紧密相关,从而避免了数据不一致的问题。

2 事务消息步骤

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    4.1 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    4.2 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
    5.1 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    5.2 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
  6. 注意:服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。

3 项目结构

本章内容以Spring Cloud Alibaba 快速学习之 RocketMQ中的项目为基础稍作修改。下面列出修改的文件。

3.1 项目rocketmq-producer

在这里插入图片描述

  • application.properties
    这里添加了事务相关的配置
 #spring应用程序监听的端口号
server.port=8080
#spring应用程序的名称
spring.application.name=rocketmq-producer
#spring当前激活的配置文件
spring.profiles.active=dev

#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-out-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-out-0.content-type=application/json

#rocketmq 事务消息配置
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.producerType=Trans
#rocketmq 事务消息分组
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 事务消息监听
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.transactionListener=RocketMQTransactionListener
  • rocketmq-producer/src/main/java/org/example/controller/TestController.java
 package org.example.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;


@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/send")
    public String send() {
        for (int i = 0; i < 5; i++) {
            Map<String, String> map = new HashMap<>();
            String id = i + "";
            map.put("id", id);
            map.put("msg", "测试消息");
            MessageBuilder<Map<String, String>> builder = MessageBuilder.withPayload(map);
            streamBridge.send("testchannel-out-0", builder.build());
        }
        return "消息发送成功!";
    }


}
  • rocketmq-producer/src/main/java/org/example/conf/RocketMQTransactionListener.java
    这里添加了事务监听器,注意@Component名称与配置文件中对应
package org.example.conf;

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

@Component("RocketMQTransactionListener")
public class RocketMQTransactionListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        String msg = new String(message.getBody());
        System.out.println("execute:" + msg);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        if (jsonObject.getIntValue("id") == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if (jsonObject.getIntValue("id") == 1) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("check:" + new String(messageExt.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

3.2 项目rocketmq-consumer-b

在这里插入图片描述

  • application.properties
    这里注释了接收广播消息
#spring应用程序监听的端口号
server.port=8082
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev

#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING

#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json

3.3 项目rocketmq-consumer-a

在这里插入图片描述

  • application.properties
    这里也是注释了接收广播消息
#spring应用程序监听的端口号
server.port=8081
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev

#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING

#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json

4 测试

4.1 同时启动三个子项目

在这里插入图片描述

4.2 发送消息

  • 打开浏览器访问:http://localhost:8080/test/send,可以看到5条消息都进入了executeLocalTransaction方法。

在这里插入图片描述 - 其中消息id为0二次确认结果为Commit,被consumerA正常接收,这与监听器中代码功能一致。
在这里插入图片描述- 其中消息id为1二次确认结果为Unknown,触发回查,在回查中正常Commit,被consumerB正常接收,这与监听器中代码功能一致。
在这里插入图片描述在这里插入图片描述- 其他消息二次确认结果为Rollback,服务端将回滚事务,不会将半事务消息投递给消费者,这与监听器中代码功能一致。

5 完整代码

Gitee代码链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2073138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【什么是“Binary“二进制文件?】

“Binary”二进制文件是计算机文件的一种形式。部件文件是开发人员编写的源代码文件&#xff0c;还未被编译成可执行的机器代码&#xff0c;通常具有如.c、.cpp、.java 等扩展名。对象文件是部件文件经过编译器编译生成的中间文件&#xff0c;包含了部件文件的机器代码和符号表…

链表OJ题——环形链表2

文章目录 一、题目链接二、解题思路三、解题代码 一、题目链接 环形链表2 题目描述&#xff1a;在链表有环的基础上&#xff0c;找出环的入口点。 二、解题思路 三、解题代码

移动端爬虫学习记录

免责声明 本文旨在探讨移动端爬虫技术的应用和挑战&#xff0c;仅供教育和研究用途。请确保在合法合规的框架内使用爬虫技术&#xff0c;遵循相关法律法规和网站的使用条款。作者不对因使用本文内容而产生的任何法律或安全问题承担责任。 1、初识移动端爬虫 学习移动端爬虫的原…

.NET 开发的高性能内网穿透工具

目录 前言 什么是NSmartProxy&#xff1f; 项目特点 运行原理 客户端安装 服务端安装 使用案例 项目地址 最后 前言 在许多情况下&#xff0c;我们需要从外部网络访问内部网络中的服务&#xff0c;比如家里的服务器或者公司的内部资源。这时内网穿透工具就可以帮助我们…

【吊打面试官系列-Memcached面试题】什么是二进制协议,我该关注吗?

大家好&#xff0c;我是锋哥。今天分享关于 【什么是二进制协议&#xff0c;我该关注吗&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; 什么是二进制协议&#xff0c;我该关注吗&#xff1f; 关于二进制最好的信息当然是二进制协议规范&#xff1a; 1000道 互…

【AI+编程】只需1句提示词0代码生成前端展示效果

最近被Vercel发布的V0 编程效果惊艳到了&#xff0c; 不管是前端开发 还是立志成为全栈工程师的 同学&#xff0c;不可错过。 官网地址&#xff1a;https://v0.dev/chat/ 代码生成工具很多&#xff0c;不管是github copilot、阿里的通义灵码&#xff0c; 腾讯云的AI代码助手…

python 多进程 多线程 程序

这个纯粹为了增加理解&#xff0c;将很多比较好的资料进行归纳总结。 1、理论汇总 并发和并行 image.png 多进程和多线程 同步和异步 同步&#xff1a;所谓同步&#xff0c;就是在发出一个功能调用时&#xff0c;在没有得到结果之前&#xff0c;该调用就不会返回。 异步…

027集—CAD中批量删除多段线重复点、距离过近点——vba代码实现

cad图中多段线存在重复点、或距离过近点&#xff0c;可通过vba插件一键删除。 &#xff08;精度可人工设定&#xff0c;例如精度设置0.001&#xff1a;小于0.001 的点视为重复点&#xff0c;删除此点。&#xff09; 如下图&#xff1a; 如下图&#xff1a; 大量重复点和距离…

【Gaussian splatting系列学习】(三)

3DGS系列&#xff08;一&#xff09; 3DGS系列&#xff08;二&#xff09; 3DGS系列&#xff08;三&#xff09; 3D高斯球的颜色 基函数&#xff1a; 任何一个周期性函数可以分解为正弦和余弦的线性组合 球谐函数&#xff1a; 任何一个球面坐标的函数可以用多个球谐函数来近…

FPGA开发——在线调试工具Signal Tap的使用

一、简介 在我们进行FPGA进行开发时通常都会经历代码编写&#xff0c;仿真&#xff0c;下板验证等过程。使用FPGA进行开发的小伙伴都知道&#xff0c;在代码编写时往往花费不了太长的时间&#xff0c;下板验证更是。在开发中占绝大部分时间的是仿真&#xff0c;有时候编写代码只…

C++类和对象(下):初始化列表、explicit关键字、友元函数、友元类

文章目录 C类和对象9、初始化列表9.1构造函数体赋值9.2初始化列表9.3 explicit&#xff08;显示&#xff09;关键字 10、友元10.1友元函数10.2友元类 C类和对象 9、初始化列表 一个类的构造函数要初始化成员变量有两种方式&#xff0c;一种是构造函数体赋值&#xff0c;另一种…

【C++二分查找】2817. 限制条件下元素之间的最小绝对差

本文涉及的基础知识点 C二分查找 LeetCode2817. 限制条件下元素之间的最小绝对差 给你一个下标从 0 开始的整数数组 nums 和一个整数 x 。 请你找到数组中下标距离至少为 x 的两个元素的 差值绝对值 的 最小值 。 换言之&#xff0c;请你找到两个下标 i 和 j &#xff0c;满…

python应用之内置hashlib库的哈希算法介绍

hashlib 是 Python 的一个内置模块&#xff0c;提供了像 SHA1, SHA256, MD5 等哈希算法。可以接受任意长度的字节数据作为输入&#xff0c;并输出一个固定长度的“哈希值”&#xff0c;通常用于校验数据的完整性。而且该算法是不可逆的&#xff0c;不能通过哈希值反算出原始数据…

zookeeper服务搭建

zookeeper服务搭建 前言1. 前置准备2. 下载和解压Zookeeper3. 配置环境变量4. 编辑Zookeeper配置文件5. 配置Zookeeper节点ID6. 配置好的Zookeeper分发到其他节点7. 启动Zookeeper集群参考博客 前言 Zookeeper是一个开源的分布式协调服务&#xff0c;主要用于解决分布式应用中的…

【Excal】OR 函数

语法&#xff1a; OR&#xff08;判断条件1&#xff0c;判断条件2&#xff0c;判断体件3&#xff0c;****&#xff09; 评优条件&#xff1a; 语文成绩高于90 数学成绩高于90 英语成绩高于85 物理成绩高于85 点击回车键 选中填充 回车 选中填充

echart legend 的使用及离开界面图表全局销毁

父组件 <template><AbnormalAlarmStatistics ref"abnormalAlarmStatistics" /> </template> <script setup> import {ref,reactive,computed,onMounted,getCurrentInstance,watch } from "vue";const { proxy } getCurrentInsta…

CentOS Docker搭建Mysql5.7集群

MySQL Replication MySQL提供了Replication功能&#xff0c;可以实现将一个数据库的数据同步到多台其他数据库。前者通常称之为主库&#xff08;master&#xff09;&#xff0c;后者则被称从库&#xff08;slave&#xff09;。MySQL复制过程采用异步方式&#xff0c;但延时非常…

图解Redis五大数据类型

五种数据类型的不同之处&#xff0c;是value在存储时的形式不同。 hash类型 value类型是<key,value>键值对。如果发生hash冲突&#xff0c;用开放定址法解决&#xff0c;不拉链&#xff01; key值重复&#xff0c;则新值覆盖旧值 List类型 Set类型 与List的类似&…

嵌入式Keil工具【微库】和【标准库】的对比

我们在学习或者用单片机做开发的时候,输出信息以及打印调试基本都会有用的 printf 函数,那么,这个时候基本都会用到【微库】。 如果使用 Keil 软件,就会勾选配置中的微库(MicroLib),如下图: 同样,在IAR、 e2 studio等开发单片机的集成开发环境中,也有类似的配置选项。…

电脑远程监控桌面软件集锦|(2024全网优秀资源整理!)

电脑远程监控桌面软件在企业管理和个人应用中扮演着重要的角色。 这些工具可以帮助企业提升工作效率、保护数据安全&#xff0c;同时也能在个人使用时提供便利。 以下是几款优秀的电脑远程监控桌面软件及其主要功能介绍&#xff1a; 1中科安企 特点&#xff1a;一款备受企业…