高性能消息队列中间件MQ_part2

news2024/11/18 7:32:24

接上一篇part1的内容

RabbitMQ通配符模式_编写消费者

接下来我们编写通配符模式的消费者:

// 站内信消费者
public class Customer_Station {
    public static void main(String[] args)
throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory
= new ConnectionFactory();
      
connectionFactory.setHost("192.168.0.162");
        connectionFactory.setPort(5672);
      
connectionFactory.setUsername("itbaizhan");
     
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
11
12        //2.创建连接
13        Connection conn =
connectionFactory.newConnection();
14    
15        //3.建立信道
16        Channel channel =
conn.createChannel();
17    
18        // 4.监听队列
19      
channel.basicConsume("SEND_STATION3", true,
new DefaultConsumer(channel) {
20            @Override
21            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
22                String message = new
String(body, "utf-8");
23                System.out.println("发送站内
信:"+message);
24           }
25       });
26   }
27 }
// 邮件消费者
30 public class Customer_Mail {
31    public static void main(String[] args)
throws IOException, TimeoutException {
32        // 1.创建连接工厂
33        ConnectionFactory connectionFactory
= new ConnectionFactory();
34      
connectionFactory.setHost("192.168.0.162");
35        connectionFactory.setPort(5672);
36      
connectionFactory.setUsername("itbaizhan");
37      
connectionFactory.setPassword("itbaizhan");
38      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
39
40        //2.创建连接
41        Connection conn =
connectionFactory.newConnection();
42    
43        //3.建立信道
44        Channel channel =
conn.createChannel();
45    
46        // 4.监听队列
47        channel.basicConsume("SEND_MAIL3",true, new DefaultConsumer(channel) {
48            @Override
            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
50                String message = newString(body, "utf-8");
51                System.out.println("发送邮件:"+message);
52           }
53       });
54   }
55
56 }
57
58 // 短信消费者
59 public class Customer_Message {
60    public static void main(String[] args)throws IOException, TimeoutException {
61        // 1.创建连接工厂
62        ConnectionFactory connectionFactory= new ConnectionFactory();
63      connectionFactory.setHost("192.168.0.162");
64        connectionFactory.setPort(5672);
65      connectionFactory.setUsername("itbaizhan");
66      connectionFactory.setPassword("itbaizhan");
67      connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn =connectionFactory.newConnection();
    
        //3.建立信道
        Channel channel =conn.createChannel();
    
        // 4.监听队列
      
channel.basicConsume("SEND_MESSAGE3", true,
new DefaultConsumer(channel) {
            @Override
            public void
handleDelivery(String consumerTag, Envelopeenvelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
                String message = newString(body, "utf-8");
                System.out.println("发送短信:"+message);
           }
       });
   }
}

SpringBoot整合RabbitMQ_项目搭建

之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>

2.编写配置文件

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

SpringBoot整合RabbitMQ_创建对列和交换机

SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:

@Configuration
2 public class RabbitConfig {
3    private final String EXCHANGE_NAME ="boot_topic_exchange";
4    private final String QUEUE_NAME ="boot_queue";
5
6    // 创建交换机
7    @Bean("bootExchange")
8    public Exchange getExchange() {
9        return ExchangeBuilder
10               .topicExchange(EXCHANGE_NAME) // 交换机类型
11               .durable(true) // 是否持久化
12               .build();
   }
14
15    // 创建队列
16    @Bean("bootQueue")
17    public Queue getMessageQueue() {
18        return new Queue(QUEUE_NAME); // 队列名
19   }
20
21    // 交换机绑定队列
22    @Bean
23    public Binding bindMessageQueue(@Qualifier("bootExchange")Exchange exchange, @Qualifier("bootQueue")Queue queue) {
24        return BindingBuilder
25               .bind(queue)
26               .to(exchange)
27               .with("#.message.#")
28               .noargs();
29   }
30 }

SpringBoot整合RabbitMQ_编写生产者

SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息

@SpringBootTest
public class TestProducer {
    // 注入RabbitTemplate工具类
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage(){
        /**
         * 发送消息
         * 参数1:交换机
         *          * 参数2:路由key
         * 参数3:要发送的消息
         */
      
rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
   }
}

运行生产者代码,即可看到消息发送到了RabbitMQ中
在这里插入图片描述


SpringBoot整合RabbitMQ_编写消费者

