支付模块-基于消息队列发送支付通知消息

news2025/1/16 17:45:41

消息队列发送支付通知消息

需求分析

订单服务作为通用服务,在订单支付成功后需要将支付结果异步通知给其他对接的微服务,微服务收到支付结果根据订单的类型去更新自己的业务数据

在这里插入图片描述

技术方案

使用消息队列进行异步通知需要保证消息的可靠性即生产端将消息成功通知到服务端: 消息发送到交换机 --> 由交换机发送到队列 --> 消费者监听队列,收到消息进行处理,参考文章02- 使用Docker安装RabbitMQ-CSDN博客

  • 生产者确认机制: 发送消息前使用数据库事务将消息保证到数据库表中,成功发送到交换机将消息从数据库中删除

  • 配置MQ持久化(交换机、队列、发送消息):MQ收到消息持久化,当MQ重启时即使消息没有消费完也不会丢失

  • 消费者确认机制: 消费者消费成功,自动发送ACK,负责重试消费

发布订阅模式: 订单服务接收支付成功结果通知后创建一条消息发送给Fanout广播类型的交换机,学习中心服务绑定队列到交换机接收消息,参考文章04- 基于SpringAMQP封装RabbitMQ,消息队列的Work模型和发布订阅模型-CSDN博客

环境搭建

第一步: 在订单服务和学习中心服务中添加消息队列依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在Nacos的dev环境下添加RabbitMQ的配置信息rabbitmq-dev.yaml,设置group为xuecheng-plus-common

spring:
  rabbitmq:
    host: 192.168.101.128 # 主机
    port: 5672 # 端口名
    username: root # 用户名
    password: root # 密码
    virtual-host: / # 虚拟主机
    publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: true # 开启publish-return功能,同样是基于callback机制调用回调函数ReturnCallback
    template:
      mandatory: true # 定义消息路由失败时的策略,true表示调用ReturnCallback;false表示直接丢弃消息
    listener:
      simple:
      	# 每次只能获取一条消息,处理完成才能获取下一个消息
        prefetch: 1  
        # auto:出现异常时返回unack且消息回滚到mq,如果没有异常直接返回ack
        # manual:手动控制
        # none:丢弃消息不回滚到mq
        acknowledge-mode: auto 
        retry:
          enabled: false # 开启消费者失败重试
          initial-interval: 5000ms # 初始的失败等待时长为几秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态;如果业务中包含事务需要改为false

第三步:在订单服务和学习中心服务的接口工程中引入rabbitmq-dev.yaml配置文件

- data-id: rabbitmq-${spring.profiles.active}.yaml
  group: xuecheng-plus-common
  refresh: true

第四步: 在订单服务的service工程编写MQ配置类PayNotifyConfig创建交换机和队列

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {

    // 交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    // 支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";
    // 支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

    // 声明交换机且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }

    //支付通知队列且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue course_publish_queue() {
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    // 交换机和支付通知队列绑定
    @Bean
    public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }
	
    // 交换机路由消息到队列的时候如果失败执行回调函数
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 消息处理service
        MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 消息发送失败记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 解析消息内容,将消息再添加到消息表
            MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
            mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3());

        });
    }
}

第五步: 在学习中心服务编写MQ配置类PayNotifyConfig创建交换机和队列,避免学习中心服务启动的时候监听的队列还没有创建,如果生产端已经创建就不再创建

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {
    // 声明交换机,支付通知队列,交换机和支付通知队列绑定关系
    
    // 不用设置回调函数,只有生产者才需要确认 
}

重启订单服务,登录rabbitmq查看交换机自动创建成功

在这里插入图片描述

生产者发送信息

在订单服务的OrderService中定义接口接收支付宝响应的通知消息结果并发送给学习中心服务

