RocketMQ学习(五):分布式事务

news2025/1/12 0:55:16

一、分布式事务

事务(Transaction),一般是指要做的或所做的事情。在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元(unit)。事务通常由高级数据库操纵语言或编程语言(如SQL,C++或Java)书写的用户程序的执行所引起,并用形如begin transactionend transaction语句(或函数调用)来界定。事务由事务开始(begin transaction)和事务结束(end transaction)之间执行的全体操作组成。
摘自百度百科

提到事务我们很容易会想到事务的四大特性ACID,但是在分布式的情况下想实现和单机下一样的事务并不是一件容易的事情,目前一些常见的分布式事务解决方案有如下几种,**消息队列+本地事件表、2PC、3PC、TCC、基于RocketMQ的半消息机制等等。**这些方案都有各自的使用场景,个人理解在高并发的情况下基于RocketMQ的半消息机制来实现分布式事务是一种不错的解决方案,其他的方案例如2PC、3PC或多或少存在锁定资源的情况,所以今天的内容就是介绍以RocketMQ的半消息为基础来实现分布式事务。

二、原理分析

image.png 这里借用一下RocketMQ官方的图,从图中可以看出当订单支付后有对应4个分支的操作分别是:更新订单状态、更新物流、更新用户积分、清空购物车。这4个步骤应该在同一个事务中,但是由于分布式的情况通常我们很难做到一致性,所以我们会采用一种折中的手段:最终一致性。接下来我们学习一下如何使用RocketMQ来解决这一问题。
首先在上一篇文章中我们知道了RocketMQ有一种独特的机制,半消息,当消息处于“半事务消息”的状态时,消费者是无法获取到消息的,利用这一特性我们可以轻松的实现分布式事务。
在这里插入图片描述


流程如下:1、首先生产者向RocketMQ服务端投递一条半事务消息并等待服务端的响应
2、投递成功则开始执行本地事务(如果失败可以尝试重新投递,如果多次投递失败则需要人工介入)
3、根据本地事务执行的结果来告知RocketMQ服务端是否提交消息,如果执行成功则投递否则回滚
4、RocketMQ服务端将成功的消息投递给消费者,消费者进行消费。

这里需要注意的几点:
1、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
2、消费者端最好对消费做好幂等性处理,防止重复消费。

三、代码演示

1、业务描述

支付成功修改订单流水状态为支付成功,订单修改为已支付。其中订单和支付处于两个微服务中使用不同的数据库。

2、生产者

2.1、依赖

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
  </dependency>

  <dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
  </dependency>

  
  <dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.1</version>
  </dependency>

  <dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.1</version>
  </dependency>

</dependencies>

2、配置文件

rocketmq:
  name-server: 192.168.111.152:9876
  producer:
    group: test-group
server:
  port: 8080

3、核心代码1

package com.cmxy.producerdemo.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.cmxy.producerdemo.entity.LocalTransaction;
import com.cmxy.producerdemo.entity.PayFlow;
import com.cmxy.producerdemo.mapper.LocalTransactionMapper;
import com.cmxy.producerdemo.mapper.PayFlowMapper;
import com.cmxy.producerdemo.service.PayService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

@Service
    @Slf4j
    public class PayServiceImpl implements PayService {

        @Autowired
        private PayFlowMapper payFlowMapper;
        @Autowired
        private LocalTransactionMapper localTransactionMapper;
        @Autowired
        private RocketMQTemplate rocketMQTemplate;

        @Override
        public String paySuccess(String orderNo) {
            //判断支付流水是否存在
            PayFlow payFlow = payFlowMapper.selectOne(new LambdaQueryWrapper<PayFlow>().eq(PayFlow::getOrderNo, orderNo));
            if (payFlow == null) {
                log.error("支付流水不存在");
                return "fail";
            }
            //校验完成,发送半事务消息;
            String transactionId = UUID.randomUUID().toString().replace("-", "");
            Message<String> message = MessageBuilder.withPayload("rocketMQTemplate transactional message ").
                setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();
            rocketMQTemplate.sendMessageInTransaction("TestTransaction", message, orderNo);
            return "success";
        }

        /**
* 修改支付流水状态
*
* @param orderNo
*/
        @Override
        @Transactional
        public boolean updatePayFlow(String orderNo, String transactionId) {
            //将支付流水更新为已支付,并且插入一条本地事务数据表示当前事务执行完成
            PayFlow payFlow = new PayFlow();
            payFlow.setStatus(1);
            int update = payFlowMapper.update(payFlow, new LambdaQueryWrapper<PayFlow>().eq(PayFlow::getOrderNo, orderNo));
            int insert = localTransactionMapper.insert(new LocalTransaction(transactionId));
            return update > 0 && insert > 0;
        }
    }

核心代码2

/**
 * 由于在Springboot rocket mq start 2.1.0版本之后 @RocketMQTransactionListener移出了txProducerGroup属性
 * 所以如果存在多个不同事务,需要从代码层面来区分。(之前是一个Listener对应一个)
 */
