RocketMQ --- 实战篇

news2024/9/27 12:15:57

一、案例介绍

1.1、业务分析

模拟电商网站购物场景中的【下单】和【支付】业务

1.1.1、下单

在这里插入图片描述
流程

  1. 用户请求订单系统下单

  2. 订单系统通过RPC调用订单服务下单

  3. 订单服务调用优惠券服务,扣减优惠券

  4. 订单服务调用调用库存服务,校验并扣减库存

  5. 订单服务调用用户服务,扣减用户余额

  6. 订单服务完成确认订单

1.1.2、支付

在这里插入图片描述
流程

  1. 用户请求支付系统
  2. 支付系统调用第三方支付平台API进行发起支付流程
  3. 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统
  4. 支付系统调用订单服务修改订单状态
  5. 支付系统调用积分服务添加积分
  6. 支付系统调用日志服务记录日志

1.2、问题分析

1.2.1、问题1

用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。

如何保证数据的完整性?

在这里插入图片描述

使用MQ保证在下单失败后系统数据的完整性

在这里插入图片描述

1.2.2、问题2

用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。

商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?

在这里插入图片描述

通过MQ进行数据分发,提高系统处理性能

在这里插入图片描述

二.、技术分析

2.1、技术选型

  • SpringBoot
  • Dubbo
  • Zookeeper
  • RocketMQ
  • Mysql

在这里插入图片描述

2.2、SpringBoot整合RocketMQ

下载rocketmq-spring项目

将rocketmq-spring安装到本地仓库

mvn install -Dmaven.skip.test=true

2.2.1、消息生产者

2.2.1.1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq-producer</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spring-boot-version>2.3.2.RELEASE</spring-boot-version>
    </properties>

    <dependencies>
        <!-- 配置web启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <!-- rocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <!-- lombok插件 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot-version}</version>
            </plugin>
        </plugins>
    </build>
</project>

2.2.1.2、配置文件

server:
  port: 8094
spring:
  application:
    name: rocketmq-producer

rocketmq:
  name-server: 127.0.0.1:9876          # rocketMQ的名称服务器,格式为:' host:port;host:port '。
  # 生产端配置
  producer:
    group: ${spring.application.name}  # 生产着组名
    #access-key: access-key            # rocketMQ服务端配置acl授权信息,没有则不需要
    #secret-key: secret-key            # rocketMQ服务端配置acl授权信息,没有则不需要
  # 消费端配置
#  consumer:
#    access-key: access-key            #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!
#    secret-key: secret-key            #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!

2.2.1.3、消息发送

/**
 * <p>
 * RocektMQ 事务消息监听器
 * </p>
 *
 **/
@Slf4j
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {


    /**
     * 检测半消息,在该方法中,执行本地事务
     *
     * @param msg 发送消息
     * @param arg 外部参数
     * @return commit:提交事务,它允许消费者消费此消息。bollback:回滚事务,它代表该消息将被删除,不允许被消费。 unknown:中间状态,它代表需要检查消息队列来确定状态(checkLocalTransaction方法)。
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> MQ事务执行器,执行本地事务 message={},args={} <<<<", msg, arg);

        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderPaidEvent payload = JSON.parseObject(jsonString, OrderPaidEvent.class);

            //模拟业务操作,当paidMoney >5 则提交,否则等事务会查
            if (payload.getPaidMoney().compareTo(new BigDecimal("5")) > 0) {
                //提交事务
                log.info("MQ提交事务啦!payload ={} ", payload);
                return RocketMQLocalTransactionState.COMMIT;
            }

            //不知道状态,转 checkLocalTransaction 回查执行
            log.info("MQ无法确定,等回查!payload ={} ", payload);
            return RocketMQLocalTransactionState.UNKNOWN;
        } catch (Exception e) {
            log.error("事务消息出错啦~ e:{}", e.getMessage(), e);
            //回滚
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    /**
     * 该方法时MQ进行消息事务状态回查、
     * <p>
     *
     * @param msg
     * @return bollback, commit or unknown
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info(">>>> MQ事务执行器,事务状态回查 message={} <<<<", msg);
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderPaidEvent payload = JSON.parseObject(jsonString, OrderPaidEvent.class);

            log.info("事务回查:checkLocalTransaction提交事务啦!payload ={} ", payload);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("回调的事务出错啦~ e:{}", e.getMessage(), e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}
/**
 * <p>
 * RocektMQ生产者常用发送消息方法
 * 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
 * </p>
 *
 **/
public interface IRocketMQService {

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     * <p>
     * (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。
     * 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
     * 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
     * )
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String destination, Object msg);

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     *
     * @param topicName 主题名 topicName
     * @param tags      标签 tags
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String topicName, String tags, Object msg);

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     *
     * @param topicName 主题名 topicName
     * @param tags      标签 tags
     * @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String topicName, String tags, String key, Object msg);

    /**
     * 发送同步消息-SQL92模式
     * 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true
     * 在console控制台查看集群状态  enablePropertyFilter=true 才正常
     *
     * @param topicName 主题名 topicName
     * @param map       自定义属性
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg);


    /**
     * 发送同步消息-SQL92模式
     * 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true
     * 在console控制台查看集群状态  enablePropertyFilter=true 才正常
     *
     * @param topicName 主题名 topicName
     * @param map       自定义属性
     * @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg);


    /**
     * 发生异步消息(异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。)
     *
     * @param destination  主题名:标签 topicName:tags
     * @param msg          发送对象
     * @param sendCallback 异步回调函数
     */
    void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback);

    /**
     * 发送单向消息(这种方式主要用在不特别关心发送结果的场景,例如日志发送。)
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     */
    void sendOneway(String destination, Object msg);

    /**
     * 发送批量消息(发送超过1MB,做了自动分割,超时时间设置30s(默认3s)),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
     *
     * @param destination 主题名:标签 topicName:tags
     * @param list        批量消息
     */
    void sendBatchMessage(String destination, List<?> list);


    /**
     * 发送批量消息(发送超过1MB,做了自动分割。),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
     *
     * @param topicName 主题名 topicName
     * @param tags      标签 tags
     * @param timeout   超时时间,空则默认设为30s
     * @param list      批量消息
     */
    void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list);

    /**
     * 发送延时消息(超时时间,设置30s(默认3s))
     * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
     *
     * @param destination    主题名:标签 topicName:tags
     * @param msg            发送对象
     * @param delayTimeLevel 延时等级(从1开始)
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel);

    /**
     * 发送延时消息
     * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
     *
     * @param destination    主题名:标签 topicName:tags
     * @param msg            发送对象
     * @param timeout        超时时间(单位毫秒)
     * @param delayTimeLevel 延时等级(从1开始)
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel);


    /**
     * 发送顺序消息(分区有序,多个queue参与,即相对每个queue,消息都是有序的。)
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @param hashKey     根据其哈希值取模后确定发送到哪一个queue队列
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendInOrder(String destination, Object msg, String hashKey);


    /**
     * 发送事务消息
     * 事务消息使用上的限制
     * 1:事务消息不支持延时消息和批量消息。
     * 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
     * 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
     * 4:事务性消息可能不止一次被检查或消费。
     * 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
     * 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @param arg         arg
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessageInTransaction(String destination, Object msg, Object arg);
}
/**
 * <p>
 * RocektMQ生产者常用发送消息方法
 * 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
 * </p>
 *
 */
@Slf4j
@Service
public class RocketMQServiceImpl implements IRocketMQService {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * 原生的producer
     */
    @Autowired
    private DefaultMQProducer producer;

    @Override
    public SendResult sendMessage(String destination, Object msg) {
        String[] split = destination.split(":");
        if (split.length == 2) {
            return this.sendMessage(split[0], split[1], msg);
        }
        return this.sendMessage(destination, null, msg);
    }

    @Override
    public SendResult sendMessage(String topicName, String tags, Object msg) {
        return this.sendMessage(topicName, tags, null, msg);
    }

