【RocketMQ】(七)事务实现原理

news2025/1/23 2:04:41

RocketMQ事务的使用场景

单体架构下的事务

在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。以创建订单为例,假设下单后需要做两个操作:

  1. 在订单表生成订单
  2. 在积分表增加本次订单增加的积分记录

在单体架构下只需使用@Transactional开启事务,就可以保证数据的一致性:

    @Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 生成订单
        orderService.createOrder(orderId);
        // 增加积分
        creditService.addCredits(orderId);
    }

然而现在越来越多系统开始使用分布式架构,在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中,在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚,所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。

分布式架构下的事务

分布式架构下如果需要保证事务的一致性,需要使用分布式事务,分布式事务的实现方式有多种,这里我们先看通过RocketMQ事务的实现方式。

同样以下单流程为例,在分布式架构下的处理流程如下:

  1. 订单服务生成订单
  2. 发送订单生成的MQ消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录

普通MQ消息存在的问题

如果使用@Transactional + 发送普通MQ的方式,看下存在的问题:

  1. 假如订单创建成功,MQ消息发送成功,但是order方法在返回的前一刻,服务突然宕机,由于开启了事务,事务还未提交(方法结束后才会正常提交),所以订单表并未生成记录,但是MQ却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况
  2. 假如先发送MQ消息再创建订单呢,此时问题就更明显了,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态
    @Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 创建订单
        Order order = orderService.createOrder(orderDTO.getOrderId());
        // 发送订单创建的MQ消息
        sendOrderMessge(order);
        return;
    }

解决上述问题的方式就是使用RocketMQ事务消息。

RocketMQ事务消息的使用

使用事务消息需要实现自定义的事务监听器,TransactionListener提供了本地事务执行和状态回查的接口,executeLocalTransaction方法用于执行我们的本地事务,checkLocalTransaction是一种补偿机制,在异常情况下如果未收到事务的提交请求,会调用此方法进行事务状态查询,以此决定是否将事务进行提交/回滚:

