RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

news2025/1/10 23:52:45

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • 业务设计流程
  • 业务设计源码
    • 基础 SQL 脚本
    • 基础依赖
    • 基础配置
    • 基础依赖代码库
    • 模块代码
      • 订单服务
      • 库存服务
      • 生成订单
  • RocketMQ 事务消息
    • 订单服务生产者
      • 事务生产者实例
      • 事务生产者监听器
      • 事务消息实体
      • 事务消息投递
    • 库存服务消费者
    • Tips
  • 总结

前言

在上一篇文章:保护数据完整性:探索 RocketMQ 分布式事务消息的力量 详细分析了「事务消息设计方面及源码相关层面」讲解,事务半消息的发送及提交、事务消息的补偿过程

在 RocketMQ 中由于网络故障原因或业务应用程序异常宕机导致事务消息未及时的完成处理,提供了事务消息补偿机制>检查本地事务执行状态的方法,为整个流程二阶段提交完成了不可忽视的异常消息补偿机制。

接下来,会通过以下两个链路中的第一条链路进行实战演练,确保在 RocketMQ 事务消息处理过程中,这两者的事务状态能够确保一致完成.

1、创建订单完成、预扣减库存
2、订单支付完成、实扣减库存

业务设计流程

在这里插入图片描述

1、事务生产者发送半消息到 Broker 服务端的 Half Topic 中,实际发送半消息的 Topic 是真实的 Topic,在这里会被替换为「RMQ_SYS_TRANS_HALF_TOPIC」存储到日志文件中

2、在 Broker 服务端将半消息存储到日志文件以后,若发送半消息的结果是成功的,那么就会执行「订单服务客户端」本地事务方法「executeLocalTransaction」

3、会同步等待本地事务方法执行的结果,再根据执行结果、消息体投递消息类型「EndTransactionRequestHeader」给到 Broker 服务端进行处理,该请求由 Broker 服务端「EndTransactionProcessor」进行处理.

4、在 EndTransactionProcessor 中会根据本地事务处理的结果,进行判别

1、若本地事务执行成功,在 Broker 服务端会将半消息对应的 Topic 调整为真实的 Topic 消息进行存储到日志文件中,随即在库存服务的消费者才能消费到这条消息,从而再对库存进行扣减,同时标记好半消息,确保在定时检查事务消息时不会再次被扫描到进行处理
2、若本地事务执行失败,在 Broker 服务端会将半消息标记为「已处理」不会让定时触发的事务消息检查机制进行扫描到

当然,定时任务的数据处理,不能确保它有时间的误差性,所以说执行成功或执行失败的事务消息,会在补偿机制进行再一次的校验

业务设计源码

基础 SQL 脚本

