2.8日学习打卡----初学RabbitMQ(三)

news2025/1/16 15:56:11

2.8日学习打卡

一.springboot整合RabbitMQ

在这里插入图片描述
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用
SpringBoot整合RabbitMQ,简化代码编写

创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>

编写配置文件

spring:
 rabbitmq:
   host: 192.168.66.100
   port: 5672
   username: jjy
   password: jjy
   virtual-host: /
# 日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,

写法如下:

package com.jjy.springrabbitmqdemo;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";
    //创建交换机
    @Bean("bootExchange")
    public Exchange getExchange(){
      return ExchangeBuilder
              .topicExchange(EXCHANGE_NAME)//交换机类型
              .durable(true)//是否持久化
              .build();
    }
    //创建队列
    @Bean("bootQueue")
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 队列名
    }
    //交换机绑定队列
    @Bean
    public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.message.#")
                .noargs();
    }
}

SpringBoot整合RabbitMQ_编写生产者

在这里插入图片描述
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送
消息,编写生产者时只需要注入RabbitTemplate即可发送消息。

package com.jjy.springrabbitmqdemo;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestProducer {
    //注入RabbitTemplate工具类
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送的消息
     */
    public void testSendMessage(){
        rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
    }
}

SpringBoot整合RabbitMQ_编写消费者

在这里插入图片描述

消费者

package com.jjy.rabbitmqcosspring.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//消费者
@Component
public class Consumer {
    //监听队列
    @RabbitListener(queues = "boot_queue")
    public void listenMessage(String message){
        System.out.println("监听的消息: "+message);
    }
}

整合后的代码,就是不用自己去实例化(创建连接工厂,连接,信道);让spring容器来控制实例的创建到销毁。
代码的实现有生产者和消费者、还有配置类(创建交换机跟队列及其绑定操作),都独立为一个类(共3个类),yml文件中配置rabbitmq的一些属性。

Direct类型(默认,匹配发送)

它会把消息路由到那些binding key与routing key完全匹配的Queue中。

它是一个一对一的模型,一条消息一定会被发到指定的一个队列(完全匹配)。

配置代码

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitDirectConfig {

    @Bean
    public Queue directQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("directQueue-One",false,false,false,null);
    }

    @Bean
    public Queue directQueue2(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("directQueue-Two",false,false,false,null);
    }

    @Bean
    public DirectExchange directExchange(){
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new DirectExchange("MqSendService-One",false,false,null);
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(directQueue())   //绑定队列
                .to(directExchange())       //队列绑定到哪个交换器
                .with("One");         //绑定路由key,必须指定
    }

    @Bean
    public Binding bingExchange2(){
        return BindingBuilder.bind(directQueue2())   //绑定队列
                .to(directExchange())       //队列绑定到哪个交换器
                .with("Two");         //绑定路由key,必须指定
    }
}

Topic类型(拓展匹配发送)

它是Direct类型的一种扩展,提供灵活的匹配规则。

  • routing key为一个句点号 " . " 分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如"One.Two"
  • binding key与routing key一样也是句点号 " . " 分隔的字符串
  • binding key中可以存在两种特殊字符 " * " 与 " # " ,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicConfig {
    @Bean
    public Queue topicQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("topicQueue-One",false,false,false,null);
    }

    @Bean
    public Queue topicQueue2(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("topicQueue-Two",false,false,false,null);
    }

    @Bean
    public TopicExchange topicExchange(){
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new TopicExchange("Topic-Ex",false,false,null);
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(topicQueue())   //绑定队列
                .to(topicExchange())       //队列绑定到哪个交换器
                .with("*.Two.*");        //路由key,必须指定
    }

    @Bean
    public Binding bingExchange2(){
        return BindingBuilder.bind(topicQueue2())   //绑定队列
                .to(topicExchange())       //队列绑定到哪个交换器
                .with("#");         //路由key,必须指定
    }
}

Fanout 类型(广播发送)

它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

它是一种一对多的类型,无法指定Binding Key,发送的一条消息会被发到绑定的所有队列。

配置代码


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitFanoutConfig {

    @Bean
    public Queue fanoutQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("fanoutQueue-One",false,false,false,null);
    }

    @Bean
    public Queue fanoutQueue2(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("fanoutQueue-Two",false,false,false,null);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new FanoutExchange("Fanout-Ex",false,false,null);
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(fanoutQueue())   //绑定队列
                .to(fanoutExchange());       //队列绑定到哪个交换器
    }

    @Bean
    public Binding bingExchange2(){
        return BindingBuilder.bind(fanoutQueue())   //绑定队列
                .to(fanoutExchange());       //队列绑定到哪个交换器
    }

}

