rabbitmq使用笔记

news2025/1/10 10:27:39

前言

mq的优点:异步提速、解耦、流量削峰

mq的缺点: mq宕机可能导致消息丢失、消费者未成功消费如果保证整个系统架构的事务安全、消息可能被重复消费出现幂等问题、消息未被消费到引发死信问题、消息出现消费延迟或消费异常引发的顺序消费错乱问题...

mq的使用建议:系统扛不住了,扩容太贵了,不得不使用了

以下将根据官网提供的主流工作模式描述mq使用流程

一、队列生产消费模式

1、简单模式

消费者创建

public class SimpleConsumer {

    public static final String QUEUE_NAME = "hello world";

    //队列的名称
    public static void consumeMessage() throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接rabbitmq队列
        factory.setHost("127.0.0.1");
        //设置用户名称
        factory.setUsername("guest");
        //设置密码
        factory.setUsername("guest");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        //声明 接受消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        //取消消息回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        //消费者消费消息:消费哪个队列、消费成功之后是否要自动应答(true:自动应答,false:手动应答)、消费失败的回调、取消消息的回调
        System.out.println(channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback));
    }
}

生产者创建 

public class SimpleProducer {

    public static final String QUEUE_NAME = "hello world";

    //消息发送
    public static void buildMessage() throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接rabbitmq队列
        factory.setHost("127.0.0.1");
        //设置用户名称
        factory.setUsername("guest");
        //设置密码
        factory.setUsername("guest");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        //创建队列:
        //    设置队列名称、是否持久化、是否共享队列消息的消费(true:共享消费,false:只有一个消费者消费)、是否自动删除(true:消息用完后自动删除、false:)、其它参数(延迟、死信消息处理用)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送消息
        String message = "hello world!!!";

        //消息发送
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }
}

2、工作队列模式

         工作队列(任务队列)主要是消费端注册多个消费者以加速消息消费,防止消息消费速度慢、延迟、死信等异常,适用于任务比较重的地方

                                        

模式特点:

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息

代码和简单模式没区别,额外要做的就是对消费者线程多开,可以对线程做些标记观察是否真的有轮询消费

3、Pub/Sub订阅模式

与之前的模式相比,该模式可以提供消息共享,即一个消息被多个消费者消费,但是一个队列中的消息仍然只能消费一次,那如何做到的多消费呢?答案是利用交换订阅技术,即由交换机来路由消息给多个队列从而实现多次消费 

Exchange(交换机)

        交换机的工作非常简单,一方面接受来自生产者的消息,另一方面将消息推到队列。关键的点在于交配机必须精确处理消息,即把消息推到哪个队列或者是否应该丢弃

        交换机类型描述:直接(direct/route)、主题(topic)、标题(headers)、扇出(fanout)

注:比如前面代码发布消息时channel.basicPublish方法指定的exchange是""或者null,实际走的是name为default的exchange,其类型为direct,routingkey默认是队列名称

临时队列

        未设置持久配置都是临时队列,即durable参数配置的值不是durable,哦還要看生成者是否配置了autodelete,如果autodelete參數配置的值是true也是臨時隊列,因为用完就把队列删除了嘛谁还管队列是否配置了持久

        创建临时队列的快速方法:String queueName = channel.queueDeclare().getQueue()

Bind

        交换机绑定queue

        

Fanout

        扇出这种类型就是广播,我不知道谁做的中文翻译,我*你**,就是消息广播到所绑定的所有队列,和routingkey没有关系

4、Direct订阅模式        

        这个和fanout模式差不多,区别在于exchange的type是direct,需要在bind队列时设置routing key,exchange根据routing key找到bind的queue,然后路由消息到这个queue

        

5、Topic        

        这个是对direct的扩展,direct的缺点在于需要硬编码到代码,如果以后要扩展别的队列还需要去生产者手动添加新routing key,当然我们可以通过服务配置来弥补这个问题,但是维护也需要人力,而topic可以很好的解决这个耦合问题

topic模式要求

        交换机的routing key不能随意写,必须是一个单词列表,以 . 分隔,单词列表长度<=255

        还有两个替换符,*代替一个单词,#代替零或多个单词,例如:*.orange.*;lazy.# ...

二、死信队列