    @Override
    public SendResult sendMessage(String topicName, String tags, String key, Object msg) {
        MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
        //设置key,唯一标识码要设置到keys字段,方便将来定位消息丢失问题
        if (StringUtils.isNotBlank(key)) {
            messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);
        }
        Message<?> message = messageBuilder.build();
        SendResult sendResult = this.rocketMqTemplate.syncSend(StringUtils.isBlank(tags) ? topicName : (topicName + ":" + tags), message);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("MQ发送同步消息成功,topicName={},tags={},msg={},sendResult={}", topicName, tags, msg, sendResult);
        } else {
            log.warn("MQ发送同步消息不一定成功,topicName={},tags={},msg={},sendResult={}", topicName, tags, msg, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg) {
        return this.sendMessageBySql(topicName, map, null, msg);
    }

    @Override
    public SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg) {
        MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
        //设置key,唯一标识码要设置到keys字段,方便将来定位消息丢失问题
        if (StringUtils.isNotBlank(key)) {
            messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);
        }
        //设置自定义属性
        if (map != null && !map.isEmpty()) {
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                messageBuilder.setHeader(entry.getKey(), entry.getValue());
            }
        }
        Message<?> message = messageBuilder.build();
        SendResult sendResult = this.rocketMqTemplate.syncSend(topicName, message);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("发送同步消息-SQL92模式成功,topicName={},map={},msg={},sendResult={}", topicName, map, msg, sendResult);
        } else {
            log.warn("发送同步消息-SQL92模式不一定成功,topicName={},map={},msg={},sendResult={}", topicName, map, msg, sendResult);
        }
        return sendResult;
    }

    @Override
    public void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback) {
        this.rocketMqTemplate.asyncSend(destination, msg, sendCallback);
        log.info("MQ发送异步消息,destination={} msg={}", destination, msg);
    }

    @Override
    public void sendOneway(String destination, Object msg) {
        this.rocketMqTemplate.sendOneWay(destination, msg);
        log.info("MQ发送单向消息,destination={} msg={}", destination, msg);
    }

    @Override
    public void sendBatchMessage(String destination, List<?> list) {
        String topicName = destination;
        String tags = "";

        String[] split = destination.split(":");
        if (split.length == 2) {
            topicName = split[0];
            tags = split[1];
        }
        this.sendBatchMessage(topicName, tags, 30000L, list);
    }

    @Override
    public void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list) {
        //转为message
        List<org.apache.rocketmq.common.message.Message> messages = list.stream().map(x ->
                new org.apache.rocketmq.common.message.Message(topicName, tags,
                        //String类型不需要转JSON,其它类型都要转为JSON模式
                        x instanceof String ? ((String) x).getBytes(StandardCharsets.UTF_8) : JSON.toJSONBytes(x))
        ).collect(Collectors.toList());

        //自动分割发送,把大的消息分裂成若干个小的消息(每次发送最大只能4MB)
        ListSplitter splitter = new ListSplitter(messages);

        while (splitter.hasNext()) {
            try {
                List<org.apache.rocketmq.common.message.Message> listItem = splitter.next();
                SendResult sendResult = producer.send(listItem, timeout == null ? 30000L : timeout);
                if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                    log.info("MQ发送批量消息成功,topicName={}  tags={}, size={},sendResult={}", topicName, tags, listItem.size(), sendResult);
                } else {
                    log.warn("MQ发送批量消息不一定成功,topicName={}  tags={}, size={},sendResult={}", topicName, tags, listItem.size(), sendResult);
                }
            } catch (Exception e) {
                //处理error
                log.error("MQ发送批量消息失败,topicName={}  tags={},,errorMessage={}", topicName, tags, e.getMessage(), e);
                throw new RuntimeException("MQ发送批量消息失败,原因:" + e.getMessage());
            }
        }
    }

    @Override
    public SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel) {
        return this.sendDelayLevel(destination, msg, 30000, delayTimeLevel);
    }

    @Override
    public SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel) {
        Message<?> message = MessageBuilder
                .withPayload(msg)
                .build();
        SendResult sendResult = this.rocketMqTemplate.syncSend(destination, message, timeout, delayTimeLevel);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("MQ发送延时消息成功,destination={} msg={} sendResult={}", destination, message, sendResult);
        } else {
            log.warn("MQ发送延时消息不一定成功,destination={} msg={} sendResult={}", destination, message, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendInOrder(String destination, Object msg, String hashKey) {
        Message<?> message = MessageBuilder
                .withPayload(msg)
                .build();
        //hashKey:  根据其哈希值取模后确定发送到哪一个队列
        SendResult sendResult = this.rocketMqTemplate.syncSendOrderly(destination, message, hashKey);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("MQ发送顺序消息成功,destination={} msg={} sendResult={}", destination, message, sendResult);
        } else {
            log.warn("MQ发送顺序消息不一定成功,destination={} msg={} sendResult={}", destination, message, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendMessageInTransaction(String destination, Object msg, Object arg) {
        Message<?> message = MessageBuilder
                //转为JSON格式
                .withPayload(msg instanceof String ? msg : JSON.toJSONString(msg))
                .build();

        TransactionSendResult sendResult = rocketMqTemplate.sendMessageInTransaction(destination, message, arg);

        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("MQ发送事务消息成功,destination={} msg={} sendResult={}", destination, message, sendResult);
        } else {
            log.warn("MQ发送事务消息不一定成功,destination={} msg={} sendResult={}", destination, message, sendResult);
        }
        return sendResult;
    }
}
/**
 * <p>
 * 消息列表分割,复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
 * </p>
 *
 **/
public class ListSplitter implements Iterator<List<Message>> {
    /**
     * 最大4MB,这里每次只发送1MB。(为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB)
     */
    private final int SIZE_LIMIT = 1024 * 1024 * 1;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    private int getStartIndex() {
        Message currMessage = messages.get(currIndex);
        int tmpSize = calcMessageSize(currMessage);
        while (tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(currIndex);
            tmpSize = calcMessageSize(message);
        }
        return currIndex;
    }

    /**
     * 计算消息字节长度
     */
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length;
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
        }
        tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
        return tmpSize;
    }
}
/**
 * <p>
 * 批量消息实体
 * </p>
 *
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchDto implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer id;

    private String message;
}
/**
 * <p>
 * 订单支付事件
 * </p>
 *
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderPaidEvent implements Serializable {

    private static final long serialVersionUID = 1L;


    private String orderId;

    private BigDecimal paidMoney;

    private String msg;
}
/**
 * <p>
 * 顺序的步骤
 * </p>
 *
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderStep {
    private long orderId;
    private String desc;
}

2.2.1.4、测试类

/**
 * <p>
 * 测试类
 * </p>
 *
 **/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketMQProducerApplication.class)
public class RocketMQProducerTest {

    @Autowired
    private IRocketMQService rocketMQService;


    /**
     * 发送同步消息
     */
    @Test
    public void sendMessage() {
        for (int i = 0; i < 10; i++) {
            //广播消费端下的同步消息
            rocketMQService.sendMessage("Consumer_Broadcast", "广播同步消息" + i);
            //集群消费端下的同步消息
            rocketMQService.sendMessage("Consumer_Cluster", "集群同步消息" + i);
//            rocketMQService.sendMessage("Consumer_Cluster", null, i + "", "集群同步消息" + i);
        }
    }

    /**
     * 发送同步消息-Tag过滤模式
     */
    @Test
    public void sendMessageByTag() {
        rocketMQService.sendMessage("Consumer_Tag", "tag1", "过滤同步消息tag1");
        rocketMQService.sendMessage("Consumer_Tag", "tag2", "过滤同步消息tag2");
        rocketMQService.sendMessage("Consumer_Tag", "tag3", "过滤同步消息tag3");
    }

    /**
     * 发送同步消息-发送同步消息-SQL92模式
     */
    @Test
    public void sendMessageBySql() {
        Map<String, Object> map = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            map.put("a", i);
            map.put("b", i % 2 == 0 ? "sql" : "notSql");
            rocketMQService.sendMessageBySql("Consumer_SQL", map, "SQL92模式消息  map=" + map);
        }
    }


    /**
     * 发送异步消息
     */
    @SneakyThrows
    @Test
    public void sendAsyncMessage() {
        for (int i = 0; i < 10; i++) {
            rocketMQService.sendAsyncMessage("Consumer_Cluster", "集群异步消息" + i, new SendCallback() {
                //发送成功
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("集群异步消息发送成功啦!sendResult={}", sendResult);
                }

                //发送失败
                @Override
                public void onException(Throwable e) {
                    log.error("集群异步消息发送失败啦!原因={}", e.getMessage(), e);
                }
            });
        }

        //先睡20秒,避免还没发送完毕就关闭了
        TimeUnit.SECONDS.sleep(20L);
    }


    /**
     * 发送单向消息
     */
    @Test
    public void sendOneway() {
        for (int i = 0; i < 10; i++) {
            rocketMQService.sendOneway("Consumer_Cluster", "集群单向消息" + i);
        }
    }


    /**
     * 发送批量消息(小批量)
     */
    @Test
    public void sendBatchMessage() {
        //演示发送实体类型
        List<BatchDto> batchDtoList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            batchDtoList.add(
                    BatchDto.builder()
                            .id(i)
                            .message("发送批量消息Dto类型" + i)
                            .build());
        }
        rocketMQService.sendBatchMessage("Consumer_Batch", batchDtoList);
        log.info("=================发送Consumer_Batch主题完毕=================");

        //演示发送String类型
        List<String> stringList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            stringList.add("发送批量消息String类型" + i);
        }