Headers(键值对匹配,不常用)

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对;当消息发送到ExchangeRabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

该类型不常用,暂不提供代码。

Message(消息)

当执行诸如 basicPublish() 之类的操作时,内容作为字节数组参数传递,而其他属性作为单独的参数传入。

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
    
    ...
}

MessageProperties 接口定义了几个常见的属性,例如“messageId”“timestamp”、“contentType”等等。 还可以通过调用 setHeader(String key, Object value) 方法扩展这些属性

二. 消息的可靠性投递

在这里插入图片描述
RabbitMQ消息投递的路径为:
生产者 —> 交换机 —> 队列 —> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

三种模式刚好监听完RabbitMQ的一整套流程。即我们能够由这三种模式得到消息的传递及处理的结果。

确认模式(confirm)

在这里插入图片描述
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机

生产者配置文件开启确认模式

 rabbitmq:
   host: 192.168.66.100
   port: 5672
   username: jjy
   password: jjy
   virtual-host: /
    # 开启确认模式
   publisher-confirm-type: correlated
package com.jjy.rabbitproducer;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    private final String EXCHNAGE_NAME="my_topic_exchange";
    private final String QUEUE_NAME="my_queue";
    @Bean("bootExchange")
    public Exchange getExchange(){
        return ExchangeBuilder
                .topicExchange(EXCHNAGE_NAME)//交换机类型
                .durable(true)
                .build();
    }
    // 2.创建队列
    @Bean("bootQueue")
    public Queue getMessageQueue(){
        return QueueBuilder
                .durable(QUEUE_NAME) // 队列持久化
                .build();
    }
    @Bean
    public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("my_routing")
                .noargs();
    }
}

@SpringBootTest
public class ProduceTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendMessage(){
        // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 被调用的回调方法
             * @param correlationData 相关配置信息
             * @param ack 交换机是否成功收到了消息
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    System.out.println("confirm接受成功!");
                }else{
                    System.out.println("confirm接受失败,原因为:"+cause);
                    // 做一些处理。
                }
            }
        });

        rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
    }
}

退回模式(return)

在这里插入图片描述
退回模式(return)可以监听消息是否从交换机成功传递到队列,
使用方法如下:

生产者配置文件开启退回模式

spring:
 rabbitmq:
   host: 192.168.66.100
   port: 5672
   username: jjy
   password: jjy
   virtual-host: /
    # 开启确认模式
   publisher-confirm-type: correlated
    # 开启回退模式
   publisher-returns: true
package com.jjy.rabbitproducer;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    private final String EXCHNAGE_NAME="my_topic_exchange";
    private final String QUEUE_NAME="my_queue";
    @Bean("bootExchange")
    public Exchange getExchange(){
        return ExchangeBuilder
                .topicExchange(EXCHNAGE_NAME)//交换机类型
                .durable(true)
                .build();
    }
    // 2.创建队列
    @Bean("bootQueue")
    public Queue getMessageQueue(){
        return QueueBuilder
                .durable(QUEUE_NAME) // 队列持久化
                .build();
    }
    @Bean
    public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("my_routing")
                .noargs();
    }
}
@Test
    public void testReturn(){
        // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
            /**
             * @param returned 失败后将失败信息封装到参数中
             */
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("消息对象:"+returned.getMessage());
                System.out.println("错误码:"+returned.getReplyCode());
                System.out.println("错误信息:"+returned.getReplyText());
                System.out.println("交换机:"+returned.getExchange());
                System.out.println("路由键:"+returned.getRoutingKey());
            }
        });
        rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
    }

消费者消息确认(Ack)

在这里插入图片描述
在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当
中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

消费者配置开启手动签收

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启手动签收
   listener:
     simple:
       acknowledge-mode: manual
package com.jjy.rabbitconsumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class AckConsumer {
//    @RabbitListener(queues = "my_queue")
//    public void listenMessage(String Mesage){
//        int i=1/0;
//        System.out.println("成功接收到消息:"+Mesage);
//    }
//@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
    //消息投递序号,消息每次投递该值都会+1
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try{
        int i=1/0;
        System.out.println("成功接收到消息:"+message);
       // 签收消息
        /**
         * 参数1:消息投递序号
         * 参数2:是否一次可以签收多条消息
         */
        channel.basicAck(deliveryTag,true);
    } catch (Exception e){
        System.out.println("消息消费失败!");
        Thread.sleep(2000);
        // 拒签消息
        /**
         * 参数1:消息投递序号
         * 参数2:是否一次可以拒签多条消息
         * 参数3:拒签后消息是否重回队列
         */
        channel.basicNack(deliveryTag,true,true);
    }

}

}