死信:

        死信是指无法被消费的消息,一般来说produer将消息投递到queue,consumer从queue取出消息进行消费,但是某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就是死信消息,为了保存这些死信消息就有了死信队列。

应用场景:

        为了保证订单业务的消息数据不丢失,需要使用rabbitmq的死信队列机制,当消息消费发生异常,将消息投入死信队列中。        

死信来源:

        消息ttl过期

        队列达到最大长度(队列满了,无法再添加数据到mq)

        消息被拒绝(basic.reject或者basic.nack)并且requeue=false

        消费者都宕机了

三.延迟队列

        延迟队列是用来存放需要在指定时间被处理的元素的队列 

1.应用场景

        外卖订单规定在15min之内下单,否则失效

        新创建的店铺,如果十天内没有上传商品,自动发送消息提醒

        用户注册账户成功,如果三天内没有登陆进行短信提醒

        用户发起退款,如果三天内没有得到处理会通知线下服务人员

        预定会议后,在预定时间的前15分钟消息提醒参会人员

2.代码操作

2.1.框架图

创建两个队列 QA 和 QB,两个队列 TTL 分别设置为 10s 和 40s,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

2.2.配置信息 

@Configuration
public class TtlQueueConfig {

    /**
     * 普通交换机名称
     */
    public static final String X_EXCHANGE = "X";

    /**
     * 死信交换机名称
     */
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    /**
     * 普通队列名称
     */
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    /**
     * 死信队列名称
     */
    public static final String DEAD_LETTER_QUEUE = "QD";

    /**
     * 声明 XExchange
     */
    @Bean
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 声明 yExchange
     */
    @Bean
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明队列QA
     */
    @Bean
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信路由键
        arguments.put("x-dead-letter-routing-key", "YD");
        // 设置过期时间
        arguments.put("x-message-ttl", 10000);
        return new Queue(QUEUE_A, true, false, false, arguments);
    }

    /**
     * 声明队列QB
     */
    @Bean
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信路由键
        arguments.put("x-dead-letter-routing-key", "YD");
        // 设置过期时间
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    /**
     * 死信队列QD
     */
    @Bean
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }


    /**
     * 绑定
     */
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueBBindingX(){
        return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);
    }

    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

2.3.消息生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable String message){
        log.info("当前时间:{}发送一条消息{}给两个队列", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s队列QA:"+message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s队列QB:"+message);
        return "发送成功";
    }
}

2.4.消息消费者代码

@Slf4j
@Component
public class DeadLetterConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg);
    }
}

2.5.测试

        发送一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
在这里插入图片描述

第一条消息在 10s 后变成了死信消息,然后被消费者消费掉了,第二条消息在 40s 之后变成了死信消息,然后被消费掉,这样一个延时队列就完成了。

2.6.缺点

        queueA和queueB都是通过queue设置ttl,如此一来生产者无法灵活设置消息的ttl

2.7.优化

        让生产者也参与设置ttl,即生产者在生产消息时设置ttl,如此一来queue即使没有设置ttl也可以实现延时操作,扩展性大幅提高。

        【注:】如果生产者和queue同时设置ttl,则以最短的ttl为有效值

        如图,新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

2.8.配置信息

@Component
public class MsgTtlQueueConfig {

    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    private static final String QUEUE_C = "QC";

    @Bean("queueC")
    public Queue queueC(){
        Map<String, Object> arguments = new HashMap<>(2);
        // 声明当前队列绑定的死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 声明当前队列的私信路由key
        arguments.put("x-dead-letter-routing-key", "YD");
        return new Queue(QUEUE_C, false, false, false, arguments);
    }

    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

2.9.消费生产者

@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration(ttlTime);
            return message;
        }
    };
    rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
    return "发送成功";
}

        将程序执行,然后发送请求:
                http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
                http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

在这里插入图片描述
        两条消息的过期时间一致,过期时间短的那条消息,在过期时间到了以后并没有立即被消费,而是和过期时间长的那条消息一起被消费了。所以,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

2.10.Rabbitmq 插件实现延迟队列

        上面提到的问题,确实是一个问题,如果不能实现TTL的细粒度,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。

原理:

        以往的ttl都是由队列处理,而插件延迟功能是由exchange来管理,当ttl达到截至时间再发送到对应的queue