我们编写另一个SpringBoot项目作为RabbitMQ的消费者

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- rabbitmq起步依赖 -->
<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>

2.编写配置文件

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

3.编写消费者,监听队列

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String
message){
        System.out.println("发送短
信:"+message);
   }
}

4.启动项目,可以看到消费者会消费队列中的消息


消息的可靠性投递_概念

RabbitMQ消息投递的路径为:生产者 —> 交换机 —> 队列 —> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列

@Configuration
public class RabbitConfig {
    private final String
EXCHANGE_NAME="my_topic_exchange";
    private final String
QUEUE_NAME="my_queue";
    // 1.创建交换机
    @Bean("bootExchange")
    public Exchange getExchange(){
        return ExchangeBuilder
10               .topicExchange(EXCHANGE_NAME) // 交换机类型
11               .durable(true) // 是否持久化
12           .build();
13   }
14
15    // 2.创建队列
16    @Bean("bootQueue")
17    public Queue getMessageQueue(){
18        return QueueBuilder
19               .durable(QUEUE_NAME) // 队列持久化
20               .build();
21   }
22
23    // 3.将队列绑定到交换机
24    @Bean
25    public Binding bindMessageQueue(@Qualifier("bootExchange")Exchange exchange, @Qualifier("bootQueue")Queue queue){
26        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
27   }
28 }

消息的可靠性投递_确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下

1.生产者配置文件开启确认模式

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启确认模式
   publisher-confirm-type: correlated

2.生产者定义确认模式的回调方法

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testConfirm(){
8        // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
9      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
10            /**
11             * 被调用的回调方法
12             * @param correlationData 相关配置信息
13             * @param ack 交换机是否成功收到了消息
14             * @param cause 失败原因
15             */
16            @Override
17            public void confirm(CorrelationData correlationData,boolean ack, String cause) {
18                if (ack){
19                  System.out.println("confirm接受成功!");
20               }else{
21                  
System.out.println("confirm接受失败,原因为:"+cause);
22                    // 做一些处理。
23               }
24           }
25       });
26      
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
   }
}


消息的可靠性投递_退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

1 生产者配置文件开启退回模式

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启确认模式
   publisher-confirm-type: correlated
    # 开启回退模式
   publisher-returns: true

2.生产者定义退回模式的回调方法

@SpringBootTest
public class ProducerTest {
    @Autowired
        private RabbitTemplate rabbitTemplate;
5
6    @Test
7    public void testReturn(){
8        // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
9      rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
10            /**
11             * @param returned 失败后将失败信息封装到参数中
12             */
13            @Override
14            public void returnedMessage(ReturnedMessage returned)
{
15                System.out.println("消息对象:"+returned.getMessage());
16                System.out.println("错误码:"+returned.getReplyCode());
17                System.out.println("错误信息:"+returned.getReplyText());
18                System.out.println("交换机:"+returned.getExchange());
19                System.out.println("路由键:"+returned.getRoutingKey());
20                // 处理消息...
21           }
22       });
      rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
24   }
25 }

消息的可靠性投递_Ack

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

1 消费者配置开启手动签收

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启手动签收
   listener:
     simple:
       acknowledge-mode: manual

2.消费者处理消息时定义手动签收和拒绝签收的情况

@Component
public class AckConsumer {
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message
message, Channel channel) throws IOException, InterruptedException {
        // 消息投递序号,消息每次投递该值都会+1
        long deliveryTag =message.getMessageProperties().getDelivery
Tag();
        try {
            int i = 1/0; //模拟处理消息出现bug
            System.out.println("成功接受到消息:"+message);
            // 签收消息
            /**
             * 参数1:消息投递序号
             * 参数2:是否一次可以签收多条消息
             *              */
15          channel.basicAck(deliveryTag,true);
16       }catch (Exception e){
17            System.out.println("消息消费失败!");
18            Thread.sleep(2000);
19            // 拒签消息
20            /**
21             * 参数1:消息投递序号
22             * 参数2:是否一次可以拒签多条消息
23             * 参数3:拒签后消息是否重回队列
24             */
25          channel.basicNack(deliveryTag,true,true);
26       }
27   }
28 }

RabbitMQ高级特性_消费端限流

在这里插入图片描述
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:
1 生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
   }
}

2.消费端配置限流机制

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
       prefetch: 5

