电商项目-分布式事务(四)基于消息队列实现分布式事务

news2025/2/6 12:06:25

基于消息队列实现分布式事务,实现消息最终一致性

如何基于消息队列实现分布式事务?

通过消息队列实现分布式事务的话,可以保证当前数据的最终一致性。实现思路:将大的分布式事务,进行拆分,拆分成若干个小的本地事务,只要每一个小的本地事务是执行成功的,那就代表当前的分布式事务是执行成功的。下面以用户下单增加积分,来进行消息队列实现分布式事务的操作。

功能实现流程图如下:
在这里插入图片描述
主要包含三个角色:订单服务,消息队列和用户服务。
步骤(1-4):订单服务
1.首先,在订单服务中,会生成订单相关数据,并添加到数据库中。
2.接着会在订单数据库中,创建任务表 ,会向任务表添加数据,(Username,orderID,point积分)

3.设置定时任务,扫描任务表,获取相关数据,如每隔7S扫描一次
4.发送任务数据到MQ

步骤(5-12):用户服务
5.6.7.用户服务会接收消息,保证消息的不可重复消费,判断在Redis和 在数据库中是否存在。防止重复处理,不要重复添加积分。
用户服务正在执行消息
如果不存在,继续业务功能。
添加本地事务
12.通知订单服务

步骤(13):订单服务
13.删除任务表数据。

一、 准备工作

1.1 shangcheng_order库新增数据表

任务表:

