RocketMQ学习(二)

news2024/11/24 20:30:29

文章目录

  • 1. 案例介绍
    • 1.1 业务分析
      • 1)下单
      • 2)支付
    • 1.2 问题分析
      • 问题1
        • 如何保证数据的完整性?
        • 使用MQ保证在下单失败后系统数据的完整性
      • 问题2
        • 如何处理第三方支付平台的异步通知
        • 通过MQ进行数据分发,提高系统处理性能
  • 2. 技术分析
    • 2.1 技术选型
    • 2.2 SpringBoot整合RocketMQ
      • 2.2.1 消息生产者
        • 1)添加依赖
        • 2)配置文件
        • 3)启动类
        • 4)测试类
      • 2.2.2 消息消费者
        • 1)添加依赖
        • 2)配置文件
        • 3)启动类
        • 4)消息监听器
    • 2.3 SpringBoot整合Dubbo
      • 2.3.1 搭建Zookeeper集群
        • 1)准备工作
        • 2)配置集群
        • 3)启动集群
        • zookeeper安装启动配置示例
      • 2.3.2 RPC服务接口
      • 2.3.3 服务提供者
        • 1)添加依赖
        • 2)配置文件
        • 3)启动类
        • 4)服务实现
      • dubbo-admin管理平台搭建
        • 示例
      • 2.3.4 服务消费者
        • 1)添加依赖
        • 2)配置文件
        • 3)启动类
        • 4)Controller
  • 3. 环境搭建
    • 3.1 数据库
      • 1)优惠券表
      • 2)商品表
      • 3)订单表
      • 4)订单商品日志表
      • 5)用户表
      • 6)用户余额日志表
      • 7)订单支付表
      • 8)MQ消息生产表
      • shop.sql
    • 3.2 项目初始化
      • 3.1.1 工程浏览
      • 3.1.2 工程关系
    • 3.3 Mybatis逆向工程使用
      • 1)代码生成
      • 2)代码导入
    • 3.4 公共类介绍
  • 4. 下单业务
    • ==流程简介*==
    • ==下单业务流程图==
    • 4.1 下单基本流程
      • 1)接口定义
      • 2)业务类实现
      • 3)校验订单
      • 4)生成预订单
      • 9)小结
    • 4.2 失败补偿机制
      • 4.2.1 ==消息发送方==
      • 4.2.2 消费接收方
        • 1)==回退库存*==
        • 2)回退优惠券
        • 3)==回退余额==
        • 4)取消订单
    • 4.3 测试
      • 1)准备测试环境
      • 2)准备测试数据
      • 3)测试下单成功流程
      • 4)测试下单失败流程
  • 5. 支付业务
    • ==流程简介*==
    • 5.1 创建支付订单
    • 5.2 支付回调
      • 5.2.1 流程分析
      • 5.2.2 代码实现
        • 线程池优化消息发送逻辑
      • 5.2.3 处理消息
        • 1)配置RocketMQ属性值
        • 2)消费消息
  • 6. 整体联调
    • 6.1 准备工作
      • 1)配置RestTemplate类
      • 2)配置请求地址
    • 6.2 下单测试
    • 6.3 支付测试

1. 案例介绍

1.1 业务分析

模拟电商网站购物场景中的【下单】和【支付】业务

1)下单

在这里插入图片描述

  1. 用户请求订单系统下单
  2. 订单系统通过RPC调用订单服务下单
  3. 订单服务调用优惠券服务,扣减优惠券
  4. 订单服务调用调用库存服务,校验并扣减库存
  5. 订单服务调用用户服务,扣减用户余额
  6. 订单服务完成确认订单

2)支付

在这里插入图片描述

  1. 用户请求支付系统
  2. 支付系统调用第三方支付平台API进行发起支付流程
  3. 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统
  4. 支付系统调用订单服务修改订单状态
  5. 支付系统调用积分服务添加积分
  6. 支付系统调用日志服务记录日志

1.2 问题分析

问题1

用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、优惠券、余额进行回退。

如何保证数据的完整性?

在这里插入图片描述

使用MQ保证在下单失败后系统数据的完整性

在这里插入图片描述

问题2

用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。

如何处理第三方支付平台的异步通知

商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?

在这里插入图片描述

通过MQ进行数据分发,提高系统处理性能

在这里插入图片描述

2. 技术分析

2.1 技术选型

  • SpringBoot
  • Dubbo
  • Zookeeper
  • RocketMQ
  • Mysql

在这里插入图片描述

2.2 SpringBoot整合RocketMQ

下载rocketmq-spring项目

将rocketmq-spring安装到本地仓库(不用安装,mavenrepository中央仓库中有)

mvn install -Dmaven.skip.test=true

2.2.1 消息生产者

1)添加依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

<properties>
    <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq-spring-boot-starter-version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.6</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>
2)配置文件
# application.properties
rocketmq.name-server=192.168.134.3:9876;192.168.134.4:9876
rocketmq.producer.group=my-group
3)启动类
@SpringBootApplication
public class MQProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQSpringBootApplication.class);
    }
}
4)测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MQSpringBootApplication.class})
public class ProducerTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void test1(){
        rocketMQTemplate.convertAndSend("zzhua-springboot-rocketmq",
                                        "hello springboot rocketmq");
    }
}

编写好生产者后,运行测试方法即可。

2.2.2 消息消费者

1)添加依赖

同消息生产者

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

<properties>
    <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>

<dependencies>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq-spring-boot-starter-version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.6</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>
2)配置文件

同消息生产者

# application.properties
rocketmq.name-server=192.168.134.3:9876;192.168.134.4:9876
rocketmq.consumer.group=zzhua-group
3)启动类
@SpringBootApplication
public class MQConsumerApplication {
    
    public static void main(String[] args) {
        
        SpringApplication.run(MQSpringBootApplication.class);
    }
    
}
4)消息监听器
@Slf4j
@Component
@RocketMQMessageListener(topic = "zzhua-springboot-rocketmq",
                         consumeMode = ConsumeMode.CONCURRENTLY,
                         consumerGroup = "${rocketmq.consumer.group}")
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:" + message);
    }
}

启动消费者应用,接收到了生产者发送的消息。

2.3 SpringBoot整合Dubbo

下载dubbo-spring-boot-starter依赖包

dubbo-spring-boot-starter安装到本地仓库

mvn install -Dmaven.skip.test=true

在这里插入图片描述

2.3.1 搭建Zookeeper集群

1)准备工作
  1. 安装JDK
  2. 将Zookeeper上传到服务器
  3. 解压Zookeeper,并创建data目录,将conf下的zoo_sample.cfg文件改名为zoo.cfg
  4. 建立/user/local/zookeeper-cluster,将解压后的Zookeeper复制到以下三个目录
/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3
  1. 配置每一个 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183

    修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

clientPort=2181
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data

​ 修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data

​ 修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
2)配置集群
  1. 在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是 1、2、3 。这个文件就是记录每个服务器的 ID

  2. 在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表。

    集群服务器 IP 列表如下

server.1=192.168.25.140:2881:3881
server.2=192.168.25.140:2882:3882
server.3=192.168.25.140:2883:3883

解释:server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口

3)启动集群

启动集群就是分别启动每个实例。

在这里插入图片描述

检查zookeeper启动状态

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status

在这里插入图片描述

zookeeper安装启动配置示例

上传zookeeper压缩包到服务器,并解压到/usr/local/zookeeper-cluster

在这里插入图片描述

在/usr/local/zookeeper-cluster/zookeeper-3.4.6创建data文件夹,在文件夹中创建myid文件,并且使用vim写入1

在这里插入图片描述

在/usr/local/zookeeper-cluster/zookeeper-3.4.6/conf/ 下的zoo_sample.cfg拷贝一份,名为zoo.cfg文件

在这里插入图片描述

修改zoo.cfg配置,注意dataDir下的目录中的zookeeper-1与clientPort的2181

在这里插入图片描述

将修改好的zookeeper-3.4.6复制3份,名为zookeeper-1,zookeeper-2,zookeeper-3,需要只需要修改zookeeper-2与zookeeper-3这2个文件夹下的data/myid文件和conf/zoo.cfg配置文件中的端口和数据目录位置

在这里插入图片描述

修改zookeeper-2下的data/myid文件和conf/zoo.cfg配置文件中的端口和数据目录位置,同理修改zookeeper-3文件夹下的data/myid文件和conf/zoo.cfg配置文件

在这里插入图片描述