@Slf4j
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private PayService payService;
    @Autowired
    private LocalTransactionMapper localTransactionMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务");
        String orderNo = (String) o;
        String transactionId = message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class);
        log.info("本地事务执行完成");
        return payService.updatePayFlow(orderNo, transactionId) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
    }

    /**
     * 检查本地事务表 注意这里不要去查具体业务,只需要查本地事务表是否插入成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transactionId = message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class);
        LocalTransaction localTransaction = localTransactionMapper.selectOne(new LambdaQueryWrapper<LocalTransaction>().eq(LocalTransaction::getTransactionId, transactionId));
        return localTransaction == null ? RocketMQLocalTransactionState.UNKNOWN : RocketMQLocalTransactionState.COMMIT;
    }
}

3、消费者

消费者相对来说要简单的多,和普通消息一样。需要注意的是一定要处理好幂等性问题!

@Slf4j
@Component
@RocketMQMessageListener(topic = "TestTransaction", consumerGroup = "test-group") // topic、tag保持一致
public class OrderListener implements RocketMQListener<String> {
    
    @Autowired
    private OrderMapper orderMapper;

    /**
     * 重点:对于消费来说一定要做好幂等性!!!(当前demo偷个懒就不做了),重复消费可能会导致很严重的问题
     * 笔者之前就因为没有注意幂等性,导致给客户多发了3张优惠券!!!
     * @param message
     */
    @Override
    public void onMessage(String message) {
       log.info("收到信息:{}",message);
        Order order = orderMapper.selectOne(new LambdaQueryWrapper<Order>().eq(Order::getOrderNo, message));
        if(order == null){
            //这里如果有业务上的异常,需要记录日志,通知开发人员等等
            throw new RuntimeException("订单不存在");
        }
        order.setStatus(1);
        orderMapper.updateById(order);
    }
}

四、总结

总的来说RocketMQ来实现分布式事务相对来说不难,个人感觉基于消息队列来实现分布式事务适用于业务上对于数据一致性要求没有那么高的,允许中间有一段时间数据不一致的场景;相较于2PC、3PC使用RocketMQ来实现分布式事务效率更高,且代码编写也不复杂。本案例中的代码笔者后续会放到github、gitee上。希望对你有所帮助

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/125339.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

『Java课设』JavaSwing+MySQL实现医院智慧点餐系统

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位喜欢写作&#xff0c;计科专业大三菜鸟 &#x1f3e1;个人主页&#xff1a;starry陆离 如果文章有帮到你的话记得点赞&#x1f44d;收藏&#x1f497;支持一下哦 『Java课设』JavaSwingMySQL实现医院智慧点餐系统1.功能介…

设计模式:责任链模式的应用场景及源码应用

一、概述 责任链模式&#xff08;Chain of Responsibility Pattern&#xff09;是将链中每一个节点看作是一个对象&#xff0c;每个节点处理的请求均不同&#xff0c;且内部自动维护一个下一节点对象。当一个请求从链式的首端发出时&#xff0c;会沿着链的路径依次传递给每一个…

AI代码实时生成工具teleportHQ

来源&#xff1a;投稿 作者&#xff1a;ΔU 编辑&#xff1a;学姐 今天给大家分享一款AI代码实时生成工具teleportHQ&#xff0c;teleportHQ本质上是一个低代码开发平台&#xff0c;但是首次将计算机视觉应用到低代码开发上&#xff0c;teleportHQ允许用户通过熟悉的设计工具界…

JSX的基本使用

JSX的基本使用1.JSX简介1.1 JSX是react的核心内容1.2 createElement的问题1.3 createElement的问题1.4 JSX注意点2 使用prettier插件格式化react代码3 JSX中嵌入JavaScript表达式4 条件渲染5 列表渲染6 样式处理1.JSX简介 JSX是JavaScript XML的简写&#xff0c;表示了在Javas…

Linux 下 rpm管理包

一、 .rpm的文件格式 以.rpm格式发布的软件里面封装的都是经过编译过的二进制形式的软件&#xff0c;可以直接安装。.rpm格式的文件又称为rpm软件包&#xff0c;简称rpm包。 二、 rpm文件名的格式 三、 rpm命令的使用与软件的安装 Linux中安装rpm软件包有3种方法&#xff1…

新手小白入门之泛型

一、背景 JAVA推出泛型以前&#xff0c;程序员可以构建一个元素类型为Object的集合&#xff0c;该集合能够存储任意的数据类型对象&#xff0c;而在使用该集合的过程中&#xff0c;需要程序员明确知道存储每个元素的数据类型&#xff0c;否则很容易引发ClassCastException异常…

嵌入式书籍推荐

现在嵌入式软件工程师的数量需求方面是越来越旺盛&#xff0c;但是在人才供给方面却出现了缺口&#xff0c;个大公司对于嵌入式开发工程师职位出现供不应求的局面&#xff0c;正是有很多人看到这了大好的环境&#xff0c;纷纷选择开始学习嵌入式开发&#xff0c;学习的方式也是…

第十六讲:神州交换机访问控制列表的配置