DROP TABLE IF EXISTS `tb_task`;
CREATE TABLE `tb_task` (
  `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '任务id',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `delete_time` datetime DEFAULT NULL,
  `task_type` varchar(32) DEFAULT NULL COMMENT '任务类型',
  `mq_exchange` varchar(64) DEFAULT NULL COMMENT '交换机名称',
  `mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
  `request_body` varchar(512) DEFAULT NULL COMMENT '任务请求的内容',
  `status` varchar(32) DEFAULT NULL COMMENT '任务状态',
  `errormsg` varchar(512) DEFAULT NULL COMMENT '任务错误信息',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

历史任务表:

DROP TABLE IF EXISTS `tb_task_his`;
CREATE TABLE `tb_task_his` (
  `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '任务id',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `delete_time` datetime DEFAULT NULL,
  `task_type` varchar(32) DEFAULT NULL COMMENT '任务类型',
  `mq_exchange` varchar(64) DEFAULT NULL COMMENT '交换机名称',
  `mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
  `request_body` varchar(512) DEFAULT NULL COMMENT '任务请求的内容',
  `status` varchar(32) DEFAULT NULL COMMENT '任务状态',
  `errormsg` varchar(512) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

1.2 shangcheng_service_order_api添加相关实体类

@Table(name="tb_task")
public class Task {@Id
    private Long id;@Column(name = "create_time")
    private Date createTime;@Column(name = "update_time")
    private Date updateTime;@Column(name = "delete_time")
    private Date deleteTime;@Column(name = "task_type")
    private String taskType;@Column(name = "mq_exchange")
    private String mqExchange;@Column(name = "mq_routingkey")
    private String mqRoutingkey;@Column(name = "request_body")
    private String requestBody;@Column(name = "status")
    private String status;@Column(name = "errormsg")
    private String errormsg;
    
    //getter,setter略
}
@Table(name="tb_task_his")
public class TaskHis {@Id
    private Long id;@Column(name = "create_time")
    private Date createTime;@Column(name = "update_time")
    private Date updateTime;@Column(name = "delete_time")
    private Date deleteTime;@Column(name = "task_type")
    private String taskType;@Column(name = "mq_exchange")
    private String mqExchange;@Column(name = "mq_routingkey")
    private String mqRoutingkey;@Column(name = "request_body")
    private String requestBody;@Column(name = "status")
    private String status;@Column(name = "errormsg")
    private String errormsg;//getter,setter略
}

1.3 shangcheng_user新增积分日志表

DROP TABLE IF EXISTS `tb_point_log`;
CREATE TABLE `tb_point_log` (
  `order_id` varchar(200) NOT NULL,
  `user_id` varchar(200) NOT NULL,
  `point` int(11) NOT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 

1.4 shangcheng_service_user_api添加实体类 PointLog

@Table(name="tb_point_log")
public class PointLog {private String orderId;
    private String userId;
    private Integer point;
    //getter,setter略
}

1.5 shangcheng_service_order添加rabbitMQ配置类

@Configuration
public class RabbitMQConfig {
    //添加积分任务交换机
    public static final String EX_BUYING_ADDPOINTUSER = "ex_buying_addpointuser";//添加积分消息队列
    public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";//完成添加积分消息队列
    public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";//添加积分路由key
    public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";//完成添加积分路由key
    public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";/**
     * 交换机配置
     * @return the exchange
     */
    @Bean(EX_BUYING_ADDPOINTUSER)
    public Exchange EX_BUYING_ADDPOINTUSER() {
        return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
    }
    //声明队列
    @Bean(CG_BUYING_FINISHADDPOINT)
    public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
        Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
        return queue;
    }
    //声明队列
    @Bean(CG_BUYING_ADDPOINT)
    public Queue QUEUE_CG_BUYING_ADDPOINT() {
        Queue queue = new Queue(CG_BUYING_ADDPOINT);
        return queue;
    }
    /**
     * 绑定队列到交换机 .
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
    }
    @Bean
    public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
    }
}
 

二、 订单服务添加任务并发送

步骤1: 修改添加订单方法
当添加订单的时候,添加任务表中相关数据, 代码如下:

//增加任务表记录
Task task = new Task();
task.setCreateTime(new Date());
task.setUpdateTime(new Date());
task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE);
task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY);Map map = new HashMap();
map.put("userName",order.getUsername());
map.put("orderId",order.getId());
map.put("point",order.getPayMoney());
task.setRequestBody(JSON.toJSONString(map));
taskMapper.insertSelective(task);

步骤2: 定时扫描任务表最新数据
订单服务新增定时任务类,获取小于系统当前时间的所有任务数据

(1) 修改订单服务启动类,添加开启定时任务注解

@EnableScheduling

(2) 定义定时任务类
查询最新数据
更新taskMapper新增方法,查询所有小于系统当前时间的数据

public interface TaskMapper extends Mapper<Task> {@Select("SELECT * from tb_task WHERE update_time<#{currentTime}")
    @Results({@Result(column = "create_time",property = "createTime"),
            @Result(column = "update_time",property = "updateTime"),
            @Result(column = "delete_time",property = "deleteTime"),
            @Result(column = "task_type",property = "taskType"),
            @Result(column = "mq_exchange",property = "mqExchange"),
            @Result(column = "mq_routingkey",property = "mqRoutingkey"),
            @Result(column = "request_body",property = "requestBody"),
            @Result(column = "status",property = "status"),
            @Result(column = "errormsg",property = "errormsg")})
    List<Task> findTaskLessTanCurrentTime(Date currentTime);
}

任务类实现

@Component
public class QueryPointTask {@Autowired
    private RabbitTemplate rabbitTemplate;@Autowired
    private TaskMapper taskMapper;@Scheduled(cron = "0 0/2 * * * ?")
    public void queryTask(){//1.获取小于系统当前时间数据
        List<Task> taskList = taskMapper.findTaskLessTanCurrentTime(new Date());if (taskList!=null && taskList.size()>0){
            //将任务数据发送到消息队列
            for (Task task : taskList) {
 rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(task));
            }
        }
    }
}

三、 用户服务更改积分

步骤1: 添加rabbitmq配置类(与订单服务相同)

@Configuration
public class RabbitMQConfig {
    //添加积分任务交换机
    public static final String EX_BUYING_ADDPOINTURSE = "ex_buying_addpointurse";//添加积分消息队列
    public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";//完成添加积分消息队列
    public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";//添加积分路由key
    public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";//完成添加积分路由key
    public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";/**
     * 交换机配置
     * @return the exchange
     */
    @Bean(EX_BUYING_ADDPOINTURSE)
    public Exchange EX_DECLARE() {
        return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTURSE).durable(true).build();
    }
    //声明队列
    @Bean(CG_BUYING_FINISHADDPOINT)
    public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
        Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
        return queue;
    }
    //声明队列
    @Bean(CG_BUYING_ADDPOINT)
    public Queue QUEUE_CG_BUYING_ADDPOINT() {
        Queue queue = new Queue(CG_BUYING_ADDPOINT);
        return queue;
    }
    /**
     * 绑定队列到交换机 .
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
    }
    @Bean
    public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
    }
}

步骤2: 定义消息监听类:

@Component
public class AddPointListener {@Autowired
    private UserService userService;@Autowired
    private RedisTemplate redisTemplate;@Autowired
    private RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitMQConfig.CG_BUYING_ADDPOINT)
    public void receiveMessage(String message){Task task = JSON.parseObject(message, Task.class);if (task == null || StringUtils.isEmpty(task.getRequestBody())){
            return;
        }//判断redis中是否存在内容
        Object value = redisTemplate.boundValueOps(task.getId()).get();
        if (value != null){
            return;
        }//更新用户积分
        int result = userService.updateUserPoints(task);if (result<=0){
            return;
        }//返回通知
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_FINISHADDPOINT_KEY,JSON.toJSONString(task));}
}
 

步骤3: 定义修改用户积分实现
实现思路:

1)判断当前订单是否操作过

2)将任务存入redis

3)修改用户积分

4)添加积分日志表记录

5)删除redis中记录

@Autowired
private PointLogMapper pointLogMapper;/**
     * 修改用户积分
     * @param task
     * @return
     */
    @Override
    @Transactional
    public int updateUserPoints(Task task) {
        Map info = JSON.parseObject(task.getRequestBody(), Map.class);
        String userName = info.get("userName").toString();
        String orderId = info.get("orderId").toString();
        int point = (int) info.get("point");//判断当前订单是否操作过
        PointLog pointLog = pointLogMapper.findLogInfoByOrderId(orderId);
        if (pointLog != null){
            return 0;
        }//将任务存入redis
        redisTemplate.boundValueOps(task.getId()).set("exist",1,TimeUnit.MINUTES);//修改用户积分
        int result = userMapper.updateUserPoint(userName, point);
        if (result<=0){
            return result;
        }//添加积分日志表记录
        pointLog = new PointLog();
        pointLog.setOrderId(orderId);
        pointLog.setPoint(point);
        pointLog.setUserId(userName);
        result = pointLogMapper.insertSelective(pointLog);
        if (result<=0){
            return result;
        }//删除redis中的记录
        redisTemplate.delete(task.getId());return 1;
    }

步骤4: 定义根据订单id查询积分日志表
定义PointLogMapper,实现根据订单id查询:

public interface PointLogMapper extends Mapper<PointLog> {@Select("select * from tb_point_log where order_id=#{orderId}")
    PointLog findLogInfoByOrderId(@Param("orderId") String orderId);
}

四、 订单服务删除原任务

步骤1: 定义监听类
在订单服务中定义监听类,用于监听队列,如果队列中有消息,则删除原任务防止消息重复发送,并对任务信息进行记录

@Component
public class DelTaskListener {@Autowired
    private TaskService taskService;@RabbitListener(queues = RabbitMQConfig.CG_BUYING_FINISHADDPOINT)
    public void receiveMessage(String message){Task task = JSON.parseObject(message, Task.class);
​
        taskService.delTask(task);
    }
}

步骤: 定义任务service

public interface TaskService {void delTask(Task task);
}
@Service
@Transactional
public class TaskServiceImpl implements TaskService {@Autowired
    private TaskMapper taskMapper;@Autowired
    private TaskHisMapper taskHisMapper;
​
​
    @Override
    public void delTask(Task task) {
        //1. 设置删除时间
        task.setDeleteTime(new Date());
        Long id = task.getId();
        task.setId(null);//bean复制
        TaskHis taskHis = new TaskHis();
        BeanUtils.copyProperties(task,taskHis);//记录任务信息
        taskHisMapper.insertSelective(taskHis);//删除原任务
        task.setId(id);
        taskMapper.deleteByPrimaryKey(task);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2293771.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode_双指针 160.相交链表

160.相交链表 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 思路: 本题中&#xff0c;交点不是数值相等&#xff0c;而是指针相等 双指针遍历两遍后必定相遇&#xff0c…

深入理解浮点数:单精度、双精度、半精度和BFloat16详解

文章目录 深入理解浮点数&#xff1a;单精度、双精度、半精度和BFloat16详解 &#x1f522;简介 &#x1f31f;1. 单精度&#xff08;Single Precision&#xff09;&#x1f3af;应用场景 &#x1f680; 2. 双精度&#xff08;Double Precision&#xff09;&#x1f4aa;应用场…

Verilog基础(三):过程

过程(Procedures) - Always块 – 组合逻辑 (Always blocks – Combinational) 由于数字电路是由电线相连的逻辑门组成的,所以任何电路都可以表示为模块和赋值语句的某种组合. 然而,有时这不是描述电路最方便的方法. 两种always block是十分有用的: 组合逻辑: always @(…

拍照对比,X70 PRO与X90 PRO+的细节差异

以下是局部截图&#xff08;上X70P下X90PP&#xff09; 对比1 这里看不出差异。 对比2 X90PP的字明显更清楚。 对比3 中下的字&#xff0c;X90PP显然更清楚。

Node.js与嵌入式开发:打破界限的创新结合

文章目录 一、Node.js的本质与核心优势1.1 什么是Node.js?1.2 嵌入式开发的范式转变二、Node.js与嵌入式结合的四大技术路径2.1 硬件交互层2.2 物联网协议栈2.3 边缘计算架构2.4 轻量化运行时方案三、实战案例:智能农业监测系统3.1 硬件配置3.2 软件架构3.3 核心代码片段四、…

使用java调用deepseek,调用大模型,处理问题。ollama

废话不多&#xff0c;直接上代码 Testpublic void test7171111231233(){// url:放请求地址String url "http://localhost:11434/api/generate";HttpRequest request HttpUtil.createPost(url);Map<String, String> headers new HashMap<>();String a…

Linux驱动---字符设备

目录 一、基础简介 1.1、Linux设备驱动分类 1.2、字符设备驱动概念 二、驱动基本构成 2.1、驱动模块的加载和卸载 2.2、添加LICENNSE以及其他信息 三、字符设备驱动开发步骤 3.1、分配主次设备号 3.1.1 主次设备号 3.1.2静态注册设备号 3.1.3动态注册设备号 3.1.4释…

php7.3安装php7.3-gmp扩展踩坑总结

环境&#xff1a; 容器里面为php7.3.3版本 服务器也为php7.3.3-14版本&#xff0c;但是因为业务量太大需要在服务器里面跑脚本 容器里面为 alpine 系统&#xff0c;安装各种扩展 服务器里面开发服为 ubuntu 16.04.7 LTS (Xenial Xerus) 系统 服务器线上为 ubuntu 20.04.6 LTS (…

javaEE-8.JVM(八股文系列)

目录 一.简介 二.JVM中的内存划分 JVM的内存划分图: 堆区:​编辑 栈区:​编辑 程序计数器&#xff1a;​编辑 元数据区&#xff1a;​编辑 经典笔试题&#xff1a; 三,JVM的类加载机制 1.加载: 2.验证: 3.准备: 4.解析: 5.初始化: 双亲委派模型 概念: JVM的类加…

大语言模型轻量化:知识蒸馏的范式迁移与工程实践

大语言模型轻量化&#xff1a;知识蒸馏的范式迁移与工程实践 &#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 摘要 在大型语言模型&#xff…

数据结构:时间复杂度

文章目录 为什么需要时间复杂度分析&#xff1f;一、大O表示法&#xff1a;复杂度的语言1.1 什么是大O&#xff1f;1.2 常见复杂度速查表 二、实战分析&#xff1a;解剖C语言代码2.1 循环结构的三重境界单层循环&#xff1a;线性时间双重循环&#xff1a;平方时间动态边界循环&…

[创业之路-276]:从燃油汽车到智能汽车:工业革命下的价值变迁

目录 前言&#xff1a; 从燃油汽车到智能汽车&#xff1a;工业革命下的价值变迁 前言&#xff1a; 燃油汽车&#xff0c;第一次、第二次工业革命&#xff0c;机械化、电气化时代的产物&#xff0c;以机械和电气自动化为核心价值。 智能汽车&#xff0c;第三次、第四次工业革…

vue页面和 iframe多页面无刷新方案和并行 并接入 micro 微前端部分思路

前: 新进了一家公司,公司是做电商平台的, 用的系统竟然还是jsp的网站,每次修改页面还需要我下载idea代码,作为一个前端, 这可不能忍,于是向上申请,意思你们后台做的太辣鸡,我要重做,经领导层商议从去年6月开始到今年12月把系统给重构了 公司系统采用的是每个jsp页面都是一个ifr…

Python 自学秘籍:开启编程之旅,人生苦短,我用python。

从2009年&#xff0c;用了几次python后就放弃了&#xff0c;一直用的php&#xff0c;现在人工智能时代&#xff0c;完全没php什么事情。必须搞python了&#xff0c;虽然已经40多岁了。死磕python了。让滔滔陪着你一起学python 吧。 开启新世界 在当今人工智能化的时代&#xff…

每日一题洛谷P5721 【深基4.例6】数字直角三角形c++

#include<iostream> using namespace std; int main() {int n;cin >> n;int t 1;for (int i 0; i < n; i) {for (int j 0; j < n - i; j) {printf("%02d",t);t;}cout << endl;}return 0; }

解决DeepSeek服务器繁忙问题:本地部署与优化方案

deepseek服务器崩了&#xff0c;手把手教你如何在手机端部署一个VIP通道&#xff01; 引言 随着人工智能技术的快速发展&#xff0c;DeepSeek等大语言模型的应用越来越广泛。然而&#xff0c;许多用户在使用过程中遇到了服务器繁忙、响应缓慢等问题。本文将探讨如何通过本地部…

【后端开发】系统设计101——通信协议,数据库与缓存,架构模式,微服务架构,支付系统(36张图详解)

【后端开发】系统设计101——通信协议&#xff0c;数据库与缓存&#xff0c;架构模式&#xff0c;微服务架构&#xff0c;支付系统&#xff08;36张图&#xff09; 文章目录 1、通信协议通信协议REST API 对比 GraphQL&#xff08;前端-web服务&#xff09;grpc如何工作&#x…

Java基础——分层解耦——IOC和DI入门

目录 三层架构 Controller Service Dao ​编辑 调用过程 面向接口编程 分层解耦 耦合 内聚 软件设计原则 控制反转 依赖注入 Bean对象 如何将类产生的对象交给IOC容器管理&#xff1f; 容器怎样才能提供依赖的bean对象呢&#xff1f; 三层架构 Controller 控制…

武汉火影数字|VR虚拟现实:内容制作与互动科技的奇妙碰撞

VR虚拟现实是一种利用计算机技术生产三维虚拟世界的技术&#xff0c;通过头戴式显示器、手柄等设备&#xff0c;用户可以身临其境地感受虚拟世界&#xff0c;与其中的物体进行自然交互。 当内容制作遇上 VR&#xff0c;会发生什么&#xff1f; 当内容制作遇上VR&#xff0c;就像…

SpringBoot扩展篇:@Scope和@Lazy源码解析

SpringBoot扩展篇&#xff1a;Scope和Lazy源码解析 1. 研究主题及Demo2. 注册BeanDefinition3. 初始化属性3.1 解决依赖注入3.2 创建代理 ContextAnnotationAutowireCandidateResolver#getLazyResolutionProxyIfNecessary3.3 代理拦截处理3.4 单例bean与原型bean创建的区别 4. …