public interface OrderService {
    /**
 	* 接收通知结果并发送给学习中心服务
 	* @param mq	Message 消息
 	*/
    void notifyPayResult(MqMessage mqMessage);
}
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    MqMessageService mqMessageService;
    
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void notifyPayResult(MqMessage mqMessage) {
        // 1. 将消息体转为Json
        String jsonMsg = JSON.toJSONString(mqMessage);
        // 2. 设置消息的持久化方式为PERSISTENT,即消息会被持久化到磁盘上,确保即使在RabbitMQ服务器重启后也能够恢复消息
        Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        // 3. 封装CorrelationData,用于跟踪指定Id消息的相关信息
        CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());
        // 3.1 使用CorrelationData添加一个Callback对象指定回调方法,该对象用于在消息确认时处理消息的结果
        correlationData.getFuture().addCallback(result -> {
            if (result.isAck()) {
                // 3.2 消息成功发送到交换机,删除消息表中的记录
                log.debug("消息发送成功:{}", jsonMsg);
                mqMessageService.completed(mqMessage.getId());
            } else {
                // 3.3 消息发送失败
                log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result.getReason());
            }
        }, ex -> {
            // 3.4 消息异常可能是网络问题
            log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage());
        });
        // 4. 发送消息
        rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj, correlationData);
    }
}

订单服务收到第三方平台的支付结果时,在saveAliPayStatus方法中除了保存支付宝响应的结果信息还需要向数据库消息表添加消息记录将消息封装好后发送给消费端

	/**
     * 保存支付结果信息,向数据库中的消息表添加消息并发送给消费端
     * @param payStatusDto 支付结果信息
     */
@Transactional
@Override
public void saveAlipayStatus(PayStatusDto payStatusDto) {
    // 1. 获取支付流水号
    String payNo = payStatusDto.getOut_trade_no();
    // 2. 查询数据库订单状态
    XcPayRecord payRecord = getPayRecordByPayNo(payNo);
    if (payRecord == null) {
        XueChengPlusException.cast("未找到支付记录");
    }
    XcOrders order = xcOrdersMapper.selectById(payRecord.getOrderId());
    if (order == null) {
        XueChengPlusException.cast("找不到相关联的订单");
    }
    String statusFromDB = payRecord.getStatus();
    // 2.1 已支付,直接返回
    if ("600002".equals(statusFromDB)) {
        return;
    }
    // 3. 查询支付宝交易状态
    String tradeStatus = payStatusDto.getTrade_status();
    // 3.1 支付宝交易已成功,保存订单表和交易记录表,更新交易状态
    if ("TRADE_SUCCESS".equals(tradeStatus)) {
        // 更新支付交易表
        payRecord.setStatus("601002");
        payRecord.setOutPayNo(payStatusDto.getTrade_no());
        payRecord.setOutPayChannel("Alipay");
        payRecord.setPaySuccessTime(LocalDateTime.now());
        int updateRecord = xcPayRecordMapper.updateById(payRecord);
        if (updateRecord <= 0) {
            XueChengPlusException.cast("更新支付交易表失败");
        }
        // 更新订单表
        order.setStatus("600002");
        int updateOrder = xcOrdersMapper.updateById(order);
        if (updateOrder <= 0) {
            log.debug("更新订单表失败");
            XueChengPlusException.cast("更新订单表失败");
        }
    }
    // 4. 创建消息记录并保存到消息表中,参数1:支付结果类型通知;参数2:业务id;参数3:业务类型
    MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", order.getOutBusinessId(), order.getOrderType(), null);
    // 5. 封装消息记录并发送给消费端
    notifyPayResult(mqMessage);
}

消费者接收消息

在学习中心服务定义impl/ReceivePayNotifyService

  • 监听消息队列接收支付结果, 当接收到消息后更新选课记录表的选课状态为选课成功,同时向我的课程表中插入一条课程记录
@Slf4j
@Service
public class ReceivePayNotifyService {

    @Autowired
    MyCourseTablesService tablesService;