//        rocketMQService.sendBatchMessage("Consumer_Cluster","A",null,stringList);
        rocketMQService.sendBatchMessage("Consumer_Cluster", stringList);
        log.info("=================发送Consumer_Cluster完毕=================");
    }


    /**
     * 发送批量消息(大批量,超过4MB)
     */
    @Test
    @SneakyThrows
    public void sendBatchMessage2() {
        //演示发送实体类型
        List<BatchDto> batchDtoList = new ArrayList<>(1000000);
        for (int i = 0; i < 1000000; i++) {
            batchDtoList.add(
                    BatchDto.builder()
                            .id(i)
                            .message("发送批量消息Dto类型" + i)
                            .build());
        }
        rocketMQService.sendBatchMessage("Consumer_Batch", batchDtoList);
        log.info("=================发送Consumer_Batch主题完毕=================");

        Thread.sleep(200000);
    }


    /**
     * 发送延时消息
     */
    @Test
    public void sendDelayLevel() {
        this.rocketMQService.sendDelayLevel("Consumer_Cluster", "集群延时消息", 4);
    }

    /**
     * 发送顺序消息-分区有序
     */
    @Test
    public void sendInOrder() {
        // 订单列表
        List<OrderStep> orderList = this.buildOrders();

        //循环一下,增加测试样本
//        for (int i = 0; i < 10; i++) {
        for (OrderStep orderStep : orderList) {
            this.rocketMQService.sendInOrder("Consumer_InOrder", orderStep, orderStep.getOrderId() + "");
        }
//        }
    }

    /**
     * 发送事务消息(一定要做幂等性处理(其实所有消息都要做幂等性。。))
     */
    @Test
    @SneakyThrows
    public void sendMessageInTransaction() {
        OrderPaidEvent build = OrderPaidEvent.builder()
                .orderId("123")
                .msg("事务消息-开始支付")
                .paidMoney(new BigDecimal(2))
                .build();
        this.rocketMQService.sendMessageInTransaction("Consumer_Transaction", build, "test");

        //先睡200秒,避免还没发送完毕就关闭了
        TimeUnit.SECONDS.sleep(200L);
    }


    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

2.2.2、消息消费者

2.2.2.1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq-consumer</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spring-boot-version>2.3.2.RELEASE</spring-boot-version>
    </properties>

    <dependencies>
        <!-- 配置web启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <!-- rocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <!-- lombok插件 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot-version}</version>
            </plugin>
        </plugins>
    </build>
</project>

2.2.2.2、配置文件

server:
  port: 8096
spring:
  application:
    name: rocketmq-consumer

rocketmq:
  name-server: 127.0.0.1:9876          # rocketMQ的名称服务器,格式为:' host:port;host:port '。
  # 生产端配置
  producer:
    group: ${spring.application.name}  # 生产着组名
    #access-key: access-key            # rocketMQ服务端配置acl授权信息,没有则不需要
    #secret-key: secret-key            # rocketMQ服务端配置acl授权信息,没有则不需要
  # 消费端配置
#  consumer:
#    access-key: access-key            #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!
#    secret-key: secret-key            #如果开启了acl,一定要配置。否则集群模式下会正常,广播模式消费端会失效!

2.2.2.3、消息监听器

2.2.2.3.1、批量消息消费
/**
 * <p>
 * 批量消息消费
 * </p>
 *
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Batch",//主题
        consumerGroup = "Consumer_Batch_group"//消费组  唯一
)
public class ConsumerBatch implements RocketMQListener<BatchDto>, RocketMQPushConsumerLifecycleListener {

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(BatchDto message) {
        log.info("ConsumerCluster 批量消息消费    message: {}  ", message);
    }

    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.2、广播模式消费
/**
 * <p>
 * 广播模式消费,每个消费者消费的消息都是相同的
 * </p>
 *
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Broadcast",//主题
        consumerGroup = "Consumer_Broadcast_group",//消费组  唯一
        messageModel = MessageModel.BROADCASTING //消费模式 默认CLUSTERING集群  BROADCASTING:广播(接收所有信息)
)
public class ConsumerBroadcast implements RocketMQListener<String> {

    //todo 广播消费模式宽带消费仍然确保消息至少被消费一次,但是没有提供重发选项。

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(String message) {
        try {
            //模拟业务逻辑处理中...
            log.info("ConsumerBroadcast  广播模式消费 message: {}  ", message);
            TimeUnit.SECONDS.sleep(10);
            //模拟出错,触发重试
//            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }
}
2.2.2.3.3、集群模式消费
/**
 * <p>
 * 集群模式消费,负载均衡模式消费(最常用!!!)
 * </p>
 *
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Cluster",//主题
        consumerGroup = "Consumer_Cluster_group",//消费组  唯一
        messageModel = MessageModel.CLUSTERING //消费模式 默认CLUSTERING集群  BROADCASTING:广播(接收所有信息)
)
public class ConsumerCluster implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(String message) {
        try {
            //模拟业务逻辑处理中...
            log.info("ConsumerCluster 集群模式消费 message: {}  ", message);
            TimeUnit.SECONDS.sleep(10);
            //模拟出错,触发重试
//            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.4、顺序消息消费
/**
 * <p>
 * 顺序消息消费-分区有序
 * </p>
 *
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_InOrder",//主题
        consumerGroup = "Consumer_InOrder_group",//消费组
        consumeMode = ConsumeMode.ORDERLY//消费类型  ORDERLY 一个队列一个线程,即分区有序
)
public class ConsumerInOrder implements RocketMQListener<OrderStep>, RocketMQPushConsumerLifecycleListener {

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param msg
     */
    @Override
    public void onMessage(OrderStep msg) {
        log.info("Receive message: {}  ThreadName: {}", msg, Thread.currentThread().getName());
        //模拟出错,触发重试
//        int i = 1 / 0;
    }


    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //程序报错,顺序消息这里不会等待重试,会立即执行。不设最大重试次数,会一直不断重试执行。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.5、过滤消息消费
