RabbitMQ应用问题——消息补偿机制以及代码示例

news2024/11/24 13:43:58

RabbitMQ应用问题——消息补偿机制以及代码示例

RabbitMQ应用问题

  • 消息可靠性的保障
    • 消息补偿机制

在这里插入图片描述

详细说明

这里使用了简单的代码进行演示,订单的消费者没有写,在订单的消费同时,发送一条增加积分消息到积分队列。

详细流程途中都有注明。

为了更加清楚代码这里进行表明功能。

在这里插入图片描述

gitee地址

1.创建mq-manager父工程

1.1导入依赖

 <packaging>pom</packaging>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.0</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.7</version>
    </dependency>

</dependencies>

2.创建mq-common子模块

2.1导入依赖

<dependencies>
    <!-- mybatis-plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.2</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.2.6</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

2.2编写实体类

2.2.1Order
package com.qf.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.util.Date;
/*
  { "bizNo": "20200803173145877",
    "status": 1,
    "price": 34.12,
    "goodId": 1002,
    "userId": 100
  }

*/

//订单
@TableName("orders")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    @TableField(value = "biz_no")
    private String bizNo; //业务编号

    @TableField(value = "status")
    private Integer status;

    @TableField(value = "price")
    private BigDecimal price;

    @TableField(value = "create_time")
    private Date createTime;

    @TableField(value = "pay_time")
    private Date payTime;

    @TableField(value = "good_id")
    private Integer goodId;

    @TableField(value = "user_id")
    private Integer userId;

    //exist = false:该属性不使用
    @TableField(value = "num", exist = false)
    private Integer num;
}
2.2.2Integral
package com.qf.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

//积分
@TableName("integral")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Integral{

    @TableId(type = IdType.AUTO)
    private Integer id;
    @TableField("user_id")
    private Integer userId;
    private Long score;
    private String msg;

    @TableField("create_time")
    private Date createTime;
}
2.2.3Msg
package com.qf.entity;

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

//消息
@TableName("message")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Msg {

    @TableId(value = "id")
    private String id;

    @TableField(value = "exchange")
    private String exchange;

    @TableField(value = "routing_key")
    private String routingKey;

    @TableField(value = "content")
    private String content; // 消息的内容

    @TableField
    private Integer status; // 消息的状态

    @TableField(value = "try_count")
    private int tryCount; //尝试次数

    @TableField(value = "create_time")
    private Date createTime;
}

2.3编写公共参数

2.3.1IntegralConstant
package com.qf.contant;

//设置系统中的参数
public class IntegralConstant {

    // 积分系统队列
    public final static String INTEGRAL_QUEUE = "integral_queue";

    // 积分系统交换机
    public final static String INTEGRAL_EXCHANGE = "integral_exchange";

    // 积分系统的 routing-key
    public final static String INTEGRAL_ROUTING_KEY= "integral_routing_key";
}
2.3.2DeadConstant
package com.qf.contant;

//死信
public class DeadConstant {
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    //死信路由键
    public static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
}

3.编写mq-order子模块

3.1导入公共模块

<dependencies>
    <dependency>
        <groupId>com.qf</groupId>
        <artifactId>mq-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

3.2编写配置文件

server:
  port: 8080

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.129
    port: 5672
    publisher-confirm-type: simple
    publisher-returns: true

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #驼峰形式
    map-underscore-to-camel-case: true

3.3编写启动类

package com.qf;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.qf.mapper")
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class,args);
    }
}

3.4编写OrderController

package com.qf.controller;

import com.qf.entity.Order;
import com.qf.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("insertOrder")
    public String insertOrder(@RequestBody Order order){
        orderService.insertOrder(order);
        return "success";
    }

}

3.5编写OrderService

package com.qf.service;

import com.qf.entity.Order;

public interface OrderService {
    /**
     * 插入订单
     * @param order
     */
    void insertOrder(Order order);
}

3.6编写OrderServiceImpl

package com.qf.service.impl;

import com.alibaba.fastjson.JSON;
import com.qf.contant.IntegralConstant;
import com.qf.entity.Integral;
import com.qf.entity.Msg;
import com.qf.entity.Order;
import com.qf.mapper.MsgMapper;
import com.qf.mapper.OrderMapper;
import com.qf.service.OrderService;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