3.消费者监听队列

@Component
2 public class QosConsumer{
3    @RabbitListener(queues = "my_queue")
4    public void listenMessage(Message
message, Channel channel) throws IOException, InterruptedException {
5        // 1.获取消息
6        System.out.println(new String(message.getBody()));
7        // 2.模拟业务处理
8        Thread.sleep(3000);
9        // 3.签收消息
10      channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
11   }
12 }

RabbitMQ高级特性_利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

使用方法如下:
1 生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
   }
}

2.消费端配置不公平分发

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理
的快谁拉取下一条消息,实现了不公平分发
       prefetch: 1

3.编写两个消费者

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
{
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收          
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
12   }
13    
14    // 消费者2
15    @RabbitListener(queues = "my_queue")
16    public void listenMessage2(Message message, Channel channel) throws Exception{
17        //1.获取消息
18        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
19        //2. 处理业务逻辑
20        Thread.sleep(3000);// 消费者2处理慢
21        //3. 手动签收
22      channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
23   }
24 }

RabbitMQ高级特性_消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1 在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {
    private final String
EXCHANGE_NAME="my_topic_exchange2";
    private final String
QUEUE_NAME="my_queue2";
    // 1.创建交换机
    @Bean("bootExchange2")
    public Exchange getExchange2(){
        return ExchangeBuilder
               
.topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean("bootQueue2")
    public Queue getMessageQueue2(){
        return QueueBuilder
               .durable(QUEUE_NAME)
               .ttl(10000) //队列的每条消息存活10s
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding
bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,
@Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2.生产者批量生产消息,测试存活时间

@Test
2 public void testSendBatch2() throws InterruptedException {
3    // 发送十条消息
4    for (int i = 0; i < 10; i++) {
5      rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "sendmessage..."+i);
6   }
7 }

设置单条消息存活时间

@Test
public void testSendMessage() {
    //设置消息属性
    MessageProperties messageProperties =
new MessageProperties();
    //设置存活时间
  
messageProperties.setExpiration("10000");
    // 创建消息对象
    Message message = new Message("send
message...".getBytes(StandardCharsets.UTF_8)
, messageProperties);
    // 发送消息
  
rabbitTemplate.convertAndSend("my_topic_exc
hange", "my_routing", message);
}

注意:
1 如果设置了单条消息的存活时间,也设置了队列的存活时
间,以时间短的为准。
2 消息过期后,并不会马上移除消息,只有消息消费到队列顶
端时,才会移除该消息。

@Test
public void testSendMessage2() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // 1.创建消息属性
            MessageProperties
messageProperties = new MessageProperties();
            // 2.设置存活时间
          
messageProperties.setExpiration("10000");
            // 3.创建消息对象
            Message message = new Message(("send message..." +i).getBytes(), messageProperties);
            // 4.发送消息
          
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
       } else {
          
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..." + i);
       }
   }
}

在以上案例中,i=5的消息才有过期时间,10s后消息并没有
马上被移除,但该消息已经不会被消费了,当它到达队列顶
端时会被移除。


RabbitMQ高级特性_优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

优先级队列用法如下:

1.创建队列和交换机

@Configuration
public class RabbitConfig3 {
    private final String
EXCHANGE_NAME="priority_exchange";
    private final String
QUEUE_NAME="priority_queue";
    // 1.创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return ExchangeBuilder
               
.topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
       // 2.创建队列
    @Bean(QUEUE_NAME)
    public Queue priorityQueue(){
        return QueueBuilder
               .durable(QUEUE_NAME)
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
               .maxPriority(10)
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding
bindPriority(@Qualifier(EXCHANGE_NAME)
Exchange exchange, @Qualifier(QUEUE_NAME)
Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2.编写生产者

@Test
public void testPriority() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // i为5时消息的优先级较高
                       MessageProperties
messageProperties = new MessageProperties();
          
messageProperties.setPriority(9);
            Message message = new Message(("send message..." +
i).getBytes(StandardCharsets.UTF_8),
messageProperties);
          
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
       } else {
          rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..."+ i);
       }
   }
}

3.编写消费者

@Component
2 public class PriorityConsumer {
3 @RabbitListener(queues ="priority_queue")
4    public void listenMessage(Message message, Channel channel) throws Exception
{
5        //获取消息
6        System.out.println(new String(message.getBody()));
7        //手动签收
8      channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
9   }
10 }