    @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
    public void receive(Message message) {
        // 1. 获取消息
        MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);
        // 2. 根据消息内容,更新选课记录,向我的课程表插入记录
        // 2.1 消息类型,学习中心只处理支付结果的通知
        String messageType = mqMessage.getMessageType();
        // 2.2 选课id
        String chooseCourseId = mqMessage.getBusinessKey1();
        // 2.3 订单类型,60201表示购买课程
        String orderType = mqMessage.getBusinessKey2();
        // 3. 学习中心只负责处理支付结果的通知
        if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType)){
            // 3.1 学习中心只负责购买课程类订单的结果
            if ("60201".equals(orderType)){
                // 3.2 保存选课记录
                boolean flag = tablesService.saveChooseCourseStatus(chooseCourseId);
                if (!flag){
                    XueChengPlusException.cast("保存选课记录失败");
                }
            }
        }
    }
}

MyCourseTablesService接口中定义方法更新选课记录的选课状态,同时向我的课程表添加选课记录(之前添加免费课程的时候已经实现过了)

public interface MyCourseTablesService {
	/**
     * 保存选课成功状态
     * @param chooseCourseId
     * @return
     */
    public boolean saveChooseCourseSuccess(String chooseCourseId);
}

@Slf4j
@Service
public class MyCourseTablesServiceImpl implements MyCourseTablesService {
    @Override
    @Transactional
    public boolean saveChooseCourseStatus(String chooseCourseId) {
        // 1. 根据选课id,查询对应的选课记录
        XcChooseCourse chooseCourse = chooseCourseMapper.selectById(chooseCourseId);
        if (chooseCourse == null) {
            log.error("接收到购买课程的消息,根据选课id未查询到课程,选课id:{}", chooseCourseId);
            return false;
        }
        // 2. 选课状态为未支付时,更新选课状态为选课成功
        if ("701002".equals(chooseCourse.getStatus())) {
            chooseCourse.setStatus("701001");
            int update = chooseCourseMapper.updateById(chooseCourse);
            if (update <= 0) {
                log.error("更新选课记录失败:{}", chooseCourse);
            }
        }
        // 3. 向我的课程表添加记录
        addCourseTables(chooseCourse);
        return true;
    }
}
public XcCourseTables addCourseTabls(XcChooseCourse xcChooseCourse){
    //选课成功了才可以向我的课程表添加
    String status = xcChooseCourse.getStatus();
    if(!"701001".equals(status)){
        XueChengPlusException.cast("选课没有成功无法添加到课程表");
    }
    XcCourseTables xcCourseTables = getXcCourseTables(xcChooseCourse.getUserId(), xcChooseCourse.getCourseId());
    if(xcCourseTables!=null){
        return xcCourseTables;
    }

    xcCourseTables = new XcCourseTables();
    BeanUtils.copyProperties(xcChooseCourse,xcCourseTables);
    xcCourseTables.setChooseCourseId(xcChooseCourse.getId());//记录选课表的逐渐
    xcCourseTables.setCourseType(xcChooseCourse.getOrderType());//选课类型
    xcCourseTables.setUpdateDate(LocalDateTime.now());
    int insert = courseTablesMapper.insert(xcCourseTables);
    if(insert<=0){
        XueChengPlusException.cast("添加我的课程表失败");
    }

    return xcCourseTables;
}

通知支付结果测试

选择一门已发布的收费课程,如果在我的课程表存储则删除记录及其相关的选课记录及订单记录信息

  • 进入课程详细页面,点击马上学习生成二维码进行支付
  • 支付完成点击“支付完成”,观察订单服务控制台是否发送消息(使用内网穿透工具)
  • 观察学习中心服务控制台是否接收到消息
  • 观察数据库中的消息表的相应记录是否已删除,我的选课表中是否有对应的选课记录

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1513494.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

git commit --amend

