RabbitMQ实战教程(2)

news2025/1/12 0:59:00

十、RabbitMQ实战教程

在掌握了SpringCloudAlibaba的应用后,再来玩!!

为了更好的理解RabbitMQ在项目中的作用,来一套实战操作。

10.1 RabbitMQ实战场景

首先模拟一个场景,电商中对应的处理方案。

模拟一个用户在电商平台下单:

  • 需要调用库存服务,扣除商品库存,扣除成功后,才可以继续往下走业务
  • 需要调用订单服务,创建订单(待支付)。
  • 还需要很多后续的处理
    • 下单时,会使用优惠券,预扣除当前用户使用的优惠券
    • 下单时,会使用用户积分顶金额,预扣除当前用户的积分
    • 创建成功后,需要通知商家,有用户下单。
    • ………………

image.png

10.2 RabbitMQ实战场景搭建

因为场景设计到了服务之间的调用。

这里需要大家提前掌握一些知识:Nacos,OpenFeign的应用层面。

1、构建聚合工程,作为父工程管理所有的模块

准备好pom.xml文件

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath />
</parent>

<groupId>com.mashibing</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<properties>
    <spring.cloud-version>Hoxton.SR12</spring.cloud-version>
    <spring.cloud.alibaba-version>2.2.7.RELEASE</spring.cloud.alibaba-version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring.cloud.alibaba-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
2、构建其他六个子服务

image.png

3、从下单服务开始一次完成配置以及接口的提供

下单服务:

  • 导入依赖
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
    </dependencies>
    
  • 构建启动类
    @SpringBootApplication
    @EnableDiscoveryClient
    public class PlaceOrderStarterApp {
    
        public static void main(String[] args) {
            SpringApplication.run(PlaceOrderStarterApp.class,args);
        }
    }
    
  • 编写配置文件
    server:
      port: 80
    
    spring:
      application:
        name: placeorder
      cloud:
        nacos:
          discovery:
            server-addr: 114.116.226.76:8848
    
  • 处理问题:启动后发现,无法正常的注册到Nacos上,需要将Alibaba的版本降到2.2.6.RELEASE

其他服务的基本配置,我这里直接写好,然后大家可以去Git中找到指定提交点~

4、完成整个下单的流程
  • 下单服务接口(前置操作)
    @RestController
    public class PlaceOrderController {
    
        /**
         * 模拟用户下单操作
         * @return
         */
        @GetMapping("/po")
        public String po(){
            //1、调用库存服务扣除商品库存
    
            //2、调用订单服务,创建订单
    
            //3、调用优惠券服务,预扣除使用的优惠券
    
            //4、调用用户积分服务,预扣除用户使用的积分
    
            //5、调用商家服务,通知商家用户已下单
    
            return "place order is ok!";
        }
    
    }
    
  • 库存服务接口
    @RestController
    public class ItemStockController {
    
        private static int stock = 10;
    
        @GetMapping("/decr")
        public void decr() throws InterruptedException {
            Thread.sleep(400);
            stock--;
            if(stock < 0){
                throw new RuntimeException("商品库存不足!");
            }
            System.out.println("扣减库存成功!");
        }
    }
    
  • 订单服务接口
    @RestController
    public class OrderManageController {
    
        @GetMapping("create")
        public void create() throws InterruptedException {
            Thread.sleep(400);
            System.out.println("创建订单成功!");
        }
    
    }
    
  • 优惠券服务接口
    @RestController
    public class CouponController {
    
        @GetMapping("/coupon")
        public void coupon() throws InterruptedException {
            Thread.sleep(400);
            System.out.println("优惠券预扣除成功!");
        }
    
    }
    
  • 用户积分服务接口
    @RestController
    public class UserPointsController {
    
        @GetMapping("/up")
        public void up() throws InterruptedException {
            Thread.sleep(400);
            System.out.println("扣除用户积分成功!!");
        }
    
    }
    
  • 商家服务接口
    @RestController
    public class BusinessController {
    
        @GetMapping("/notify")
        public void notifyBusiness() throws InterruptedException {
            Thread.sleep(400);
            System.out.println("通知商家成功!!");
        }
    
    }
    