RabbitMQ死信队列_概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

在这里插入图片描述消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

RabbitMQ死信队列_代码实现

@Configuration
public class RabbitConfig4 {
    private final String DEAD_EXCHANGE =
"dead_exchange";
    private final String DEAD_QUEUE =
"dead_queue";
    private final String NORMAL_EXCHANGE =
"normal_exchange";
    private final String NORMAL_QUEUE =
"normal_queue";
    // 死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
               
.topicExchange(DEAD_EXCHANGE)
               .durable(true)
               .build();
   }
    // 死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
               .durable(DEAD_QUEUE)
               .build();
   }
       // 死信交换机绑定死信队列
    @Bean
    public Binding
bindDeadQueue(@Qualifier(DEAD_EXCHANGE)
Exchange
exchange,@Qualifier(DEAD_QUEUE)Queue queue){
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("dead_routing")
               .noargs();
   }
    // 普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
               
.topicExchange(NORMAL_EXCHANGE)
               .durable(true)
               .build();
   }
    // 普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
               .durable(NORMAL_QUEUE)
               
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
               
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
53               .ttl(10000) // 消息存活10s
54               .maxLength(10) // 队列最大长度10
55               .build();
56   }
57
58    // 普通交换机绑定普通队列
59    @Bean
60    public Binding
bindNormalQueue(@Qualifier(NORMAL_EXCHANGE)
Exchange
exchange,@Qualifier(NORMAL_QUEUE)Queue
queue){
61        return BindingBuilder
62               .bind(queue)
63               .to(exchange)
64               .with("my_routing")
65               .noargs();
66   }
67 }

测试死信队列

1.生产者发送消息

@Test
public void testDlx(){
    // 存活时间过期后变成死信
    //       
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    // 超过队列长度后变成死信
    //       for (int i = 0; i < 20; i++)
{
    //           
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    //       }
    // 消息拒签但不返回原队列后变成死信
  
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}

2.消费者拒收消息

@Component
public class DlxConsumer {
    @RabbitListener(queues ="normal_queue")
    public void listenMessage(Message
message, Channel channel) throws IOException {
        // 拒签消息
      
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
   }
}

RabbitMQ延迟队列_概念

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单
在这里插入图片描述
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
在这里插入图片描述


RabbitMQ延迟队列_死信队列实现

接下来我们使用死信队列实现延迟队列

1 创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。

<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>
<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2.编写配置文件

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  
# 日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

@Configuration
public class RabbitConfig {
    // 订单交换机和队列
    private final String ORDER_EXCHANGE =
"order_exchange";
    private final String ORDER_QUEUE =
"order_queue";
    // 过期订单交换机和队列
    private final String EXPIRE_EXCHANGE =
"expire_exchange";
    private final String EXPIRE_QUEUE =
"expire_queue";
    // 过期订单交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange deadExchange(){
            return ExchangeBuilder
14               
.topicExchange(EXPIRE_EXCHANGE)
15               .durable(true)
16               .build();
17   }
18    // 过期订单队列
19    @Bean(EXPIRE_QUEUE)
20    public Queue deadQueue(){
21        return QueueBuilder
22               .durable(EXPIRE_QUEUE)
23               .build();
24   }
25    // 将过期订单队列绑定到交换机
26    @Bean
27    public Binding
bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE)
Exchange exchange,@Qualifier(EXPIRE_QUEUE)
Queue queue){
28        return BindingBuilder
29               .bind(queue)
30               .to(exchange)
31               .with("expire_routing")
32               .noargs();
33   }
34
35    // 订单交换机
36    @Bean(ORDER_EXCHANGE)
37    public Exchange normalExchange(){
38        return ExchangeBuilder
39               
.topicExchange(ORDER_EXCHANGE)
              .durable(true)
               .build();
   }
    // 订单队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
               .durable(ORDER_QUEUE)
               .ttl(10000) // 存活时间为10s,模拟30min
               
.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
               
.deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字
               .build();
   }
    // 将订单队列绑定到交换机
    @Bean
    public Binding
bindNormalQueue(@Qualifier(ORDER_EXCHANGE)
Exchange exchange,@Qualifier(ORDER_QUEUE)
Queue queue){
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("order_routing")
               .noargs();
   }
}