/**
 * <p>
 * 过滤消息消费,SQL92模式
 * 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true
 * 在console控制台查看集群状态  enablePropertyFilter=true 才正常
 * </p>
 * <p>
 * <p>
 * 数值比较,比如:>,>=,<,<=,BETWEEN,=;
 * 字符比较,比如:=,<>,IN;
 * IS NULL 或者 IS NOT NULL;
 * 逻辑符号 AND,OR,NOT;
 * <p>
 * 常量支持类型为:
 * <p>
 * 数值,比如:123,3.1415;
 * 字符,比如:'abc',必须用单引号包裹起来;
 * NULL,特殊的常量
 * 布尔值,TRUE 或 FALSE
 *
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_SQL",//主题
        consumerGroup = "Consumer_SQL_group",//消费组  唯一
        /* 下面都有默认值,可选  */
        selectorType = SelectorType.SQL92,//过滤选项类型 默认TAG 还有 SQL92,Brro
        selectorExpression = "a between 0 and 3 or b='sql'"
)
public class ConsumerSQL implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {


    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(String message) {
        try {
            //模拟业务逻辑处理中...
            log.info("ConsumerSQL 消费 message: {}  ", message);
            TimeUnit.SECONDS.sleep(10);
            //模拟出错,触发重试
//            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.6、过滤消息消费,Tag模式
/**
 * <p>
 * 过滤消息消费,Tag模式
 * </p>
 *
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Tag",//主题
        consumerGroup = "Consumer_Tag1_group",//消费组  唯一
        selectorType = SelectorType.TAG,//过滤选项类型 默认TAG
        selectorExpression = "tag1"//过滤选项 默认*    Tag多个时,"tag1 || tag2 || tag3"
)
public class ConsumerTag1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(String message) {
        try {
            //模拟业务逻辑处理中...
            log.info("ConsumerTag1 消费 message: {}  ", message);
            TimeUnit.SECONDS.sleep(10);
            //模拟出错,触发重试
//            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.7、过滤消息消费,Tag模式
/**
 * <p>
 * 过滤消息消费,Tag模式
 * </p>
 *
 * @author MrWen
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Tag",//主题
        consumerGroup = "Consumer_Tag2_group",//消费组  唯一
        selectorType = SelectorType.TAG,//过滤选项类型 默认TAG
        selectorExpression = "tag1||tag2"//过滤选项 默认*    Tag多个时,"tag1 || tag2 || tag3"
)
public class ConsumerTag2 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(String message) {
        try {
            //模拟业务逻辑处理中...
            log.info("ConsumerTag2 消费 message: {}  ", message);
            TimeUnit.SECONDS.sleep(10);
            //模拟出错,触发重试
//            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.8、事务消息演示
/**
 * <p>
 * 事务消息演示
 * <p>
 * 事务消息使用上的限制
 * 1:事务消息不支持延时消息和批量消息。
 * 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
 * 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
 * 4:事务性消息可能不止一次被检查或消费。
 * 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
 * 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
 * </p>
 *
 * @author MrWen
 **/
@Slf4j
@Component
@RocketMQMessageListener(topic = "Consumer_Transaction",//主题
        consumerGroup = "Consumer_Transaction_group"//消费组  唯一
)
public class ConsumerTransaction implements RocketMQListener<OrderPaidEvent>, RocketMQPushConsumerLifecycleListener {

    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param orderPaidEvent 接收的消息,订单支付事件
     */
    @Override
    public void onMessage(OrderPaidEvent orderPaidEvent) {
        try {
            //模拟业务逻辑处理中...
            log.info("ConsumerTransaction 事务消息消费 message: {}  ", orderPaidEvent);
            TimeUnit.SECONDS.sleep(10);
            //模拟出错,触发重试
//            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
2.2.2.3.9、测试
/**
 * <p>
 * 测试
 * </p>
 *
 */
@Slf4j
//@Component
@RocketMQMessageListener(topic = "synchronously",//主题
        consumerGroup = "synchronously_group",//消费组  唯一
        /* 下面都有默认值,可选  */
        selectorType = SelectorType.TAG,//过滤选项类型 默认TAG 还有 SQL92
        selectorExpression = "*",//过滤选项 默认*    Tag多个时,"tag1 || tag2 || tag3"
        consumeMode = ConsumeMode.CONCURRENTLY,//消费类型  默认CONCURRENTLY同步  还有有序 ORDERLY 一个队列一个线程
        messageModel = MessageModel.CLUSTERING, //消费模式 默认CLUSTERING集群  还有 广播CLUSTERING(接收所有信息)
        consumeThreadMax = 64,//最大线程数,默认64
        consumeTimeout = 15L//超时时间,以分钟为单位,默认15L
)
public class Consumer implements RocketMQListener<OrderPaidEvent>, RocketMQPushConsumerLifecycleListener {


    /**
     * 消费者
     * 程序报错则进行重试
     *
     * @param message 接收的消息
     */
    @Override
    public void onMessage(OrderPaidEvent message) {
        log.info("Receive message: {}  ThreadName: {}", message, Thread.currentThread().getName());
    }


    /**
     * consumer配置都是通过这个
     *
     * @param consumer consumer配置
     */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //设最大重试次数,默认16次
        //距离上一次重试间隔
        //第1次:10s    第2次:30s     第3次:1min    第4次:2min     第5次:3min     第6次:4min     第7次:5min    第8次:6min
        //第9次:7min   第10次:8min   第11次:9min   第12次:10min   第13次:20min   第14次:30min   第15次:1h     第16次:2h   16次以后:都是2h
        //某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
        consumer.setMaxReconsumeTimes(3);

        //关于消费位点,默认CONSUME_FROM_LAST_OFFSET(从上一个偏移量消费)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //关于消费位点,从第一个偏移量消费(即全量消费,正常消息相同存储均为 3 天,3 天后会被自动删除)
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //关于消费位点,以秒精度回溯消费时间,按指定回溯时间开始消费,默认回溯半小时前的消费时间。
        //时间格式为20131223171201<br>暗示2013年12月23日17点12分01秒<br>
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }
}
/**
 * <p>
 * 批量消息实体
 * </p>
 *
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchDto implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer id;

    private String message;
}
/**
 * <p>
 * 订单支付事件
 * </p>
 *
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderPaidEvent implements Serializable {

    private static final long serialVersionUID = 1L;

    private String orderId;

    private BigDecimal paidMoney;

    private String msg;
}
/**
 * <p>
 * 顺序的步骤
 * </p>
 *
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderStep {
    private long orderId;
    private String desc;
}

2.3、SpringBoot整合Dubbo

下载dubbo-spring-boot-starter依赖包

dubbo-spring-boot-starter安装到本地仓库

mvn install -Dmaven.skip.test=true

在这里插入图片描述

2.3.1、搭建Zookeeper集群

2.3.1.1、准备工作

  1. 安装JDK
  2. 将Zookeeper上传到服务器
  3. 解压Zookeeper,并创建data目录,将conf下的zoo_sample.cfg文件改名为zoo.cfg
  4. 建立/user/local/zookeeper-cluster,将解压后的Zookeeper复制到以下三个目录
/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3
  1. 配置每一个 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183

    修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

clientPort=2181
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data

​ 修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data

​ 修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data

2.3.1.2、配置集群

  1. 在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是 1、2、3 。这个文件就是记录每个服务器的 ID

  2. 在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表。

    集群服务器 IP 列表如下

server.1=192.168.25.140:2881:3881
server.2=192.168.25.140:2882:3882
server.3=192.168.25.140:2883:3883

解释:server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口

2.3.1.3、启动集群

启动集群就是分别启动每个实例。
在这里插入图片描述

2.3.2、RPC服务接口

public interface IUserService {
    public String sayHello(String name);
}

2.3.3、服务提供者

2.3.3.1、添加依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

<dependencies>
    <!--dubbo-->
    <dependency>
        <groupId>com.alibaba.spring.boot</groupId>
        <artifactId>dubbo-spring-boot-starter</artifactId>
        <version>2.0.0</version>
    </dependency>
	<!--spring-boot-stater-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>log4j-to-slf4j</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
	<!--zookeeper-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.9</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
	<!--API-->
    <dependency>
        <groupId>com.angyan.demo</groupId>
        <artifactId>dubbo-api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

</dependencies>

2.3.3.2、配置文件

# application.properties
spring.application.name=dubbo-demo-provider
spring.dubbo.application.id=dubbo-demo-provider
spring.dubbo.application.name=dubbo-demo-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20880

2.3.3.3、启动类

@EnableDubboConfiguration
@SpringBootApplication
public class ProviderBootstrap {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(ProviderBootstrap.class,args);
    }

}

2.3.3.4、服务实现

@Component
@Service(interfaceClass = IUserService.class)
public class UserServiceImpl implements IUserService{
    @Override
    public String sayHello(String name) {
        return "hello:"+name;
    }
}

2.3.4、服务消费者

2.3.4.1、添加依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>

<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--dubbo-->
    <dependency>
        <groupId>com.alibaba.spring.boot</groupId>
        <artifactId>dubbo-spring-boot-starter</artifactId>
        <version>2.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>log4j-to-slf4j</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!--zookeeper-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.9</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!--API-->
    <dependency>
        <groupId>com.angyan.demo</groupId>
        <artifactId>dubbo-api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

</dependencies>

2.3.4.2、配置文件

# application.properties
spring.application.name=dubbo-demo-consumer
spring.dubbo.application.name=dubbo-demo-consumer
spring.dubbo.application.id=dubbo-demo-consumer
    spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183

2.3.4.3、启动类

@EnableDubboConfiguration
@SpringBootApplication
public class ConsumerBootstrap {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerBootstrap.class);
    }
}

2.3.4.4、Controller

@RestController
@RequestMapping("/user")
public class UserController {

    @Reference
    private IUserService userService;

    @RequestMapping("/sayHello")
    public String sayHello(String name){
        return userService.sayHello(name);
    }
}

三. 环境搭建

3.1、数据库

3.1.1、优惠券表

FieldTypeComment
coupon_idbigint(50) NOT NULL优惠券ID
coupon_pricedecimal(10,2) NULL优惠券金额
user_idbigint(50) NULL用户ID
order_idbigint(32) NULL订单ID
is_usedint(1) NULL是否使用 0未使用 1已使用
used_timetimestamp NULL使用时间

3.1.2、商品表

FieldTypeComment
goods_idbigint(50) NOT NULL主键
goods_namevarchar(255) NULL商品名称
goods_numberint(11) NULL商品库存
goods_pricedecimal(10,2) NULL商品价格
goods_descvarchar(255) NULL商品描述
add_timetimestamp NULL添加时间

3.1.3、订单表

FieldTypeComment
order_idbigint(50) NOT NULL订单ID
user_idbigint(50) NULL用户ID
order_statusint(1) NULL订单状态 0未确认 1已确认 2已取消 3无效 4退款
pay_statusint(1) NULL支付状态 0未支付 1支付中 2已支付
shipping_statusint(1) NULL发货状态 0未发货 1已发货 2已退货
addressvarchar(255) NULL收货地址
consigneevarchar(255) NULL收货人
goods_idbigint(50) NULL商品ID
goods_numberint(11) NULL商品数量
goods_pricedecimal(10,2) NULL商品价格
goods_amountdecimal(10,0) NULL商品总价
shipping_feedecimal(10,2) NULL运费
order_amountdecimal(10,2) NULL订单价格
coupon_idbigint(50) NULL优惠券ID
coupon_paiddecimal(10,2) NULL优惠券
money_paiddecimal(10,2) NULL已付金额
pay_amountdecimal(10,2) NULL支付金额
add_timetimestamp NULL创建时间
confirm_timetimestamp NULL订单确认时间
pay_timetimestamp NULL支付时间

3.1.4、订单商品日志表

FieldTypeComment
goods_idint(11) NOT NULL商品ID
order_idvarchar(32) NOT NULL订单ID
goods_numberint(11) NULL库存数量
log_timedatetime NULL记录时间

3.1.5、用户表

FieldTypeComment
user_idbigint(50) NOT NULL用户ID
user_namevarchar(255) NULL用户姓名
user_passwordvarchar(255) NULL用户密码
user_mobilevarchar(255) NULL手机号
user_scoreint(11) NULL积分
user_reg_timetimestamp NULL注册时间
user_moneydecimal(10,0) NULL用户余额

3.1.6、用户余额日志表

FieldTypeComment
user_idbigint(50) NOT NULL用户ID
order_idbigint(50) NOT NULL订单ID
money_log_typeint(1) NOT NULL日志类型 1订单付款 2 订单退款
use_moneydecimal(10,2) NULL操作金额
create_timetimestamp NULL日志时间

3.1.7、订单支付表

FieldTypeComment
pay_idbigint(50) NOT NULL支付编号
order_idbigint(50) NULL订单编号
pay_amountdecimal(10,2) NULL支付金额
is_paidint(1) NULL是否已支付 1否 2是

3.1.8、MQ消息生产表

FieldTypeComment
idvarchar(100) NOT NULL主键
group_namevarchar(100) NULL生产者组名
msg_topicvarchar(100) NULL消息主题
msg_tagvarchar(100) NULLTag
msg_keyvarchar(100) NULLKey
msg_bodyvarchar(500) NULL消息内容
msg_statusint(1) NULL0:未处理;1:已经处理
create_timetimestamp NOT NULL记录时间

3.1.9、MQ消息消费表

FieldTypeComment
msg_idvarchar(50) NULL消息ID
group_namevarchar(100) NOT NULL消费者组名
msg_tagvarchar(100) NOT NULLTag
msg_keyvarchar(100) NOT NULLKey
msg_bodyvarchar(500) NULL消息体
consumer_statusint(1) NULL0:正在处理;1:处理成功;2:处理失败
consumer_timesint(1) NULL消费次数
consumer_timestamptimestamp NULL消费时间
remarkvarchar(500) NULL备注

3.2、项目初始化

shop系统基于Maven进行项目管理

3.2.1、工程浏览

  • 父工程:shop-parent
  • 订单系统:shop-order-web
  • 支付系统:shop-pay-web
  • 优惠券服务:shop-coupon-service
  • 订单服务:shop-order-service
  • 支付服务:shop-pay-service
  • 商品服务:shop-goods-service
  • 用户服务:shop-user-service
  • 实体类:shop-pojo
  • 持久层:shop-dao
  • 接口层:shop-api
  • 工具工程:shop-common

共12个系统

3.2.2、工程关系

在这里插入图片描述

3.3、Mybatis逆向工程使用

3.3.1、代码生成

使用Mybatis逆向工程针对数据表生成CURD持久层代码

3.3.2、代码导入

  • 将实体类导入到shop-pojo工程
  • 在服务层工程中导入对应的Mapper类和对应配置文件

3.4 公共类介绍

  • ID生成器

    IDWorker:Twitter雪花算法

  • 异常处理类

    CustomerException:自定义异常类

    CastException:异常抛出类

  • 常量类

    ShopCode:系统状态类

  • 响应实体类

    Result:封装响应状态和响应信息

四. 下单业务

在这里插入图片描述

4.1、下单基本流程

4.1.1、接口定义

  • IOrderService
public interface IOrderService {
    /**
     * 确认订单
     * @param order
     * @return Result
     */
    Result confirmOrder(TradeOrder order);
}

4.1.2、业务类实现

@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {

    @Override
    public Result confirmOrder(TradeOrder order) {
        //1.校验订单
       
        //2.生成预订单
       
        try {
            //3.扣减库存
            
            //4.扣减优惠券
           
            //5.使用余额
           
            //6.确认订单
            
            //7.返回成功状态
           
        } catch (Exception e) {
            //1.确认订单失败,发送消息
            
            //2.返回失败状态
        }

    }
}

4.1.3、校验订单

在这里插入图片描述

private void checkOrder(TradeOrder order) {
        //1.校验订单是否存在
        if(order==null){
            CastException.cast(ShopCode.SHOP_ORDER_INVALID);
        }
        //2.校验订单中的商品是否存在
        TradeGoods goods = goodsService.findOne(order.getGoodsId());
        if(goods==null){
            CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
        }
        //3.校验下单用户是否存在
        TradeUser user = userService.findOne(order.getUserId());
        if(user==null){
            CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
        }
        //4.校验商品单价是否合法
        if(order.getGoodsPrice().compareTo(goods.getGoodsPrice())!=0){
            CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
        }
        //5.校验订单商品数量是否合法
        if(order.getGoodsNumber()>=goods.getGoodsNumber()){
            CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
        }

        log.info("校验订单通过");
}

4.1.4、生成预订单

在这里插入图片描述

private Long savePreOrder(TradeOrder order) {
        //1.设置订单状态为不可见
        order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());
        //2.订单ID
        order.setOrderId(idWorker.nextId());
        //核算运费是否正确
        BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
        if (order.getShippingFee().compareTo(shippingFee) != 0) {
            CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
        }
        //3.计算订单总价格是否正确
        BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
        orderAmount.add(shippingFee);
        if (orderAmount.compareTo(order.getOrderAmount()) != 0) {
            CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
        }

        //4.判断优惠券信息是否合法
        Long couponId = order.getCouponId();
        if (couponId != null) {
            TradeCoupon coupon = couponService.findOne(couponId);
            //优惠券不存在
            if (coupon == null) {
                CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);
            }
            //优惠券已经使用
            if ((ShopCode.SHOP_COUPON_ISUSED.getCode().toString())
                .equals(coupon.getIsUsed().toString())) {
                CastException.cast(ShopCode.SHOP_COUPON_INVALIED);
            }
            order.setCouponPaid(coupon.getCouponPrice());
        } else {
            order.setCouponPaid(BigDecimal.ZERO);
        }

        //5.判断余额是否正确
        BigDecimal moneyPaid = order.getMoneyPaid();
        if (moneyPaid != null) {
            //比较余额是否大于0
            int r = order.getMoneyPaid().compareTo(BigDecimal.ZERO);
            //余额小于0
            if (r == -1) {
                CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);
            }
            //余额大于0
            if (r == 1) {
                //查询用户信息
                TradeUser user = userService.findOne(order.getUserId());
                if (user == null) {
                    CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
                }
            //比较余额是否大于用户账户余额
            if (user.getUserMoney().compareTo(order.getMoneyPaid().longValue()) == -1) {
                CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
            }
            order.setMoneyPaid(order.getMoneyPaid());
        }
    } else {
        order.setMoneyPaid(BigDecimal.ZERO);
    }
    //计算订单支付总价
    order.setPayAmount(orderAmount.subtract(order.getCouponPaid())
                       .subtract(order.getMoneyPaid()));
    //设置订单添加时间
    order.setAddTime(new Date());

    //保存预订单
    int r = orderMapper.insert(order);
    if (ShopCode.SHOP_SUCCESS.getCode() != r) {
        CastException.cast(ShopCode.SHOP_ORDER_SAVE_ERROR);
    }
    log.info("订单:["+order.getOrderId()+"]预订单生成成功");
    return order.getOrderId();
}

4.1.5、扣减库存