安装:

        这里不做演示,可百度去官网下载和安装对应版本的插件(好文推荐),安装完成后重启如下所示

        在这里插入图片描述 

        新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
在这里插入图片描述

        我们自定义的交换机中,这是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。 

2.11.配置信息 

@Configuration
public class DelayedQueueConfig {

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    @Bean("delayedQueue")
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    /**
     * 自定义交换机 定义一个延迟交换机
     *  不需要死信交换机和死信队列,支持消息延迟投递,消息投递之后没有到达投递时间,是不会投递给队列
     *  而是存储在一个分布式表,当投递时间到达,才会投递到目标队列
     * @return
     */
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>(1);
        // 自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

2.12.生产消息

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{
            messagePostProcessor.getMessageProperties().setDelay(delayTime);
            return messagePostProcessor;
        });
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message);
        return "发送成功";
    }
}

2.13.消费消息

@Slf4j
@Component
public class DeadLetterConsumer {

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);
    }
}

发起请求:
        http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
        http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
在这里插入图片描述
        第二条消息被先消费掉了,符合预期 

2.14.总结

        延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好地利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列,来保证消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好要的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。 

四、发布确认        

        如果由于异常重启rabbitmq,那么中间的消息将会投递失败且丢失,只能数据补偿。生产环境如何保证异常下的数据不会丢失?

        最好的方法只能是消息先行磁盘存储,如果服务异常或者重启仍然可以追回消息

发布确认使用方式

        单消息发布确认:channel发布消息,然后channel执行waitForConfirms(),虽然简单但是低效,1000个简单消息处理耗时1s

        批量消息发布确认:channel发布一批消息(例如for循环发布),然后channel执行waitForConfirms(),1000个简单消息处理耗时0.15s

        异步发布确认: channel发布消息,然后channel执行 addReturnListener(ReturnCallback returnCallback),1000个简单消息处理耗时0.06s,不过需要你实现里面的两个回调接口并作为参数传入,一个是发布确认接口,另一个是不可达消息处理接口

springBoot下发布确认使用方式

        spring-boot-starter-amqp提供了配置信息:spring.rabbitmq.publisher-confirm-type,主要用于设置发布确认类型

        correlated:成功发布消息到交换机,可以异步触发回调

        simple:支持rabbittemplate使用waitForConfirms方法进行同步确认

        none:禁用发布模式

异步处理不可达消息方式

        在仅开启生产者确认机制的情况下,交换机接收到消息后会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息被直接丢弃,生产者也感知不到消息被丢弃。这是一种不可靠的情况。解决方法就是设置mandatory参数,即生产者设置回退消息回调方法用来处理回退消息。

        springBoot的mandatory设置方式:spring.rabbitmq.publisher-returns=true

代码演示

        发布确认和不可达消息处理写一块了,如下 

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入发布确认回调接口
        rabbitTemplate.setConfirmCallback(this);