4.编写下单的控制器方法,下单后向订单交换机发送消息

@RestController
public class OrderController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //下单
    @GetMapping("/place/{orderId}")
    public String placeOrder(@PathVariable
String orderId){
        System.out.println("处理订单数据...");
        // 将订单id发送到订单队列
      
rabbitTemplate.convertAndSend("order_exch
ange", "order_routing", orderId);
        return "下单成功,修改库存";
   }
}

5.编写监听死信队列的消费者

// 过期订单消费者
@Component
public class ExpireOrderConsumer {
    // 监听队列
    @RabbitListener(queues ="expire_queue")
    public void listenMessage(String
orderId){
        System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
   }
}

RabbitMQ延迟队列_插件实现

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
在这里插入图片描述
RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
在这里插入图片描述

安装延迟队列插件

1 使用rz将插件上传至虚拟机
2 安装插件

# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-
3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable
rabbitmq_delayed_message_exchange

3.重启RabbitMQ服务

#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached

此时登录管控台可以看到交换机类型多了延迟消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/195259.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot引入flink,maven打包插件需替换

目录说明说明 springboot引入flink后&#xff0c;如果要打包&#xff0c;传统的maven不行&#xff0c;要更换指定插件 <build><finalName>flink</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><art…

CMake 混编c和c++代码