  • 通过dubbo调用商品服务完成扣减库存
private void reduceGoodsNum(TradeOrder order) {
        TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
        goodsNumberLog.setGoodsId(order.getGoodsId());
        goodsNumberLog.setOrderId(order.getOrderId());
        goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
        Result result = goodsService.reduceGoodsNum(goodsNumberLog);
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
        }
        log.info("订单:["+order.getOrderId()+"]扣减库存["+order.getGoodsNumber()+"个]成功");
    }
  • 商品服务GoodsService扣减库存
@Override
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
    if (goodsNumberLog == null ||
            goodsNumberLog.getGoodsNumber() == null ||
            goodsNumberLog.getOrderId() == null ||
            goodsNumberLog.getGoodsNumber() == null ||
            goodsNumberLog.getGoodsNumber().intValue() <= 0) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
    if(goods.getGoodsNumber()<goodsNumberLog.getGoodsNumber()){
        //库存不足
        CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
    }
    //减库存
    goods.setGoodsNumber(goods.getGoodsNumber()-goodsNumberLog.getGoodsNumber());
    goodsMapper.updateByPrimaryKey(goods);


    //记录库存操作日志
    goodsNumberLog.setGoodsNumber(-(goodsNumberLog.getGoodsNumber()));
    goodsNumberLog.setLogTime(new Date());
    goodsNumberLogMapper.insert(goodsNumberLog);

    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}

4.1.6、扣减优惠券

  • 通过dubbo完成扣减优惠券
private void changeCoponStatus(TradeOrder order) {
    //判断用户是否使用优惠券
    if (!StringUtils.isEmpty(order.getCouponId())) {
        //封装优惠券对象
        TradeCoupon coupon = couponService.findOne(order.getCouponId());
        coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
        coupon.setUsedTime(new Date());
        coupon.setOrderId(order.getOrderId());
        Result result = couponService.changeCouponStatus(coupon);
        //判断执行结果
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            //优惠券使用失败
            CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
        }
        log.info("订单:["+order.getOrderId()+"]使用扣减优惠券["+coupon.getCouponPrice()+"元]成功");
    }

}
  • 优惠券服务CouponService更改优惠券状态