CREATE TABLE `order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_no` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '订单编号',
  `amount` bigint DEFAULT NULL COMMENT '订单金额',
  `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

CREATE TABLE `stock` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
  `lock_stock` int DEFAULT NULL COMMENT '锁定库存',
  `stock` int DEFAULT NULL COMMENT '真实库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

基础依赖

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spring-boot.version>2.6.7</spring-boot.version>
    <jackson.version>2.11.0</jackson.version>
    <mysql.version>8.0.17</mysql.version>
    <alibaba-druid.version>1.2.8</alibaba-druid.version>
    <mybatis-plus.version>3.4.2</mybatis-plus.version>
</properties>
<dependencies>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.vnjohn</groupId>
        <artifactId>blog-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

基础配置

订单服务、库存服务在实际的生产过程中,会各自有各自的库,在本地环境中演练中采用一个库进行模拟.

订单服务,作为事务消息生产者

server:
  port: 8088

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: 12345678
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      filters: stat
      maxActive: 30
      initialSize: 1
      maxWait: 10000
      # 保持连接活跃
      keep-alive: true
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
  type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径

rocketmq:
  transaction:
    producer: order_transaction
  bootstraps:
  namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876

库存服务,作为事务消息消费者

server:
  port: 8085

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: 12345678
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      filters: stat
      maxActive: 30
      initialSize: 1
      maxWait: 10000
      # 保持连接活跃
      keep-alive: true
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
  type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径

rocketmq:
  bootstraps:
  namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876
  consumer-group: order_transaction_group
  order:
    create:
      topic: order_transaction
      tag: withholding_stock

基础依赖代码库

/**
 * Spring Context 工具类
 *
 * @author vnjohn
 */
@Component
public class SpringContextUtils implements ApplicationContextAware {
    public final static String SPRING_CONTEXT_UTILS_COMPONENT = "springContextUtils";
    public static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtils.applicationContext = applicationContext;
    }

    /**
     * 获取HttpServletRequest
     */
    public static HttpServletRequest getHttpServletRequest() {
        return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
    }

    public static HttpServletResponse getHttpServletResponse() {
        return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> c) {
        return applicationContext.getBean(c);
    }

    public static <T> Map<String, T> getBeanOfType(Class<T> c) {
        return applicationContext.getBeansOfType(c);
    }

    public static <T> T getBean(String name, Class<T> requiredType) {
        return applicationContext.getBean(name, requiredType);
    }

    public static boolean containsBean(String name) {
        return applicationContext.containsBean(name);
    }

    public static boolean isSingleton(String name) {
        return applicationContext.isSingleton(name);
    }

    public static Class<? extends Object> getType(String name) {
        return applicationContext.getType(name);
    }

    /**
     * 获取当前环境
     */
    public static String getActiveProfile() {
        return applicationContext.getEnvironment().getActiveProfiles()[0];
    }
}

模块代码

订单服务

订单 DO 实体

@Data
@TableName("`order`")
@EqualsAndHashCode(callSuper = false)
public class OrderDO {
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

订单数据库映射层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
public interface OrderMapper extends BaseMapper<OrderDO> {
}

订单仓储层

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Component
public class OrderRepository {
    @Resource
    private OrderMapper orderMapper;

    public Order queryByOrderNo(String orderNo) {
        OrderDO orderDO = orderMapper.selectOne(new QueryWrapper<OrderDO>()
                .lambda().eq(OrderDO::getOrderNo, orderNo));
        return BeanUtils.copy(orderDO, Order.class);
    }
}

库存服务

库存 DO 实体

/**
 * @author vnjohn
 * @since 2023/11/15
 */
@Data
@TableName("`stock`")
@EqualsAndHashCode(callSuper = false)
public class StockDO {
    @TableId(type = IdType.AUTO)
    private Long id;

    private Long skuId;

    private Integer lockStock;

    private Integer stock;

}

库存数据库映射层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
public interface StockMapper extends BaseMapper<StockDO> {
}

库存仓储层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
@Slf4j
@Component
public class StockRepository {
    @Resource
    private StockMapper stockMapper;

    public void preDecreaseStock(String orderNo, Long skuId) {
        // 订单号与 SkuId 可用于做日志记录,这里默认给它数量记为 1
        log.info("订单号:{}", orderNo);
        StockDO stockDO = stockMapper.selectOne(new QueryWrapper<StockDO>()
                .lambda().eq(StockDO::getSkuId, skuId));
        if (null == stockDO) {
            return;
        }
        int currentLockStock = stockDO.getLockStock() + 1;
        stockDO.setLockStock(currentLockStock);
        // 此处最好采用乐观锁+ CAS 方式进行更新
        stockMapper.updateById(stockDO);
    }
}

生成订单

订单领域实体

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class Order {
    /**
     * id
     */
    private Long id;

    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

创建订单实体

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class CreateOrder {
    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

创建订单领域执行器

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Component
public class OrderCreateHandler {
    @Resource
    private OrderMapper orderMapper;

    public Boolean handle(CreateOrder order) {
        // 在这里模拟订单异常或创建系统
        // 1、订单创建逻辑,涉及到表结构及数据会比较多,这里不做多阐述
        String orderNo = order.getOrderNo();
        return orderMapper.insert(BeanUtils.copy(order, OrderDO.class)) > 0;
    }

}

RocketMQ 事务消息

订单服务生产者

首先要在订单服务能够投递事务消息,应该先实例化一个事务生产者

事务生产者实例

/**
 * 订单服务专门用来提供事务消息的生产者
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class OrderTransactionProducer {
    @Value("${rocketmq.transaction.producer}")
    private String transactionProducerName;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    private static TransactionMQProducer PRODUCER;

    /**
     * 获取事务生产者实例对象
     *
     * @return 事务生产者实例
     */
    public static TransactionMQProducer getInstance() {
        return PRODUCER;
    }

    /**
     * 若未定义线程,检测事务半消息的线程默认只有一个,当同时出现多条事务半消息需要检测时,就退化为队列的方式进行入队,要进行排队处理,从而降低了并发、并行数
     *
     * <p>
     *  public TransactionMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
     *          super(namespace, producerGroup, rpcHook);
     *          this.checkThreadPoolMinSize = 1;
     *          this.checkThreadPoolMaxSize = 1;
     *          this.checkRequestHoldMax = 2000;
     *  }
     * </p>
     */
    @PostConstruct
    public void initTransactionProducer() {
        try {
            PRODUCER = new TransactionMQProducer(transactionProducerName);
            PRODUCER.setNamesrvAddr(namesrvAddr);
            PRODUCER.setTransactionListener(new OrderTransactionListener());
            // 自定义线程池处理,用于追踪消息投递时的日志,能够追踪到具体的投放线程,所必要参数,不设置采用的是默认的线程池
//          producer.setExecutorService();
            PRODUCER.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            log.error("创建事务生产者异常,", e);
        }
    }
}

事务生产者监听器

与事务生产者必须绑定好的一个关键>监听器,用这个监听器来判别如何做事务消息的后续处理

/**
 * 订单服务「本地事务消息-半消息」处理
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
public class OrderTransactionListener implements TransactionListener {
    /**
     * unknow 消息最大重试次数设置为 3,当超过 3 次以后进行默认成功且记录到本地日志表中
     */
    private static Integer MAX_RETRY_TIME = 3;

    /**
     * 用于存储本地事务执行的结果:事务id->订单id
     */
    private ConcurrentHashMap<String, String> TRANSACTION_ORDER_MAP = new ConcurrentHashMap<>();

    /**
     * 用于存储本地事务检查次数的结果:事务id->check 次数
     * 源码中会默认检查为 15 次,一次的时间间隔为 6s,由于那个属于全局的配置
     * 在这里可自定义适配次数,时间还是按照默认的配置来进行处理
     */
    private ConcurrentHashMap<String, Integer> UNKNOW_TRANSACTION_CHECK_MAP = new ConcurrentHashMap<>();


    /**
     * 执行本地事务的处理逻辑
     *
     * @param message 待发送的消息内容
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 当前执行的事务 id
        String transactionId = message.getTransactionId();
        CreateOrder createOrder = (CreateOrder) o;
        TRANSACTION_ORDER_MAP.put(transactionId, createOrder.getOrderNo());
        // TODO 捕获创建订单后的执行结果
        try {
            // 伪代码创建订单
            OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
            Order order = orderRepository.queryByOrderNo(createOrder.getOrderNo());
            // 订单已存在,不再做处理
            if (Objects.nonNull(order)) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            OrderCreateHandler orderCreateHandler = SpringContextUtils.getBean(OrderCreateHandler.class);
            Boolean handleResult = orderCreateHandler.handle(createOrder);
            if (handleResult) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception orderException) {
            // 消息进行回滚,在消费者那一侧是无法观察此消息的
            log.error("创建订单出现异常,", orderException);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 由于网络问题,导致非业务异常,可能是订单已经写入数据库成功了没有及时地去处理事务消息状态
        // 在这里需要去进行 check 检查本地事务是否执行有误
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 事务 producer 会从 broker 获取到未处理的事务消息列表,进行依次处理
     * 检查本地事务状态,当出现「事务消息生产者」宕机时,该方法仍然会对未处理的事务消息进行检测
     * 对于 unknow 消息,会 1s 进行一次定期处理,该参数可调整
     *
     * @param messageExt 消息扩展类信息
     * @return 本地事务执行状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 检查本地事务表是否存在订单号
        String transactionId = messageExt.getTransactionId();
        log.info("checkLocalTransaction transactionId:{}", transactionId);
        String orderNo = TRANSACTION_ORDER_MAP.get(transactionId);
        // unknow 消息进行重试三次,超出后不再做处理,不然 unknow 消息会一直在控制台打印进行处理,视为无效的工作
        // 增加补偿机制,用于处理 unknow 重试的消息,当重试的消息是提交或回滚状态,则调用相关的方法进行处理
        Integer checkCount = UNKNOW_TRANSACTION_CHECK_MAP.get(transactionId);
        if (Objects.isNull(checkCount) || checkCount < MAX_RETRY_TIME) {
            checkCount = Objects.isNull(checkCount) ? 1 : ++checkCount;
            log.info("transactionId-{},check 检查次数:{}", transactionId, checkCount);
            UNKNOW_TRANSACTION_CHECK_MAP.put(transactionId, checkCount);
            return LocalTransactionState.UNKNOW;
        }
        // 检查次数超出预定阈值,可记录到日志
        if (checkCount.equals(MAX_RETRY_TIME)) {
            log.info("transactionId-{},check 检查次数超出", transactionId);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 反查订单数据当前订单是否已经创建
        OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
        Order order = orderRepository.queryByOrderNo(orderNo);
        if (Objects.nonNull(order)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

}

事务消息实体

/**
 * 订单库存消息体
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class OrderWithStockMessage {
    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 数量
     */
    private Integer quantity;

}

事务消息投递

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class OrderUnifiedCommandHandler {

    public void handler(String orderNo) {
        // 1、订单数据幂等校验成功
        // 2、调用生产者发送事务半消息
        // 假设接收到订单支付已完成的标识
        OrderWithStockMessage stockMessageBody = new OrderWithStockMessage();
        stockMessageBody.setSkuId(1001L);
        stockMessageBody.setQuantity(2);
        stockMessageBody.setOrderNo(orderNo);
        Message stockMessage = new Message(
                "order_transaction",
                "withholding_stock",
                UUID.randomUUID().toString(),
                JsonUtils.objToJsonStr(stockMessageBody).getBytes()
        );
        CreateOrder order = new CreateOrder();
        order.setSkuId(1001L);
        order.setAmount(100L);
        order.setUserId(888L);
        order.setOrderNo(orderNo);

        // putUserProperty 该方法,通过网络传递给 consumer 一些用户自定义参数,可以用来校验做其他的业务逻辑处理
        // stockMessage.putUserProperty("action", );

        // 发送的是半消息
        // Message msg, Object arg
        // 第一个参数:本地事务处理成功以后,需要进行发送的消息体内容
        // 第二个参数:作为本地事务用于检测或执行本地事务时的对象体
        try {
            TransactionSendResult transactionSendResult = OrderTransactionProducer.getInstance().sendMessageInTransaction(stockMessage, order);
            log.info("事务执行结果:{}", JsonUtils.objToJsonStr(transactionSendResult.getLocalTransactionState()));
        } catch (MQClientException e) {
            log.error("订单预生成,事务半消息 send fail,", e);
        }
    }
}

库存服务消费者

/**
 * 订单消费者,消费来自订单服务投递的消息
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class CreateOrderPreStockConsumer {
    @Value("${rocketmq.consumer-group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    @Value("${rocketmq.order.create.topic}")
    private String orderCreateTopic;

    @Value("${rocketmq.order.create.tag}")
    private String orderCreateTag;

    @PostConstruct
    public void initCreateOrderConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe(orderCreateTopic, orderCreateTag);
            consumer.registerMessageListener(new CreateOrderStockMessageListener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    public static class CreateOrderStockMessageListener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            String msgId = messageExt.getMsgId();
            byte[] body = messageExt.getBody();
            String bodyValue = new String(body);
            log.info("当前订单创建成功,预扣减库存信息:{}", bodyValue);
            String orderNo = JSON.parseObject(bodyValue).getString("orderNo");
            Long skuId = JSON.parseObject(bodyValue).getLong("skuId");
            StockRepository stockRepository = SpringContextUtils.getBean(StockRepository.class);
            stockRepository.preDecreaseStock(orderNo, skuId);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

Tips

采用两阶段提交的事务消息,先提交一个半消息,然后执行本地事务,再发送一个 commit 的半消息;若这个 commit 半消息失败了,MQ 是基于第一个半消息不断的反查本地事务执行状态来进行后续流程的推进的,这样只有当本地事务提交成功,最终 MQ 消息也会发送成功,若本地事务 rollback,那么 MQ 消息不会再进行发送,会标记这条半消息的状态为「已处理」从而保证了两者之间的一致性

执行本地事务方法+本地事务定时检查,结合起来来保证事务消息执行的一致性

总结

该篇博文主要是通过实际的业务代码来进行 RocketMQ 事务消息实战,上一篇博文从 RocketMQ 事务消息的整体设计以及相关的源码的讲解,这篇通过订单生成、库存预扣减的简单例子来对事务消息的这块流程进行细粒化的业务设计,事务消息生产者的本地事务消息与补偿事务消息结合起来保证订单创建成功以后,库存才进行预扣减,希望这篇简单的 RocketMQ 事务消息实战博文能够帮助到您理解事务消息的实际应用,期待三连支持!

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 微信体系 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1217318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】树与二叉树(十七):二叉树的基础操作:删除指定结点及其左右子树(算法DST)

文章目录 5.2.1 二叉树二叉树性质引理5.1&#xff1a;二叉树中层数为i的结点至多有 2 i 2^i 2i个&#xff0c;其中 i ≥ 0 i \geq 0 i≥0。引理5.2&#xff1a;高度为k的二叉树中至多有 2 k 1 − 1 2^{k1}-1 2k1−1个结点&#xff0c;其中 k ≥ 0 k \geq 0 k≥0。引理5.3&…

【软考篇】中级软件设计师 第四部分(三)

中级软件设计师 第四部分&#xff08;三&#xff09; 三十四. 结构化开发方法34.1 内聚34.2 耦合 三十五. 测试基础知识三十六. 面向对象36.1 UML图36.2 设计模式36.3 数据流图 读前须知&#xff1a; 【软考篇】中级软件设计师 学前须知 上一章节&#xff1a; 【软考篇】中级软…

在rt-thread中使用iperf触发断言卡死

问题触发 最近在适配sdio device驱动&#xff0c;CP芯片与AP芯片对接&#xff08;RK3399&#xff09;&#xff0c;准备使用iperf测试下能否AP与CP能否正常通信。CP芯片跑的是rt-thread系统&#xff0c;在使用sdio_eth_dev_init命令初始化后&#xff0c;使用iperf -c 192.168.1…

Uniapp连接iBeacon设备——实现无线定位与互动体验(理论篇)

目录 前言&#xff1a; 一、什么是iBeacon技术 二、Uniapp连接iBeacon设备的准备工作 硬件设备&#xff1a; 三、Uniapp连接iBeacon设备的实现步骤 创建Uniapp项目&#xff1a; 四、Uniapp连接iBeacon设备的应用场景 室内导航&#xff1a; 五、Uniapp连接iBeacon设备的未来…

MQTT协议详解及在Android上的应用

MQTT协议详解及在Android上的应用 一、MQTT协议简介二、MQTT工作原理三、MQTT协议特点四、MQTT在Android上的应用4.1 准备工作4.2 示例代码 五、结论 本博客将全面介绍MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;协议的基本概念、工作原理、特点以及在An…

网工内推 | 国企、上市公司售前,CISP/CISSP认证,最高18K*14薪

01 中电福富信息科技有限公司 招聘岗位&#xff1a;售前工程师&#xff08;安全&#xff09; 职责描述&#xff1a; 1、对行业、用户需求、竞争对手等方面提出分析报告&#xff0c;为公司市场方向、产品研发和软件开发提供建议&#xff1b; 2、负责项目售前跟踪、技术支持、需…

01Urllib

1.什么是互联网爬虫&#xff1f; 如果我们把互联网比作一张大的蜘蛛网&#xff0c;那一台计算机上的数据便是蜘蛛网上的一个猎物&#xff0c;而爬虫程序就是一只小蜘蛛&#xff0c;沿着蜘蛛网抓取自己想要的数据 解释1&#xff1a;通过一个程序&#xff0c;根据Url(http://www.…

基于边缘智能网关的冬季管网智能监测应用

随着我国北方全面进入到冬季&#xff0c;多日以来严寒、降雪天气频发&#xff0c;民生基础设施也迎来冬季考验。尤其是民众生活仰赖的水、电、气管网&#xff0c;面临极端冰雪天气时易存在各种风险&#xff0c;包括管道水/气泄露损耗、低温冻裂、积雪压塌压损、冻结受阻等。 针…

广州华锐互动VRAR:VR教学楼地震模拟体验增强学生防震减灾意识

在当今社会&#xff0c;地震作为一种自然灾害&#xff0c;给人们的生活带来了巨大的威胁。特别是在学校这样的集体场所&#xff0c;一旦发生地震&#xff0c;后果将不堪设想。因此&#xff0c;加强校园安全教育&#xff0c;提高师生的防震减灾意识和能力&#xff0c;已经成为了…

Ubuntu22.04离线安装uwsgi问题记录

GCC4.8安装 1、报错信息1&#xff1a; 由于缺少gcc4.8环境导致的无法安装uwsgi 解决方案&#xff1a; 离线安装GCC4.8环境, GCC4.8.5离线安装步骤如下&#xff1a; 1、下载gcc的离线安装包及其依赖包&#xff0c;链接如下&#xff1a; https://download.csdn.net/download/…

vue源码分析(八)—— update分析(首次渲染)

文章目录 前言一、update首次渲染的核心方法__path__二、__path__方法详解1. 文件路径2. inBrowser的解析&#xff08;1&#xff09;noop 的空函数定义&#xff1a;&#xff08;2&#xff09;patch 的含义 3. createPatchFunction 的解析4. path 方法解析&#xff08;1&#xf…

医美三季报内卷,华熙生物、爱美客、昊海生科混战双11

双十一落幕&#xff0c;据天猫大美妆数据统计&#xff0c;被称为“医美三剑客”的华熙生物(688363.SH&#xff09;、爱美客(300896.SZ)、昊海生科(688366.SH)的医美产品均未进入天猫双11美容护肤类目TOP10榜单。 与此同时&#xff0c;其业绩承压困局也写在最新的三季报里。 「…

M系列 Mac安装配置Homebrew

目录 首先&#xff0c;验证电脑是否安装了Homebrew 1、打开终端输入以下指令&#xff1a; 2、如图所示&#xff0c;该电脑没有安装Homebrew &#xff0c;下面我们安装Homebrew 一、官网下载 &#xff08;不建议&#xff09; 1、我们打开官网&#xff1a;https://brew.sh/ …

freeRTOS--任务通知

一、什么是任务通知 使用任务通知可以替换二值信号量、计数信号量、事件标志组&#xff0c;可以替代长度为1的队列&#xff0c;任务通知速度更快、使用的RAM更少。 任务通知值的更新方式&#xff1a; 发消息给任务&#xff0c;如果有通知未读&#xff0c;不覆盖通知值。发消…

成功解决:com.alibaba.druid.support.logging.JakartaCommonsLoggingImpl.

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 前言 使用Spring 整合 mybatis的时候 报错…

Linux Traefik工具Dashboard结合内网穿透实现远程访问

文章目录 前言1. Docker 部署 Trfɪk2. 本地访问traefik测试3. Linux 安装cpolar4. 配置Traefik公网访问地址5. 公网远程访问Traefik6. 固定Traefik公网地址 前言 Trfɪk 是一个云原生的新型的 HTTP 反向代理、负载均衡软件&#xff0c;能轻易的部署微服务。它支持多种后端 (D…

【Mac开发环境搭建】Docker安装Redis、Nacos

文章目录 Dokcer安装Redis拉取镜像创建配置文件创建容器连接测试Redis连接工具[Quick Redis]设置Redis自启动 Docker安装Nacos Dokcer安装Redis 拉取镜像 docker pull redis创建配置文件 # bind 127.0.0.1 -::1 bind 0.0.0.0 # 是否启用保护模式 protected-mode no# redis端口…

服务器集群配置LDAP统一认证高可用集群(配置tsl安全链接)-centos9stream-openldap2.6.2

写在前面 因之前集群为centos6&#xff0c;已经很久没升级了&#xff0c;所以这次配置统一用户认证也是伴随系统升级到centos9时一起做的配套升级。新版的openldap配置大致与老版本比较相似&#xff0c;但有些地方配置还是有变化&#xff0c;另外&#xff0c;铺天盖地的帮助文…

【数据结构】【版本1.2】【线性时代】——链表之王(双向带头循环)

目录 引言 链表的分类 双向链表的结构 双向链表的实现 定义 创建新节点 初始化 打印 尾插 头插 判断链表是否为空 尾删 头删 查找与修改 指定插入 指定删除 销毁 顺序表和双向链表的优缺点分析 双向链表oj题 源代码 dlist.h dlist.c test.…

Python实现求解上个工作日逻辑

目录 一、需求描述二、代码实现三、测试结果 一、需求描述 因工作需要&#xff0c;现需获取任意一个日期的上个工作日&#xff0c;要求考虑法定假日及周末。 例如&#xff1a;2024年2月10日&#xff08;春节&#xff09;的上一个工作日为2024年2月9日&#xff0c;2024年2月17…