在这里插入图片描述

在bin所在目录启动zk,会默认在运行目录下产生zookeeper.out日志文件。在全部启动之后,查看zk的状态

在这里插入图片描述

2.3.2 RPC服务接口

public interface IUserService {
    public String sayHello(String name);
}

2.3.3 服务提供者

1)添加依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

<dependencies>
    
    <!--dubbo-->
    <dependency>
        <groupId>com.alibaba.spring.boot</groupId>
        <artifactId>dubbo-spring-boot-starter</artifactId>
        <version>2.0.0</version>
    </dependency>
    
	<!--spring-boot-stater-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>log4j-to-slf4j</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    
	<!--zookeeper-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.9</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    
	<!--API-->
    <dependency>
        <groupId>com.itheima.demo</groupId>
        <artifactId>dubbo-api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

</dependencies>
2)配置文件
# application.properties
spring.application.name=dubbo-demo-provider
spring.dubbo.application.id=dubbo-demo-provider
spring.dubbo.application.name=dubbo-demo-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
 # 服务调用端口
spring.dubbo.protocol.port=20880
3)启动类
@EnableDubboConfiguration
@SpringBootApplication
public class ProviderBootstrap {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(ProviderBootstrap.class,args);
    }

}
4)服务实现
@Component
@Service(interfaceClass = IUserService.class) // dubbo的@Service注解
public class UserServiceImpl implements IUserService{
    @Override
    public String sayHello(String name) {
        return "hello:"+name;
    }
}

dubbo-admin管理平台搭建

将dubbo-admin.war上传到与zookeeper同一服务器,复制粘贴到tomcat的webapps下,启动tomcat,然后访问:服务器ip:8080/dubbo-admin

示例

将dubbo-admin.war复制到tomcat的webapps目录下,如果zookeeper与dubbo-admin不在同一服务器下,需要修改dubbo-admin.war中的WEB-INF/dubbo.properties文件的zookeeper的ip地址,然后启动tomcat即可如下访问dubbo-admin,可以看到服务已经注册上来了。

在这里插入图片描述

在这里插入图片描述

2.3.4 服务消费者

1)添加依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--dubbo-->
    <dependency>
        <groupId>com.alibaba.spring.boot</groupId>
        <artifactId>dubbo-spring-boot-starter</artifactId>
        <version>2.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>log4j-to-slf4j</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!--zookeeper-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.9</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!--API-->
    <dependency>
        <groupId>com.itheima.demo</groupId>
        <artifactId>dubbo-api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

</dependencies>
2)配置文件
# application.properties
spring.application.name=dubbo-demo-consumer
spring.dubbo.application.name=dubbo-demo-consumer
spring.dubbo.application.id=dubbo-demo-consumer
    spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
3)启动类
@EnableDubboConfiguration
@SpringBootApplication
public class ConsumerBootstrap {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerBootstrap.class);
    }
}
4)Controller
@RestController
@RequestMapping("/user")
public class UserController {

    @Reference // 使用dubbo的@Reference注解
    private IUserService userService;

    @RequestMapping("/sayHello")
    public String sayHello(String name){
        return userService.sayHello(name);
    }

}

可以看到消费者也注册上来了

在这里插入图片描述

3. 环境搭建

3.1 数据库

1)优惠券表

FieldTypeComment
coupon_idbigint(50) NOT NULL优惠券ID
coupon_pricedecimal(10,2) NULL优惠券金额
user_idbigint(50) NULL用户ID
order_idbigint(32) NULL订单ID
is_usedint(1) NULL是否使用 0未使用 1已使用
used_timetimestamp NULL使用时间

2)商品表

FieldTypeComment
goods_idbigint(50) NOT NULL主键
goods_namevarchar(255) NULL商品名称
goods_numberint(11) NULL商品库存
goods_pricedecimal(10,2) NULL商品价格
goods_descvarchar(255) NULL商品描述
add_timetimestamp NULL添加时间

3)订单表

FieldTypeComment
order_idbigint(50) NOT NULL订单ID
user_idbigint(50) NULL用户ID
order_statusint(1) NULL订单状态 0未确认 1已确认 2已取消 3无效 4退款
pay_statusint(1) NULL支付状态 0未支付 1支付中 2已支付
shipping_statusint(1) NULL发货状态 0未发货 1已发货 2已退货
addressvarchar(255) NULL收货地址
consigneevarchar(255) NULL收货人
goods_idbigint(50) NULL商品ID
goods_numberint(11) NULL商品数量
goods_pricedecimal(10,2) NULL商品价格
goods_amountdecimal(10,0) NULL商品总价
shipping_feedecimal(10,2) NULL运费
order_amountdecimal(10,2) NULL订单价格
coupon_idbigint(50) NULL优惠券ID
coupon_paiddecimal(10,2) NULL优惠券
money_paiddecimal(10,2) NULL已付金额
pay_amountdecimal(10,2) NULL支付金额
add_timetimestamp NULL创建时间
confirm_timetimestamp NULL订单确认时间
pay_timetimestamp NULL支付时间

4)订单商品日志表

FieldTypeComment
goods_idint(11) NOT NULL商品ID
order_idvarchar(32) NOT NULL订单ID
goods_numberint(11) NULL库存数量
log_timedatetime NULL记录时间

5)用户表

FieldTypeComment
user_idbigint(50) NOT NULL用户ID
user_namevarchar(255) NULL用户姓名
user_passwordvarchar(255) NULL用户密码
user_mobilevarchar(255) NULL手机号
user_scoreint(11) NULL积分
user_reg_timetimestamp NULL注册时间
user_moneydecimal(10,0) NULL用户余额

6)用户余额日志表

FieldTypeComment
user_idbigint(50) NOT NULL用户ID
order_idbigint(50) NOT NULL订单ID
money_log_typeint(1) NOT NULL日志类型 1订单付款 2 订单退款
use_moneydecimal(10,2) NULL操作金额
create_timetimestamp NULL日志时间

7)订单支付表

FieldTypeComment
pay_idbigint(50) NOT NULL支付编号
order_idbigint(50) NULL订单编号
pay_amountdecimal(10,2) NULL支付金额
is_paidint(1) NULL是否已支付 1否 2是

8)MQ消息生产表

FieldTypeComment
idvarchar(100) NOT NULL主键
group_namevarchar(100) NULL生产者组名
msg_topicvarchar(100) NULL消息主题
msg_tagvarchar(100) NULLTag
msg_keyvarchar(100) NULLKey
msg_bodyvarchar(500) NULL消息内容
msg_statusint(1) NULL0:未处理;1:已经处理
create_timetimestamp NOT NULL记录时间

###9)MQ消息消费表

FieldTypeComment
msg_idvarchar(50) NULL消息ID
group_namevarchar(100) NOT NULL消费者组名
msg_tagvarchar(100) NOT NULLTag
msg_keyvarchar(100) NOT NULLKey
msg_bodyvarchar(500) NULL消息体
consumer_statusint(1) NULL0:正在处理;1:处理成功;2:处理失败
consumer_timesint(1) NULL消费次数
consumer_timestamptimestamp NULL消费时间
remarkvarchar(500) NULL备注

shop.sql

/*
SQLyog Ultimate v8.32 
MySQL - 5.5.49 : Database - trade
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`trade` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `trade`;

/*Table structure for table `trade_coupon` */

DROP TABLE IF EXISTS `trade_coupon`;