@Override
public Result changeCouponStatus(TradeCoupon coupon) {
    try {
        //判断请求参数是否合法
        if (coupon == null || StringUtils.isEmpty(coupon.getCouponId())) {
            CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
        }
		//更新优惠券状态为已使用
        couponMapper.updateByPrimaryKey(coupon);
        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
    } catch (Exception e) {
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
}

4.1.7、扣减用户余额

  • 通过用户服务完成扣减余额
private void reduceMoneyPaid(TradeOrder order) {
    //判断订单中使用的余额是否合法
    if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
        TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
        userMoneyLog.setOrderId(order.getOrderId());
        userMoneyLog.setUserId(order.getUserId());
        userMoneyLog.setUseMoney(order.getMoneyPaid());
        userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
        //扣减余额
        Result result = userService.changeUserMoney(userMoneyLog);
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
        }
        log.info("订单:["+order.getOrderId()+"扣减余额["+order.getMoneyPaid()+"元]成功]");
    }
}
  • 用户服务UserService,更新余额

在这里插入图片描述

@Override
public Result changeUserMoney(TradeUserMoneyLog userMoneyLog) {
    //判断请求参数是否合法
    if (userMoneyLog == null
            || userMoneyLog.getUserId() == null
            || userMoneyLog.getUseMoney() == null
            || userMoneyLog.getOrderId() == null
            || userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }

    //查询该订单是否存在付款记录
    TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
    userMoneyLogExample.createCriteria()
            .andUserIdEqualTo(userMoneyLog.getUserId())
            .andOrderIdEqualTo(userMoneyLog.getOrderId());
   int count = userMoneyLogMapper.countByExample(userMoneyLogExample);
   TradeUser tradeUser = new TradeUser();
   tradeUser.setUserId(userMoneyLog.getUserId());
   tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue());
   //判断余额操作行为
   //【付款操作】
   if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_PAID.getCode())) {
           //订单已经付款,则抛异常
           if (count > 0) {
                CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
            }
       	   //用户账户扣减余额
           userMapper.reduceUserMoney(tradeUser);
       }
    //【退款操作】
    if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_REFUND.getCode())) {
         //如果订单未付款,则不能退款,抛异常
         if (count == 0) {
         CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
     }
     //防止多次退款
     userMoneyLogExample = new TradeUserMoneyLogExample();
     userMoneyLogExample.createCriteria()
             .andUserIdEqualTo(userMoneyLog.getUserId())
                .andOrderIdEqualTo(userMoneyLog.getOrderId())
                .andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
     count = userMoneyLogMapper.countByExample(userMoneyLogExample);
     if (count > 0) {
         CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
     }
     	//用户账户添加余额
        userMapper.addUserMoney(tradeUser);
    }


    //记录用户使用余额日志
    userMoneyLog.setCreateTime(new Date());
    userMoneyLogMapper.insert(userMoneyLog);
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}

4.1.8、确认订单

private void updateOrderStatus(TradeOrder order) {
    order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
    order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
    order.setConfirmTime(new Date());
    int r = orderMapper.updateByPrimaryKey(order);
    if (r <= 0) {
        CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
    }
    log.info("订单:["+order.getOrderId()+"]状态修改成功");
}

4.1.9、小结

@Override
public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    checkOrder(order);
    //2.生成预订单
    Long orderId = savePreOrder(order);
    order.setOrderId(orderId);
    try {
        //3.扣减库存
        reduceGoodsNum(order);
        //4.扣减优惠券
        changeCoponStatus(order);
        //5.使用余额
        reduceMoneyPaid(order);
        //6.确认订单
        updateOrderStatus(order);
        log.info("订单:["+orderId+"]确认成功");
        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
    } catch (Exception e) {
        //确认订单失败,发送消息
        ...
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
}

4.2、失败补偿机制

4.2.1、消息发送方

  • 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup

mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
  • 注入模板类和属性值信息
 @Autowired
 private RocketMQTemplate rocketMQTemplate;

 @Value("${mq.order.topic}")
 private String topic;

 @Value("${mq.order.tag.cancel}")
 private String cancelTag;
  • 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    //2.生成预订
    try {
        //3.扣减库存
        //4.扣减优惠券
        //5.使用余额
        //6.确认订单
    } catch (Exception e) {
        //确认订单失败,发送消息
        CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
        cancelOrderMQ.setOrderId(order.getOrderId());
        cancelOrderMQ.setCouponId(order.getCouponId());
        cancelOrderMQ.setGoodsId(order.getGoodsId());
        cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
        cancelOrderMQ.setUserId(order.getUserId());
        cancelOrderMQ.setUserMoney(order.getMoneyPaid());
        try {
            sendMessage(topic, 
                        cancelTag, 
                        cancelOrderMQ.getOrderId().toString(), 
                    JSON.toJSONString(cancelOrderMQ));
    } catch (Exception e1) {
        e1.printStackTrace();
            CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
        }
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
}
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {
    //判断Topic是否为空
    if (StringUtils.isEmpty(topic)) {
        CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
    }
    //判断消息内容是否为空
    if (StringUtils.isEmpty(body)) {
        CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
    }
    //消息体
    Message message = new Message(topic, tags, keys, body.getBytes());
    //发送消息
    rocketMQTemplate.getProducer().send(message);
}

4.2.2、消费接收方

  • 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
  • 创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", 
                         consumerGroup = "${mq.order.consumer.group.name}",
                         messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{

    @Override
    public void onMessage(MessageExt messageExt) {
        ...
    }
}

4.2.2.1、回退库存

  • 流程分析
    在这里插入图片描述
  • 消息消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Value("${mq.order.consumer.group.name}")
    private String groupName;

    @Autowired
    private TradeGoodsMapper goodsMapper;

    @Autowired
    private TradeMqConsumerLogMapper mqConsumerLogMapper;

    @Autowired
    private TradeGoodsNumberLogMapper goodsNumberLogMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
        String msgId=null;
        String tags=null;
        String keys=null;
        String body=null;
        try {
            //1. 解析消息内容
            msgId = messageExt.getMsgId();
            tags= messageExt.getTags();
            keys= messageExt.getKeys();
            body= new String(messageExt.getBody(),"UTF-8");

            log.info("接受消息成功");

            //2. 查询消息消费记录
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);

            if(mqConsumerLog!=null){
                //3. 判断如果消费过...
                //3.1 获得消息处理状态
                Integer status = mqConsumerLog.getConsumerStatus();
                //处理过...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){
                    log.info("消息:"+msgId+",已经处理过");
                    return;
                }

                //正在处理...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){
                    log.info("消息:"+msgId+",正在处理");
                    return;
                }

                //处理失败
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){
                    //获得消息处理次数
                    Integer times = mqConsumerLog.getConsumerTimes();
                    if(times>3){
                        log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
                        return;
                    }
                    mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());

                    //使用数据库乐观锁更新
                    TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
                    TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
                    criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
                    criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
                    criteria.andGroupNameEqualTo(groupName);
                    criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
                    int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
                    if(r<=0){
                        //未修改成功,其他线程并发修改
                        log.info("并发修改,稍后处理");
                    }
                }

            }else{
                //4. 判断如果没有消费过...
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(0);

                //将消息处理信息添加到数据库
                mqConsumerLogMapper.insert(mqConsumerLog);
            }
            //5. 回退库存
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            Long goodsId = mqEntity.getGoodsId();
            TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
            goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());
            goodsMapper.updateByPrimaryKey(goods);

            //记录库存操作日志
            TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
            goodsNumberLog.setOrderId(mqEntity.getOrderId());
            goodsNumberLog.setGoodsId(goodsId);
            goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
            goodsNumberLog.setLogTime(new Date());
            goodsNumberLogMapper.insert(goodsNumberLog);

            //6. 将消息的处理状态改为成功
            mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
            mqConsumerLog.setConsumerTimestamp(new Date());
            mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
            log.info("回退库存成功");
        } catch (Exception e) {
            e.printStackTrace();
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
            if(mqConsumerLog==null){
                //数据库未有记录
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(1);
                mqConsumerLogMapper.insert(mqConsumerLog);
            }else{
                mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
                mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
            }
        }

    }
}

4.2.2.2、回退优惠券

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Autowired
    private TradeCouponMapper couponMapper;

    @Override
    public void onMessage(MessageExt message) {

        try {
            //1. 解析消息内容
            String body = new String(message.getBody(), "UTF-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            log.info("接收到消息");
            //2. 查询优惠券信息
            TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());
            //3.更改优惠券状态
            coupon.setUsedTime(null);
            coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());
            coupon.setOrderId(null);
            couponMapper.updateByPrimaryKey(coupon);
            log.info("回退优惠券成功");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("回退优惠券失败");
        }

    }
}