三.RabbitMQ高级特性

消费端限流

在这里插入图片描述
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
1 生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
   }
}

2 消费端配置限流机制

spring:
 rabbitmq:
   host: 192.168.66.100
   port: 5672
   username: jjy
   password: jjy
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5
条才会继续拉取消息。
       prefetch: 5

3.消费者监听队列

@Component
public class OosConsimer {
   //@RabbitListener(queues ="my_queue")
   public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
       // 1.获取消息
       System.out.println(new String(message.getBody()));
       // 2.业务处理
       Thread.sleep(3000);
       //3.签收

       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       channel.basicAck(deliveryTag,true);
   }
}

就是说从生产端发送过来的消息,在队列等待消费端接收,如果消费端处理消息业务的速度相对较慢,积累的消息过多从而处理不过来(资源耗尽),会导致系统性能降低或瘫痪。
因为消费端每秒处理消息的条数有限,所以我们需要在消费端进行一个限流,故而限制了队列消息的投递。
即消费端限流也就是限制队列投递到消费端的流,也可以说是在队列与消费端之间进行一个限流。

利用限流实现不公平分发

在这里插入图片描述
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

公平分发则不能在yml文件中设置限流(prefetch),公平分发即给多个消费者平分消息进行消费。这样会导致处理快的消费者在等待,故而浪费资源,降低性能。

不公平分发则需要在yml文件中设置限流(prefetch),并且prefetch: 1(即设置为1);不公平分发即每次拉取一条消息,谁处理得快就继续处理,这样可以极大的节约资源,从而提高性能。

1 生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
   }
}

2 消费端配置不公平分发

spring:
 rabbitmq:
   host: 192.168.66.100
   port: 5672
   username: jjy
   password: jjy
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理
的快谁拉取下一条消息,实现了不公平分发
       prefetch: 1

3 编写两个消费者

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
    {
        //1.获取消息
        System.out.println("消费者1:"+new
                String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

        // 消费者2
        @RabbitListener(queues = "my_queue")
        public void listenMessage2(Message
        message, Channel channel) throws Exception
        {
            //1.获取消息
            System.out.println("消费者2:"+new
                    String(message.getBody(),"UTF-8"));
            //2. 处理业务逻辑
            Thread.sleep(3000);// 消费者2处理慢
            //3. 手动签收

            channel.basicAck(message.getMessageProper
                    ties().getDeliveryTag(),true);
        }
    }

消息存活时间

在这里插入图片描述
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间

设置队列所有消息存活时间
就是说需要在配置类(RabbitConfig)中设置队列所有消息的存活时间;

return QueueBuilder
                .durable(QUEUE_NAME)//队列持久化
                .ttl(10000)//设置队列的所有消息存活10s
                .build();

即在创建bean队列时,就要设置队列所有消息的存活时间。

**设置某条消息的存活时间 **

就是说只需要在发送的时候指定它的存活时间即可。
实现比较稍微麻烦一点,创建消息属性并设置存活时间,然后创建消息对象,消息对象 将消息属性作为参数,并且传入发送的消息,最后再将消息对象作为参数传给交换机,即可实现对单条消息设置存活时间。

//发送消息,并设置该消息的存活时间
    @Test
    public void testSendMessage()
    {
        //1.创建消息属性
        MessageProperties messageProperties = new MessageProperties();
        //2.设置存活时间
        messageProperties.setExpiration("10000");
        //3.创建消息对象
        Message message = new Message("sendMessage...".getBytes(),messageProperties);
        //4.发送消息
        rabbitTemplate.convertAndSend("my_topic_exchange1","my_routing",message);
    }

若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。
因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。

在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准。

消息过期后,并不会马上移除消息,只有消息消费到队列顶
端时,才会移除该消息。

 @Test

    public void testSendMessage2() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                // 1.创建消息属性
                MessageProperties messageProperties = new MessageProperties();
                // 2.设置存活时间

                messageProperties.setExpiration("10000 ");
                // 3.创建消息对象
                Message message = new Message(("send message..." + i).getBytes(), messageProperties);
                // 4.发送消息

                rabbitTemplate.convertAndSend("my_topi c_exchange", "my_routing", message);
            } else {
                rabbitTemplate.convertAndSend("my_topi c_exchange", "my_routing", "send message..." + i);
            }
        }
    }