@Service
@Transactional
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private MsgMapper msgMapper;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void insertOrder(Order order) {
        //插入订单
        orderMapper.insert(order);

        //插入积分,是把积分信息发送到消息队列中
        Integral integral = new Integral();
        integral.setUserId(order.getUserId());//积分对应的用户id就是下订单的用户id
        integral.setScore(10L);
        integral.setMsg("购物积分");
        integral.setCreateTime(new Date());
        //把积分对象,转换为Json类型,发送到消息队列中
        String integralJson = JSON.toJSONString(integral);

        //创建消息对象,如果消息消费成功了,再去删除对应的消息
        Msg msg = new Msg();
        //分布式环境下,id必须是唯一的,解决方案:百度的uid-generator,美团开源项目Leaf
        String uuid = UUID.randomUUID().toString();
        msg.setId(uuid);
        msg.setExchange(IntegralConstant.INTEGRAL_EXCHANGE);//积分对应的交换机
        msg.setRoutingKey(IntegralConstant.INTEGRAL_ROUTING_KEY);//积分对象的路由的key
        msg.setContent(integralJson);//积分的Json对象
        msg.setStatus(-1);//状态
        msg.setTryCount(0);//尝试次数
        msg.setCreateTime(new Date());//时间
        //插入消息
        msgMapper.insert(msg);

        //发送消息,需要把Msg对象的id(就是uuid)传过来,一旦消息消费成功,还要去Msg对应表中把该消息删除
        CorrelationData correlationData = new CorrelationData(uuid);
        System.out.println("uuid:" + uuid);
        System.out.println("correlationData.getId():" + correlationData.getId());
        //发送
        rabbitTemplate.convertAndSend(
                IntegralConstant.INTEGRAL_EXCHANGE,
                IntegralConstant.INTEGRAL_ROUTING_KEY,
                buildMessage(integralJson,uuid),
                correlationData
        );
    }

    //构建消息
    private Message buildMessage(String body,String messageId){
        //获取MessagePropertiesBuilder对象
        MessagePropertiesBuilder messagePropertiesBuilder = MessagePropertiesBuilder.newInstance();
        //获取MessageProperties对象
        MessageProperties messageProperties = messagePropertiesBuilder.build();
        messageProperties.setMessageId(messageId);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化

        Message message = new Message(body.getBytes(),messageProperties);
        System.out.println("message传递的内容:" + new String(message.getBody()));
        System.out.println("message传递的uuid:" + message.getMessageProperties().getMessageId());

        return message;
    }
}

3.7编写OrderMapper

package com.qf.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Order;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderMapper extends BaseMapper<Order> {
}

3.8编写MsgMapper

package com.qf.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Msg;
import org.springframework.stereotype.Repository;

@Repository
public interface MsgMapper extends BaseMapper<Msg> {
}

3.9编写配置类

3.9.1DeadConfig
package com.qf.config;

import com.qf.constant.DeadConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadConfig {

    //创建队列
    @Bean
    public Queue createDeadQueue(){
        return new Queue(DeadConstant.DEAD_LETTER_QUEUE);
    }

    //创建交换机
    @Bean
    public DirectExchange createDeadExchange(){
        //交换机默认持久化
        return new DirectExchange(DeadConstant.DEAD_LETTER_EXCHANGE);
    }

    //绑定:交换机中的消息可以发送到不同的队列
    @Bean
    public Binding bindingDeadQueue(){
        //需要设置routingKey
        return BindingBuilder.bind(createDeadQueue()).to(createDeadExchange())
                .with(DeadConstant.DEAD_LETTER_ROUTING_KEY);//和发送消息时的routingKey一致
    }
}
3.9.2IntegralConfig
package com.qf.config;

import com.qf.constant.DeadConstant;
import com.qf.constant.IntegralConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

//路由模式
@Configuration
public class IntegralConfig {

    //创建队列
    @Bean
    public Queue createIntegralQueue(){

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DeadConstant.DEAD_LETTER_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DeadConstant.DEAD_LETTER_ROUTING_KEY);