准备工作 wsl 或者 有linux 系统(购买阿里云或者其他云服务器&#xff09;cmake, gcc, git 等一些必要的软件安装 环境 windows 下 的 wsl wsl 安装下载 例子 拿 Unix网络编程 举例, 作者对原生接口进行了封装, 我们需要编译使用在自己的工程 1. 创建空文件 cd E:\githu…

网络流量监控对OA系统性能分析案例

需求简介 某外高桥公司的OA系统是其重要的业务系统&#xff0c;OA系统负责人表示&#xff0c;部分用户反馈&#xff0c;访问OA系统时比较慢。需要通过分析系统看一下实际情况。 报告内容 本报告内容主要为&#xff1a;OA性能整体分析 分析时间 报告分析时间范围为&#xf…

同一条好友邀请信息给大量的人发,会导致领英账号被封吗?

做外贸的领英新人经常有一个问题&#xff1a;领英上添加好友时&#xff0c;同一条好友邀请信息给大量的人发&#xff0c;会导致领英账号被封吗&#xff1f; 这是一个被一部分人所忽略&#xff0c;也在被一部分人所担心的问题&#xff0c;因为很多领英新手都是在复制粘贴发送相…

游戏开发者的视觉盲区

本文首发于微信公众号&#xff1a; 小蚂蚁教你做游戏。欢迎关注领取更多学习做游戏的原创教程资料&#xff0c;每天学点儿游戏开发知识。嗨&#xff01;大家好&#xff0c;我是小蚂蚁。前天我刚发布了一个新的游戏作品——经典宝石方块。仍然是掌机模式&#xff0c;仍然是简约风…

JAVA开发(Web应用境外访问慢问题)

背景&#xff1a; 最近公司做的小程序出现在香港地区访问慢的问题。因为我们的应用是部署在大陆的腾讯服务器&#xff08;北京&#xff09;上&#xff0c;所以在香港地区访问大陆应用会比较慢。初步体验是4-5秒的响应速度。 影响的原因&#xff1a; 1、网络的原因&#xff0…

【HBase高级】7. HBase调优、常见问题处理

HBase调优 6.1 通用优化 NameNode的元数据备份使用SSD 定时备份NameNode上的元数据 每小时或者每天备份&#xff0c;如果数据极其重要&#xff0c;可以5~10分钟备份一次。备份可以通过定时任务复制元数据目录即可。 为NameNode指定多个元数据目录 使用dfs.name.dir或者dfs…

4.5--贪心--单源最短路径问题

设置顶点集合S并不断地作贪心选择--&#xff08;不属于这个集合S中距离"源"最短的顶点&#xff09;来扩充这个集合--更新最短距离 这张图需要放在最前面&#xff0c;就是经典dijkstra的主要思想。 为什么这样贪心是对的&#xff1f; 1、问题描述 给定带权有向图G (…

C++ 入门

C是在C的基础之上&#xff0c;容纳进去了面向对象编程思想&#xff0c;并增加了许多有用的库&#xff0c;以及编程范式等 文章目录一、命名空间二、输入输出三、缺省参数四、函数重载五、引用1. 引用的用法2. 常引用3. 引用的使用场景4. 引用的底层实现六、内联函数七、auto 关…

数组

循环队列中元素个数计算方法是固定的&#xff0c;即(尾-头)%长度&#xff0c;但是由于是循环队列所以尾可能会小于头&#xff0c;所以要加上长度&#xff0c;使尾-头保持是正整数&#xff0c;然后再对长度求余&#xff0c;即元素个数。循环队列中&#xff1a;头指针指向队列头元…

基于Java+Spring+Html的图书借阅管理系统详细设计和实现

博主介绍&#xff1a;✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

seo:百度统计

一、百度统计官网https://tongji.baidu.com/web5/welcome/login二、理解百度统计个人理解&#xff0c;添加这段代码到网站首页&#xff0c;有人访问该网站&#xff0c;即会加载这段代码&#xff0c;接着把信息发送到百度统计id 对应的百度统计账号&#xff0c;可从百度统计查看…

OpenMMLab AI实战课笔记 -- 第2节课

OpenMMLab AI实战课笔记 -- 第2节课1. 第二节课(图像分类)1.1 深度学习模型1.2 网络进化过程1.3 ResNet &#xff08;残差网络&#xff09;1.4 卷积的参数量1.5 卷积的计算量&#xff08;乘加次数&#xff09;1.6 降低模型参数量和计算量的方法1.7 可分离卷积1.8 注意力机制 At…

常见正则表达式使用参考

目录 一、正则函数 1.REGEXP 2.regexp_replace 3.regexp_extract 二、正则表达式 三、特殊字符转义 一、正则函数 1.REGEXP 语法格式&#xff1a; A REGEXP B &#xff08;A是需要匹配的字符串&#xff0c;B是正则表达式字符串&#xff09; 操作类型: strings 描述: …

UniTask详解

前言 UniTask为Unity提供一个高性能&#xff0c;0GC的async/await异步方案。 基于值类型的UniTask和自定义的 AsyncMethodBuilder 来实现0GC使所有 Unity 的 AsyncOperations 和 Coroutines 可等待基于 PlayerLoop 的任务( UniTask.Yield, UniTask.Delay, UniTask.DelayFrame…

哈工大机器学习复习笔记(四)

本篇文章是在参考西瓜书、PPT课件、网络上相关博客等资料的基础上整理出的机器学习复习笔记&#xff0c;希望能给大家的机器学习复习提供帮助。这篇笔记只是复习的一个参考&#xff0c;大家一定要结合书本、PPT来进行复习&#xff0c;有些公式的推导最好能够自己演算一遍。由于…

Scala 简单实现数据库连接池

在使用JDBC的时候&#xff0c;连接池是非常宝贵的资源。为了复用这些资源&#xff0c;可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接&#xff0c;则可以在一定时间内等待&#xff0c;直到队列中有可用的连接&#xff0c;否则将抛出…

浅谈估值模型:PB指标与剩余收益估值

摘要及声明 1&#xff1a;本文简单介绍PB指标的推导以及剩余收益的估值方式&#xff1b; 2&#xff1a;本文主要为理念的讲解&#xff0c;模型也是笔者自建&#xff0c;文中假设与观点是基于笔者对模型及数据的一孔之见&#xff0c;若有不同见解欢迎随时留言交流&#xff1b…

【HTML】HTML 标签 ① ( 骨架标签 | 双标签和单标签 | 嵌套关系和并列关系 | 文档类型 | 页面语言 | 编码字符集 )

文章目录一、HTML 标签简介二、HTML 骨架标签三、双标签和单标签四、嵌套关系和并列关系五、文档类型六、页面语言七、编码字符集一、HTML 标签简介 HTML 英文全称 " HyperText Mark-up Language " , 中文名称是 " 超文本标记语言 " ; 多媒体 : 超文本 指…

小 C 爱观察(observe)

小 C 爱观察&#xff08;observe&#xff09;题目描述输入格式输出格式样例输入数据#1输出数据#1解释#1输入数据#2输出数据#2输入数据#3输出数据#3题目描述 小 C 非常喜欢树。上次后院的蚂蚁看腻了&#xff0c;这次准备来观察树。 小 C 每天起得早早的&#xff0c;给小树浇水…