在以上案例中,i=5的消息才有过期时间,10s后消息并没有马上被移除,但该消息已经不会被消费了,当它到达队列顶端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列

优先级队列用法如下:
1 创建队列和交换机


@Configuration
public class RabbitmqConfig3 {
    private final String EXCHANGE_NAME="priority_exchange";
    private final String QUEUE_NAME="priority_queue";
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return  ExchangeBuilder
                .topicExchange(EXCHANGE_NAME)//交换机类型
                .durable(true)//是否持久化
                .build();
    }
    @Bean(QUEUE_NAME)
    public Queue producerQueue(){
        return QueueBuilder
                .durable(QUEUE_NAME)//队列持久化
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
                .maxPriority(10)
                .build();
    }
    @Bean
    public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
    }
}

2 编写生产者

 @Test
    public void testPriority() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                // i为5时消息的优先级较高
                MessageProperties messageProperties = new MessageProperties();
                messageProperties.setPriority(9);
                Message message = new Message(("send message..." + i).getBytes(StandardCharsets.UTF_8), messageProperties);
                rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
            } else {
                rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
            }
        }
    }

3 编写消费者


@Component
public class PriorityConsumer {
    @RabbitListener(queues = "priority_queue")
    public void listenMessage(Message message, Channel channel) throws IOException {
        System.out.println(new String(message.getBody(),"utf-8"));
        //手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);


    }
}

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1442358.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关节点检测

https://www.bilibili.com/video/BV19g4y1777q/?p2&spm_id_frompageDriver 关节点检测全流程 YOLO:单阶段&#xff0c;快&#xff1b; MMPose&#xff1a;双阶段&#xff0c;准&#xff1b; 标注工具Labelme 用Labelme标注样本数据集

利用Pybind11封装Python版的WiringPi!

原版的WiringPi是一个用于树莓派的GPIO库&#xff0c;用C语言开发&#xff0c;仓库地址&#xff1a;https://github.com/WiringPi/WiringPi。该库允许用户以编程方式访问和控制树莓派的GPIO引脚。而随着Python在嵌入式设备上的快速发展&#xff0c;其对底层引脚的操作也变得越来…

OpenAI给DALL-E 3来了个新动作,加入了全新水印技术

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

老胡的周刊(第128期)

老胡的信息周刊[1]&#xff0c;记录这周我看到的有价值的信息&#xff0c;主要针对计算机领域&#xff0c;内容主题极大程度被我个人喜好主导。这个项目核心目的在于记录让自己有印象的信息做一个留存以及共享。 &#x1f3af; 项目 coze-discord-proxy[2] 代理 Discord-Bot 对…

018 Linux

文章目录 操作系统定义分类Linux系统构成 Linux文件系统Linux常用命令基础操作命令文件操作压缩解压权限管理显示展示命令其他命令 vi编译器操作使用 添加用户基本概念用户管理命令 ubuntu软件安装ssh服务终端启动Python服务 操作系统 定义 操作系统是管理计算机硬件与软件资…

【超高效!保护隐私的新方法】针对图像到图像(l2l)生成模型遗忘学习:超高效且不需要重新训练就能从生成模型中移除特定数据

针对图像到图像生成模型遗忘学习&#xff1a;超高效且不需要重新训练就能从生成模型中移除特定数据 提出背景如何在不重训练模型的情况下从I2I生成模型中移除特定数据&#xff1f; 超高效的机器遗忘方法子问题1: 如何在图像到图像&#xff08;I2I&#xff09;生成模型中进行高效…

archlinux 使用 electron-ssr 代理 socks5

提前下载好 pacman 包 https://github.com/shadowsocksrr/electron-ssr/releases/download/v0.2.7/electron-ssr-0.2.7.pacman 首先要有 yay 和 aur 源&#xff0c;这个可以参考我之前的博客 虚拟机内使用 archinstall 安装 arch linux 2024.01.01 安装依赖 yay 安装的&#…

堆排序----C语言数据结构

目录 引言 堆排序的实现**堆的向下调整算法** 对排序的时间复杂度建堆的时间复杂度&#xff1a;排序过程的时间复杂度&#xff1a;总体时间复杂度&#xff1a; 引言 堆排序&#xff08;Heap Sort&#xff09;是一种基于比较的排序算法&#xff0c;利用堆的数据结构来实现。它的…

数据湖的整体思路