        return new Queue(IntegralConstant.INTEGRAL_QUEUE,true,false,false,arguments);
    }

    //创建交换机
    @Bean
    public DirectExchange createIntegralExchange(){
        //交换机默认持久化
        return new DirectExchange(IntegralConstant.INTEGRAL_EXCHANGE);
    }

    //绑定:交换机中的消息可以发送到不同的队列
    @Bean
    public Binding bindingIntegralQueue(){
        //需要设置routingKey
        return BindingBuilder.bind(createIntegralQueue()).to(createIntegralExchange())
                .with(IntegralConstant.INTEGRAL_ROUTING_KEY);//和发送消息时的routingKey一致
    }

}
3.9.3PublisherConfirmAndReturnConfig
package com.qf.config;

import com.qf.entity.Msg;
import com.qf.mapper.MsgMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;


@Slf4j
@Configuration
public class PublisherConfirmAndReturnConfig implements
        RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MsgMapper msgMapper;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        //判断
        if(ack){
            log.info("已到达broker");
            log.info("correlationData id is {}",correlationData.getId());
            //删除存到数据库中的消息
            //msgMapper.deleteById(correlationData.getId());//通过id删除
            //设置删除条件
            HashMap<String, Object> map = new HashMap<>();
            map.put("id",correlationData.getId());
            map.put("status",-1);
            //多条件删除
            msgMapper.deleteByMap(map);

        }else{
            log.info("没有到达broker,实际上消息已经保存到mysql中,也可以保存到redis中");
        }
    }

    //return机制,该方法比confirm先执行,只要未到达队列的时候才执行
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息未到达队列");
        //如果消息到达队列不执行以下代码

        //消息已经从数据库被删除了
        //考虑人工干预,获取消息信息进行保存
        String exchange = returnedMessage.getExchange();
        String routingKey = returnedMessage.getRoutingKey();
        Message message = returnedMessage.getMessage();

        //创建消息对象,如果消息消费成功了,再去删除对应的消息
        Msg msg = new Msg();

        msg.setId(message.getMessageProperties().getMessageId());
        msg.setExchange(exchange);//积分对应的交换机
        msg.setRoutingKey(routingKey);//积分对象的路由的key
        msg.setContent(new String(message.getBody()));//积分的Json对象
        msg.setStatus(-2);//状态可以设置为和之前不一样
        msg.setTryCount(0);//尝试次数
        msg.setCreateTime(new Date());//时间
        //插入消息
        msgMapper.insert(msg);
        //做进一步处理:给管理员发邮件,发短信....

    }
}

4.创建mq-integral子模块

4.1导入依赖

<dependencies>
    <dependency>
        <groupId>com.qf</groupId>
        <artifactId>mq-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    
    <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.4</version>
        </dependency>
</dependencies>

4.2编写配置文件

server:
  port: 8081

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.134
    port: 5672
    listener:
      simple:
        retry:
          enabled: true #开启消息重试
          max-attempts: 3 #最大重试次数
          initial-interval: 2000ms #每次重试的时间间隔
          multiplier: 2 #每次重试时间乘以当前倍数
        #重试机制必须是自动ack,才能放到死信队列中
        acknowledge-mode: auto

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #驼峰形式
    map-underscore-to-camel-case: true

4.3编写启动类

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.qf.mapper")
public class IntegralApplication {
    public static void main(String[] args) {
        SpringApplication.run(IntegralApplication.class,args);
    }
}

4.4编写IntegralImpl

package com.qf.service.impl;

import com.alibaba.fastjson.JSON;
import com.qf.constant.IntegralConstant;
import com.qf.entity.Integral;
import com.qf.mapper.IntegralMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Service
public class IntegralImpl {