4.2.2.3、回退余额

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Autowired
    private IUserService userService;

    @Override
    public void onMessage(MessageExt messageExt) {

        try {
            //1.解析消息
            String body = new String(messageExt.getBody(), "UTF-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            log.info("接收到消息");
            if(mqEntity.getUserMoney()!=null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){
                //2.调用业务层,进行余额修改
                TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
                userMoneyLog.setUseMoney(mqEntity.getUserMoney());
                userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
                userMoneyLog.setUserId(mqEntity.getUserId());
                userMoneyLog.setOrderId(mqEntity.getOrderId());
                userService.updateMoneyPaid(userMoneyLog);
                log.info("余额回退成功");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("余额回退失败");
        }

    }
}

4.2.2.4、取消订单

@Override
public void onMessage(MessageExt messageExt) {
    String body = new String(messageExt.getBody(), "UTF-8");
    String msgId = messageExt.getMsgId();
    String tags = messageExt.getTags();
    String keys = messageExt.getKeys();
    log.info("CancelOrderProcessor receive message:"+messageExt);
    CancelOrderMQ cancelOrderMQ = JSON.parseObject(body, CancelOrderMQ.class);
    TradeOrder order = orderService.findOne(cancelOrderMQ.getOrderId());
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
    orderService.changeOrderStatus(order);
    log.info("订单:["+order.getOrderId()+"]状态设置为取消");
    return order;
}

4.3、测试

4.3.1、准备测试环境

  • 用户数据
  • 商品数据
  • 优惠券数据
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ShopOrderServiceApplication.class)
public class OrderTest {

    @Autowired
    private IOrderService orderService;
}

4.3.2、测试下单成功流程

@Test    
public void add(){
    Long goodsId=XXXL;
    Long userId=XXXL;
    Long couponId=XXXL;

    TradeOrder order = new TradeOrder();
    order.setGoodsId(goodsId);
    order.setUserId(userId);
    order.setGoodsNumber(1);
    order.setAddress("北京");
    order.setGoodsPrice(new BigDecimal("5000"));
    order.setOrderAmount(new BigDecimal("5000"));
    order.setMoneyPaid(new BigDecimal("100"));
    order.setCouponId(couponId);
    order.setShippingFee(new BigDecimal(0));
    orderService.confirmOrder(order);
}

执行完毕后,查看数据库中用户的余额、优惠券数据,及订单的状态数据

4.3.3、测试下单失败流程

代码同上。

执行完毕后,查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。

五. 支付业务

5.1、创建支付订单

在这里插入图片描述

public Result createPayment(TradePay tradePay) {
    //查询订单支付状态
    try {
        TradePayExample payExample = new TradePayExample();
        TradePayExample.Criteria criteria = payExample.createCriteria();
        criteria.andOrderIdEqualTo(tradePay.getOrderId());
        criteria.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        int count = tradePayMapper.countByExample(payExample);
        if (count > 0) {
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
        }

        long payId = idWorker.nextId();
        tradePay.setPayId(payId);
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
        tradePayMapper.insert(tradePay);
        log.info("创建支付订单成功:" + payId);
    } catch (Exception e) {
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

5.2、支付回调

5.2.1、流程分析

在这里插入图片描述

5.2.2、代码实现

  • 创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(4);

    executor.setMaxPoolSize(8);

    executor.setQueueCapacity(100);

    executor.setKeepAliveSeconds(60);

    executor.setThreadNamePrefix("Pool-A");

    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    executor.initialize();

    return executor;

}
@Autowired
private ThreadPoolTaskExecutor executorService;

@Override
public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
     log.info("支付回调");
     //1. 判断用户支付状态
     if(tradePay.getIsPaid().intValue()==ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()){
         //2. 更新支付订单状态为已支付
         Long payId = tradePay.getPayId();
         TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
         //判断支付订单是否存在
         if(pay==null){
             CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
         }
         pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
         int r = tradePayMapper.updateByPrimaryKeySelective(pay);
         log.info("支付订单状态改为已支付");
         if(r==1){
             //3. 创建支付成功的消息
             TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
             tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
             tradeMqProducerTemp.setGroupName(groupName);
             tradeMqProducerTemp.setMsgTopic(topic);
             tradeMqProducerTemp.setMsgTag(tag);
             tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
             tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
             tradeMqProducerTemp.setCreateTime(new Date());
             //4. 将消息持久化数据库
             mqProducerTempMapper.insert(tradeMqProducerTemp);
             log.info("将支付成功消息持久化到数据库");

             //在线程池中进行处理
             threadPoolTaskExecutor.submit(new Runnable() {
                 @Override
                 public void run() {
                     //5. 发送消息到MQ
                     SendResult result = null;
                     try {
                         result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                     if(result.getSendStatus().equals(SendStatus.SEND_OK)){
                         log.info("消息发送成功");
                         //6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
                         mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
                         log.info("持久化到数据库的消息删除");
                     }
                 }
             });

         }
         return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
     }else{
         CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
         return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());
     }

 }

5.2.3、处理消息

支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理

  1. 订单服务修改订单状态为已支付
  2. 日志服务记录支付日志
  3. 用户服务负责给用户增加积分

以下用订单服务为例说明消息的处理情况

5.2.3.1、配置RocketMQ属性值

mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group

5.2.3.2、消费消息

  • 在订单服务中,配置公共的消息处理类
public class BaseConsumer {

    public TradeOrder handleMessage(IOrderService 
                                    orderService, 
                                    MessageExt messageExt,Integer code) throws Exception {
        //解析消息内容
        String body = new String(messageExt.getBody(), "UTF-8");
        String msgId = messageExt.getMsgId();
        String tags = messageExt.getTags();
        String keys = messageExt.getKeys();
        OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
        
        //查询
        TradeOrder order = orderService.findOne(orderMq.getOrderId());

        if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
            order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
        }

        if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
            order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        }
        orderService.changeOrderStatus(order);
        return order;
    }

}
  • 接受订单支付成功消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}", 
                         consumerGroup = "${mq.pay.consumer.group.name}")
public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private IOrderService orderService;

    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            log.info("CancelOrderProcessor receive message:"+messageExt);
            TradeOrder order = handleMessage(orderService, 
                                             messageExt, 
                                             ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
            log.info("订单:["+order.getOrderId()+"]支付成功");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("订单支付失败");
        }
    }
}

六、整体联调

通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作

6.1、准备工作

6.1.1、配置RestTemplate类

@Configuration
public class RestTemplateConfig {

    @Bean
    @ConditionalOnMissingBean({ RestOperations.class, RestTemplate.class })
    public RestTemplate restTemplate(ClientHttpRequestFactory factory) {

        RestTemplate restTemplate = new RestTemplate(factory);

        // 使用 utf-8 编码集的 conver 替换默认的 conver(默认的 string conver 的编码集为"ISO-8859-1")
        List<HttpMessageConverter<?>> messageConverters = restTemplate.getMessageConverters();
        Iterator<HttpMessageConverter<?>> iterator = messageConverters.iterator();
        while (iterator.hasNext()) {
            HttpMessageConverter<?> converter = iterator.next();
            if (converter instanceof StringHttpMessageConverter) {
                iterator.remove();
            }
        }
        messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));

        return restTemplate;
    }

    @Bean
    @ConditionalOnMissingBean({ClientHttpRequestFactory.class})
    public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // ms
        factory.setReadTimeout(15000);
        // ms
        factory.setConnectTimeout(15000);
        return factory;
    }
}

6.1.2、配置请求地址

  • 订单系统
server.host=http://localhost
server.servlet.path=/order-web
server.port=8080
shop.order.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.order.confirm=/order/confirm
  • 支付系统
server.host=http://localhost
server.servlet.path=/pay-web
server.port=9090
shop.pay.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.pay.createPayment=/pay/createPayment
shop.pay.callbackPayment=/pay/callbackPayment

6.2、下单测试

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopOrderWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class OrderTest {

   @Autowired
   private RestTemplate restTemplate;

   @Value("${shop.order.baseURI}")
   private String baseURI;

   @Value("${shop.order.confirm}")
   private String confirmOrderPath;

   @Autowired
   private IDWorker idWorker;
  
  /**
    * 下单
    */
   @Test
   public void confirmOrder(){
       Long goodsId=XXXL;
       Long userId=XXXL;
       Long couponId=XXXL;

       TradeOrder order = new TradeOrder();
       order.setGoodsId(goodsId);
       order.setUserId(userId);
       order.setGoodsNumber(1);
       order.setAddress("北京");
       order.setGoodsPrice(new BigDecimal("5000"));
       order.setOrderAmount(new BigDecimal("5000"));
       order.setMoneyPaid(new BigDecimal("100"));
       order.setCouponId(couponId);
       order.setShippingFee(new BigDecimal(0));

       Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, order, Result.class).getBody();
       System.out.println(result);
   }

}