CREATE TABLE `trade_coupon` (
  `coupon_id` bigint(50) NOT NULL COMMENT '优惠券ID',
  `coupon_price` decimal(10,2) DEFAULT NULL COMMENT '优惠券金额',
  `user_id` bigint(50) DEFAULT NULL COMMENT '用户ID',
  `order_id` bigint(32) DEFAULT NULL COMMENT '订单ID',
  `is_used` int(1) DEFAULT NULL COMMENT '是否使用 0未使用 1已使用',
  `used_time` timestamp NULL DEFAULT NULL COMMENT '使用时间',
  PRIMARY KEY (`coupon_id`),
  KEY `FK_trade_coupon` (`user_id`),
  KEY `FK_trade_coupon2` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_coupon` */

/*Table structure for table `trade_goods` */

DROP TABLE IF EXISTS `trade_goods`;

CREATE TABLE `trade_goods` (
  `goods_id` bigint(50) NOT NULL AUTO_INCREMENT,
  `goods_name` varchar(255) DEFAULT NULL COMMENT '商品名称',
  `goods_number` int(11) DEFAULT NULL COMMENT '商品库存',
  `goods_price` decimal(10,2) DEFAULT NULL COMMENT '商品价格',
  `goods_desc` varchar(255) DEFAULT NULL COMMENT '商品描述',
  `add_time` timestamp NULL DEFAULT NULL COMMENT '添加时间',
  PRIMARY KEY (`goods_id`)
) ENGINE=InnoDB AUTO_INCREMENT=345959443973935105 DEFAULT CHARSET=utf8;

/*Data for the table `trade_goods` */

insert  into `trade_goods`(`goods_id`,`goods_name`,`goods_number`,`goods_price`,`goods_desc`,`add_time`) values (345959443973935104,'华为P30',999,'5000.00','夜间拍照更美','2019-07-09 20:38:00');

/*Table structure for table `trade_goods_number_log` */

DROP TABLE IF EXISTS `trade_goods_number_log`;

CREATE TABLE `trade_goods_number_log` (
  `goods_id` bigint(50) NOT NULL COMMENT '商品ID',
  `order_id` bigint(50) NOT NULL COMMENT '订单ID',
  `goods_number` int(11) DEFAULT NULL COMMENT '库存数量',
  `log_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`goods_id`,`order_id`),
  KEY `FK_trade_goods_number_log2` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_goods_number_log` */

/*Table structure for table `trade_mq_consumer_log` */

DROP TABLE IF EXISTS `trade_mq_consumer_log`;

CREATE TABLE `trade_mq_consumer_log` (
  `msg_id` varchar(50) DEFAULT NULL,
  `group_name` varchar(100) NOT NULL,
  `msg_tag` varchar(100) NOT NULL,
  `msg_key` varchar(100) NOT NULL,
  `msg_body` varchar(500) DEFAULT NULL,
  `consumer_status` int(1) DEFAULT NULL COMMENT '0:正在处理;1:处理成功;2:处理失败',
  `consumer_times` int(1) DEFAULT NULL,
  `consumer_timestamp` timestamp NULL DEFAULT NULL,
  `remark` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`group_name`,`msg_tag`,`msg_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_mq_consumer_log` */

/*Table structure for table `trade_mq_producer_temp` */

DROP TABLE IF EXISTS `trade_mq_producer_temp`;

CREATE TABLE `trade_mq_producer_temp` (
  `id` varchar(100) NOT NULL,
  `group_name` varchar(100) DEFAULT NULL,
  `msg_topic` varchar(100) DEFAULT NULL,
  `msg_tag` varchar(100) DEFAULT NULL,
  `msg_key` varchar(100) DEFAULT NULL,
  `msg_body` varchar(500) DEFAULT NULL,
  `msg_status` int(1) DEFAULT NULL COMMENT '0:未处理;1:已经处理',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_mq_producer_temp` */

/*Table structure for table `trade_order` */

DROP TABLE IF EXISTS `trade_order`;