git commit --amend 1. 修改已经输入的commit 1. 修改已经输入的commit 我已经输入了commit fix: 删除无用代码 然后现在表示不准确&#xff0c;然后我通过命令git commit --amend修改commit

Python语言基础与应用-北京大学-陈斌-P40-39-基本扩展模块/上机练习:计时和文件处理-给算法计时-上机代码

Python语言基础与应用-北京大学-陈斌-P40-39-基本扩展模块/上机练习&#xff1a;计时和文件处理-给算法计时-上机代码 # 基本扩展模块训练 给算法计时 def factorial(number): # 自定义一个计算阶乘的函数i 1result 1while i < number:result result * ii 1return(resul…

推动新质生产力,机器人技术的黄金时代——张驰咨询

在这个不断进步和变化的时代中&#xff0c;张驰咨询与各个行业的领先企业紧密合作&#xff0c;致力于构建新一代生产力的未来蓝图。张驰咨询深刻理解各个行业的发展态势与独特性&#xff0c;通过深入分析企业遇到的挑战&#xff0c;张驰咨询提供定制化的解决方案&#xff0c;旨…

【代码随想录 | 数组 01】二分查找

文章目录 1.二分查找1.1题目1.2思路&#xff08;核心&#xff1a;区间的定义&#xff09;1.3左闭右闭1.4左闭右开1.5总结 1.二分查找 1.1题目 704.二分查找—力扣题目链接 题目&#xff1a;给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 …

Java毕业设计-基于spring boot开发的实习管理系统-毕业论文+答辩ppt(附源代码+演示视频)

文章目录 前言一、毕设成果演示&#xff08;源代码在文末&#xff09;二、毕设摘要展示1.开发说明2.需求分析3、系统功能结构 三、系统实现展示1、前台功能模块2、后台功能模块2.1 管理员功能2.2 教师功能2.3 学生功能2.4 实习单位功能 四、毕设内容和源代码获取总结 Java毕业设…

外贸产品再好,推广不出去也白搭!

外贸产品虽然质量和价格都很好&#xff0c;但是如果没有推广的支持&#xff0c;就很难在市场上打开局面。好的产品需要好的营销&#xff0c;推广不仅仅是宣传产品的重要手段&#xff0c;更是提高产品知名度和市场占有率的关键。只有通过有效的推广措施&#xff0c;才能让产品赢…

(Linux学习九)管道、重定向介绍

FD:文件描述符。 0,1,2,3&#xff0c;&#xff0c;&#xff0c;。进程打开文件所用。 0标准输入 1 标准输出 2 标准错误输出 3普通文件 一、管道 | 命令 | tee | xargs | 命令1的输出&#xff0c;作为命令2的输入&#xff0c;命令2的输出作为命令3的输入 | tee 三通&#xff…

【Python】random库

专栏文章索引&#xff1a;Python 原文章&#xff1a;Python中random函数用法整理_python random-CSDN博客 目录 1.random.random() 2.random.uniform(a, b) 3.random.randint(a, b) 4.random.randrange([start], stop[, step]) 5. random.choice() 6. random.shuffle(x[,…

工业园区智慧水电设备管控系统

在现代工业园区中&#xff0c;水电设备的管控系统起着至关重要的作用。这些系统不仅仅是简单的机械装置&#xff0c;它们更是一种智慧的结合&#xff0c;为工业生产提供了可靠的保障和高效的管理。让我们一起来探索工业园区智慧水电设备管控系统的奥秘。 我们来看看水电设备的…

电脑内存条

目录 一&#xff0c;电脑内存条是什么1&#xff0c;定义2&#xff0c;作用 二&#xff0c;怎样查看自己电脑内存条1&#xff0c;window运行管理器2&#xff0c;软件3&#xff0c;intel官网4&#xff0c;计算机命令行模式 三&#xff0c;选择电脑内存条1&#xff0c;选择ddr2&am…

如何在WordPress网站上设置多语言展示