6.3、支付测试

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopPayWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class PayTest {

    @Autowired
    private RestTemplate restTemplate;

    @Value("${shop.pay.baseURI}")
    private String baseURI;

    @Value("${shop.pay.createPayment}")
    private String createPaymentPath;

    @Value("${shop.pay.callbackPayment}")
    private String callbackPaymentPath;

    @Autowired
    private IDWorker idWorker;

   /**
     * 创建支付订单
     */
    @Test
    public void createPayment(){

        Long orderId = 346321587315814400L;
        TradePay pay = new TradePay();
        pay.setOrderId(orderId);
        pay.setPayAmount(new BigDecimal(4800));

        Result result = restTemplate.postForEntity(baseURI + createPaymentPath, pay, Result.class).getBody();
        System.out.println(result);
    }
   
    /**
     * 支付回调
     */
    @Test
    public void callbackPayment(){
        Long payId = 346321891507720192L;
        TradePay pay = new TradePay();
        pay.setPayId(payId);
        pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        Result result = restTemplate.postForEntity(baseURI + callbackPaymentPath, pay, Result.class).getBody();
        System.out.println(result);

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/688627.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MiniGPT4模型训练与部署

第二式&#xff1a;MiniGPT4模型训练与部署 1.环境搭建1.1 下载MiniGPT-4代码1.2 创建虚拟环境 2.Vicuna模型准备2.1 下载vicuna delta weights2.2 下载原始llama weights2.3 合成真正的working weights2.4 配置Vicuna模型路径 3. MiniGPT-4 checkpoint准备3.1 下载MiniGPT-4 c…

《二叉搜索树》

文章目录 一、二叉搜索树的概念二、二叉搜索树的实现2.1 插入迭代插入递归插入 2.2 查找迭代查找递归查找 2.3 删除迭代删除递归删除 2.4 中序遍历 三、二叉搜索树的应用1、K模型2、KV模型 四、二叉树的性能分析 一、二叉搜索树的概念 二叉搜索树又叫做二叉排序树。 左子树的…

diffusion model(三)—— classifier guided diffusion model

classifier guided diffusion model 背景 对于一般的DM&#xff08;如DDPM&#xff0c; DDIM&#xff09;的采样过程是直接从一个噪声分布&#xff0c;通过不断采样来生成图片。但这个方法生成的图片类别是随机的&#xff0c;如何生成特定类别的图片呢&#xff1f;这就是clas…

前沿重器[35] | 提示工程和提示构造技巧

前沿重器 栏目主要给大家分享各种大厂、顶会的论文和分享&#xff0c;从中抽取关键精华的部分和大家分享&#xff0c;和大家一起把握前沿技术。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。&#xff08;算起来&#xff0c;专项启动已经…

MySQL数据库主从复制与读写分离(图文详解!)

目录 前言 一&#xff1a;MySQL数据库主从复制与读写分离 1、什么是读写分离&#xff1f; 2、为什么要读写分离呢&#xff1f; 3、什么时候要读写分离&#xff1f; 4、主从复制与读写分离 5、mysql支持的复制类型 &#xff08;1&#xff09;STATEMENT &#xff08;2&…

SLAM面试笔记(5) — C++面试题

目录 第1章 C基础 1 C中static静态变量有什么作用&#xff0c;在什么情况下会用&#xff1f; 2 类中的this指针指向哪里&#xff1f; 3 说一下const的作用。 4 std::string类型为啥不能memset&#xff1f; 5 emplace_back( )和push_back( )有什么区别&#xff1f; 6 tra…

【状态估计】基于无味卡尔曼滤波模拟倾斜传感器研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

通过Redisson的管道批量操作来提高Redis Io效率

一、背景 当在对redis进行刷数操作时&#xff0c;大部分的redis框架对于单次执行的效率差不多&#xff0c;但我们有时需要一次性写入大量的redis key时&#xff0c;一次一次的操作速度就很慢。尤其是处于跨region的环境&#xff0c;一次的redis io就高达数十毫秒&#xff0…

Android aar包的生成与使用

前言 最近用Android Studio开发Android时&#xff0c;会经常接触到aar包&#xff08;Java Archive&#xff09;&#xff0c;aar包含所有资源&#xff0c;class以及res资源文件全部包含。 优势 Android通过aar方式把代码和资源打成一个包&#xff0c;提供给第三方使用或者是开…

什么是AOP?

目录 一、AOP简介 1、AOP简介和作用 2、AOP的概念 二、AOP的基本实现 三、AOP工作流程 1 、AOP工作流程 2、AOP核心概念 四、AOP切入点表达式 1、语法格式 2、通配符 五、AOP通知类型 1、AOP通知分类 2、AOP通知详解 &#xff08;1&#xff09;前置通知 &#xf…

Java Web JDBC(1)23.6.25

JDBC 1&#xff0c;JDBC概述 在开发中我们使用的是java语言&#xff0c;那么势必要通过java语言操作数据库中的数据。这就是接下来要学习的JDBC。 1.1 JDBC概念 JDBC 就是使用Java语言操作关系型数据库的一套API 全称&#xff1a;( Java DataBase Connectivity ) Java 数据库…

vue3-实战-13-管理后台-数据大屏解决方案-顶部组件搭建-实时游客统计

目录 1-数据大屏解决方案vw和vh 2-数据大屏解决方案scale 3-数据大屏原型需求图 4-数据大屏顶部搭建 4.1-顶部原型需求 4.2-顶部模块父组件的结构和逻辑 4.3-顶部模块子组件结构和逻辑 5-数据大屏游客统计 5.1-原型需求图分析 5.2-结构样式逻辑开发 1-数据大屏解决方…

视觉与多模态大模型前沿进展 | 2023智源大会精彩回顾

导读 6 月 9 日下午&#xff0c;智源大会「视觉与多模态大模型」专题论坛如期举行。随着 stable diffusion、midjourney、SAM 等爆火应用相继问世&#xff0c;AIGC 和计算机视觉与大模型的结合成为了新的「风口」。本次研讨会由智源研究院访问首席科学家颜水成和马尔奖获得者曹…

在UE5编辑器环境中使用Python

UE有很多Python方案&#xff0c;本文所讲述的Python为UE5官方内嵌版本方案&#xff0c;并且只能在编辑器环境下使用&#xff0c;使用该功能可以编写编辑器下的辅助工具&#xff0c;提升开发效率。 1.调用Python的几种方式 讲一讲UE5中调用Python的几种方式&#xff0c;首先是…

rust abc(5): 常量

文章目录 1. 目的2. 基本用法2.1 说明2.2 运行结果 3. 不推荐或不正确用法3.1 不推荐用小写字母作为常量名字3.2 常量名称中含有小写字母就会报warning3.3 定义常量时&#xff0c;不指定数据类型会编译报错 4. const 和 immutable 的区别4.1 const 可以在函数外声明&#xff0c…

三、决策树 四、随机森林

三、决策树1.决策树模型的原理1&#xff09;什么是决策树2&#xff09;决策树模型原理3.构建决策树的目的4&#xff09;决策树的优缺点 2.决策树的典型生成算法1&#xff09;常用的特征选择有信息增益、信息增益率、基尼系数2&#xff09;基于信息增益的ID3算法3&#xff09;基…

JAVAWEB 30-

JAVAWEB 30- 快速入门DriverManagerConnectionresultsetPreparedStatement增删改查查询所有添加 修改 MAVEN坐标MyBatis代理开发mybatis查询条件查询添加删除参数传递 快速入门 public static void main(String[] args) throws Exception { /1.注册驱动 Class.forName("co…

【TA100】Bloom算法

一、什么是Bloom算法 1、首先看一下Bloom效果长什么样 2、什么是Bloom ● Bloom&#xff0c;也称辉光&#xff0c;是一种常见的屏幕效果 ● 模拟摄像机的一种图像效果&#xff0c;让画面中较亮的区域“扩散”到周围的区域中&#xff0c;造成一种朦胧的效果 ● 可以让物体具有…

[JVM]再聊 CMS 收集器

题目之所以是再聊,是因为以前聊过: [JVM]聊聊 CMS 收集器 最近又看了下这块的知识,打算把 CMS/标记-清除/GC Roots/引用 这些知识串起来 我依旧可能写的不是很好,降低下期待 GC 算法 CMS 是基于 标记-清除 算法来做的,那我们就先从 GC 算法开始聊 GC 算法有: 标记-清除 标…

一篇博客教会你使用Docker部署Redis哨兵

文章目录 主数据库配置文件启动实例容器虚拟IP 从数据库配置文件启动实例 主从数据库查看主数据库查看从数据库 哨兵配置文件启动哨兵查看哨兵 哨兵机制哨兵选举选举日志重启主数据库 今天我们学习使用 Docker 部署 Redis 的主从复制&#xff0c;并部署 Redis 哨兵&#xff0c;…