CREATE TABLE `trade_order` (
  `order_id` bigint(50) NOT NULL COMMENT '订单ID',
  `user_id` bigint(50) DEFAULT NULL COMMENT '用户ID',
  `order_status` int(1) DEFAULT NULL COMMENT '订单状态 0未确认 1已确认 2已取消 3无效 4退款',
  `pay_status` int(1) DEFAULT NULL COMMENT '支付状态 0未支付 1支付中 2已支付',
  `shipping_status` int(1) DEFAULT NULL COMMENT '发货状态 0未发货 1已发货 2已收货',
  `address` varchar(255) DEFAULT NULL COMMENT '收货地址',
  `consignee` varchar(255) DEFAULT NULL COMMENT '收货人',
  `goods_id` bigint(50) DEFAULT NULL COMMENT '商品ID',
  `goods_number` int(11) DEFAULT NULL COMMENT '商品数量',
  `goods_price` decimal(10,2) DEFAULT NULL COMMENT '商品价格',
  `goods_amount` decimal(10,0) DEFAULT NULL COMMENT '商品总价',
  `shipping_fee` decimal(10,2) DEFAULT NULL COMMENT '运费',
  `order_amount` decimal(10,2) DEFAULT NULL COMMENT '订单价格',
  `coupon_id` bigint(50) DEFAULT NULL COMMENT '优惠券ID',
  `coupon_paid` decimal(10,2) DEFAULT NULL COMMENT '优惠券',
  `money_paid` decimal(10,2) DEFAULT NULL COMMENT '已付金额',
  `pay_amount` decimal(10,2) DEFAULT NULL COMMENT '支付金额',
  `add_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
  `confirm_time` timestamp NULL DEFAULT NULL COMMENT '订单确认时间',
  `pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
  PRIMARY KEY (`order_id`),
  KEY `FK_trade_order` (`user_id`),
  KEY `FK_trade_order2` (`goods_id`),
  KEY `FK_trade_order3` (`coupon_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_order` */

/*Table structure for table `trade_pay` */

DROP TABLE IF EXISTS `trade_pay`;

CREATE TABLE `trade_pay` (
  `pay_id` bigint(50) NOT NULL COMMENT '支付编号',
  `order_id` bigint(50) DEFAULT NULL COMMENT '订单编号',
  `pay_amount` decimal(10,2) DEFAULT NULL COMMENT '支付金额',
  `is_paid` int(1) DEFAULT NULL COMMENT '是否已支付 1否 2是',
  PRIMARY KEY (`pay_id`),
  KEY `FK_trade_pay` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_pay` */

/*Table structure for table `trade_user` */

DROP TABLE IF EXISTS `trade_user`;

CREATE TABLE `trade_user` (
  `user_id` bigint(50) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `user_name` varchar(255) DEFAULT NULL COMMENT '用户姓名',
  `user_password` varchar(255) DEFAULT NULL COMMENT '用户密码',
  `user_mobile` varchar(255) DEFAULT NULL COMMENT '手机号',
  `user_score` int(11) DEFAULT NULL COMMENT '积分',
  `user_reg_time` timestamp NULL DEFAULT NULL COMMENT '注册时间',
  `user_money` decimal(10,0) DEFAULT NULL COMMENT '用户余额',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=345963634385633281 DEFAULT CHARSET=utf8;

/*Data for the table `trade_user` */

insert  into `trade_user`(`user_id`,`user_name`,`user_password`,`user_mobile`,`user_score`,`user_reg_time`,`user_money`) values (345963634385633280,'刘备','123L','18888888888L',100,'2019-07-09 13:37:03','900');

/*Table structure for table `trade_user_money_log` */

DROP TABLE IF EXISTS `trade_user_money_log`;

CREATE TABLE `trade_user_money_log` (
  `user_id` bigint(50) NOT NULL COMMENT '用户ID',
  `order_id` bigint(50) NOT NULL COMMENT '订单ID',
  `money_log_type` int(1) NOT NULL COMMENT '日志类型 1订单付款 2 订单退款',
  `use_money` decimal(10,2) DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL COMMENT '日志时间',
  PRIMARY KEY (`user_id`,`order_id`,`money_log_type`),
  KEY `FK_trade_user_money_log2` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `trade_user_money_log` */

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

3.2 项目初始化

shop系统基于Maven进行项目管理

3.1.1 工程浏览

在这里插入图片描述

  • 父工程:shop-parent
  • 订单系统:shop-order-web
  • 支付系统:shop-pay-web
  • 优惠券服务:shop-coupon-service
  • 订单服务:shop-order-service
  • 支付服务:shop-pay-service
  • 商品服务:shop-goods-service
  • 用户服务:shop-user-service
  • 实体类:shop-pojo
  • 持久层:shop-dao
  • 接口层:shop-api
  • 工具工程:shop-common

共12个系统

3.1.2 工程关系

在这里插入图片描述

3.3 Mybatis逆向工程使用

1)代码生成

使用Mybatis逆向工程针对数据表生成CURD持久层代码

2)代码导入

  • 将实体类导入到shop-pojo工程
  • 在服务层工程中导入对应的Mapper类和对应配置文件

3.4 公共类介绍

  • ID生成器

    IDWorker:Twitter雪花算法

  • 异常处理类

    CustomerException:自定义异常类

    CastException:异常抛出类

  • 常量类

    ShopCode:系统状态类

  • 响应实体类

    Result:封装响应状态和响应信息

4. 下单业务

流程简介*

用户发起下单操作,后台先保存预订单,然后通过RPC调用库存服务,优惠券服务,用户服务完成业务,如果任何1个服务调用失败,那么在处理异常的时候,发送1条消息到RocketMQ,其它服务则监听RocketMQ中的消息,做确认订单失败的业务,回退该预订单的数据。而消息的消费方需要做幂等性处理

下单业务流程图

在这里插入图片描述

4.1 下单基本流程

1)接口定义

  • IOrderService
public interface IOrderService {
    /**
     * 确认订单
     * @param order
     * @return Result
     */
    Result confirmOrder(TradeOrder order);
}

2)业务类实现

@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {

    @Override
    public Result confirmOrder(TradeOrder order) {
        //1.校验订单
       
        //2.生成预订单
       
        try {
            //3.扣减库存
            
            //4.扣减优惠券
           
            //5.使用余额
           
            //6.确认订单
            
            //7.返回成功状态
           
        } catch (Exception e) {
            //1.确认订单失败,发送消息
            
            //2.返回失败状态
        }

    }
}

3)校验订单

在这里插入图片描述

private void checkOrder(TradeOrder order) {
        //1.校验订单是否存在
        if(order==null){
            CastException.cast(ShopCode.SHOP_ORDER_INVALID);
        }
        //2.校验订单中的商品是否存在
        TradeGoods goods = goodsService.findOne(order.getGoodsId());
        if(goods==null){
            CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
        }
        //3.校验下单用户是否存在
        TradeUser user = userService.findOne(order.getUserId());
        if(user==null){
            CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
        }
        //4.校验商品单价是否合法
        if(order.getGoodsPrice().compareTo(goods.getGoodsPrice())!=0){
            CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
        }
        //5.校验订单商品数量是否合法
        if(order.getGoodsNumber()>=goods.getGoodsNumber()){
            CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
        }

        log.info("校验订单通过");
}

4)生成预订单

在这里插入图片描述

private Long savePreOrder(TradeOrder order) {

    //1.设置订单状态为不可见
    order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());

    //2.订单ID
    order.setOrderId(idWorker.nextId());

    //核算运费是否正确
    BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
    if (order.getShippingFee().compareTo(shippingFee) != 0) {
        CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
    }

    //3.计算订单总价格是否正确
    BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
    orderAmount.add(shippingFee);
    if (orderAmount.compareTo(order.getOrderAmount()) != 0) {
        CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
    }

    //4.判断优惠券信息是否合法
    Long couponId = order.getCouponId();
    if (couponId != null) {
        TradeCoupon coupon = couponService.findOne(couponId);
        //优惠券不存在
        if (coupon == null) {
            CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);
        }
        //优惠券已经使用
        if ((ShopCode.SHOP_COUPON_ISUSED.getCode().toString())
            .equals(coupon.getIsUsed().toString())) {
            CastException.cast(ShopCode.SHOP_COUPON_INVALIED);
        }
        order.setCouponPaid(coupon.getCouponPrice());
    } else {
        order.setCouponPaid(BigDecimal.ZERO);
    }

    //5.判断余额是否正确
    BigDecimal moneyPaid = order.getMoneyPaid();
    if (moneyPaid != null) {
        //比较余额是否大于0
        int r = order.getMoneyPaid().compareTo(BigDecimal.ZERO);
        //余额小于0
        if (r == -1) {
            CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);
        }
        //余额大于0
        if (r == 1) {
            //查询用户信息
            TradeUser user = userService.findOne(order.getUserId());
            if (user == null) {
                CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
            }
            //比较余额是否大于用户账户余额
            if (user.getUserMoney().compareTo(order.getMoneyPaid().longValue()) == -1) {
                CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
            }
            order.setMoneyPaid(order.getMoneyPaid());
        }
    } else {
        order.setMoneyPaid(BigDecimal.ZERO);
    }
    //计算订单支付总价
    order.setPayAmount(orderAmount.subtract(order.getCouponPaid())
                       .subtract(order.getMoneyPaid()));
    //设置订单添加时间
    order.setAddTime(new Date());

    //保存预订单
    int r = orderMapper.insert(order);

    if (ShopCode.SHOP_SUCCESS.getCode() != r) {
        CastException.cast(ShopCode.SHOP_ORDER_SAVE_ERROR);
    }

    log.info("订单:["+order.getOrderId()+"]预订单生成成功");

    return order.getOrderId();
}

###5)扣减库存

  • 通过dubbo调用商品服务完成扣减库存
private void reduceGoodsNum(TradeOrder order) {
    
    TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
    
    goodsNumberLog.setGoodsId(order.getGoodsId()); // 商品id
    goodsNumberLog.setOrderId(order.getOrderId()); // 订单id
    goodsNumberLog.setGoodsNumber(order.getGoodsNumber()); // 购买的商品数量
    
    // 通过dubbo的rpc远程调用商品服务实现 扣减库存
    Result result = goodsService.reduceGoodsNum(goodsNumberLog);
    
    if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
        CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
    }
    
    log.info("订单:["+order.getOrderId()+"]扣减库存["+order.getGoodsNumber()+"个]成功");
}
  • 商品服务GoodsService扣减库存
@Override
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
    
    if (goodsNumberLog == null ||
        goodsNumberLog.getGoodsNumber() == null ||
        goodsNumberLog.getOrderId() == null ||
        goodsNumberLog.getGoodsNumber() == null ||
        goodsNumberLog.getGoodsNumber().intValue() <= 0) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    
    TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
    
    if(goods.getGoodsNumber()<goodsNumberLog.getGoodsNumber()){
        //库存不足
        CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
    }
    
    // 减库存: 商品数量 = 原库存数量 - 购买的商品数量
    goods.setGoodsNumber(goods.getGoodsNumber()-goodsNumberLog.getGoodsNumber());
    goodsMapper.updateByPrimaryKey(goods);

    //记录库存操作日志	
    goodsNumberLog.setGoodsNumber(-(goodsNumberLog.getGoodsNumber()));
    goodsNumberLog.setLogTime(new Date());
    goodsNumberLogMapper.insert(goodsNumberLog);

    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),
                      ShopCode.SHOP_SUCCESS.getMessage());
}

###6)扣减优惠券

  • 通过dubbo完成扣减优惠券
private void changeCoponStatus(TradeOrder order) {
    //判断用户是否使用优惠券
    if (!StringUtils.isEmpty(order.getCouponId())) {
        //封装优惠券对象
        TradeCoupon coupon = couponService.findOne(order.getCouponId());
        coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
        coupon.setUsedTime(new Date());
        coupon.setOrderId(order.getOrderId());
        Result result = couponService.changeCouponStatus(coupon);
        //判断执行结果
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            //优惠券使用失败
            CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
        }
        log.info("订单:["+order.getOrderId()+"]使用扣减优惠券["+coupon.getCouponPrice()+"元]成功");
    }

}
  • 优惠券服务CouponService更改优惠券状态
@Override
public Result changeCouponStatus(TradeCoupon coupon) {
    try {
        //判断请求参数是否合法
        if (coupon == null || StringUtils.isEmpty(coupon.getCouponId())) {
            CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
        }
		//更新优惠券状态为已使用
        couponMapper.updateByPrimaryKey(coupon);
        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
    } catch (Exception e) {
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
}

###7)扣减用户余额

  • 通过用户服务完成扣减余额
private void reduceMoneyPaid(TradeOrder order) {
    //判断订单中使用的余额是否合法
    if (order.getMoneyPaid() != null 
        && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
        
        TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
        
        userMoneyLog.setOrderId(order.getOrderId());
        userMoneyLog.setUserId(order.getUserId());
        userMoneyLog.setUseMoney(order.getMoneyPaid());
        userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
        
        //扣减余额
        Result result = userService.changeUserMoney(userMoneyLog);
        
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
        }
        log.info("订单:["+order.getOrderId()+"扣减余额["+order.getMoneyPaid()+"元]成功]");
    }
}
  • 用户服务UserService,更新余额

在这里插入图片描述

@Override
public Result changeUserMoney(TradeUserMoneyLog userMoneyLog) {
    
    //判断请求参数是否合法
    if (userMoneyLog == null
        || userMoneyLog.getUserId() == null
        || userMoneyLog.getUseMoney() == null
        || userMoneyLog.getOrderId() == null
        || userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }

    //查询该订单是否存在付款记录
    TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
    userMoneyLogExample.createCriteria()
        .andUserIdEqualTo(userMoneyLog.getUserId())    // 用户id
        .andOrderIdEqualTo(userMoneyLog.getOrderId()); // 订单id
    
    int count = userMoneyLogMapper.countByExample(userMoneyLogExample);
    
    TradeUser tradeUser = new TradeUser();
    tradeUser.setUserId(userMoneyLog.getUserId());
    tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue());

    //判断余额操作行为
    //【付款操作】
    if (userMoneyLog.getMoneyLogType()
                    .equals(ShopCode.SHOP_USER_MONEY_PAID.getCode())) {
        
        //订单已经付款,则抛异常
        if (count > 0) {
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
        }
        
        //用户账户扣减余额
        userMapper.reduceUserMoney(tradeUser);
    }
    
    //【退款操作】
    else if(userMoneyLog.getMoneyLogType()
                        .equals(ShopCode.SHOP_USER_MONEY_REFUND.getCode())) {
        
        //如果订单未付款,则不能退款,抛异常
        if (count == 0) {
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
        }
        
        //防止多次退款
        userMoneyLogExample = new TradeUserMoneyLogExample();
        userMoneyLogExample.createCriteria()
            .andUserIdEqualTo(userMoneyLog.getUserId())
            .andOrderIdEqualTo(userMoneyLog.getOrderId())
            .andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
        count = userMoneyLogMapper.countByExample(userMoneyLogExample);
        
        if (count > 0) {
            CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
        }
        
        //用户账户添加余额
        userMapper.addUserMoney(tradeUser);
    }


    //记录用户使用余额日志
    userMoneyLog.setCreateTime(new Date());
    
    userMoneyLogMapper.insert(userMoneyLog);
    
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),
                      ShopCode.SHOP_SUCCESS.getMessage());
}

###8)确认订单

private void updateOrderStatus(TradeOrder order) {
    
    order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
    
    order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
    
    order.setConfirmTime(new Date());
    
    int r = orderMapper.updateByPrimaryKey(order);
    
    if (r <= 0) {
        CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
    }
    
    log.info("订单:["+order.getOrderId()+"]状态修改成功");
}

9)小结

@Override
public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    checkOrder(order);
    //2.生成预订单
    Long orderId = savePreOrder(order);
    order.setOrderId(orderId);
    try {
        //3.扣减库存
        reduceGoodsNum(order);
        //4.扣减优惠券
        changeCoponStatus(order);
        //5.使用余额
        reduceMoneyPaid(order);
        //6.确认订单
        updateOrderStatus(order);
        log.info("订单:["+orderId+"]确认成功");
        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),
                          ShopCode.SHOP_SUCCESS.getMessage());
    } catch (Exception e) {
        //确认订单失败,发送消息
        ...
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), 
                          ShopCode.SHOP_FAIL.getMessage());
    }
}

4.2 失败补偿机制

4.2.1 消息发送方

  • 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup

mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
  • 注入模板类和属性值信息
 @Autowired
 private RocketMQTemplate rocketMQTemplate;

 @Value("${mq.order.topic}")
 private String topic;

 @Value("${mq.order.tag.cancel}")
 private String cancelTag;
  • 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    //2.生成预订
    try {
        //3.扣减库存
        //4.扣减优惠券
        //5.使用余额
        //6.确认订单
    } catch (Exception e) {

        //确认订单失败,发送消息
        CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
        cancelOrderMQ.setOrderId(order.getOrderId());
        cancelOrderMQ.setCouponId(order.getCouponId());
        cancelOrderMQ.setGoodsId(order.getGoodsId());
        cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
        cancelOrderMQ.setUserId(order.getUserId());
        cancelOrderMQ.setUserMoney(order.getMoneyPaid());

        try {
            sendMessage(topic,                                   // topic
                        cancelTag,                               // tag
                        cancelOrderMQ.getOrderId().toString(),   // keys
                        JSON.toJSONString(cancelOrderMQ)         // body
                       );
        } catch (Exception e1) {
            e1.printStackTrace();
            CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
        }
        
        return new Result(ShopCode.SHOP_FAIL.getSuccess(),
                          ShopCode.SHOP_FAIL.getMessage());
    }
}
private void sendMessage(String topic, 
                         String tags, 
                         String keys, 
                         String body) {
    
    //判断Topic是否为空
    if (StringUtils.isEmpty(topic)) {
        CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
    }
    
    //判断消息内容是否为空
    if (StringUtils.isEmpty(body)) {
        CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
    }
    
    //消息体
    Message message = new Message(topic, tags, keys, body.getBytes());
    
    //发送消息		
    rocketMQTemplate.getProducer().send(message);
}

4.2.2 消费接收方

  • 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
  • 创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", 
                         consumerGroup = "${mq.order.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING // 使用广播类型
                        )
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{

    @Override
    public void onMessage(MessageExt messageExt) { // 泛型使用MessageExt, 以获取RocketMQ为消息生成的msgId
        ...
    }
}
1)回退库存*
  • 流程分析
    在这里插入图片描述

  • 消息消费者

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",
                         consumerGroup = "${mq.order.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Value("${mq.order.consumer.group.name}")
    private String groupName;

    @Autowired
    private TradeGoodsMapper goodsMapper;

    @Autowired
    private TradeMqConsumerLogMapper mqConsumerLogMapper;

    @Autowired
    private TradeGoodsNumberLogMapper goodsNumberLogMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
        
        String msgId=null;
        String tags=null;
        String keys=null;
        String body=null;
        
        try {
            
            //1. 解析消息内容
            msgId = messageExt.getMsgId();
            tags= messageExt.getTags();
            keys= messageExt.getKeys();
            body= new String(messageExt.getBody(),"UTF-8");

            log.info("接受消息成功");

            //2. 查询消息消费记录(这三个字段是联合主键)
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);        // tag
            primaryKey.setMsgKey(keys);        // keys
            primaryKey.setGroupName(groupName);// groupName
            
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper
                .selectByPrimaryKey(primaryKey);

            //3. 判断如果消费过...
            if(mqConsumerLog != null){
                
                //3.1 获得消息处理状态
                Integer status = mqConsumerLog.getConsumerStatus();
                
                //处理成功...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode()
                   .intValue()==status.intValue()){
                    log.info("消息:"+msgId+",已经处理过");
                    return;
                }

                //正在处理...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().
                   intValue()==status.intValue()){
                    log.info("消息:"+msgId+",正在处理");
                    return;
                }

                //处理失败
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode()
                   .intValue()==status.intValue()){
                    
                    //获得消息处理次数
                    Integer times = mqConsumerLog.getConsumerTimes();
                    
                    if(times>3){
                        log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
                        return;
                    }
                    
                    // 消费次数未大于3
                    mqConsumerLog.setConsumerStatus(
                        ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode()
                    );

                    //使用数据库乐观锁更新
                    TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
                    TradeMqConsumerLogExample.
                        Criteria criteria = example.createCriteria();
                    criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
                    criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
                    criteria.andGroupNameEqualTo(groupName);
                    // 添加条件: 消费次数
                    criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
                    
                    int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog,
                                                                         example);
                    
                    if(r<=0){
                        //未修改成功,其他线程并发修改
                        log.info("并发修改,稍后处理");
                        return;
                    }
                    
                    // 未超过最大允许失败的次数,将接着下面的处理
                }

            } 
            else {
                
                //4. 判断如果没有消费过...
                
                mqConsumerLog = new TradeMqConsumerLog();
                
                // 联合主键
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                
                // 处理中
                mqConsumerLog.setConsumerStatus(
                    ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode()
                );
                
				// 消费次数
                mqConsumerLog.setConsumerTimes(1);

                //将消息处理信息添加到数据库
                mqConsumerLogMapper.insert(mqConsumerLog);
            }
            
            
            //5. 回退库存
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            Long goodsId = mqEntity.getGoodsId();
            TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
            goods.setGoodsNumber(goods.getGoodsNumber() + mqEntity.getGoodsNum());
            goodsMapper.updateByPrimaryKey(goods);

            //记录库存操作日志
            TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
            goodsNumberLog.setOrderId(mqEntity.getOrderId());
            goodsNumberLog.setGoodsId(goodsId);
            goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
            goodsNumberLog.setLogTime(new Date());
            goodsNumberLogMapper.insert(goodsNumberLog);

            //6. 将消息的处理状态改为成功
            mqConsumerLog.setConsumerStatus(
                ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode()
            );
            mqConsumerLog.setConsumerTimestamp(new Date());
            mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
            
            log.info("回退库存成功");
            
        } catch (Exception e) {
            
            e.printStackTrace();
            
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            
            TradeMqConsumerLog 
                mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
            
            if(mqConsumerLog==null){
                
                //数据库未有记录
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                
                // 消费状态: 消费失败
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL
                                                .getCode());
                
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(1);
                
                mqConsumerLogMapper.insert(mqConsumerLog);
                
            }
            else{
                // 消费次数 + 1
                mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
                mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
            }
        }

    }
}
2)回退优惠券
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",
                         consumerGroup = "${mq.order.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING )

public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Autowired
    private TradeCouponMapper couponMapper;

    @Override
    public void onMessage(MessageExt message) {

        try {
            
            //1. 解析消息内容
            String body = new String(message.getBody(), "UTF-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            
            log.info("接收到消息");
            
            //2. 查询优惠券信息
            TradeCoupon coupon = couponMapper.selectByPrimaryKey(
                mqEntity.getCouponId()
            );
            
            //3.更改优惠券状态
            coupon.setUsedTime(null);
            coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode()); // 未使用状态
            coupon.setOrderId(null);
            
            couponMapper.updateByPrimaryKey(coupon);
            
            log.info("回退优惠券成功");
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("回退优惠券失败");
        }

    }
}
3)回退余额
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",
                         consumerGroup = "${mq.order.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Autowired
    private IUserService userService;

    @Override
    public void onMessage(MessageExt messageExt) {

        try {
            
            //1.解析消息
            String body = new String(messageExt.getBody(), "UTF-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            
            log.info("接收到消息");
            
            if(mqEntity.getUserMoney()!=null 
               && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){
                
                //2.调用业务层,进行余额修改
                TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
                userMoneyLog.setUseMoney(mqEntity.getUserMoney());
                
                // 设置为退款类型
                userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
                
                userMoneyLog.setUserId(mqEntity.getUserId());
                userMoneyLog.setOrderId(mqEntity.getOrderId());
                
                userService.updateMoneyPaid(userMoneyLog);
                
                log.info("余额回退成功");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("余额回退失败");
        }

    }
}

updateMoneyPaid如下(但是里面会存在问题,防止多次退款那里没有加锁。假设MQ并发发了2条消息过来,就会给用户加多次记录。可以加上分布式锁,或者使用乐观锁)

@Override
public Result updateMoneyPaid(TradeUserMoneyLog userMoneyLog) {

    //1.校验参数是否合法
    if(userMoneyLog==null || userMoneyLog.getUserId()==null ||
       userMoneyLog.getOrderId()==null || userMoneyLog.getUseMoney()==null||
       userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO)<=0)
    {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }

    //2.查询订单余额使用日志
    TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
    TradeUserMoneyLogExample.Criteria criteria = userMoneyLogExample.createCriteria();
    criteria.andOrderIdEqualTo(userMoneyLog.getOrderId());
    criteria.andUserIdEqualTo(userMoneyLog.getUserId());
    int r = userMoneyLogMapper.countByExample(userMoneyLogExample);

    TradeUser tradeUser = userMapper.selectByPrimaryKey(userMoneyLog.getUserId());

    //3.扣减余额...
    if(userMoneyLog.getMoneyLogType().intValue()
       ==ShopCode.SHOP_USER_MONEY_PAID.getCode().intValue()){

        if(r>0){
            //已经付款
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
        }

        //减余额
        tradeUser.setUserMoney(new BigDecimal(tradeUser.getUserMoney())
                               .subtract(userMoneyLog.getUseMoney()).longValue()
                              );
        userMapper.updateByPrimaryKey(tradeUser);
    }

    //4.回退余额...
    if(userMoneyLog.getMoneyLogType().intValue()
       ==ShopCode.SHOP_USER_MONEY_REFUND.getCode().intValue()){
        
        if(r<=0){
            //如果没有支付,则不能回退余额
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
        }
        
        //防止多次退款
        TradeUserMoneyLogExample userMoneyLogExample2 = new TradeUserMoneyLogExample();
        TradeUserMoneyLogExample.Criteria criteria1 
                                               = userMoneyLogExample2.createCriteria();
        criteria1.andOrderIdEqualTo(userMoneyLog.getOrderId());
        criteria1.andUserIdEqualTo(userMoneyLog.getUserId());
        criteria1.andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
        
        int r2 = userMoneyLogMapper.countByExample(userMoneyLogExample2);
        
        if(r2>0){
            CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
        }
        
        //退款
        tradeUser.setUserMoney(new BigDecimal(tradeUser.getUserMoney())
                               .add(userMoneyLog.getUseMoney()).longValue());
        userMapper.updateByPrimaryKey(tradeUser);
    }
    
    //5.记录订单余额使用日志
    userMoneyLog.setCreateTime(new Date());
    
    userMoneyLogMapper.insert(userMoneyLog);
    
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),
                      ShopCode.SHOP_SUCCESS.getMessage());
}

4)取消订单
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",
                         consumerGroup = "${mq.order.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{

    @Autowired
    private TradeOrderMapper orderMapper;

    @Override
    public void onMessage(MessageExt messageExt) {

        try {
            
            //1. 解析消息内容
            String body = new String(messageExt.getBody(),"UTF-8");
            
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            
            log.info("接受消息成功");
            
            //2. 查询订单
            TradeOrder order = orderMapper.selectByPrimaryKey(mqEntity.getOrderId());
            
            //3.更新订单状态为取消
            order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
            
            orderMapper.updateByPrimaryKey(order);
            
            log.info("订单状态设置为取消");
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.info("订单取消失败");
        }
    }
}

4.3 测试

1)准备测试环境

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ShopOrderServiceApplication.class)
public class OrderTest {

    @Autowired
    private IOrderService orderService;
}

2)准备测试数据

  • 用户数据
  • 商品数据
  • 优惠券数据

3)测试下单成功流程

@Test    
public void add(){
    Long goodsId=XXXL;
    Long userId=XXXL;
    Long couponId=XXXL;

    TradeOrder order = new TradeOrder();
    order.setGoodsId(goodsId);
    order.setUserId(userId);
    order.setGoodsNumber(1);
    order.setAddress("北京");
    order.setGoodsPrice(new BigDecimal("5000"));
    order.setOrderAmount(new BigDecimal("5000"));
    order.setMoneyPaid(new BigDecimal("100"));
    order.setCouponId(couponId);
    order.setShippingFee(new BigDecimal(0));
    orderService.confirmOrder(order);
}

执行完毕后,查看数据库中用户的余额、优惠券数据,及订单的状态数据

4)测试下单失败流程

代码同上。

执行完毕后,查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。

5. 支付业务

流程简介*

用户对订单发起支付,创建支付订单,用户重定向到第三方支付平台,当用户在第三方平台支付成功后,第三方支付平台发起支付成功的回调,后台需要及时给第三方支付平台响应,因此在收到回调后,先保存支付成功的消息到数据库以保证消息的可靠性,然后通过线程池将消息异步发送到RocketMQ,其它服务监听RocketMQ做支付成功的逻辑。

5.1 创建支付订单

在这里插入图片描述

// 已知 订单id, 需支付金额, 未支付状态, 来保存 支付订单
public Result createPayment(TradePay tradePay) {
    
    // 必须携带orderId
    if(tradePay==null || tradePay.getOrderId()==null){
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    
    try {
        
        //查询订单支付状态
        TradePayExample payExample = new TradePayExample();
        TradePayExample.Criteria criteria = payExample.createCriteria();
        criteria.andOrderIdEqualTo(tradePay.getOrderId());
        // 已支付状态作为查询条件
        criteria.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        
        int count = tradePayMapper.countByExample(payExample);
        
        if (count > 0) {
            // 如果已支付, 则抛出异常
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
        }

        long payId = idWorker.nextId();
        // 设置支付id
        tradePay.setPayId(payId);
        // 设置未支付状态
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
        
        tradePayMapper.insert(tradePay);
        
        log.info("创建支付订单成功:" + payId);
        
    } catch (Exception e) {
        
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), 
                          ShopCode.SHOP_FAIL.getMessage());
    }
    
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), 
                      ShopCode.SHOP_SUCCESS.getMessage());
}

5.2 支付回调

5.2.1 流程分析

在这里插入图片描述

5.2.2 代码实现

// 用户通过第三方支付平台, 支付成功后, 由支付平台回调此接口

// TradePay中定义的属性: payId、orderId、payAmount、isPaid、
public Result callbackPayment(TradePay tradePay) {

    // 1. 判断用户支付状态
    // 如果已经支付了
    if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {

        // 2. 更新支付订单状态为已支付

        // 判断支付订单是否存在, 如果不存在, 则抛出异常
        tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());

        if (tradePay == null) {
            CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
        }

        // 设置支付订单已支付状态
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());

        // 更新支付订单状态为已支付
        int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);

        //更新成功代表支付成功
        if (i == 1) {

            // 3. 创建支付成功的消息
            TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();
            mqProducerTemp.setId(String.valueOf(idWorker.nextId()));
            mqProducerTemp.setGroupName("payProducerGroup");
            mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
            mqProducerTemp.setMsgTag(topic);
            mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
            mqProducerTemp.setCreateTime(new Date());

            // 4. 将支付成功的消息持久化数据库
            mqProducerTempMapper.insert(mqProducerTemp);

            TradePay finalTradePay = tradePay;

            // 在线程池中进行处理
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    
                    try {
                        
                        // 5. 发送消息到rocketmq
                        SendResult sendResult = sendMessage( 
                                                       topic, // payTopic
                                                       tag,   // paid
                                                       finalTradePay.getPayId(),
                                                       JSON.toJSONString( finalTradePay)
                                                );
                        
                        log.info(JSON.toJSONString(sendResult));
                        
                        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { 
                            
                            // 6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
                            mqProducerTempMapper.deleteByPrimaryKey(
                                                         mqProducerTemp.getId()
                                                    );
                            
                            System.out.println("删除消息表成功");
                         }
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            
        } 
        else {
            CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
        }
    }
    
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), 
                      ShopCode.SHOP_SUCCESS.getMessage());
}

/**
  * 发送支付成功消息
  * @param topic
  * @param tag
  * @param key
  * @param body
  */
private SendResult sendMessage(String topic, 
                               String tag, 
                               String key, 
                               String body) throws Exception{
    
    if(StringUtils.isEmpty(topic)){
        CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
    }
    
    if(StringUtils.isEmpty(body)){
        CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
    }
    
    Message message = new Message(topic,tag,key,body.getBytes());
    
    SendResult sendResult = rocketMQTemplate.getProducer().send(message);
    
    return sendResult;
}
线程池优化消息发送逻辑
  • 创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(4);

    executor.setMaxPoolSize(8);

    executor.setQueueCapacity(100);

    executor.setKeepAliveSeconds(60);

    executor.setThreadNamePrefix("Pool-A");

    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    executor.initialize();

    return executor;

}
  • 使用线程池
@Autowired
private ThreadPoolTaskExecutor executorService;

executorService.submit(new Runnable() {
    @Override
    public void run() {

        try {

            SendResult sendResult = sendMessage(topic, 
                                                tag, 
                                                finalTradePay.getPayId(), 
                                                JSON.toJSONString(finalTradePay)
                                               );

            log.info(JSON.toJSONString(sendResult));

            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
                System.out.println("删除消息表成功");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
});

5.2.3 处理消息

支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理

  1. 订单服务修改订单状态为已支付
  2. 日志服务记录支付日志
  3. 用户服务负责给用户增加积分

以下用订单服务为例说明消息的处理情况

1)配置RocketMQ属性值
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
2)消费消息
  • 在订单服务中,配置公共的消息处理类
public class BaseConsumer {

    public TradeOrder handleMessage(IOrderService 
                                    orderService, 
                                    MessageExt messageExt,Integer code) throws Exception {
        //解析消息内容
        String body = new String(messageExt.getBody(), "UTF-8");
        String msgId = messageExt.getMsgId();
        String tags = messageExt.getTags();
        String keys = messageExt.getKeys();
        OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
        
        //查询
        TradeOrder order = orderService.findOne(orderMq.getOrderId());

        if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
            order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
        }

        if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
            order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        }
        orderService.changeOrderStatus(order);
        return order;
    }

}
  • 接受订单支付成功消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",
                         consumerGroup = "${mq.pay.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING)
public class PaymentListener implements RocketMQListener<MessageExt>{

    @Autowired
    private TradeOrderMapper orderMapper;

    @Override
    public void onMessage(MessageExt messageExt) {

        log.info("接收到支付成功消息");

        try {
            
            //1.解析消息内容
            String body = new String(messageExt.getBody(),"UTF-8");
            TradePay tradePay = JSON.parseObject(body,TradePay.class);
            
            //2.根据订单ID查询订单对象
            TradeOrder tradeOrder =                  
                          orderMapper.selectByPrimaryKey(tradePay.getOrderId());
            
            //3.更改订单支付状态为已支付
            tradeOrder.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
            
            //4.更新订单数据到数据库
            orderMapper.updateByPrimaryKey(tradeOrder);
            
            log.info("更改订单支付状态为已支付");
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }
}

6. 整体联调

通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作

6.1 准备工作

1)配置RestTemplate类

@Configuration
public class RestTemplateConfig {

    @Bean
    @ConditionalOnMissingBean({ RestOperations.class, RestTemplate.class })
    public RestTemplate restTemplate(ClientHttpRequestFactory factory) {

        RestTemplate restTemplate = new RestTemplate(factory);
        

        // 使用 utf-8 编码集的 conver 替换默认的 conver
        // (默认的 string conver 的编码集为"ISO-8859-1")
        List<HttpMessageConverter<?>> 
            messageConverters = restTemplate.getMessageConverters();
        
        Iterator<HttpMessageConverter<?>> iterator = messageConverters.iterator();
        
        while (iterator.hasNext()) {
            HttpMessageConverter<?> converter = iterator.next();
            if (converter instanceof StringHttpMessageConverter) {
                iterator.remove();
            }
        }
        
        messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));

        return restTemplate;
    }

    @Bean
    @ConditionalOnMissingBean({ClientHttpRequestFactory.class})
    public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // ms
        factory.setReadTimeout(15000);
        // ms
        factory.setConnectTimeout(15000);
        return factory;
    }
}

2)配置请求地址

  • 订单系统
server.host=http://localhost
server.servlet.path=/order-web
server.port=8080
shop.order.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.order.confirm=/order/confirm
  • 支付系统
server.host=http://localhost
server.servlet.path=/pay-web
server.port=9090
shop.pay.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.pay.createPayment=/pay/createPayment
shop.pay.callbackPayment=/pay/callbackPayment

6.2 下单测试

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopOrderWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class OrderTest {

   @Autowired
   private RestTemplate restTemplate;

   @Value("${shop.order.baseURI}")
   private String baseURI;

   @Value("${shop.order.confirm}")
   private String confirmOrderPath;

   @Autowired
   private IDWorker idWorker;
  
  /**
    * 下单
    */
   @Test
   public void confirmOrder(){
       Long goodsId=XXXL;
       Long userId=XXXL;
       Long couponId=XXXL;

       TradeOrder order = new TradeOrder();
       order.setGoodsId(goodsId);
       order.setUserId(userId);
       order.setGoodsNumber(1);
       order.setAddress("北京");
       order.setGoodsPrice(new BigDecimal("5000"));
       order.setOrderAmount(new BigDecimal("5000"));
       order.setMoneyPaid(new BigDecimal("100"));
       order.setCouponId(couponId);
       order.setShippingFee(new BigDecimal(0));

       Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, 
                                                  order, Result.class).getBody();
       System.out.println(result);
   }

}

6.3 支付测试

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopPayWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class PayTest {

    @Autowired
    private RestTemplate restTemplate;

    @Value("${shop.pay.baseURI}")
    private String baseURI;

    @Value("${shop.pay.createPayment}")
    private String createPaymentPath;

    @Value("${shop.pay.callbackPayment}")
    private String callbackPaymentPath;

    @Autowired
    private IDWorker idWorker;

   /**
     * 已知订单id和需支付的金额, 来创建支付订单
     */
    @Test
    public void createPayment(){

        Long orderId = 346321587315814400L;
        
        TradePay pay = new TradePay();
        pay.setOrderId(orderId);
        pay.setPayAmount(new BigDecimal(4800));

        Result result = restTemplate.postForEntity(baseURI + createPaymentPath,
                                                   pay, Result.class
                                                  )
                                    .getBody();
        
        System.out.println(result);
    }
   
    /**
     * 用户通过第三方支付平台支付成功后, 第三方支付平台回调此接口
     * 支付回调
     */
    @Test
    public void callbackPayment(){
        
        Long payId = 346321891507720192L;
        
        TradePay pay = new TradePay();
        pay.setPayId(payId);
        pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        
        Result result = restTemplate.postForEntity(baseURI + callbackPaymentPath, 
                                                   pay, Result.class).getBody();
        
        System.out.println(result);

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2091303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Visual Studio 快速跳转至特定程序行的快捷键

Visual Studio 快速跳转至特定程序行的快捷键 linuxWindows在Visual Studio中在Visual Code中在Notepad中Win11中的普通记事本 总结 linux :numWindows Ctrl G在Visual Studio中 在Visual Code中 Ctrl G也是可以的 在Notepad中 Ctrl G也是可以的 Win11中的普通记事本…

模型 生产微笑曲线

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。产业链中&#xff0c;研发设计和品牌营销环节附加值高&#xff0c;制造环节附加值低。 1 生产微笑曲线的应用 1.1 大杨集团的“微笑曲线”到“武藏曲线”转型 武藏曲线简介说明&#xff1a;在制造业…

JavaScript的对象详解

作为程序员&#xff0c;我们常常会听见一种说法&#xff0c;那就是面向对象编程。那到底什么是对象呢&#xff1f;有改如何面向对象编程呢&#xff1f;今天我们就来详细讲讲 什么是对象呢&#xff1f; 对象是JavaScript中一个非常重要的概念&#xff0c;这是因为对象可以将多个…

Java项目怎么从零部署到Linux服务器上?

目录 一.Java环境&#xff08;JDK&#xff09;安装 二.数据库&#xff08;MySQL&#xff09;安装 三.部署上线 ▐ 部署Jar包 ▐ 运行程序 ▐ 开放端口 一个Java项目首先需要一个支持它编译的Java环境&#xff0c;因此首先要保证服务器上安装的有相应的JDK 一.Java环境&a…

为什么使用雪花算法,有什么优缺点,如何解决?为什么不使用UUID的方法,如何解决系统回拨的问题?

为什么使用雪花算法&#xff0c;有什么优缺点&#xff0c;如何解决&#xff1f;为什么不使用UUID的方法&#xff0c;如何解决系统回拨的问题&#xff1f; 生成的id应该满足下面的条件&#xff1a; 首先是全局唯一&#xff0c;不能出现重复的ID之后是总体应该是递增的&#xf…

8个平面设计必备素材网站,免费下载。

平面设计师应该去哪里找免费可商用素材网站&#xff1f;我推荐这8个&#xff0c;赶紧收藏好。 1、菜鸟图库 菜鸟图库-免费设计素材下载 菜鸟图库是一个非常大的素材库&#xff0c;站内包含设计、办公、自媒体、图片、电商等各行业素材。网站还为新手设计师提供免费的素材&…

基于django的失物招领系统的设计与实现/ 基于Python的失物招领系统的设计与实现/失物招领管理系统

失物招领系统的设计与实现 摘要:伴随着我国全面推动信息化的趋势&#xff0c;我国的很多行业都在朝着互联网的方向进发。结合计算机技术的失物招领系统能够很好地解决传统失物招领存在的问题&#xff0c;能够提高管理员管理的效率&#xff0c;改善服务质量。优秀的失物招领系统…

感染了后缀为.Wormhole勒索病毒如何应对?数据能够恢复吗?

引言&#xff1a; 在当今日益复杂的网络安全环境中&#xff0c;勒索病毒成为了企业和个人面临的一大威胁。其中&#xff0c;.Wormhole勒索病毒以其独特的传播机制和强大的加密能力&#xff0c;尤为引人注目。本文将深入探讨.Wormhole勒索病毒的特点、感染途径、危害以及相应的…

XSS LABS 靶场初识

关注这个靶场的其他相关笔记&#xff1a;XSS - LABS —— 靶场笔记合集-CSDN博客 0x01&#xff1a;XSS LABS 靶场简介 XSS LABS 靶场是一个专注于跨站脚本攻击&#xff08;Cross-Site Scripting&#xff0c;XSS&#xff09;教育和训练的平台。平台中有一系列精心设计的关于 XS…

若依框架 MyBatis 改为 MyBatis-Plus 的实现步骤

本文只做了简单的实现&#xff0c;具体的细节需根据自己的需求进一步实现。如果实现中遇到问题欢迎留言讨论。 引入 MyBatis-Plus 引入相关依赖&#xff08;pom.xml&#xff09; 推荐先直接在顶级 pom.xml 中直接依赖&#xff0c;等调试通过之后&#xff0c;在去按需依赖&…

【hot100篇-python刷题记录】【三数之和】

R7-双指针篇 思路&#xff1a; 三个元素&#xff0c;代表需要3个指针&#xff0c;利用双指针收缩的思想&#xff0c;我们可以设置k,i,j三个元素指针。 k代表最外层的循环&#xff0c;循环一遍数组。&#xff08;为了降低时间复杂度以及搜索难度&#xff0c;我们先将nums sort…

移动硬盘文件夹变成白色无法正常访问,怎么恢复?

在使用移动硬盘时&#xff0c;有时会遇到文件夹变白的情况。这通常意味着文件夹已经损坏或无法正常访问。本文将分析移动硬盘文件夹变白的原因&#xff0c;并提供相应的恢复方法。 一、原因分析 文件系统损坏&#xff1a;移动硬盘的文件系统可能因多种原因而损坏&#xff0c;如…

001集——CAD—C#二次开发入门——开发环境基本设置

CAD C#二次开发首先需要搭建一个舒服的开发环境&#xff0c;软件安装后&#xff0c;需要修改相关设置。本文为保姆级入门搭建开发环境教程&#xff0c;默认已成功安装vs和cad 。 第一步&#xff1a;创建类库 第二步&#xff1a;进行相关设置&#xff0c;如图&#xff1a; 下一…

milvus资源限制 benchmarker压测 qps优化

根据milvus 资源限制的官网&#xff0c;我们得出百万数据资源限制。 1.dev 环境 对接不同的配置最大的qps 如下(dev的机器内存很小) 2.于是认为当前的性能是匹配的&#xff0c;然后加上资源限制&#xff0c;配置 压测结果如下 {"run_id": "13292982fee74f64…

基于springboot+vue的民族文化推广系统设计与实现---附源码92323

摘 要 在全球化和信息化日益加深的当下&#xff0c;保护和推广民族文化显得尤为重要。民族文化不仅是一个国家或地区的独特标识&#xff0c;更是其历史、传统和智慧的结晶。然而&#xff0c;随着现代社会的快速发展&#xff0c;许多传统文化和习俗面临着被遗忘和消失的风险。因…

ssh---配置密钥对验证

1&#xff0e;在客户端创建密钥对 ssh-keygen -t ecdsa秘钥存放位置&#xff08;生成密钥时的用户的工作目录下&#xff09; 2&#xff0e;将公钥文件上传至服务器 3&#xff0e;在服务器中导入公钥文本 4.在客户机设置ssh代理功能&#xff0c;实现免交互登录 5.测试

Python自适应光学模态星形小波分析和像差算法

&#x1f3af;要点 &#x1f3af;星形小波分析像差测量 | &#x1f3af;对比傅里叶和小波分析 | &#x1f3af;定义多尺度图像质量度量&#xff0c;矩阵数据 | &#x1f3af;像差校正算法 | &#x1f3af;受激发射损耗显微镜布局 | &#x1f3af;干涉仪分支校准&#xff0c;求…

Java 虚方法表(虚函数)

虚方法表 Java 中的虚方法表&#xff08;Virtual Method Table, VMT&#xff09;是实现动态方法分派和多态的重要机制。它帮助 Java 运行时系统&#xff08;JVM&#xff09;决定在继承体系中调用哪一个方法的具体实现。 什么是虚方法表&#xff1f; 虚方法表是一个类的内部数…

Linux学习笔记(4)----通过网口灯判断网速是千兆还是百兆

网卡PHY 移植注意事项 注意RTL8211F的LED0&#xff0c;LED1&#xff0c;LED2&#xff0c;软件是可以自定义的&#xff0c;比如百兆&#xff0c;千兆&#xff0c;是亮哪个灯&#xff0c;黄灯或者绿灯&#xff0c;还有传输时是闪烁哪个灯&#xff0c;要注意硬件上是怎么驱动灯的…

获取当前计算机的处理器架构platform.machine()

【小白从小学Python、C、Java】 【考研初试复试毕业设计】 【Python基础AI数据分析】 获取当前计算机的处理器架构 platform.machine() 选择题 关于以下代码的输出结果说法正确的是&#xff1f; import platform print("【执行】print(platform.machine())") prin…