湖本质上是一个集中化&#xff0c;中心化的&#xff0c;一体化的存储技术&#xff0c;并且在其之上追求技术架构的统一化&#xff0c;如流批一体&#xff0c;服务分析一体化。 当数据湖成为中心&#xff0c;那么就可以围湖而建“数据服务环”&#xff0c;环上的服务包括了数仓、…

基于tomcat运行jenkins常见的报错处理

目录 1.jenkins.util.SystemProperties$Listener错误 升级jdk11可能遇到的坑 2.java.lang.RuntimeException: Fontconfig head is null, check your fonts or fonts configuration 3.There were errors checking the update sites: UnknownHostException:updates.jenkins.i…

JAVA Web 学习(五)Nginx、RPC、JWT

十二、反向代理服务器——Nginx 支持热部署&#xff0c;几乎可以做到 7 * 24 小时不间断运行&#xff0c;即使运行几个月也不需要重新启动&#xff0c;还能在不间断服务的情况下对软件版本进行热更新。性能是 Nginx 最重要的考量&#xff0c;其占用内存少、并发能力强、能支持…

初识Solidworks:我的第一份作业的感想

从来没用CAD软件画过机械设计图。但我脑子里有一种概念&#xff0c;无非就是把尺规作图软件化&#xff0c;更方便画图、更方便修改、更方便打印一些。但第一份 Solidworks 作业就颠覆了我的认知&#xff0c;考虑到这个软件的上市时间&#xff0c;让我意识到自己对 CAD 软件的认…

【大厂AI课学习笔记】【1.5 AI技术领域】(8)文本分类

8,9,10&#xff0c;将分别讨论自然语言处理领域的3个重要场景。 自然语言处理&#xff0c;Natual Language Processing&#xff0c;NLP&#xff0c;包括自然语言识别和自然语言生成。 用途是从非结构化的文本数据中&#xff0c;发掘洞见&#xff0c;并访问这些信息&#xff0…

C++——二叉树

引入 map和set特性需要先铺垫二叉搜索树&#xff0c;而二叉搜索树也是一种树形结构 二叉搜索树的特性了解&#xff0c;有助于更好的理解map和set的特性 1.二叉搜索树的概念及优缺点 1.1二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或…

CSP-202109-1-数组推导

CSP-202109-1-数组推导 解题思路 如果 currentValue 与 previousValue 相同&#xff0c;说明这个值不是一个独特的新值&#xff0c;因此只将它加到 sumTotal 上。如果 currentValue 与 previousValue 不相同&#xff0c;说明这是一个新的独特值&#xff0c;因此既将它加到 su…

逐行拆解Guava限流器RateLimiter

逐行拆解Guava限流器RateLimiter 常见限流算法 计数器法 设置一个时间窗口内允许的最大请求量&#xff0c;如果当前窗口请求数超过这个设定数量&#xff0c;则拒绝该窗口内之后的请求。 关键词&#xff1a;时间窗口&#xff0c;计数器。 举个例子&#xff0c;我们设置1秒钟…

面试经典150题 -- 栈(总结)

总的链接 面试经典 150 题 - 学习计划 - 力扣&#xff08;LeetCode&#xff09;全球极客挚爱的技术成长平台 关于栈 -- stack 的学习链接 c的STL中的栈 -- stack-CSDN博客 20 . 有效的括号 这题直接用栈模拟就好了; 这里用一种取巧的方法 , 当遇见左括号&#xff0c;加入右…

【Langchain+Streamlit】旅游聊天机器人

【LangchainStreamlit】打造一个旅游问答AI-CSDN博客 项目线上地址&#xff0c;无需openai秘钥可直接体验&#xff1a;http://101.33.225.241:8502/ github地址&#xff1a;GitHub - jerry1900/langchain_chatbot: langchainstreamlit打造的一个有memory的旅游聊天机器人&…

Linux 命令基础

Shell概述 Linux操作系统的Shell作为操作系统的外壳&#xff0c;为用户提供使用操作系统的接口。它是命令语言、命令解释程序及程序设计语言的统称。 Shell是用户和Linux内核之间的接口程序&#xff0c;如果把硬件想象成一个球体的中心&#xff0c;内核围绕在硬件的外层管理着…

【JS逆向九】逆向某混淆网站源码,模拟 加密,解密,密钥生成

逆向日期&#xff1a;2024.02.09 使用工具&#xff1a;Node.js 是否有混淆&#xff1a;源代码混淆 加密方法&#xff1a;AES标准库 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 可使用AES进行解密处理&#xff08;直接解密即可&#xff0…