访问控制列表ACL&#xff08;Access Control Lists&#xff09;数据定义工具&#xff0c;基于用户自行定义的数据的参数区分不同的数据流&#xff0c;是在交换机和路由器上经常采用的一种防火墙技术&#xff0c;它可以对经过网络设备的数据包根据一定规则进行过滤。它有以下一些…

CloudFlare 的路由拦截

因为腾讯需要对网站进行校验。 校验的方法是使用一个 tencent18250331897192314951.txt 文件&#xff0c;在这个文件中放入腾讯指定的内容。 我们使用的是 Discourse 这个社区系统&#xff0c;这个社区系统对这种问题的响应比较头痛。 解决方案 解决方案就是从域名服务商哪…

NX 系统环境 python3.6 部署 PPOCR 报错记录

NX 系统环境 python3.6 部署 PPOCR 报错记录 前言&#xff08;这环境&#xff0c;就硬配&#xff09; 问&#xff1a;为什么要用系统环境&#xff0c;不用 conda&#xff1f;答&#xff1a;因为 conda 的 ARM 端 python 最低只支持 3.7&#xff0c;而 paddlepaddle 提供的 Je…

c#入门-系统特性

特性 特性可以给成员添加元数据。这有两个作用&#xff1a; 这是一个元数据&#xff0c;可以利用反射获取到如果编译器认识这个特性&#xff0c;那么可以与特性进行交互。 第一点涉及到反射的内容&#xff0c;先略过。 而第二点要求的编译器认实这个特性&#xff0c;就仅限于…

在Linux上安装和使用ZFS

真正的文件系统终极者 ZFS 文件系统的英文名称为 ZettabyteFileSystem&#xff0c;也叫动态文件系统&#xff0c;是第一个 128 位文件系统。最初是由 Sun 公司为 Solaris10 操作系统开发的文件系统。作为 OpenSolaris 开源计划的一部分&#xff0c;ZFS 于 2005 年 11 月发布&a…

《MySQL 8从零开始学(视频教学版)》简介

#好书推荐##好书奇遇季#《MySQL 8从零开始学&#xff08;视频教学版&#xff09;》&#xff0c;定价89元&#xff0c;京东当当天猫都有发售。本书面向MySQL 8数据库初学者&#xff0c;是MySQL数据库畅销入门书。 配套资源 本书配套400个实例和14个综合案例的源码、PPT课件、近2…

/etc/passwd详解

目录 一、统一性和标准化 二、功能和权限 三、内容详解 1、/etc/passwd为按行记录的文本文件&#xff0c;每行记录一个用户的信息 2、每行信息内容 四、参考文献 一、统一性和标准化 各版本的Linux操作系统的/etc/passwd功能和内容格式基本相同。 &#xff08;1&#xf…

Spark环境搭建(Stand alone模式)

Sand alone 架构 Standalone模式是Spark自带的一种集群模式&#xff0c;不同于前面本地模式启动多个进程来模拟集群的环境&#xff0c;Standalone模式是真实地在多个机器之间搭建Spark集群的环境&#xff0c;完全可以利用该模式搭建多机器集群&#xff0c;用于实际的大数据处理…

红黑树的迭代器红黑树与AVL树的比较

&#x1f9f8;&#x1f9f8;&#x1f9f8;各位大佬大家好&#xff0c;我是猪皮兄弟&#x1f9f8;&#x1f9f8;&#x1f9f8; 文章目录一、红黑树泛型实现map&#xff0c;set对多出来的模板参数的解释二、map和set对红黑树迭代器的封装①迭代器operator②operator--三、红黑树…

Web3中文|全球首个中华武术收藏级卡牌系列发布,传武文化的未来在元宇宙?

谈及中华武术的传承与发展&#xff0c;大家首先能想到什么&#xff1f;小说、电影、动画、游戏……等等&#xff0c;都是曾经的载体。作为中华文化极其重要的一部分&#xff0c;武术是国人独有的标签&#xff0c;太多经典作品珠玉在前&#xff0c;如今武术的传承&#xff0c;需…

【JavaScript】跟着pink学习第二天部分案例

1.猜数字游戏 三次机会&#xff0c;猜1~50之间的一个整数 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"vi…

12月第4周榜单丨B站UP主排行榜(飞瓜数据B站)发布!

飞瓜轻数发布2022年12月19日-12月25日飞瓜数据UP主排行榜&#xff08;B站平台&#xff09;&#xff0c;通过充电数、涨粉数、成长指数三个维度来体现UP主账号成长的情况&#xff0c;为用户提供B站号综合价值的数据参考&#xff0c;根据UP主成长情况用户能够快速找到运营能力强的…

【Python百日进阶-数据分析】Day140 - plotly表:plotly.graph_objects.Table()

文章目录一、语法二、参数三、返回值四、实例4.1 基本表4.2 样式表4.3 使用 Pandas 数据框4.4 Dash中的表4.5 更改行和列大小4.6 交替行颜色4.7 基于变量的行颜色4.8 基于变量的单元格颜色一、语法 构造一个新的 Table 对象 用于查看详细数据的表格视图。数据以行和列的网格排…