    @Autowired
    public IntegralMapper integralMapper;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

//    //第一种方式    
//    @RabbitListener(
//            bindings = @QueueBinding(
//                    value = @Queue(name = IntegralConstant.INTEGRAL_QUEUE,
//                    durable = "true"
//            ),
//            key = {IntegralConstant.INTEGRAL_ROUTING_KEY},
//            exchange = @Exchange(name = IntegralConstant.INTEGRAL_EXCHANGE, durable = "true")
//    ))
//    public void insertIntegral(Message message){
//        //积分信息
//        String integralJson = new String(message.getBody());
//        System.out.println(integralJson);
//
//        //类型转换
//        Integral integral = JSON.parseObject(integralJson, Integral.class);
//        System.out.println(integral);
//        //插入数据库
//        integralMapper.insert(integral);
//    }

//    //第二种方式  
//    @RabbitListener(queues = IntegralConstant.INTEGRAL_QUEUE)
//    public void insertIntegral(Message message) throws InterruptedException {
//        //积分信息
//        String integralJson = new String(message.getBody());
//        System.out.println(integralJson);
//
//        //运行成功
//        //类型转换
//        Integral integral = JSON.parseObject(integralJson, Integral.class);
//        System.out.println(integral);
//        //插入数据库
//        integralMapper.insert(integral);
//
//        //测试消息重复消费
//        //直到方法执行完毕,才会ack
//        Thread.sleep(500000);
//
//        //测试消息重试
//        //System.out.println("当前系统时间:"+System.currentTimeMillis());
//        //运行失败
//        //throw new RuntimeException("消息消费异常...");
//    }

    
    @RabbitListener(queues = IntegralConstant.INTEGRAL_QUEUE)
    public void receiveIntegralMessage(Message message){
        //获取要被消息的消息id
        String messageId = message.getMessageProperties().getMessageId();
        //判断,如果redis中没有这个消息id的key,则是第一次消费该消息
        if(!stringRedisTemplate.hasKey(messageId)){
            //获取积分信息
            String integralJson = new String(message.getBody());
            //类型转换
            Integral integral = JSON.parseObject(integralJson, Integral.class);
            //插入到数据库
            integralMapper.insert(integral);
            //使用hutool工具类生成随机数
            int randomInt = RandomUtil.randomInt(10, 100);
            //往redis中也存一份
            stringRedisTemplate.opsForValue().setIfAbsent(messageId,
                    String.valueOf(randomInt),600, TimeUnit.SECONDS);

        }

}

4.5编写IntegralMapper

package com.qf.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Integral;
import org.springframework.stereotype.Repository;

@Repository
public interface IntegralMapper extends BaseMapper<Integral> {
}

5.创建mq-compensate子工程

5.1导入依赖

<dependencies>
    <dependency>
        <groupId>com.qf</groupId>
        <artifactId>mq-order</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

4.2编写配置文件

server:
  port: 8082

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.134
    port: 5672
    publisher-confirm-type: simple
    publisher-returns: true

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #驼峰形式
    map-underscore-to-camel-case: true

4.3编写启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class CompensateApplication {
    public static void main(String[] args) {
        SpringApplication.run(CompensateApplication.class,args);
    }
}

4.4编写MassageCompensateTask

package com.qf.task;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.qf.contant.IntegralConstant;
import com.qf.entity.Msg;
import com.qf.mapper.MsgMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.util.List;

/**
 *
 * 消息补偿:隔一段时间,去数据库查询未消费掉的消息,再次执行
 */

@Configuration
public class MassageCompensateTask {

    @Autowired
    private MsgMapper msgMapper;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Scheduled(cron = "10 * * * * ?")
    public void compensateTask(){
        //设置查询条件
        QueryWrapper<Msg> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("status","-2");
        queryWrapper.lt("try_count","3");
        //查询消息
        List<Msg> msgList = msgMapper.selectList(queryWrapper);
        //判断
        if(msgList.size() > 0){
            for(Msg msg : msgList){

                System.out.println("数据库中的消息id:" + msg.getId());
                //发送
                rabbitTemplate.convertAndSend(
                        msg.getExchange(),
                        msg.getRoutingKey(),
                        buildMessage(msg.getContent(),msg.getId()),
                        new CorrelationData(msg.getId())
                );

                //设置尝试次数
                msg.setTryCount(msg.getTryCount() + 1);
                //修改数据库中的消息
                msgMapper.updateById(msg);

            }
        }
    }