public interface TransactionListener {
    /**
     * 执行本地事务
     *
     * @param msg Half(prepare) message half消息
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * 本地事务状态回查
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

这里我们实现自定义的事务监听器OrderTransactionListenerImpl:

  • executeLocalTransaction方法中创建订单,如果创建成功返回COMMIT_MESSAGE,如果出现异常返回ROLLBACK_MESSAGE
  • checkLocalTransaction方法中回查事务状态,根据消息体中的订单ID查询订单是否已经创建,如果创建成功提交事务,如果未获取到认为失败,此时回滚事务。
public class OrderTransactionListenerImpl implements TransactionListener {

    @Autowired
    private OrderService orderService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String body = new String(msg.getBody(), Charset.forName("UTF-8"));
            OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
            // 模拟生成订单
            orderService.createOrder(orderDTO.getOrderId());
        } catch (Exception e) {
            // 出现异常,返回回滚状态
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 创建成功,返回提交状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String body = new String(msg.getBody(), Charset.forName("UTF-8"));
        OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
        try {
            // 根据订单ID查询订单是否存在
            Order order = orderService.getOrderByOrderId(orderDTO.getOrderId());
            if (null != order) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

接下来看如何发送事务消息,事务消息对应的生产者为TransactionMQProducer,创建TransactionMQProducer之后,设置上一步自定义的事务监听器OrderTransactionListenerImpl,然后将订单ID放入消息体中, 调用sendMessageInTransaction发送事务消息:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建下单事务监听器
        TransactionListener transactionListener = new OrderTransactionListenerImpl();
        // 创建生产者
        TransactionMQProducer producer = new TransactionMQProducer("order_group");
        // 事务状态回查线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 设置线程池
        producer.setExecutorService(executorService);
        // 设置事务监听器
        producer.setTransactionListener(transactionListener);
        // 启动生产者
        producer.start();
        try {
            // 创建订单消息
            OrderDTO orderDTO = new OrderDTO();
            // 模拟生成订单唯一标识
            orderDTO.setOrderId(UUID.randomUUID().toString());
            // 转为字节数组
            byte[] msgBody = JSON.toJSONString(orderDTO).getBytes(RemotingHelper.DEFAULT_CHARSET);
            // 构建消息
            Message msg = new Message("ORDER_TOPIC", msgBody);
            // 调用sendMessageInTransaction发送事务消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf(sendResult.toString());
            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

事务的执行流程:

  1. 在订单服务下单后,向Borker发送生成订单的事务消息,投递到ORDER_TOPIC主题中
  2. Broker收到事务消息之后,不会直接投递到ORDER_TOPIC主题中,而是先放在另外一个主题中,也叫half主题,half主题对消费者不可见
  3. half主题加入消息成功之后,会回调事务监听器的的executeLocalTransaction方法,执行本地事务,也就是订单创建,如果创建成功返回COMMIT状态,如果出现异常返回ROLLBACK状态
  4. 根据上一步的返回状态,进行结束事务的处理
    • 提交:从half主题中删除消息,然后将消息投送到ORDER_TOPIC主题中,积分服务订阅ORDER_TOPIC主题进行消费,生成积分记录
    • 回滚:从half主题中删除消息即可
  5. 如果本地事务返回的执行结果状态由于网络原因或者其他原因未能成功的发送给Broker,Broker未收到事务的执行结果,在补偿机制定时检查half主题中消息的事务执行状态时,会回调事务监听器checkLocalTransaction的接口,进行状态回查,判断订单是否创建成功,然后进行结束事务的处理

使用事务消息不会存在订单创建失败但是消息发送成功的情况,不过你可能还有一个疑问,假如订单创建成功了,消息已经投送到队列中,但是积分服务在消费的时候失败了,这样数据还是处于不一致的状态,个人感觉,积分服务可以在失败的时候进行重试或者进行一些其他的补偿机制来保证积分记录成功的生成,在极端情况下积分记录依旧没有生成,此时可能就要人工接入处理了。

RocketMQ事务实现原理

事务消息发送

一、 生产者发送事务消息
生产者在发送事务消息的时候,会在消息属性中设置PROPERTY_TRANSACTION_PREPARED属性,然后向Broker发送消息。

Broker收到消息后,会判断消息是否含有PROPERTY_TRANSACTION_PREPARED属性,如果没有该属性,表示是普通消息,按照普通消息的写入流程执行即可,如果有该属性
表示开启事务,还不能直接加入到实际的消息队列中,否则一旦加入就会被消费者消费,所以需要先对消息暂存,等收到消息提交请求时才可以添加到实际的消息队列中,RocketMQ设置了一个RMQ_SYS_TRANS_HALF_TOPIC主题来暂存事务消息,放入这个主题中的消息被称为half消息,它的处理逻辑如下:

  1. 设置消息实际的主题和队列ID,待收到事务提交请求后恢复实际的主题和队列ID,向实际的队列中添加消息(类似于延迟消息的实现);
  2. 更改消息的主题为half消息主题RMQ_SYS_TRANS_HALF_TOPIC
  3. 更改消息队列,默认使用RMQ_SYS_TRANS_HALF_TOPIC主题下ID为0的那个消息队列,将消息先投递到此队列中;

二、 执行本地事务
在上一步中,生产者向Broker发送了事务消息,发送之后生产者会根据返回的响应结果来判断消息是否发送成功:
(1)发送成功,此时执行本地事务,并返回本地事务执行结果状态,执行结果一般有以下三种;
* COMMIT_MESSAGE:表示执行成功;
* ROLLBACK_MESSAGE:执行失败需要回滚事务;
* UNKNOW:未知状态;
(2)未发送成功,比如FLUSH_DISK_TIMEOUT刷盘超时、FLUSH_SLAVE_TIMEOUTSLAVE_NOT_AVAILABLE从节点不可用等状态,此时意味着消息发送失败,本地事务状态置为ROLLBACK_MESSAGE准备回滚事务;

三、结束事务

经过了前两步骤之后,消息暂存在Broker的half主题中,也得到了本地事务的执行结果状态,接下来就需要根据本地事务的执行结果状态来决定回滚还是提交事务,首先会构建一个结束事务的请求头EndTransactionRequestHeader,请求头中会设置消息的偏移量等信息,然后根据事务的执行结果来设置不同的标识,上面知道事务执行结果一般有三种状态:

  1. COMMIT_MESSAGE:表示执行成功,可以提交事务,请求头中设置TRANSACTION_COMMIT_TYPE标识表示提交事务;
  2. ROLLBACK_MESSAGE:表示需要回滚事务,请求头中设置TRANSACTION_ROLLBACK_TYPE标识进行事务回滚;
  3. UNKNOW:事务执行结果未知状态,请求头中设置TRANSACTION_NOT_TYPE标识未知状态的事务;

之后会向Broker发送这个结束事务的请求,Broker收到请求后会做如下处理:

  1. 判断自己是否是从节点,从节点没有结束事务的权限,如果是从节点返回SLAVE_NOT_AVAILABLE状态;
  2. 从请求头中获取事务的提交类型,如果是TRANSACTION_NOT_TYPE打印warn信息,然后返回NULL,如果是其他类型,做如下处理:
    (1)TRANSACTION_COMMIT_TYPE标识:表示提交事务,请求信息中携带了消息的偏移量,会根据偏移量先查找消息是否存在,如果存在与请求头中携带的消息信息进行对比校验是否一致,校验通过才可以提交事务,此时会恢复消息原本的主题和队列,将消息投递到对应的队列中,然后将对应的half消息进行删除;
    (2)TRANSACTION_ROLLBACK_TYPE标识:表示回滚事务,同样会先根据请求中的消息偏移量进行查找并校验,通过之后,将对应的half消息进行删除;

消息删除

由于CommitLog追加写的性质,RocketMQ并不会直接将half消息从CommitLog中删除,而是使用了另外一个主题RMQ_SYS_TRANS_OP_HALF_TOPIC以下简称OP主题/队列),将已经删除的half消息记录在OP主题队列中,在事务状态检查时,需要通过这个OP队列来判断消息是否被标记了删除。

事务状态检查

由于各种原因有可能未成功收到提交/回滚事务的请求,所以RocketMQ需要定期检查half消息,检查事务的执行结果。在检查的时候会获取half主题(RMQ_SYS_TRANS_HALF_TOPIC)下的所有消息队列,遍历所有的half消息队列,对队列中的消息进行处理。

每个half消息队列,会有一个对应的OP队列,里面记录了被删除的half消息,首先需要从这个OP队列中拉取消息(因为不知道每条消息在OP队列中的哪个位置,所以需要不断拉取进行查找,每次会拉取32条),并放入到一个集合removeMap中,用于判断当前消息是否已经被标记了删除。

Broker记录了每个half队列的消费进度,每次检查时会获取上一次处理的位置,从这个位置之后继续处理队列中的每一条消息:

  1. 时间校验,如果当前时间减去检查开始时间大于最大处理时间,表示此次检查超时,终止循环等待下一次;
  2. 如果removeMap中包含当前消息,表示消息已经被删除,不需要进行处理;
  3. 如果removeMap不包含当前half消息,会根据消息偏移量获取half消息,如果消息获取不为空继续下一步;
  4. 判断当前half消费是否需要被丢弃或者跳过:
    • 丢弃:每次检查会记录本条消息的检查次数,并记在属性中,如果超过了最大的次数,表示消息需要做丢弃处理;
    • 跳过:如果消息在队列中的存留时间是否超过了设置的最大的保留时间,表示需要跳过,不进行处理;
  5. 判断消息的的存入时间是否大于本次开始检查的时间,如果大于说明是新加入的消息,由于事务消息发送后不会立刻提交,所以此时暂不需要进行检查,中断当前处理等待下一次检查;
  6. 如果消息属性中设置了PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS(事务最晚回查时间),判断half消息的存留时间是否超过了个值,如果未超过说明此时还未到回查的时间,并且当前消息未被删除,会将当前的消息重新加入half队列中,因为需要继续往后处理并在结束时更新进度,如果不重新将消息加入到队列中,这条消息就没办法再次处理;
  7. 判断是否需要进行事务回查,发送回查请求(处理回查请求时会调用checkLocalTransaction进行状态检查),回查请求通过线程池异步实现的,所以需要将half消息重新加入到队列中等待下次检查;
  8. 更新half队列的处理进度和OP队列的消费进度;

事务相关源码:【RocketMQ】【源码】事务的实现原理

参考

RocketMQ事务官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1038935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

6.5 图的遍历

前言&#xff1a; 主要内容&#xff1a; 这段文字描述的是图的遍历。图的遍历与树的遍历有些相似&#xff0c;都是从某个点开始&#xff0c;按照一定的规则遍历其他的点&#xff0c;但是图的遍历更加复杂&#xff0c;因为图中存在循环和回路&#xff0c;这意味着可能会多次访…

JavaWeb开发-09-MyBatis

官网&#xff1a;https://mybatis.org/mybatis-3/zh/index.html 一.Mybatis入门 1.快速入门 2.JDBC介绍 3.数据库连接池 官方地址&#xff1a;https://github.com/alibaba/druid/tree/master/druid-spring-boot-starter 4.lombok 二.Mybatis基础增删改查 1.准备 2.删除 3.新增…

AT24C02芯片

AT24C02简介&#xff1a; AT24C01/02/04/08/16...是一个 1K/2K/4K/8K/16K 位串行 CMOS内部有9个字节&#xff1b; 该器件通过 I2C 总线接口进行 操作&#xff0c;它有一个专门的写保护功能&#xff1b; 基于51 他有这个芯片操作 时序&#xff1a; AT24C02软件编程&#xff1a; …

darknet yolov3 模型训练步骤整理

1. 把需要训练的图片拷贝到 D:\模型训练\darknet-master\build\darknet\x64\data\VOCdevkit\VOC2012\JPEGImage 2. 执行imageName.py 文件&#xff0c;重命名JPEGImage下的图片。 D:\模型训练\darknet-master\build\darknet\x64\data\VOCdevkit\VOC2012\imageName.py 3. 用…

Termius 8 for Mac(多协议服务器连接软件)

Termius是一款远程访问和管理工具&#xff0c;旨在帮助用户轻松地远程连接到各种服务器和设备。它适用于多种操作系统&#xff0c;包括Windows、macOS、Linux和移动设备。 该软件提供了一个直观的界面&#xff0c;使用户可以通过SSH、Telnet和Mosh等协议连接到远程设备。它还支…

微表情识别API + c++并发服务器系统

微表情识别API系统 该项目只开源前后端程序&#xff0c;模型不开源 地址&#xff1a;https://github.com/lin-lai/-API- 更新功能 4.1版本 改用epoll实现IO多路复用并发服务器去除标志位设定—正在考虑用协议实现 项目介绍 本项目用于检测并识别视频中人脸的微表情 目标任…

【插件】页面引导库driver.js:

文章目录 一、效果图:二、实现思路:三、实现代码:【1】Driver.js 的技术特性【2】安装依赖【3】代码实现【4】 配置相关参数 一、效果图: 二、实现思路: 【官网】https://driverjs.com/docs/installation 【npm】https://www.npmjs.com/package/driver.js 【案例】改造driver.j…

用Python在XML和Excel表格之间实现互转

XML是一种超文本标记语言和文件格式&#xff0c;具有可自定义标签&#xff0c;易于扩展&#xff0c;便于编辑&#xff0c;传输便捷等优点。XML已成为应用数据交换的常用方式。虽然XML格式易于传输和开发者操作&#xff0c;但对于普通用户来说&#xff0c;数据以xls或xlsx的形式…

二维穿墙雷达CW112 的 优势

TFN CW112加固型二维定位穿墙雷达是一款综合UWB雷达和生物医学工程技术研制而成的人体目标探测装备。该产品可穿透建筑墙体等障碍物&#xff0c;实时获取其后方人体目标位置及数量等信息&#xff0c;具有穿透性强、轻质便携、可双手操控等特点&#xff0c;广泛应用于反恐处突、…

vue+element项目创建步骤

一、创建vue项目步骤 要创建一个Vue Element UI的项目&#xff0c;你可以按照以下步骤进行操作&#xff1a; 1.确保你已经安装了Node.js和npm&#xff08;Node.js的包管理器&#xff09;。你可以在命令行中运行以下命令来检查它们是否已经安装&#xff1a; node -vnpm -v2.使…

视频超过1g怎么压缩?分享简单的视频压缩方法

随着科技的发展&#xff0c;视频已经成为了我们日常生活中不可或缺的一部分。然而&#xff0c;有时候我们会遇到视频文件过大&#xff0c;无法顺利传输或存储的问题。那么&#xff0c;如何解决视频超过1g压缩的问题呢&#xff1f; #长假读书清单# 首先我们可以借助专业的压缩…

华为云云耀云服务器L实例评测|运行python脚本

目录 一、往期回顾二、华为云云耀云服务器L实例使用流程1、账号注册2、购买并配置云耀云服务器L实例3、登录并使用云耀云服务器L实例 三、为什么Python越来越火&#xff1f;四、使用宝塔运行Python脚本1、下载Python项目管理器2、上传项目到服务器3、添加项目4、安装requests启…

一文读懂Llama 2(从原理到实战)

简介 Llama 2&#xff0c;是Meta AI正式发布的最新一代开源大模型。 Llama 2训练所用的token翻了一倍至2万亿&#xff0c;同时对于使用大模型最重要的上下文长度限制&#xff0c;Llama 2也翻了一倍。Llama 2包含了70亿、130亿和700亿参数的模型。Meta宣布将与微软Azure进行合…

便携式水质采样设备助力毒情监测

便携式水质采样设备助力毒情监测 污水涉毒采样检测工作是运用科技手段准确评估监测辖区内毒情形势的重要手段。期间&#xff0c;民警详细了解了生活和工业污水的处理、排放以及服务范围、人口数量等情况&#xff0c;并就污水涉毒采样检测工作达成共识。随后&#xff0c;民警严格…

【车载开发系列】ECU Application Software程序刷新步骤

【车载开发系列】ECU Application Software程序刷新步骤 ECU Application Software程序刷新步骤 【车载开发系列】ECU Application Software程序刷新步骤一. Boot Software&#xff08;引导软件&#xff09;1&#xff09;boot manager&#xff08;启动管理器&#xff09;2&…

Flink1.12.7 Standalone版本安装

官网下载版本&#xff1a;https://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz 可以从首页找到Downloads | Apache Flink&#xff0c;一直往下拉 安装&#xff1a;下载后直接解压即可 添加全局参数&#xff1a; #vi /etc/profile FLINK_HO…

云服务器免费体检

三丰云提供了免费云服务器与免费虚拟主机服务&#xff0c; 个人学习、搭建个人网站或者微信小程序调试等可以申请一台。 免费申请网址为&#xff1a; https://www.sanfengyun.com/ 还是挺方便的&#xff0c;大家可以体验体验。

【校招VIP】前端操作系统之I/O调度算法

考点介绍 I/O 调度算法在各个进程竞争磁盘I/O的时候担当了裁判的角色。他要求请求的次序和时机做最优化的处理&#xff0c;以求得尽可能最好的整体I/O性能。 前端操作系统之I/O调度算法-相关题目及解析内容可点击文章末尾链接查看&#xff01; 一、考点题目 1. 某文件占10个…

Kafka的消息存储机制

前面咱们简单讲了K啊开发入门相关的概念、架构、特点以及安装启动。 今天咱们来说一下它的消息存储机制。 前言&#xff1a; Kafka通过将消息持久化到磁盘上的日志文件来实现高吞吐量的消息传递。 这种存储机制使得Kafka能够处理大量的消息&#xff0c;并保证消息的可靠性。 1…