在今天的全球化世界中&#xff0c;拥有多语言网站对于吸引更广泛的受众至关重要。前不就我们遇到Hostease的客户咨询我们的在线客服&#xff0c;他想要对他的wordpress网站支持多语言。我们提供给客户可以尝试以下的插件来支持多语言。 在本教程中&#xff0c;我们将逐步介绍如…

蓝桥杯-Python组(一)

1. 冒泡排序 算法步骤&#xff1a; 比较相邻元素&#xff0c;如果第一个大于第二个则交换从左往右遍历一遍&#xff0c;重复第一步&#xff0c;可以保证最大的元素在最后面重复上述操作&#xff0c;可以得到第二大、第三大、… n int(input()) a list(map(int, input()…

a-table:实现跨域多选功能——基础积累

table组件跨页多选功能&#xff1a; html部分的代码&#xff1a; <a-tablesize"small"style"margin-top: 10px"rowKey"id":columns"columns":dataSource"dataSource":pagination"pagination":loading"l…

从菜鸟到大师!年薪20W的c++ QT开发工程师需要懂哪些技术?

如今Qt的知识也变得非常广泛和复杂&#xff0c;学习起来同样具有一定的挑战。对于Qt从业者来说&#xff0c;有两个主要层面&#xff1a;一个是深入理解Qt框架和基础知识&#xff0c;另一个是具备丰富的工程经验。 还不熟悉的朋友&#xff0c;这里可以先领取一份Qt开发必备技术…

亏本买卖愿意做?济公活佛教你活!向前一步是正轨!——早读(逆天打工人爬取热门微信文章解读)

你愿意亏本卖你的产品吗&#xff1f; 引言Python 代码第一篇 人民日报 【夜读】亲爱的娃娃们&#xff0c;这篇演讲献给春天&#xff0c;献给你第二篇 人民日报 来啦 新闻早班车要闻社会政策 结尾 人生不止眼前的山顶&#xff0c;更在沿途攀登的风景 不妨怀揣“上一山&#xff0…

fs模块 文件写入 之 追加写入

文件的同步、异步追加写入&#xff1a; 一、异步追加 &#xff08;1&#xff09;语法&#xff1a;fs.appendFile(path,data,[options],callback(data,err)) &#xff08;2&#xff09;操作 1》引入fs模块 const fsrequire(fs); 2》调用appendFile fs.appendFile(./我可以…

18、设计模式之解释器模式(Interpreter)

一、什么是解释器模式 解释器模式是一种行为型设计模式。解释器模式就像是一种自定义语言&#xff0c;我们可以定义该语言的语法规则&#xff0c;然后从中解析出具体的命令或表达式&#xff0c;最终执行相应的操作。 eg&#xff1a;这种模式比较冷门&#xff0c;不怎么使用。 …

【安卓】Android开发入门 你的第一个apk应用

本文介绍如何写一个入门的安卓apk应用, 以嵌套一个网页为例。 开发ide&#xff1a;Android studio 语言&#xff1a;Kotlin tips: 最好别下载新版本的 Android studio &#xff0c;因为新版的界面有所改动 遇到问题去网上搜 新手刚入门可能界面都找不到在哪里&#xff1b;其次…

UI设计中的图标的分类,功能性图标

图标的分类 既然知道了图标的作用和重要性&#xff0c;那么接下来&#xff0c;就要进一步了解在工作中我们要设计哪些图标。图标可以划分成三种大类:功能性图标、装饰性图标、启动图标。 功能性图标 功能图标是具有指代意义且具有功能标识的图形&#xff0c;它不仅是一种图形&a…

vue.js 页面中设置多个swiper

效果&#xff1a; 设置主要设置了 动态的 包含类、 左右按钮的类 <template><div class"swiper-container_other"><!-- 右侧按钮 --><div :class"[(id)?swiper-button-nextid:swiper-button-next, swiper-button-next]"></div…