    //构建消息
    private Message buildMessage(String body, String messageId){
        //获取MessagePropertiesBuilder对象
        MessagePropertiesBuilder messagePropertiesBuilder = MessagePropertiesBuilder.newInstance();
        //获取MessageProperties对象
        MessageProperties messageProperties = messagePropertiesBuilder.build();
        messageProperties.setMessageId(messageId);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化

        Message message = new Message(body.getBytes(),messageProperties);
        System.out.println("message传递的内容:" + new String(message.getBody()));
        System.out.println("message传递的uuid:" + message.getMessageProperties().getMessageId());

        return message;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/539581.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32+ov7725+ESP8266实现无线图传-完成上位机图像显示

一、需求 stm32f407探索者开发板和STM32F103ZET6战舰开发板。接正点原子ov5640、OV7725、OV2640摄像头,通过esp8266Wi-Fi模块(透传模式)将摄像头采集到的rgb565格式图片通过tcp/ip协议上传到上位机显示。 二、设计思路 【1】使用QT开发上位机,建立TCP服务器,接收ESP8266…

DistilPose: Tokenized Pose Regression with Heatmap Distillation

论文名字&#xff1a;DistilPose&#xff1a;使用热图蒸馏的令牌化姿势回归 论文地址&#xff1a;2303.02455.pdf (arxiv.org)https://arxiv.org/pdf/2303.02455.pdf项目地址&#xff1a;yshMars/DistilPose: Implementation for: DistilPose: Tokenized Pose Regression with…

科幻风的卡片视频播放

上一篇博文展示了卡片中的VR展示&#xff0c;那篇主要是卡片的3D转动来展示未显示的部分图片。这篇&#xff0c;我们来点科幻的。 我们在卡片中播放视频的同时来拖动卡片或转动它。像下面那样&#xff1a; 这个主要依赖了两个库&#xff0c;具体代码如下&#xff1a; <!D…

智能问答支持自定义问答

# -*- coding: utf-8 -*- # Time : 2023-5-12 14:15 # Author : shenzh # FileName: chat_bot_v1.py # Software: PyCharm """Description:一期智能机器人设计完成&#xff0c;支持自定义问题和答案随时增加功能""" import json import jie…

NOV Diagram for .NET Crack

NOV Diagram for .NET Crack 增加了对Microsoft.NET 7.0的支持-NOV现在完全支持.NET Core 7.0&#xff0c;此外还支持Microsoft.NET Framework 4.7.2、.NET Core 5.0和.NET Core 6.0的内部版本。 添加了对读取Microsoft Visio 2003-2010绘图(VSD文件)的支持。 改进了SVG导出。 …

哈希表应用——位图

应用场景&#xff1a;海量数据处理&#xff08;这里的海量是指一般数据量非常大如以亿为单位的数据量&#xff09; 目录 面试题 位图概念 位图的实现 位图的应用 应用一 应用二 位图应用变形 面试题 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&…

Java之多线程进阶

目录 一.上节内容复习 1.线程池的实现 2.自定义一个线程池,构造方法的参数及含义 3.线程池的工作原理 4.拒绝策略 5.为什么不推荐系统提供的线程池 二.常见的锁策略 1.乐观锁和悲观锁 2.轻量级锁和重量级锁 3.读写锁和普通互斥锁 4.自旋锁和挂起等待锁 5.可重入锁和…

精彩回顾 | Fortinet Accelerate 2023·中国区巡展厦门站

Fortinet Accelerate 2023中国区 5月16日&#xff0c;Fortinet Accelerate 2023中国区巡展来到魅力“鹭岛”——厦门&#xff0c;技术、产品和业务专家&#xff0c;携手亚马逊云科技、唯一网络等云、网、安合作伙伴&#xff0c;与交通、物流、金融等各行业典型代表客户&#x…

GPT大语言模型Vicuna本地化部署实践(效果秒杀Alpaca) | 京东云技术团队

​ 背景 上一篇文章《[GPT大语言模型Alpaca-lora本地化部署实践]》介绍了斯坦福大学的Alpaca-lora模型的本地化部署&#xff0c;并验证了实际的推理效果。 总体感觉其实并不是特别理想&#xff0c;原始Alpaca-lora模型对中文支持并不好&#xff0c;用52k的中文指令集对模型进…

信息安全工程实验——口令攻击和钓鱼攻击(自用)

目录 实验目的 实验原理 实验内容 练习1windows口令破解 1、基本操作 2、思考与总结 练习2&#xff1a;QQ 邮箱的钓鱼攻击 1、构造钓鱼页面 2、接收钓鱼所得的账号和密码&#xff08;分档&#xff09; 3、实验验证 4、思考与总结 实验目的 &#xff08;1&#xff09…

网络安全实验——信息收集与主机发现

目录 实验目的 实验原理 实验内容 1.信息搜集 1.ping探测 2. Nmap扫描 3. 探测总结 2.主机发现程序开发 3.主机发现 实验总结 实验目的 1.了解信息搜集的一般步骤。 2.学会熟练使用ping命令。 3.学会利用Nmap等工具进行信息搜集。 4.了解IP助手函数。 5.掌握Sen…

Python学习29:存款买房(B)

描述‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‭‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‮‬‪‬‪‬‪‬‪‬‪‬‮‬‭‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‪‬‪‬‪‬‪‬‪‬‪‬‮‬‭‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‪‬ 你刚刚大学毕业&#xff0c;…

李莫愁给张无忌朋友圈点赞?详解SpringBoot事件机制

Spring Boot的事件机制是基于Spring框架的事件机制实现的。Spring Boot中的事件机制可以让我们在应用程序中监听和响应特定的事件&#xff0c;例如应用程序启动、关闭、上下文刷新等。 接下来&#xff0c;我们通过一个案例&#xff0c;来讲解具体怎么使用。 这个案例就是李莫…

一文看懂增值税发票识别OCR:从技术原理到 API Java 示例代码接入

引言 增值税发票识别OCR API是一项重要的技术创新&#xff0c;它在如今信息化的商业环境中发挥着重要作用。通过利用该API&#xff0c;企业和机构能够实现增值税发票的自动化识别和信息提取&#xff0c;从而在财务管理、票据核对、报销流程等方面带来许多好处。 本文将详细介…

Istio virtual service 超时和重试

在使用xshell去远程连接服务器的时候没有反应&#xff0c;这样可能等了几分钟&#xff0c;这样按下crtlc终止就行了。 有些时候微服务是多个服务组成的&#xff0c;a服务会去调用b服务&#xff0c;可能因为网络问题或者连接问题&#xff0c;没有连接成功&#xff0c;那么会尝试…

怎么把音频的声音调大?

怎么把音频的声音调大&#xff1f;我们平时会经常使用到音频文件&#xff0c;但声音大小不一&#xff0c;可能会让我们感到不适应。如果太大&#xff0c;甚至会使人吓一跳&#xff1b;如果太小&#xff0c;则难以听清楚。为了轻松聆听音频&#xff0c;我们需要将声音调整到合适…

ConvNeXt网络详解,最新ConvNeXt结合YOLO,催生YOLOv5目标检测巨变

目录 引言一、ConvNeXt的介绍1、目标检测的重要性2、YOLOv5的介绍3、ConvNeXt原理和特点4、ConvNeXt结构 二、相关研究综述1、目标检测的基础原理和流程2、YOLOv5的特点与局限性3、ConvNeXt技术在目标检测中的应用现状 三、ConvNeXt在YOLOv5中的应用与改进1、安装PyTorch和torc…

什么是栈,为什么函数式编程语言都离不开栈?

文章目录 一、什么是栈&#xff0c;什么是FILO二、栈的作用是什么&#xff0c;为什么编程语言函数调用都选择用栈&#xff1f;三、使用C模拟实现解析栈1.结构体的定义2.栈的创建及销毁3.实现入栈操作4.获取栈顶元素及出栈操作5.获取栈中有效元素个数 源代码分享 一、什么是栈&a…

临时被拉去已经在进行中的项目组「救火」,该怎么开展管理?

当你被临时拉去参与正在进行中的项目组&#xff0c;需要进行所谓的「救火」工作时&#xff0c;你需要注意的是如何开展管理&#xff0c;以确保项目能够成功完成。 首先&#xff0c;你需要了解项目的当前状态。了解项目的进展情况、目标和计划&#xff0c;以及团队成员的角色和…

Vmware Linux磁盘空间扩容

Linux磁盘空间扩容 VMware虚拟机中配置&#xff08;1&#xff09;进入虚拟机设置界面&#xff0c;选择扩展磁盘容量。&#xff08;2&#xff09; 本次是在原来30G的基础上扩展为50G。 Linux中设置&#xff08;1&#xff09; 可以看出sda3是根分区&#xff0c;下面按照博客提示&…