//        注入失败回退回调接口
        rabbitTemplate.setReturnsCallback(this);
    }
    /**
     * 交换机确认回调方法
     * 1.发送消息 交换机收到消息 回调
     *  correlationData 保存回调消息的ID和相关信息
     *  交换机收到消息 ack = true
     *  cause null
     * 2.发送消息 交换机接受失败 回调
     *  correlationData 保存回调消息的ID和相关信息
     *  交换机收到消息 ack = false
     *  cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData!=null ? correlationData.getId() : "";
        if(ack){
            log.info("确认回调:id:{}", id);
        }else {
            log.info("交换机未接受到消息,id:{},原因:{}", id, cause);
        }
    }

    //将不可达的消息传递给生产者
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息:{};\r\n退回消息交换机:{};\r\n退回原因:{};\r\nrouting key:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
    }
}

 

五、备份交换机

        有了mandatory和回退消息回调方法,我们有机会去处理不可达的消息,但是有时候我们并不知道该如何处理这些不可达的消息,最多进行日志打印,然后触发告警,最后补偿数据,为了更方便地解决这个问题催生了备份交换机

        备份交换机是指为普通交换机指定一个备用交换机,当交换机无法路由消息时就转给备份交换机来处理

【注:】

        如果同时设置了备份交换机和mandatory设置,会先执行备份交换机处理,最后再由mandatory设置的回退消息回调方法来兜底

六、其它

1.幂等

         幂等就是对同一操作的重复执行,如删除操作和查询操作,天然兼容幂等,但是新增和修改必须解决幂等影响,比如查询某个用户的流水,不管怎么重复操作都不会影响流水数据,删除也一样,但是新增就麻烦了,比如付款,对当前订单的付款如果重复付款肯定是业务异常的,修改也是如此

         消费端的幂等保障一般是:1 唯一ID+指纹码机制,利用数据库去重;2 利用redis原子性实现。唯一码是一些规则或者时间戳加别的服务信息拼接成的唯一码,要么就是redis执行setnx指令,天然支持幂等

2.优先级队列

        在系统中有一个订单催付的业务,商城会在订单有效期内推送消息给客户作为一个下单提醒,很细节的功能,但是展开来看还是有点东西的,比如系统会区分大小客户,比如高价订单会被优先处理,这涉及到队列的优先级设置,简单系统可以利用定时任务轮询,但是发杂系统还是要用消息队列,rabbitmq就提供了一个优先级配置参数

        2.1.优先级队列控制台设置方式

        标记会有个pri,表示该队列有权限参数配置

       2.2.优先级队列代码设置方式

        配置信息   

@Configuration
public class PriorityConfig {

    @Bean
    public Queue queuePri(){
//      优先级以小为优先,设置范围0-255,不用设置太大消耗内存
        return QueueBuilder.durable("priority_queue").withArgument("x-max-priority", 10).build();
    }


}

         消费端

@Slf4j
@Component
public class PriorityConsumer {

    @RabbitListener(queues = "priority_queue")
    public void receiveConfirmMessage(Message message){
        log.warn("排序处理!消息:{}", new String(message.getBody()));
    }

}

         生产消息测试

@RestController
@RequestMapping("/priority")
public class PrioritySendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    private String sendMsg(@PathVariable("message") String message){
        MessageProperties messageProperties = new MessageProperties();
        Message msg = null;
        for(int i=1; i<11; i++){
            if(i==5){
                //优先级以小为优先
                messageProperties.setPriority(1);
            }else {
                messageProperties.setPriority(10);
            }
            msg = new Message((message+":"+i).getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.convertAndSend("","priority_queue",msg.getBody());
        }

        return "发布成功!!!";
    }
}

3.惰性队列

        从3.6.0开始引入惰性队列,功能是将消息最大限度存储到磁盘,当消息被消费时才被加载进内存。但是默认情况,生产者发送消息到队列时会最大限度存储到内存,这是为了更高效处理消息

        优点:有磁盘兜底可以大量地存储消息、使消息更加可靠(比如防止宕机丢失)

        缺点:在被写入磁盘时依然会在内存备份,当mq释放内存时会将内存的消息换页到磁盘,这个操作比较消耗内存,也会阻塞队列导致接受不到消息,尤其消息量高时问题明显

        总结:常规消息量高效处理可以用默认队列(default mode);解决消息堆积可以用惰性队列(lazy mode),好文推荐icon-default.png?t=N6B9https://blog.csdn.net/TheWindOfSon/article/details/130808424?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=5

        控制台配置信息​​​​​​ 

        

        代码配置信息

@Configuration
public class LazyModeConfig {

    @Bean
    public Queue queueLazyMode(){
        return QueueBuilder.durable("lazy_mode_queue").withArgument("x-queue-mode", "lazy").build();
    }
}

         消息消费

@Slf4j
@Component
public class LazyModeConsumer {

    @RabbitListener(queues = "lazy_mode_queue")
    public void receiveConfirmMessage(Message message){
        log.warn("懒惰队列!消息:{}", new String(message.getBody()));
    }

}

        消息生产

@RestController
@RequestMapping("/lazyMode")
public class LazyModeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // lazyMode队列的可执行测试
    @GetMapping("/sendMsg/{message}")
    private String sendMsg(@PathVariable("message") String message){
        //obj参数如果是非Message类型实例则默认执行消息持久化
        rabbitTemplate.convertAndSend("","lazy_mode_queue",message);
        return "发布成功!!!";
    }

    // lazyMode队列消息是否可持久化测试
    @GetMapping("/sendMsg2/{message}")
    private String sendMsg2(@PathVariable("message") String message){
        //obj参数设置Message类型实例,交付模式为非持久化。如果mq宕机恢复后消息依然存在则说明lazyMode队列确实是做了消息的持久化
        //测试流程:
//            1.关闭消费者LazyModeConsumer(测试结束后恢复开启)
//            2.执行接口
//            3.查看控制台,是否有待消费消息
//            4.如果有,stop mq服务
//            5.start mq服务,观察待消费消息是否依然存在
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
        Message msg =  new Message((message).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend("","lazy_mode_queue",msg.getBody());
        return "发布成功!!!";
    }
}

        创建Message类型实例发送消息,通过模拟宕机测试出的结论:消息不会丢失,持久化得到了实际验证 

6.4.交付模式(delivery mode)

        delivery mode是指生产者发送消息的属性是否支持持久化,要么是none_persistent、要么是persistent

七、集群

        91-Shovel_哔哩哔哩_bilibili

        待续         

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/752070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 修改网卡 MAC 地址

使用 iproute2 修改网卡 MAC 地址 1. 使用如下命令查看当前所有网卡及其 MAC 地址&#xff1b; sudo ip link show2. 如笔者这里想要修改网卡 ens224 的 MAC 地址&#xff0c;先使用如下命令关闭该网卡&#xff1b; sudo ip link set dev ens224 down3. 设置该网卡的 MAC 地…

阿里云:机器学习平台及OpenSearch

机器学习流程 相关项目 BladeDISC-AI编译优化 EasyRec-推荐算法库 EasyCV-视觉图像算法库 EasyNLP-NLP/多模态算法库 模型开发中算法团队面临的工程挑战 Develop platform OpenSearch 向量检索库

亚马逊云科技迁移只需5个简单步骤(2023年迁移到云)

您是否正在考虑亚马逊云科技迁移&#xff0c;并将本地项目迁移到云中&#xff1f; 但是不知道从哪里开始以及如何去做&#xff1f; 在这篇文章中&#xff0c;我将指导您完成亚马逊云科技迁移。 什么是亚马逊云科技&#xff1f; 亚马逊云科技或亚马逊网络服务是最受欢迎的云平…

jar程序部署的外部依赖和按名传参和shellUtil传参json串及返回pid问题

文章目录 指定jar程序运行的外部依赖指定参数名称传参给程序shellUtil命令传参JSON串shellUtil获取回调nohub启动程序后的pid 指定jar程序运行的外部依赖 nohup java -Djava.ext.dirs./lib/ -cp DataSourceAccessPage.jar com.sitech.adapter.JsonAdapter arg0 arg1java -cp 命…

10_SPI_Flash 连续写实验

10_SPI_Flash 连续写实验 1. 实验目标2. 连续写方法3. 操作时序4. 流程框图4.1 顶层模块4.2 连续写模块 5. 波形图6. RTL6.1 flash_seq_wr_ctrl6.2 spi_flash_seq_wr 7. Testbench 1. 实验目标 使用页写指令&#xff0c;将串口发送过来的连续不定量数据写入 Flash。本实验中&a…

Linux开发环境的搭建

文章目录 系统安装工具软件安装Xshell远程登录VScode远程登录Linux 下GCC安装 系统安装 &#xff08;虚拟机安装、云服务器&#xff09;Ubuntu18.04 网络类型&#xff1a;桥接模式网络、NAT&#xff08;network access transation)网络地址转换模式、仅主机模式 注意&#xff…

模拟电压与数字脉冲占空比控制的应用与发展前景

摘要&#xff1a;本文将讨论模拟电压控制和数字脉冲占空比控制在嵌入式控制方面的应用场景、共同点和不同点&#xff0c;并探讨它们在未来发展中的前景。 引言&#xff1a; 模拟电压控制和数字脉冲占空比控制都是嵌入式系统中常用的控制方式。模拟电压控制将电压作为控制信号&…

electron 应用优雅的配置 about 信息

使用 electron 的 dialog tray 托盘栏菜单优雅简单的配置 about 关于本应用的信息&#xff0c;效果下图所示。 项目依赖 {"electron": "^24.4.1","electron-builder": "^23.6.0","electron-builder-squirrel-windows": &q…

线程那些事

线程锁 线程锁&#xff08;Thread Lock&#xff09;&#xff0c;也被称为互斥锁&#xff08;Mutex Lock&#xff09;&#xff0c;是一种用于多线程编程中的同步机制。它用于保护共享资源在多个线程之间的访问&#xff0c;以避免出现竞态条件&#xff08;Race Condition&#x…

企业内容建站系统 ModStartCMS v6.8.0 内容页面自定义模板,内容区块功能增强

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用&#xff0c;支持后台一键快速安装&#xff0c;让开发者能快的实现业务功能开发。 系统完全开源&#xff0c;基于 Apache 2.0 开源协议&#xff0c;免费且不限制商业使用。 功能特性 丰富的模块市…

【状态估计】一维粒子滤波研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

虚函数表的地址

结论 1. c多态的实现是靠虚函数表来实现的&#xff0c;有虚函数的类有虚函数表&#xff0c;没虚函数的类就没有虚函数表 2. 虚函数表是类的所有对象共用&#xff0c;切记是共同所有&#xff0c;不是一个对象所有 3. 每个虚函数成员占据虚函数表的一行&#xff0c;是个指针&a…

机械设计制造及其自动化专业向PLC方向发展的可行性

是的&#xff0c;机械设计制造及其自动化专业往PLC&#xff08;可编程逻辑控制器&#xff09;方向发展是可行的。PLC是一种用于控制和自动化各种机械设备和工业过程的计算机控制系统。它被广泛应用于工业自动化领域&#xff0c;包括制造业、能源行业、交通运输等。 我这里刚好…

001- database - 数据库

1、新的数据库进入默认有四个数据库&#xff0c;一般不要轻易删除&#xff1b; -- 创建数据库 CREATE DATABASE 数据库名 -- 查询所有数据库 SHOW DATABASES -- 使用数据库 -- USE 数据库名 -- 查询当前使用的数据库 SELECT DATABASE() -- 删除数据库 DROP DATABASE 数据库名

Rdkit|分子3D构象生成与优化

github; 地址 文章目录 Rdkit|分子3D构象生成与优化构象生成算法概述基于距离&#xff08;distance-based&#xff09;代码示例 距离几何算法生成3D结构距离几何ETKDG生成3D构象距离几何ETKDG生成多构象将Conformer类转为Mol类手动对齐 距离几何ETKDGMMFF生成3D构象距离几何ETK…

Sevlet规范:HttpServlet类 和 HttpServletRequest接口 源码解析

1. HTTP协议解读 什么是协议&#xff1f; 协议实际上是某些人&#xff0c;或者某些组织提前制定好的一套规范&#xff0c;大家都按照这个规范来&#xff0c;这样可以做到沟通无障碍。 协议就是一套规范&#xff0c;就是一套标准。由其他人或其他组织来负责制定的。 我说的话你…

PyCharm 自动添加作者信息、创建时间等信息

PyCharm 自动添加作者信息、创建时间等信息‘ 第一步 找到settings 第二步&#xff0c;找到下图所示位置输入下面代码&#xff0c;作者改成你自己的缩写&#xff0c;你也可以添加其他的 Project &#xff1a;${PROJECT_NAME} File &#xff1a;${NAME}.py IDE &…

【iOS】编译与链接

前言 计算机语言分为机器语言、汇编语言和高级语言。 可以将高级语言分为两种&#xff1a;编译语言和解释型语言&#xff08;直译式语言&#xff09;。 解释型语言&#xff08;逐步进行解释执行&#xff09; 解释语言编写的程序在每次运行时都需要通过解释器对程序进行动态…

【Leetcode】142.环形链表II

题意&#xff1a; 给定一个链表&#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 为了表示给定链表中的环&#xff0c;使用整数 pos 来表示链表尾连接到链表中的位置&#xff08;索引从 0 开始&#xff09;。 如果 pos 是 -1&#xff0c;则…

多媒体开发之cgo

go语言作为近十年来优秀的现代开发语言的代表&#xff0c;由于继承了c语言的简洁和很多现代语言的表达方式&#xff0c;在广泛的应用场景中得到众多爱好者的喜爱&#xff0c;如何将go和c、c进行联合开发&#xff0c;拓展整个开发生态&#xff0c;不用重复造轮子&#xff0c;掌握…