5、完善下单接口服务调用
  • 先给启动类添加OpenFeign注解

    @EnableFeignClients
    
  • 给5个服务提供对应的OpenFeign接口

    image.png

  • 在下单服务的Controller中实现服务的调用

    @RestController
    public class PlaceOrderController {
    
        @Autowired
        private ItemStockClient itemStockClient;
        @Autowired
        private OrderManageClient orderManageClient;
        @Autowired
        private CouponClient couponClient;
        @Autowired
        private UserPointsClient userPointsClient;
        @Autowired
        private BusinessClient businessClient;
    
    
        /**
         * 模拟用户下单操作
         * @return
         */
        @GetMapping("/po")
        public String po(){
            long start = System.currentTimeMillis();
            //1、调用库存服务扣除商品库存
            itemStockClient.decr();
            //2、调用订单服务,创建订单
            orderManageClient.create();
            //3、调用优惠券服务,预扣除使用的优惠券
            couponClient.coupon();
            //4、调用用户积分服务,预扣除用户使用的积分
            userPointsClient.up();
            //5、调用商家服务,通知商家用户已下单
            businessClient.notifyBusiness();
    
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            return "place order is ok!";
        }
    
    }
    
10.3 完成异步调用

因为下单功能,核心就在于扣除库存成功,以及创建订单成功。只要这两个操作么得问题,直接就可以让后续的优惠券,用户积分,通知商家等等操作实现一个异步的效果。而且基于RabbitMQ做异步之后,还可以让下单服务与其他服务做到解耦。

异步:可以让整个业务的处理速度更快,从而更快的给用户一个响应,下单是成功还是失败。

解耦:优惠券,用户积分,商家服务,无论哪个服务宕机,都不影响正常的下单流程。

image.png

1、下单服务
  • 导入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 编写配置文件链接RabbitMQ
    spring:
      rabbitmq:
        host: 114.116.226.76
        port: 5672
        username: rabbitmq
        password: rabbitmq
        virtual-host: rabbitmq
    
  • 构建交换机&队列
    @Configuration
    public class RabbitMQConfig {
    
        // 下单服务的交换机
        public static final String PLACE_ORDER_EXCHANGE = "place_order_exchange";
        // 三个服务的Queue
        public static final String COUPON_QUEUE = "coupon_queue";
        public static final String USER_POINTS_QUEUE = "user_points_queue";
        public static final String BUSINESS_QUEUE = "business_queue";
    
    
        @Bean
        public Exchange placeOrderExchange(){
            return ExchangeBuilder.fanoutExchange(PLACE_ORDER_EXCHANGE).build();
        }
    
        @Bean
        public Queue couponQueue(){
            return QueueBuilder.durable(COUPON_QUEUE).build();
        }
        @Bean
        public Queue userPointsQueue(){
            return QueueBuilder.durable(USER_POINTS_QUEUE).build();
        }
        @Bean
        public Queue businessQueue(){
            return QueueBuilder.durable(BUSINESS_QUEUE).build();
        }
    
        @Bean
        public Binding couponBinding(Exchange placeOrderExchange,Queue couponQueue){
            return BindingBuilder.bind(couponQueue).to(placeOrderExchange).with("").noargs();
        }
        @Bean
        public Binding userPointsBinding(Exchange placeOrderExchange,Queue userPointsQueue){
            return BindingBuilder.bind(userPointsQueue).to(placeOrderExchange).with("").noargs();
        }
        @Bean
        public Binding businessBinding(Exchange placeOrderExchange,Queue businessQueue){
            return BindingBuilder.bind(businessQueue).to(placeOrderExchange).with("").noargs();
        }
    }
    
  • 修改下单接口Controller
    @RestController
    public class PlaceOrderController {
    
        @Autowired
        private ItemStockClient itemStockClient;
        @Autowired
        private OrderManageClient orderManageClient;
        @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 模拟用户下单操作
         * @return
         */
        @GetMapping("/po")
        public String po(){
            long start = System.currentTimeMillis();
            //1、调用库存服务扣除商品库存
            itemStockClient.decr();
            //2、调用订单服务,创建订单
            orderManageClient.create();
    
            String userAndOrderInfo = "用户信息&订单信息&优惠券信息等等…………";
            // 将同步方式修改为基于RabbitMQ的异步方式
            rabbitTemplate.convertAndSend(RabbitMQConfig.PLACE_ORDER_EXCHANGE,"",userAndOrderInfo);
    
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            return "place order is ok!";
        }
    
    }
    
2、优惠券服务
  • 导入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • COPY配置文件
    spring:
      rabbitmq:
        host: 114.116.226.76
        port: 5672
        username: rabbitmq
        password: rabbitmq
        virtual-host: rabbitmq
        listener:
          simple:
            acknowledge-mode: manual
    
  • COPY配置类: 复制的下单服务的RabbitMQConfig
  • 编写消费者,实现预扣除优惠券
    @Component
    public class CouponListener {
    
        @RabbitListener(queues = {RabbitMQConfig.COUPON_QUEUE})
        public void consume(String msg, Channel channel, Message message) throws Exception {
            // 预扣除优惠券
            Thread.sleep(400);
            System.out.println("优惠券预扣除成功!" + msg);
            // 手动ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    
    }
    
3、用户积分服务

类似优惠券服务操作!

4、商家服务

类似优惠券服务操作!

10.4 下单服务保证消息的可靠性

下单服务需要保证消息一定可以发送到RabbitMQ服务中,如果发送失败。

如果消息没有发送到Exchange或者是消息没有从Exchange路由到指定队列。

  • 可以将消息存储到数据库,基于定时任务的方式重新发送。
  • 可以直接在confirm中做重试。
  • 或者是记录error日志,通过日志的形式做重新发送。
  • …………

开始完成当前操作

1、修改配置文件
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
2、重新配置RabbitTemplate对象,指定confirm和return的回调处理
@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        //1、new出RabbitTemplate对象
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        //2、将connectionFactory设置到RabbitTemplate对象中
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //3、设置confirm回调
        rabbitTemplate.setConfirmCallback(confirmCallback());
        //4、设置return回调
        rabbitTemplate.setReturnCallback(returnCallback());
        //5、设置mandatory为true
        rabbitTemplate.setMandatory(true);
        //6、返回RabbitTemplate对象即可
        return rabbitTemplate;
    }

    public RabbitTemplate.ConfirmCallback confirmCallback(){
        return new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (correlationData == null) return;
                String msgId = correlationData.getId();
                if(ack){
                    System.out.println("消息发送到Exchange成功!! msgId = " + msgId);
                }else{
                    System.out.println("消息发送到Exchange失败!! msgId = " + msgId);
                }
            }
        };
    }

    public RabbitTemplate.ReturnCallback returnCallback(){
        return new RabbitTemplate.ReturnCallback(){
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("消息未路由到队列");
                System.out.println("return:消息为:" + new String(message.getBody()));
                System.out.println("return:交换机为:" + exchange);
                System.out.println("return:路由为:" + routingKey);
            }
        };
    }

}
3、重新完成Controller中消息的发送并且完善confirm和return的回调
3.1、需要在Controller中将correlationData和发送的消息信息绑定

准备全局的Cache

public class GlobalCache {

    private static Map map = new HashMap();

    public static void set(String key,Object value){
        map.put(key,value);
    }

    public static Object get(String key){
        Object value = map.get(key);
        return value;
    }

    public static void remove(String key){
        map.remove(key);
    }

}

重新编写Controller,实现标识和消息信息的绑定

@RestController
public class PlaceOrderController {

    @Autowired
    private ItemStockClient itemStockClient;
    @Autowired
    private OrderManageClient orderManageClient;

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 模拟用户下单操作
     * @return
     */
    @GetMapping("/po")
    public String po(){
        long start = System.currentTimeMillis();
        //1、调用库存服务扣除商品库存
        itemStockClient.decr();
        //2、调用订单服务,创建订单
        orderManageClient.create();

        // 将之前的同步方式注释
        String userAndOrderInfo = "用户信息&订单信息&优惠券信息等等…………";
        // 声明当前消息的id标识
        String id = UUID.randomUUID().toString();
        // 封装消息的全部信息
        Map map = new HashMap<>();
        map.put("message",userAndOrderInfo);
        map.put("exchange",RabbitMQConfig.PLACE_ORDER_EXCHANGE);
        map.put("routingKey","");
        map.put("sendTime",new Date());
        // 将id标识和消息存储到全局缓存中
        GlobalCache.set(id,map);
        // 将同步方式修改为基于RabbitMQ的异步方式
        rabbitTemplate.convertAndSend(RabbitMQConfig.PLACE_ORDER_EXCHANGE,"",userAndOrderInfo,new CorrelationData(id));


        long end = System.currentTimeMillis();
        System.out.println(end - start);
        return "place order is ok!";
    }

}
3.2、需要在confirm的回调中完成两个操作
  • 消息发送成功,删除之前绑定的消息
    if(ack){
        log.info("消息发送到Exchange成功!!");
        GlobalCache.remove(msgId);
    }
    
  • 消息发送失败,将之前绑定的消息存储到数据库
    • 准备库表信息,存储发送失败的信息。
      CREATE TABLE `resend` (
        `id` varchar(255) NOT NULL,
        `message` varchar(255) NOT NULL,
        `exchange` varchar(255) NOT NULL,
        `routing_key` varchar(255) NOT NULL,
        `send_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        `send_count` int(11) NOT NULL DEFAULT '0' COMMENT '最多重新发送3次',
        `is_send` int(11) NOT NULL DEFAULT '0' COMMENT '0-发送失败,1-发送成功',
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
      
    • 实现数据源和MyBatis的基本配置:……
    • 改造confirm实现
      public RabbitTemplate.ConfirmCallback confirmCallback(){
          return new RabbitTemplate.ConfirmCallback(){
              @Override
              public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                  if (correlationData == null) return;
                  String msgId = correlationData.getId();
                  if(ack){
                      log.info("消息发送到Exchange成功!!");
                      GlobalCache.remove(msgId);
                  }else{
                      log.error("消息发送失败!");
                      Map value = (Map) GlobalCache.get(msgId);
                      // 推荐自己玩的时候,用service做增删改操作,控制事务~
                      resendMapper.save(value);
                  }
              }
          };
      }
      
4、测试效果image.png
10.5 消费者避免重复消费问题

采用数据库的幂等表解决消费者可能存在重复消费的问题。

再真正处理消费执行业务前做一些操作,先去查看数据库中的幂等表信息:

  • 如果消息的唯一标识已经存在了,证明当前消息已经被消费,直接告辞。
  • 如果消息的唯一标识不存在,先将当前唯一标识存储到幂等表中,然后再执行消费业务。

基于用户积分服务实现幂等性操作。

1、准备幂等表
CREATE TABLE `user_points_idempotent` (
  `id` varchar(255) NOT NULL,
  `createtime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2、给用户积分服务追加连接数据库信息:
  • 导入依赖
  • 编写配置
  • 准备Mapper接口
    public interface UserPointsIdempotentMapper {
    
        @Select("select count(1) from user_points_idempotent where id = #{id}")
        int findById(@Param("id") String id);
    
        @Insert("insert into user_points_idempotent (id) values (#{id})")
        void save(@Param("id") String id);
    
    }
    
3、准备消费方法
@Service
@Slf4j
public class UserPointsConsumeImpl implements UserPointsConsume {

    @Resource
    private UserPointsIdempotentMapper userPointsIdempotentMapper;

    private final String ID_NAME = "spring_returned_message_correlation";


    @Override
    @Transactional
    public void consume(Message message) {
        // 获取生产者提供的CorrelationId要基于header去获取。
        String id = message.getMessageProperties().getHeader(ID_NAME);
        //1、查询幂等表是否存在当前消息标识
        int count = userPointsIdempotentMapper.findById(id);
        //2、如果存在,直接return结束
        if(count == 1){
            log.info("消息已经被消费!!!无需重复消费!");
            return;
        }
        //3、如果不存在,插入消息标识到幂等表
        userPointsIdempotentMapper.save(id);
        //4、执行消费逻辑
        // 预扣除用户积分
        try {
            Thread.sleep(400);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("扣除用户积分成功!!");
    }
}
4、测试功能效果

image.png

10.6 实现延迟取消订单状态

当客户端下单之后,会基于订单服务在数据库中构建一个订单信息,默认情况下,订单信息是待支付状态。

如果用户正常支付了,会将当前订单从待支付状态改为已支付/待发货状态。

如果超过一定的时间,用户没有支付,此时需要将订单状态从待支付改为已取消的状态。

基于RabbitMQ提供的死信队列来实现当前的延迟修改订单状态的功能,同时也可以采用延迟交换机插件的形式实现,But,因为当前业务中,延迟时间是统一的,不使用延迟交换机也是ok的。

1、准备订单表并修改订单服务的业务
  • 准备表结构
    CREATE TABLE `tb_order` (
      `id` varchar(36) NOT NULL AUTO_INCREMENT,
      `total` decimal(10,2) DEFAULT NULL,
      `order_state` int(11) DEFAULT '0' COMMENT '订单状态  0-待支付, 1-已支付,2-待发货,3-已发货,-1-已取消',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
  • 修改订单服务,将之前模拟数据库操作,改为真实的数据库操作
    • 导入依赖
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.47</version>
      </dependency>
      <dependency>
          <groupId>org.mybatis.spring.boot</groupId>
          <artifactId>mybatis-spring-boot-starter</artifactId>
          <version>2.2.2</version>
      </dependency>
      <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
      </dependency>
      
    • 编写配置信息
      spring:
        datasource:
          driver-class-name: org.gjt.mm.mysql.Driver
          url: jdbc:mysql:///rabbitmq
          username: root
          password: root
      
    • 启动类添加注解
      @MapperScan(basePackages = "com.mashibing.mapper")
      
    • 实现添加操作
      • 准备Mapper接口
        public interface TBOrderMapper {
        
            @Insert("insert into tb_order (id) values (#{id})")
            void save(@Param("id") String id);
        
        }
        
      • 准备Service层
        @Service
        public class TBOrderServiceImpl implements TBOrderService {
        
            @Resource
            private TBOrderMapper orderMapper;
        
        
            @Override
            public void save() {
        	String id = UUID.randomUUID().toString();
                orderMapper.save(id);
            }
        }
        
      • Controller调用Service层
        @RestController
        @Slf4j
        public class OrderManageController {
        
            @Autowired
            private TBOrderService orderService;
        
            @GetMapping("create")
            public void create() throws InterruptedException {
                orderService.save();
                log.info("创建订单成功!!");
            }
        
        }
        
      • 测试image.png
2、在订单服务中准备死信队列配置
  • 导入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 编写配置文件
    spring:
      rabbitmq:
        host: 114.116.226.76
        port: 5672
        username: rabbitmq
        password: rabbitmq
        virtual-host: rabbitmq
        listener:
          simple:
            acknowledge-mode: manual
    
  • 编写配置类完成死信队列的构建
    @Configuration
    public class RabbitMQConfig {
    
        public static final String ORDER_EXCHANGE = "order_exchange";
        public static final String ORDER_QUEUE = "order_queue";
    
        public static final String DEAD_EXCHANGE = "dead_exchange";
        public static final String DEAD_QUEUE = "dead_queue";
    
        @Bean
        public Exchange orderExchange(){
            return ExchangeBuilder.fanoutExchange(ORDER_EXCHANGE).build();
        }
    
        @Bean
        public Queue orderQueue(){
            return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE).build();
        }
    
        @Bean
        public Exchange deadExchange(){
            return ExchangeBuilder.fanoutExchange(DEAD_EXCHANGE).build();
        }
    
        @Bean
        public Queue deadQueue(){
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        @Bean
        public Binding orderBinding(Exchange orderExchange,Queue orderQueue){
            return BindingBuilder.bind(orderQueue).to(orderExchange).with("").noargs();
        }
    
        @Bean
        public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
            return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs();
        }
    
    }
    
3、完成订单构建成功后,发送消息到死信队列

前面的准备工作,没考虑到订单的主键需要作为消息的问题,将之前的主键自增的形式,更改为UUID作为主键,方便作为消息传递。

处理了两个问题:

  • 订单表的主键,为了方便作为消息,将之前主键自增的ID,设置为了自然主键,用UUID。
  • 发送消息后,发现队列没有收到消息,定位到是忘记在配置文件追加Binding信息。

完成消息发送

@Service
public class TBOrderServiceImpl implements TBOrderService {

    @Resource
    private TBOrderMapper orderMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Override
    @Transactional
    public void save() {
        // 生成主键ID
        String id = UUID.randomUUID().toString();
        // 创建订单
        orderMapper.save(id);
        // 订单构建成功~
        // 发送消息到RabbitMQ的死信队列
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "", id, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息的生存时间为15s,当然,也可以在构建队列时,指定队列的生存时间。
                message.getMessageProperties().setExpiration("15000");
                return message;
            }
        });
    }
}
4、声明消费者消费延迟取消订单的消息
  • 声明消费者:
    @Component
    public class DelayMessageListener {
    
        @Autowired
        private TBOrderService orderService;
    
        @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
        public void consume(String id, Channel channel, Message message) throws IOException {
            //1、 调用Service实现订单状态的处理
            orderService.delayCancelOrder(id);
    
            //2、 ack的干活~
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    
    }
    
  • 完善Service业务处理
    @Override
    @Transactional
    public void delayCancelOrder(String id) {
        //1、基于id查询订单信息。 for update
        int orderState = orderMapper.findOrderStateByIdForUpdate(id);
        //2、判断订单状态
        if(orderState != 0){
            log.info("订单已经支付!!");
            return;
        }
        //3、修改订单状态
        log.info("订单未支付,修改订单状态为已取消");
        orderMapper.updateOrderStateById(-1,id);
    }
    
  • 提供Mapper与数据库交互的业务
    public interface TBOrderMapper {
    
        @Select("select order_state from tb_order where id = #{id} for update")
        int findOrderStateByIdForUpdate(@Param("id") String id);
    
        @Update("update tb_order set order_state = #{orderState} where id = #{id}")
        void updateOrderStateById(@Param("orderState") int i, @Param("id") String id);
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1596142.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WIFI详解及周边拓展

一、WiFi协议简介 WiFi协议&#xff0c;也称为无线保真技术&#xff0c;是一种允许电子设备通过无线方式在局域网&#xff08;WLAN&#xff09;和互联网上进行通信的技术标准。WiFi协议是基于IEEE 802标准的子系列标准协议&#xff0c;由电气和电子工程师协会制定。随着移动设备…

HTML的路径

路径的分类&#xff1a; 1.绝对路径 ~用于引用别处的图片&#xff08;非本地的&#xff09; 2.相对路径 ~用于引入自己项目内的图片 绝对路径&#xff1a; 绝对路径&#xff0c;即使从网上搜到的图片的位置&#xff08;我们以图床的方式为例&#xff09; 相对路径&#xff1a; …

TypeScript-官方基础模板创建的小程序,如何创建js文件

如何创建JS文件&#xff0c;不需要寻找“js”文件类型&#xff0c;只需要创建一个新的“文件”即可。 第一步:先删除 ts文件;如 index.ts 第二步:右键点击项目&#xff0c;选择“新建”&#xff0c;然后选择“文件”。 第三步:在弹出的界面中&#xff0c;在“文件名”中输入“…

使用SquareLine Studio创建LVGL项目到IMX6uLL平台

文章目录 前言一、SquareLine Studio是什么&#xff1f;二、下载安装三、工程配置四、交叉编译 前言 遇到的问题&#xff1a;#error LV_COLOR_DEPTH should be 16bit to match SquareLine Studios settings&#xff0c;解决方法见# 四、交叉编译 一、SquareLine Studio是什么…

Java多线程的线程状态和线程池参数

一、线程状态 当线程被创建并启动以后&#xff0c;它既不是一启动就进入了执行状态&#xff0c;也不是一直处于执行状态。线程对象在不同的时期有不同的状态。Java中的线程状态被定义在了java.lang.Thread.State枚举类中&#xff0c;State枚举类的源码如下&#xff1a; publi…

不再写满屏import导入

密密麻麻的import语句不仅仅是一种视觉上的冲击&#xff0c;更是对代码组织结构的一种考验。 我们是如何做到让import“占领满屏“的了&#xff0c;又该如何优雅地管理这些import语句呢&#xff1f; 本文将从产生大量import语句的原因、可能带来的问题以及如何优化和管理impo…

【C++软件调试技术】C++软件开发维护过程中典型调试问题的解答与总结

目录 1、引发C软件异常的常见原因有哪些&#xff1f; 2、排查C软件异常的常用方法有哪些&#xff1f; 3、为什么要熟悉常见的异常内存地址&#xff1f; 4、调试时遇到调用IsBadReadPtr或者IsBadWritePtr引发的异常&#xff0c;该如何处理&#xff1f; 5、如何排查GDI对象泄…

JavaScript-2.对话框、函数、数组、Date、DOM

对话框 window对象封装了三个对话框用于与用户交互 提示框&#xff1a;alert(title);确认框&#xff1a;confirm(title);输入框&#xff1a;prompt(title); 确认框 包含两个按钮“确认”/“取消”&#xff0c;点击确定时&#xff0c;返回值为true // 确认框 var bool con…

Python学习笔记16 - 函数

函数的创建和调用 函数调用的参数传递 函数的返回值 函数的参数定义 变量的作用域 递归函数 斐波那契数列 总结

Vitis HLS 学习笔记--硬件卷积加速 Filter2DKernel

目录 加速器功能 Window2D()函数 实现代码 变量解释 ARRAY_PARTITION DEPENDENCE LOOP_TRIPCOUNT ramp_up 更新Window 更新LineBuffer Filter2D()函数 ARRAY_PARTITION window_stream.read() 计算过程 备注 加速器功能 硬件加速单元从全局内存&#xff08;DDR&a…

PP-LCNet:一种轻量级CPU卷积神经网络

PP-LCNet: A Lightweight CPU Convolutional Neural Network 最近看了一个新的分享&#xff0c;在图像分类的任务上表现良好&#xff0c;具有很高的实践意义。 论文&#xff1a; https://arxiv.org/pdf/2109.15099.pdf项目&#xff1a; https://github.com/PaddlePaddle/Padd…

javaweb在线拍卖系统

项目采用技术栈 htmlcssjs Vue2.js axios.js tomcat Servlet Mybatis Mysql 1.竞拍商品列表 实现多条件分页查询,头部根据是否登录作出不同的判断信息(登录或注销) 2.登录功能 3.竞拍页面 只有登录用户才能竞拍&#xff0c;出价记录需要实现关联用户查询 4.管理员登录增…

如何在Odoo 17 销售应用中使用产品目录添加产品

Odoo&#xff0c;作为一个知名的开源ERP系统&#xff0c;发布了其第17版&#xff0c;新增了多项功能和特性。Odoo 17包中的一些操作简化了&#xff0c;生产力提高了&#xff0c;用户体验也有了显著改善。为了为其用户提供新的和改进的功能&#xff0c;Odoo不断进行改进和增加新…

基于PCIe的智能处理系统研究

引言 人工智能是集合众多方向的综合性学科,在诸多应用领域均取得了显著成果。随着航空领域人工智能技术研究的不断深入,面向开放式机载智能交互场景,人工智能的应用可解决诸多问题。例如智能感知、辅助决策等,可利用人工智能算法对多源传感器捕获的海量信息进行快速处理,仅将处…

4、XTuner微调个人小助手(homework)

基础作业&#xff08;结营必做&#xff09; 训练自己的小助手认知&#xff08;记录复现过程并截图&#xff09; 1&#xff0c;环境安装 # 如果你是在 InternStudio 平台&#xff0c;则从本地 clone 一个已有 pytorch 的环境&#xff1a; # pytorch 2.0.1 py3.10_cuda11…

Grok-1.5 Vision:X AI发布突破性的多模态AI模型,超越GPT 4V

在人工智能领域&#xff0c;多模态模型的发展一直是科技巨头们竞争的焦点。 近日&#xff0c;马斯克旗下的X AI公司发布了其最新的多模态模型——Grok-1.5 Vision&#xff08;简称Grok-1.5V&#xff09;&#xff0c;这一模型在处理文本和视觉信息方面展现出了卓越的能力&#x…

李沐36_数据增广——自学笔记

数据增强 增强一个已有的数据集&#xff0c;使得有更多的多样性 1.在语言里面加入各种不同的背景噪音 2.改变图片的颜色和形状 一般是在线生成、随机增强 常见数据增强 1.左右翻转 2.上下翻转&#xff08;不总可行&#xff09; 3.切割&#xff1a;从图片中切割一块&…

OpenCV4.9图像金字塔

目标 在本教程中&#xff0c;您将学习如何&#xff1a; 使用 OpenCV 函数 pyrUp()和 pyrDown()对给定图像进行下采样或上采样。 理论 注意 下面的解释属于 Bradski 和 Kaehler 的 Learning OpenCV 一书。 通常&#xff0c;我们需要将图像转换为与原始图像不同的大小。为此…

函数的参数命名和默认参数

在Kotlin中&#xff0c;函数可以有多个参数&#xff0c;记住参数的顺序或者仅靠位置理解他们的作用可能会很具有挑战性&#xff0c;特别是对于接受多个参数或者有相同类型参数的函数。命名参数通过允许开发者指定传递给函数的每个参数的名称来解决这个问题。 有一个用来展示用户…

了解 Vue 工程化开发中的组件通信

目录 1. 组件通信语法 1.1. 什么是组件通信&#xff1f; 1.2. 为什么要使用组件通信&#xff1f; 1.3. 组件之间有哪些关系&#xff08;组件关系分类&#xff09;&#xff1f; 1.4. 组件通信方案有哪几类 &#xff1f; 2. 父子通信流程图 3. 父传子 3.1. 父传子核心流程…