【消息队列】六万字长文详细带你RabbitMQ从入门到精通

news2025/1/13 13:08:22

目录

  • 1、基础篇
    • 1.1 为什么要用消息队列MQ
    • 1.2 什么是消息队列?
    • 1.3 RabbitMQ体系结构介绍
    • 1.4 RabbitMQ安装
    • 1.5 Hello World
      • 1.5.1 目标
      • 1.5.2 具体操作
    • 1.6 RabbitMQ用法
      • 1.6.1 Work Queues
      • 1.6.2 Publish/Subscribe
      • 1.6.3 Routing
      • 1.6.4 Topics
      • 1.6.5 工作模式小结
  • 2. 进阶篇
    • 2.1 RabbitMQ整合SpringBoot:消费者
      • 2.1.1 配置pom
      • 2.1.2 YAML
      • 2.1.3 主启动类
      • 2.1.4 监听器
      • 2.1.5 @RabbitListener注解属性对比
    • 2.2 RabbitMQ整合SpringBoot:生产者
      • 2.2.1 配置POM
      • 2.2.2 YAML
      • 2.2.3 主启动类
      • 2.2.4 测试程序
    • 2.3 可靠性
      • 2.3.1 生产者端消息确认机制
      • 2.3.2 备份交换机
      • 2.3.3 持久化
      • 2.3.4 消费端消息确认
    • 2.4 消费端限流
      • 2.4.1 思路
      • 2.4.2 生产者端代码
      • 2.4.3 消费者端代码
      • 2.4.4 测试
    • 2.5 消息超时
      • 2.5.1 队列层面设置
    • 2.6 死信和死信队列
      • 2.6.1 测试相关准备
      • 2.6.2 消费端拒收消息
      • 2.6.3 消息数量超过队列容纳极限
      • 2.6.4 消息超时未消费
    • 2.7 延迟队列
      • 2.7.1 插件简介
      • 2.7.2 插件安装
      • 2.7.3 创建交换机
      • 2.7.4 代码测试
    • 2.8 事务消息
      • 2.8.1 测试代码
      • 2.8.2 执行测试
    • 2.8 惰性队列
      • 2.8.1 创建惰性队列
      • 2.8.2 实操演练
      • 2.8.3 测试
    • 2.9 优先级队列
      • 2.9.1 创建相关资源
      • 2.9.2 生产者发送消息
      • 2.9.3 消费端接收消息
  • 总结

1、基础篇

1.1 为什么要用消息队列MQ

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下单功能【同步

  • 问题1:功能耦合度高
  • 问题2:响应时间长
  • 问题3:并发压力传递
  • 问题4:系统结构弹性不足

下单功能【异步

  • 功能解耦
  • 快速响应
  • 削峰限流
  • 便于扩展,符合开闭原则(对修改关闭,对扩展开放)

注意:
并不是把所有交互方式都改成异步

  • 强关联调用还是通过OpenFeign进行
    同步调用
  • 弱关联、可独立拆分出来的功能使用
    消息队列进行异步调用

1.2 什么是消息队列?

在这里插入图片描述
消息队列底层实现的两大主流方式

  • 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要
  • 目前主流的消息队列通信协议标准包括:
    • AMQP(Advanced Message Queuing Protocol):通用协议,IBM公司研发
    • JMS(Java Message Service):专门为Java语言服务,SUN公司研发,一组由Java接口组成
      的Java标准

在这里插入图片描述
在这里插入图片描述

1.3 RabbitMQ体系结构介绍

  • 重要:对体系结构的理解直接关系到后续的操作和使用

在这里插入图片描述

1.4 RabbitMQ安装

# 拉取镜像
docker pull rabbitmq:3.13-management

# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management

输入:ip+15672

在这里插入图片描述
在这里插入图片描述

1.5 Hello World

1.5.1 目标

生产者发送消息,消费者接收消息,用最简单的方式实现

官网说明参见下面超链接:

RabbitMQ tutorial - “Hello World!” — RabbitMQ

在这里插入图片描述

1.5.2 具体操作

  1. 添加依赖
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.20.0</version>
    </dependency>
</dependencies>
  1. 发送消息

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

public class Producer {  

    public static void main(String[] args) throws Exception {  

        // 创建连接工厂  
        ConnectionFactory connectionFactory = new ConnectionFactory();  

        // 设置主机地址  
        connectionFactory.setHost("192.168.200.100");  

        // 设置连接端口号:默认为 5672
        connectionFactory.setPort(5672);

        // 虚拟主机名称:默认为 /
        connectionFactory.setVirtualHost("/");

        // 设置连接用户名;默认为guest  
        connectionFactory.setUsername("guest");

        // 设置连接密码;默认为guest  
        connectionFactory.setPassword("123456");

        // 创建连接  
        Connection connection = connectionFactory.newConnection();  

        // 创建频道  
        Channel channel = connection.createChannel();  

        // 声明(创建)队列  
        // queue      参数1:队列名称  
        // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  
        // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  
        // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  
        // arguments  参数5:队列其它参数  
        channel.queueDeclare("simple_queue", true, false, false, null);  

        // 要发送的信息  
        String message = "你好;小兔子!";  

        // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  
        // 参数2:路由key,简单模式可以传递队列名称  
        // 参数3:配置信息  
        // 参数4:消息内容  
        channel.basicPublish("", "simple_queue", null, message.getBytes());  

        System.out.println("已发送消息:" + message);  

        // 关闭资源  
        channel.close();  
        connection.close();  

    }  

}
  1. 接收消息

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

import com.rabbitmq.client.*;  

import java.io.IOException;  

public class Consumer {  

    public static void main(String[] args) throws Exception {  

        // 1.创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  

        // 2. 设置参数  
        factory.setHost("192.168.200.100");  
        factory.setPort(5672);  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");
        factory.setPassword("123456");  

        // 3. 创建连接 Connection        
        Connection connection = factory.newConnection();  

        // 4. 创建Channel  
        Channel channel = connection.createChannel();  

        // 5. 创建队列  
        // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  
        // 参数1. queue:队列名称  
        // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  
        // 参数3. exclusive:是否独占。  
        // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  
        // 参数5. arguments:其它参数。  
        channel.queueDeclare("simple_queue",true,false,false,null);  

        // 接收消息  
        DefaultConsumer consumer = new DefaultConsumer(channel){  

            // 回调方法,当收到消息后,会自动执行该方法  
            // 参数1. consumerTag:标识  
            // 参数2. envelope:获取一些信息,交换机,路由key...  
            // 参数3. properties:配置信息  
            // 参数4. body:数据  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

                System.out.println("consumerTag:"+consumerTag);  
                System.out.println("Exchange:"+envelope.getExchange());  
                System.out.println("RoutingKey:"+envelope.getRoutingKey());  
                System.out.println("properties:"+properties);  
                System.out.println("body:"+new String(body));  

            }  

        };  

        // 参数1. queue:队列名称  
        // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  
        // 参数3. callback:回调对象  
        // 消费者类似一个监听程序,主要是用来监听消息  
        channel.basicConsume("simple_queue",true,consumer);  

    }  

}
  1. 控制台打印
    在这里插入图片描述

1.6 RabbitMQ用法

  • RabbitMQ官网,通过教程的形式,给我们列举了7种RabbitMQ用法
  • 网址:https://www.rabbitmq.com/getstarted.html
    在这里插入图片描述

1.6.1 Work Queues

本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:

  • 生产者只有一个
  • 发送一个消息
  • 消费者也只有一个,消息也只能被这个消费者消费
    所以HelloWorld也称为简单模式。
    现在我们还原一下常规情况:
  • 生产者发送多个消息
  • 由多个消费者来竞争
  • 谁抢到算谁的
  • 在这里插入图片描述

结论:

  • 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
  • Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务
    可以提高消息处理的效率。
  1. 封装工具类
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

public class ConnectionUtil {  

    public static final String HOST_ADDRESS = "192.168.200.100";  

    public static Connection getConnection() throws Exception {  

        // 定义连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  

        // 设置服务地址  
        factory.setHost(HOST_ADDRESS);  

        // 端口  
        factory.setPort(5672);  

        //设置账号信息,用户名、密码、vhost  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");  
        factory.setPassword("123456");  

        // 通过工程获取连接  
        Connection connection = factory.newConnection();  

        return connection;  
    }  



    public static void main(String[] args) throws Exception {  

        Connection con = ConnectionUtil.getConnection();  

        // amqp://guest@192.168.200.100:5672/  
        System.out.println(con);  

        con.close();  

    }  

}
  1. 编写生产者代码
import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  

public class Producer {  

    public static final String QUEUE_NAME = "work_queue";  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  

        for (int i = 1; i <= 10; i++) {  

            String body = i+"hello rabbitmq~~~";  

            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  

        }  

        channel.close();  

        connection.close();  

    }  

}
  1. 编写消费者代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。

import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  

import java.io.IOException;  

public class Consumer1 {  

    static final String QUEUE_NAME = "work_queue";  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  

        Consumer consumer = new DefaultConsumer(channel){  

            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

                System.out.println("Consumer1 body:"+new String(body));  

            }  

        };  

        channel.basicConsume(QUEUE_NAME,true,consumer);  

    }  

}

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。

如果已经运行过生产者程序,则手动把work_queue队列删掉。

  1. 运行效果

最终两个消费端程序竞争结果如下:

在这里插入图片描述
在这里插入图片描述
生产者生产了10条消息,两个消费者之间存在竞争关系

1.6.2 Publish/Subscribe

引入新角色:交换机

生产者不是把消息直接发送到队列,而是发送到交换机

  • 交换机接收消息,而如何处理消息取决于交换机的类型
  • 交换机有如下3种常见类型
    • Fanout:广播,将消息发送给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
  • 注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因
    此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那
    么消息会丢失

在这里插入图片描述

  • 组件之间关系:
    • 生产者把消息发送到交换机
    • 队列直接和交换机绑定
  • 工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列
  • 理解概念:
    • Publish:发布,这里就是把消息发送到交换机上
    • Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系
  1. 生产者代码
import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

public class Producer {  

    public static void main(String[] args) throws Exception {  

      // 1、获取连接  
        Connection connection = ConnectionUtil.getConnection();  

      // 2、创建频道  
        Channel channel = connection.createChannel();  

        // 参数1. exchange:交换机名称  
        // 参数2. type:交换机类型  
        //     DIRECT("direct"):定向  
        //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  
        //     TOPIC("topic"):通配符的方式  
        //     HEADERS("headers"):参数匹配  
        // 参数3. durable:是否持久化  
        // 参数4. autoDelete:自动删除  
        // 参数5. internal:内部使用。一般false  
        // 参数6. arguments:其它参数  
        String exchangeName = "test_fanout";  

        // 3、创建交换机  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  

        // 4、创建队列  
        String queue1Name = "test_fanout_queue1";  
        String queue2Name = "test_fanout_queue2";  

        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  

        // 5、绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //     如果交换机的类型为fanout,routingKey设置为""  
        channel.queueBind(queue1Name,exchangeName,"");  
        channel.queueBind(queue2Name,exchangeName,"");  

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";  

        // 6、发送消息  
        channel.basicPublish(exchangeName,"",null,body.getBytes());  

        // 7、释放资源  
        channel.close();  
        connection.close();  

    }  

}
  1. 消费者代码

消费者1号

import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  

public class Consumer1 {  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        String queue1Name = "test_fanout_queue1";  

        channel.queueDeclare(queue1Name,true,false,false,null);  

        Consumer consumer = new DefaultConsumer(channel){  

            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

                System.out.println("body:"+new String(body));  
                System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  

            }  

        };  

        channel.basicConsume(queue1Name,true,consumer);  

    }  

}

消费者2号

import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  

public class Consumer2 {  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        String queue2Name = "test_fanout_queue2";  

        channel.queueDeclare(queue2Name,true,false,false,null);  

        Consumer consumer = new DefaultConsumer(channel){  

            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

                System.out.println("body:"+new String(body));  
                System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  

            }  

        };  

        channel.basicConsume(queue2Name,true,consumer);  

    }  

}
  1. 运行效果

在这里插入图片描述

在这里插入图片描述
4. 小结

交换机和队列的绑定关系如下图所示:

在这里插入图片描述
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机
  • 发布订阅模式绑定指定交换机
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

1.6.3 Routing

  • 通过『路由绑定』的方式,把交换机和队列关联起来
  • 交换机和队列通过路由键进行绑定
  • 生产者发送消息时不仅要指定交换机,还要指定路由键
  • 交换机接收到消息会发送到路由键绑定的队列
  • 在编码上与 Publish/Subscribe发布与订阅模式的区别:
    • 交换机的类型为:Direct
    • 队列绑定交换机的时候需要指定routing key。

在这里插入图片描述

  1. 生产者代码
import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

public class Producer {  

    public static void main(String[] args) throws Exception {  

      Connection connection = ConnectionUtil.getConnection();  

      Channel channel = connection.createChannel();  

      String exchangeName = "test_direct";  

      // 创建交换机  
      channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  

      // 创建队列  
      String queue1Name = "test_direct_queue1";  
      String queue2Name = "test_direct_queue2";  

      // 声明(创建)队列  
      channel.queueDeclare(queue1Name,true,false,false,null);  
      channel.queueDeclare(queue2Name,true,false,false,null);  

      // 队列绑定交换机  
      // 队列1绑定error  
      channel.queueBind(queue1Name,exchangeName,"error");  

      // 队列2绑定info error warning  
      channel.queueBind(queue2Name,exchangeName,"info");  
      channel.queueBind(queue2Name,exchangeName,"error");  
      channel.queueBind(queue2Name,exchangeName,"warning");  

        String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";  

        // 发送消息  
        channel.basicPublish(exchangeName,"warning",null,message.getBytes());  
        System.out.println(message);  

      // 释放资源  
        channel.close();  
        connection.close();  

    }  

}
  1. 消费者代码

消费者1

import com.hanson.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author hanson
 * @date 2024/6/21 13:45
 */
public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer1 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue1Name,true,consumer);

    }

}

消费者2

import com.hanson.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author hanson
 * @date 2024/6/21 13:46
 */
public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue2Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer2 将日志信息存储到数据库.....");

            }

        };

        channel.basicConsume(queue2Name,true,consumer);

    }

}
  1. 运行结果

绑定关系

在这里插入图片描述
消费消息

在这里插入图片描述
在这里插入图片描述
生产者的routing key是warning,所以c2收到了消息,c1没有收到消息。

我将Producer中的routing key设置为error

在这里插入图片描述
重新运行Producer程序

在这里插入图片描述
在这里插入图片描述
两个消费者都收到了消息

1.6.4 Topics

  • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队
    列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用
    通配符
  • Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,
    例如:item.insert
  • 通配符规则:
    • #:匹配零个或多个词
    • *:匹配一个词

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  1. 生产者代码
import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

public class Producer {  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        String exchangeName = "test_topic";  

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);  

        String queue1Name = "test_topic_queue1";  
        String queue2Name = "test_topic_queue2";  

        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  

        // 绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //      如果交换机的类型为fanout ,routingKey设置为""  
        // routing key 常用格式:系统的名称.日志的级别。  
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  
        channel.queueBind(queue1Name,exchangeName,"#.error");  
        channel.queueBind(queue1Name,exchangeName,"order.*");  
        channel.queueBind(queue2Name,exchangeName,"*.*");  

        // 分别发送消息到队列:order.info、goods.info、goods.error  
        String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";  
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());  

        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";  
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());  

        body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";  
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());  

        channel.close();  
        connection.close();  

    }  

}
  1. 消费者代码

消费者1监听队列1:

import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  

public class Consumer1 {  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        String QUEUE_NAME = "test_topic_queue1";  

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  

        Consumer consumer = new DefaultConsumer(channel){  

            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

                System.out.println("body:"+new String(body));  

            }  

        };  

        channel.basicConsume(QUEUE_NAME,true,consumer);  

    }  

}

消费者2监听队列2:

import com.hanson.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  

public class Consumer2 {  

    public static void main(String[] args) throws Exception {  

        Connection connection = ConnectionUtil.getConnection();  

        Channel channel = connection.createChannel();  

        String QUEUE_NAME = "test_topic_queue2";  

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  

        Consumer consumer = new DefaultConsumer(channel){  

            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

                System.out.println("body:"+new String(body));  

            }  

        };  

        channel.basicConsume(QUEUE_NAME,true,consumer);  

    }  

}
  1. 运行效果

队列1:

在这里插入图片描述
队列2:
在这里插入图片描述

1.6.5 工作模式小结

  • 直接发送到队列:底层使用了默认交换机
  • 经过交换机发送到队列
    • Fanout:没有Routing key直接绑定队列
    • Direct:通过Routing key绑定队列,消息发送到绑定的队列上
      • 一个交换机绑定一个队列:定点发送
      • 一个交换机绑定多个队列:广播发送
    • Topic:针对Routing key使用通配符

2. 进阶篇

2.1 RabbitMQ整合SpringBoot:消费者

  • 搭建环境
  • 基础设定:交换机名称、队列名称、绑定关系
  • 发送消息:使用RabbitTemplate
  • 接收消息:使用@RabbitListener注解

2.1.1 配置pom

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>3.1.5</version>
</parent>

<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
   </dependency>
</dependencies>

2.1.2 YAML

增加日志打印的配置:

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
logging:
  level:
    com.hanson.mq.listener.MyMessageListener: info

2.1.3 主启动类

仿照生产者工程的主启动类,改一下类名即可

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Module03SpringbootConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(Module03SpringbootConsumerApplication.class, args);
    }

}

2.1.4 监听器

import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyMessageListener {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";  
    public static final String QUEUE_NAME  = "queue.order";  

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT),
            key = {ROUTING_KEY}
    ))
    public void processMessage(String dateString,
                               Message message,
                               Channel channel) {
        log.info(dateString);
    }

}

2.1.5 @RabbitListener注解属性对比

①bindings属性

  • 表面作用:
    • 指定交换机和队列之间的绑定关系
    • 指定当前方法要监听的队列
  • 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

②queues属性

@RabbitListener(queues = {QUEUE_ATGUIGU})
  • 作用:指定当前方法要监听的队列
  • 注意:此时框架不会创建相关交换机和队列,必须提前创建好

2.2 RabbitMQ整合SpringBoot:生产者

2.2.1 配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2.2.2 YAML

spring: 
  rabbitmq: 
    host: 192.168.200.100
    port: 5672 
    username: guest 
    password: 123456 
    virtual-host: /

2.2.3 主启动类

import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  

@SpringBootApplication
public class RabbitMQProducerMainType {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);  
    }

}

2.2.4 测试程序


@SpringBootTest
class Module04SpringbootProducerApplicationTests {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test01SendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello Rabbit!SpringBoot!");
        
    }
}

结果:
在这里插入图片描述

2.3 可靠性

正常下单流程
在这里插入图片描述

提出问题:

问题一:消息没有发送到消息队列上
后果:消费者拿不到消息,业务功能缺失,数据错误
在这里插入图片描述
问题二:消息成功存入消息队列,但是消息队列服务器宕机了
原本保存在内存中的消息丢失
即使服务器重新启动,消息也找不回来了
后果:消费者拿不到消息,业务功能缺失,数据错误
在这里插入图片描述
问题三:
消息成功存入消息队列,但是消费端出现问题,例如:宕机、抛异常等等
后果:业务功能缺失,数据错误
在这里插入图片描述

解决思路:

  • 故障情况1:消息没有发送到消息队列
    • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
    • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
  • 故障情况2:消息队列服务器宕机导致内存中消息丢失
    • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
  • 故障情况3:消费端宕机或抛异常导致消息没有成功被消费
    • 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
    • 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)

2.3.1 生产者端消息确认机制

  1. 配置POM
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>
  1. 主启动类

没有特殊设定:

import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  

@SpringBootApplication
public class RabbitMQProducerMainType {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);  
    }

}
  1. YAML

注意publisher-confirm-typepublisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认
logging:
  level:
    com.hanson.mq.config.MQProducerAckConfig: info
  1. 创建配置类

1、目标

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法所需对象类型
setConfirmCallback()ConfirmCallback接口类型
setReturnCallback()ReturnCallback接口类型

2、API说明

①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

    /**
     * A callback for publisher confirmations.
     *
     */
    @FunctionalInterface
    public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

    }

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机
  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

    /**
     * A callback for returned messages.
     *
     * @since 2.3
     */
    @FunctionalInterface
    public interface ReturnsCallback {

        /**
         * Returned message callback.
         * @param returned the returned message and metadata.
         */
        void returnedMessage(ReturnedMessage returned);

    }

注意:接口中的returnedMessage()方法在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称

3、配置类代码

①要点1

加@Component注解,加入IOC容器

②要点2

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。

操作封装到了一个专门的void init()方法中。

为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。

关于@PostConstruct注解大家可以参照以下说明:

@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。

使用@PostConstruct注解的方法必须满足以下条件:

  1. 方法不能有任何参数
  2. 方法必须是非静态的
  3. 方法不能返回任何值

当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。

③代码

有了以上说明,下面我们就可以展示配置类的整体代码:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @author hanson
 * @date 2024/6/21 17:42
 */
@Slf4j
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate() {
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息发送到交换机成功或者失败时调用这个方法
        log.info("confirm()回调函数打印 CorrelationData:" + correlationData);
        log.info("confirm()回调函数打印 ack:" + ack);
        log.info("confirm()回调函数打印 cause:" + cause);
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        // 发送到队列失败才调用这个方法
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
    }
}

4、发送消息

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Module05ConfirmProducerApplicationTests {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test01SendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT+ "~", ROUTING_KEY, "Message Test Confirm~~~");
    }

}

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确
  • 交换机正确、路由键不正确,无法发送到队列
  • 交换机不正确,无法发送到交换机

2.3.2 备份交换机

在这里插入图片描述

一、创建备份交换机

1、创建备份交换机

注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键

在这里插入图片描述

在这里插入图片描述

2、创建备份交换机要绑定的队列

①创建队列

在这里插入图片描述
②绑定交换机

注意:这里是要和备份交换机绑定

在这里插入图片描述
3、针对备份队列创建消费端监听器

 public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
    public static final String QUEUE_NAME_BACKUP  = "queue.order.backup";

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
            key = {""}
    ))
    public void processMessageBackup(String dateString,
                                     Message message,
                                     Channel channel) {
        log.info("BackUp: " + dateString);
    }

二、设定备份关系

1、原交换机删除

在这里插入图片描述
2、重新创建原交换机

在这里插入图片描述
3、原交换机重新绑定原队列

在这里插入图片描述
三、测试

  • 启动消费者端
  • 发送消息,但是路由键不对,于是转入备份交换机

2.3.3 持久化

一、测试非持久化交换机和队列

1、创建非持久化交换机

在这里插入图片描述
创建之后,可以在列表中看到:

在这里插入图片描述

2、创建非持久化队列

在这里插入图片描述
创建之后,可以在列表中看到:

在这里插入图片描述

3、绑定

在这里插入图片描述
4、发送消息

    public static final String EXCHANGE_TRANSIENT = "exchange.transient.user";
    public static final String ROUTING_KEY_TRANSIENT = "user";

    @Test
    public void testSendMessageTransient() {
        rabbitTemplate.convertAndSend(
                EXCHANGE_TRANSIENT,
                ROUTING_KEY_TRANSIENT,
                "Hello atguigu user~~~");
    }

5、查看已发送消息

在这里插入图片描述
结论:临时性的交换机和队列也能够接收消息,但如果RabbitMQ服务器重启之后会怎么样呢?

6、重启RabbitMQ服务器

docker restart rabbitmq

重启之后,刚才临时性的交换机和队列都没了。在交换机和队列这二者中,队列是消息存储的容器,队列没了,消息就也跟着没了。

二、持久化的交换机和队列

我们其实不必专门创建持久化的交换机和队列,因为它们默认就是持久化的。接下来我们只需要确认一下:存放到队列中,尚未被消费端取走的消息,是否会随着RabbitMQ服务器重启而丢失?

1、发送消息

运行以前的发送消息方法即可,不过要关掉消费端程序

2、在管理界面查看消息

在这里插入图片描述
3、重启RabbitMQ服务器

docker restart rabbitmq

4、再次查看消息

仍然还在:

在这里插入图片描述
三、结论

在后台管理界面创建交换机和队列时,默认就是持久化的模式。

此时消息也是持久化的,不需要额外设置。

2.3.4 消费端消息确认

deliveryTag:交付标签机制

在这里插入图片描述
消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。

提问:如果交换机是Fanout模式,同一个消息广播到了不同队列,deliveryTag会重复吗?

答:不会,deliveryTag在Broker范围内唯一

在这里插入图片描述

multiple参数:

在这里插入图片描述
在这里插入图片描述

一、ACK

ACK是acknowledge的缩写,表示已确认

二、默认情况

默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息

但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了

所以还是要修改成手动确认

三、创建消费端module

1、配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、YAML

增加针对监听器的设置:

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认

3、主启动类

没有特殊设定:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQConsumerMainType {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQConsumerMainType.class, args);
    }

}

四、消费端监听器

1、创建监听器类

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyMessageListener {
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME = "queue.order";

    @RabbitListener(queues = {QUEUE_NAME})
    public void processMessage(String dataString, Message message, Channel channel) {
        // 核心操作
        log.info("消费端 消息内容:" + dataString);
    }
}

2、在接收消息的方法上应用注解

// 修饰监听方法
@RabbitListener(
        // 设置绑定关系
        bindings = @QueueBinding(

            // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
            value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),

            // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
            exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),

            // 配置路由键信息
            key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {

}

3、接收消息方法内部逻辑

  • 业务处理成功:手动返回ACK信息,表示消息成功消费
  • 业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:
    • 把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍
    • 不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止

4、相关API

下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口

①basicAck()方法

  • 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息

②basicNack()方法

  • 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列

③basicReject()方法

  • 方法功能:根据指定的deliveryTag,对该消息表示拒绝
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列
  • basicNack()和basicReject()有啥区别?
    • basicNack()有批量操作
    • basicReject()没有批量操作

5、完整代码示例

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class MyMessageListener {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME  = "queue.order";

    // 修饰监听方法
    @RabbitListener(
            // 设置绑定关系
            bindings = @QueueBinding(

                // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
                value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),

                // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
                exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),

                // 配置路由键信息
                key = {ROUTING_KEY}
    ))
    public void processMessage(String dataString, Message message, Channel channel) throws IOException {

        // 1、获取当前消息的 deliveryTag 值备用
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            // 2、正常业务操作
            log.info("消费端接收到消息内容:" + dataString);

            // System.out.println(10 / 0);

            // 3、给 RabbitMQ 服务器返回 ACK 确认信息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {

            // 4、获取信息,看当前消息是否曾经被投递过
            Boolean redelivered = message.getMessageProperties().getRedelivered();

            if (!redelivered) {
                // 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
                channel.basicNack(deliveryTag, false, true);
            } else {
                // 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
                channel.basicReject(deliveryTag, false);
            }

        }
    }

}

五、要点总结

  • 要点1:把消息确认模式改为手动确认
  • 要点2:调用Channel对象的方法返回信息
    • ACK:Acknowledgement,表示消息处理成功
    • NACK:Negative Acknowledgement,表示消息处理失败
    • Reject:拒绝,同样表示消息处理失败
  • 要点3:后续操作
    • requeue为true:重新放回队列,重新投递,再次尝试
    • requeue为false:不放回队列,不重新投递
  • 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据

六、流程梳理

在这里插入图片描述
七、多啰嗦一句
消费端如果设定消息重新放回队列,Broker重新投递消息,那么消费端就可以再次消费消息,这是一种“重试”机制,这需要消费端代码支持“"幂等性 ”——这属于前置知识,不展开了。

2.4 消费端限流

在这里插入图片描述
在这里插入图片描述

2.4.1 思路

  • 生产者发送100个消息
  • 对照两种情况:
    • 消费端没有设置prefetch参数:100个消息被全部取回
    • 消费端设置prefetch参数为1:100个消息慢慢取回

2.4.2 生产者端代码

@Test  
public void testSendMessage() {
    for (int i = 0; i < 100; i++) {
        rabbitTemplate.convertAndSend(
                EXCHANGE_DIRECT,
                ROUTING_KEY,
                "Hello hanson" + i);
    }
}

2.4.3 消费者端代码

// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);

// System.out.println(10 / 0);
TimeUnit.SECONDS.sleep(1);

// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);

2.4.4 测试

1、未使用prefetch

  • 不要启动消费端程序,如果正在运行就把它停了
  • 运行生产者端程序发送100条消息
  • 查看队列中消息的情况:
ReadyUnackedTotal
1000100
  • 说明:

    • Ready表示已经发送到队列的消息数量
    • Unacked表示已经发送到消费端但是消费端尚未返回ACK信息的消息数量
    • Total未被删除的消息总数
  • 接下来启动消费端程序,再查看队列情况:

ReadyUnackedTotal
09696
  • 能看到消息全部被消费端取走了,正在逐个处理、确认,说明有多少消息消费端就并发处理多少

2、设定prefetch

①YAML配置

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息

②测试流程

  • 停止消费端程序
  • 运行生产者端程序发送100条消息
  • 查看队列中消息的情况:
ReadyUnackedTotal
1000100
  • 接下来启动消费端程序,持续观察队列情况:
ReadyUnackedTotal
95196
ReadyUnackedTotal
80181
ReadyUnackedTotal
65166
  • 能看到消息不是一次性全部取回的,而是有个过程

2.5 消息超时

  • 消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
  • 我们可以从两个层面来给消息设定过期时间:
    • 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
    • 消息本身:给具体的某个消息设定过期时间
  • 如果两个层面都做了设置,那么哪个时间短,哪个生效

2.5.1 队列层面设置

1、设置

在这里插入图片描述
别忘了设置绑定关系:

在这里插入图片描述
2、测试

  • 不启动消费端程序
  • 向设置了过期时间的队列中发送100条消息
  • 等10秒后,看是否全部被过期删除

在这里插入图片描述

二、消息层面设置

1、设置

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

@Test  
public void testSendMessageTTL() {  

    // 1、创建消息后置处理器对象  
    MessagePostProcessor messagePostProcessor = (Message message) -> {  

        // 设定 TTL 时间,以毫秒为单位
        message.getMessageProperties().setExpiration("5000");  

        return message;
    };

    // 2、发送消息  
    rabbitTemplate.convertAndSend(    
            EXCHANGE_DIRECT,     
            ROUTING_KEY,     
            "Hello hanson", messagePostProcessor);    
}

2、查看效果

这次我们是发送到普通队列上:

在这里插入图片描述

2.6 死信和死信队列

  • 概念:当一个消息无法被消费,它就变成了死信。
  • 死信产生的原因大致有下面三种:
    • 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
    • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信
    • 超时:消息到达超时时间未被消费
  • 死信的处理方式大致有下面三种:
    • 丢弃:对不重要的消息直接丢弃,不做处理
    • 入库:把死信写入数据库,日后处理
    • 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)

2.6.1 测试相关准备

1、创建死信交换机和死信队列

常规设定即可,没有特殊设置:

  • 死信交换机:exchange.dead.letter.video
  • 死信队列:queue.dead.letter.video
  • 死信路由键:routing.key.dead.letter.video

2、创建正常交换机和正常队列

注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机

在这里插入图片描述

  • 正常交换机:exchange.normal.video
  • 正常队列:queue.normal.video
  • 正常路由键:routing.key.normal.video

全部设置完成后参照如下细节:

在这里插入图片描述
3、Java代码中的相关常量声明

public static final String EXCHANGE_NORMAL = "exchange.normal.video";  
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";  

public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";  
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";  

public static final String QUEUE_NORMAL = "queue.normal.video";  
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";

2.6.2 消费端拒收消息

1、发送消息的代码

@Test  
public void testSendMessageButReject() {  
    rabbitTemplate  
            .convertAndSend(  
                    EXCHANGE_NORMAL,  
                    ROUTING_KEY_NORMAL,  
                    "测试死信情况1:消息被拒绝");  
}

2、接收消息的代码

①监听正常队列

@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
    // 监听正常队列,但是拒绝消息
    log.info("★[normal]消息接收到,但我拒绝。");
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}

②监听死信队列

@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {  
    // 监听死信队列  
    log.info("★[dead letter]dataString = " + dataString);
    log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

3、执行结果

在这里插入图片描述

2.6.3 消息数量超过队列容纳极限

1、发送消息的代码

@Test  
public void testSendMultiMessage() {  
    for (int i = 0; i < 20; i++) {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_NORMAL,  
                ROUTING_KEY_NORMAL,  
                "测试死信情况2:消息数量超过队列的最大容量" + i);  
    }  
}

2、接收消息的代码

消息接收代码不再拒绝消息:

@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
    // 监听正常队列
    log.info("★[normal]消息接收到。");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

重启微服务使代码修改生效。

3、执行效果

正常队列的参数如下图所示:

在这里插入图片描述
生产者发送20条消息之后,消费端死信队列接收到前10条消息:

在这里插入图片描述

2.6.4 消息超时未消费

1、发送消息的代码

@Test
public void testSendMessageTimeout() {
    rabbitTemplate
            .convertAndSend(
                    EXCHANGE_NORMAL,
                    ROUTING_KEY_NORMAL,
                    "测试死信情况3:消息超时");
}

2、执行效果

队列参数生效:

在这里插入图片描述
因为没有消费端监听程序,所以消息未超时前滞留在队列中:

在这里插入图片描述

消息超时后,进入死信队列:

在这里插入图片描述

2.7 延迟队列

在这里插入图片描述
实现思路

  • 方案1:借助消息超时时间+死信队列(就是刚刚我们测试的例子)
  • 方案2:给RabbitMQ安装插件

在这里插入图片描述
这种方法在上面死信队列有提到,这里不做讲解,主要来看看方案二

2.7.1 插件简介

  • 官网地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  • 延迟极限:最多两天

2.7.2 插件安装

1、确定卷映射目录

docker inspect rabbitmq

运行结果:

"Mounts": [
            {
                "Type": "volume",
                "Name": "rabbitmq-plugin",
                "Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data",
                "Destination": "/plugins",
                "Driver": "local",
                "Mode": "z",
                "RW": true,
                "Propagation": ""
            },
            {
                "Type": "volume",
                "Name": "debf9d83ff537f2d88f33ac9c773f9a63b341b1d0f5f2f3f541c7e4f6b289499",
                "Source": "/var/lib/docker/volumes/debf9d83ff537f2d88f33ac9c773f9a63b341b1d0f5f2f3f541c7e4f6b289499/_data",
                "Destination": "/var/lib/rabbitmq",
                "Driver": "local",
                "Mode": "",
                "RW": true,
                "Propagation": ""
            }
        ]

和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data

2、下载延迟插件

官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

下载插件安装文件:

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data

3、启用插件

# 登录进入容器内部
docker exec -it rabbitmq /bin/bash

# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出Docker容器
exit

# 重启Docker容器
docker restart rabbitmq

4、确认

确认点1:查看当前节点已启用插件的列表:

在这里插入图片描述
确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了

在这里插入图片描述

2.7.3 创建交换机

rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以,创建方式如下:

在这里插入图片描述

关于x-delayed-type参数的理解:

原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?

这里就额外使用x-delayed-type来指定交换机本身的类型

2.7.4 代码测试

1、生产者端代码

@Test
public void testSendDelayMessage() {
    rabbitTemplate.convertAndSend(
            EXCHANGE_DELAY,
            ROUTING_KEY_DELAY,
            "测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",
            messageProcessor -> {

                // 设置延迟时间:以毫秒为单位
                messageProcessor.getMessageProperties().setHeader("x-delay", "10000");

                return messageProcessor;
            });
}

2、消费者端代码

①情况A:资源已创建

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;  

@Component  
@Slf4j
public class MyDelayMessageListener {

    public static final String QUEUE_DELAY = "queue.delay.video";

    @RabbitListener(queues = {QUEUE_DELAY})
    public void process(String dataString, Message message, Channel channel) throws IOException {  
        log.info("[生产者]" + dataString);
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

②情况B:资源未创建

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.*;  
import org.springframework.stereotype.Component;  

import java.io.IOException;  
import java.text.SimpleDateFormat;  
import java.util.Date;  

@Component  
@Slf4j
public class MyDelayMessageListener {  

    public static final String EXCHANGE_DELAY = "exchange.delay.video";
    public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
    public static final String QUEUE_DELAY = "queue.delay.video";

    @RabbitListener(bindings = @QueueBinding(  
        value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),  
        exchange = @Exchange(  
                value = EXCHANGE_DELAY,   
                durable = "true",   
                autoDelete = "false",   
                type = "x-delayed-message",   
                arguments = @Argument(name = "x-delayed-type", value = "direct")),  
        key = {ROUTING_KEY_DELAY}  
    ))  
    public void process(String dataString, Message message, Channel channel) throws IOException {  
        log.info("[生产者]" + dataString);  
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));  
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  
    }  

}

3、执行效果

①交换机类型

在这里插入图片描述

②生产者端效果

注意:使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行

在这里插入图片描述

③消费者端效果

在这里插入图片描述

2.8 事务消息

事务消息的机制说明:生产者端

在这里插入图片描述

在这里插入图片描述

  • 总结:
    • 在生产者端使用事务消息和消费端没有关系
    • 在生产者端使用事务消息仅仅是控制事务内的消息是否发送
    • 提交事务就把事务内所有消息都发送到交换机
    • 回滚事务则事务内任何消息都不会被发送
    • 事务控制对消费端无效!!!

2.8.1 测试代码

1、引入依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、yaml配置

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /

3、主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQProducerMainType {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);
    }

}

4、相关配置

import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Data
public class RabbitConfig {

    @Bean
    public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
}

5、测试代码

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
@Slf4j
public class RabbitMQTest {

    public static final String EXCHANGE_NAME = "exchange.tx.dragon";
    public static final String ROUTING_KEY = "routing.key.tx.dragon";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessageInTx() {
        // 1、发送第一条消息
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");

        // 2、抛出异常
        log.info("do bad:" + 10 / 0);

        // 3、发送第二条消息
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
    }

}

2.8.2 执行测试

1、未使用事务

抛出异常前的消息发送了,抛异常后的消息没有发送:

在这里插入图片描述
在这里插入图片描述
为了不影响后续操作,我们直接在管理界面这里把这条消息消费掉:

在这里插入图片描述
2、使用事务

①说明

因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控

②测试提交事务的情况

@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {
    // 1、发送第一条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");

    // 2、发送第二条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}

在这里插入图片描述
在这里插入图片描述
③测试回滚事务的情况

@Test
@Transactional
@Rollback(value = true)
public void testSendMessageInTx() {
    // 1、发送第一条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)");

    // 2、发送第二条消息
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)");
}

在这里插入图片描述

2.8 惰性队列

  • 创建队列时,在Durability这里有两个选项可以选择
    • Durable:持久化队列,消息会持久化到硬盘上
    • Transient:临时队列,不做持久化操作,broker重启后消息会丢失

在这里插入图片描述
惰性队列:未设置惰性模式时队列的持久化机制

  • 那么Durable队列在存入消息之后,是否是立即保存到硬盘呢?

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 比较下面两个说法是否是相同的意思:
    • 立即移动到硬盘
    • 尽早移动到硬盘
  • 我认为不一样:
    • 立即:消息刚进入队列时
    • 尽早:服务器不繁忙时

惰性队列:应用场景

原文翻译:使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。
由于各种原因,排队可能会变得很长:

  • 消费者离线/崩溃/停机进行维护
  • 突然出现消息进入高峰,生产者的速度超过了消费者
  • 消费者比正常情况慢

2.8.1 创建惰性队列

1、官网说明

在这里插入图片描述

队列可以创建为默认惰性模式,模式指定方式是:

  • 使用队列策略(建议)
  • 设置queue.declare参数

如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。

2、基于策略方式设定

# 登录Docker容器
docker exec -it rabbitmq /bin/bash

# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量

  • set_policy是子命令,表示设置策略

  • Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的

  • "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置

  • '{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"

  • –-apply-to参数指定该策略将应用于队列(queues)级别

  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列

如果需要修改队列模式可以执行如下命令(不必删除队列再重建):

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

3、在声明队列时使用参数设定

  • 参数名称:x-queue-mode
  • 可用参数值:
    • default
    • lazy
  • 不设置就是取值为default

Java代码原生API设置方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Java代码注解设置方式:

@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
    @Argument(name = "x-queue-mode", value = "lazy")
})

2.8.2 实操演练

1、生产者端代码

①配置POM

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.5</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

②配置YAML

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /

③主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQLazyProducer {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQLazyProducer.class, args);
    }

}

④发送消息

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    public static final String EXCHANGE_LAZY_NAME = "exchange.hanson.lazy";
    public static final String ROUTING_LAZY_KEY = "routing.key.hanson.lazy";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, "I am a message for test lazy queue.");
    }

}

2、消费者端代码

①配置POM

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.5</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

②配置YAML

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /

③主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQLazyConsumerMainType {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQLazyConsumerMainType.class, args);
    }

}

④监听器

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyLazyMessageProcessor {

    public static final String EXCHANGE_LAZY_NAME = "exchange.hanson.lazy";
    public static final String ROUTING_LAZY_KEY = "routing.key.hanson.lazy";
    public static final String QUEUE_LAZY_NAME = "queue.hanson.lazy";

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {
            @Argument(name = "x-queue-mode", value = "lazy")
        }),
        exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),
        key = {ROUTING_LAZY_KEY}
    ))
    public void processMessageLazy(String data, Message message, Channel channel) {
        log.info("消费端接收到消息:" + data);
    }

}

2.8.3 测试

  • 先启动消费端

  • 基于消费端@RabbitListener注解中的配置,自动创建了队列

在这里插入图片描述

  • 发送消息

2.9 优先级队列

优先级队列:机制说明

  • 默认情况:基于队列先进先出的特性,通常来说,先入队的先投递
  • 设置优先级之后:优先级高的消息更大几率先投递
  • 关键参数:x-max-priority

优先级队列:消息的优先级设置

  • RabbitMQ允许我们使用一个正整数给消息设定优先级
  • 消息的优先级数值取值范围:1~255
  • RabbitMQ官网建议在1~5之间设置消息的优先级(优先级越高,占用CPU、内存等资源越多)
  • 队列在声明时可以指定参数:x-max-priority
  • 默认值:0 此时消息即使设置优先级也无效
  • 指定一个正整数值:消息的优先级数值不能超过这个值

2.9.1 创建相关资源

1、创建交换机

exchange.test.priority

在这里插入图片描述

2、创建队列

queue.test.priority

x-max-priority

在这里插入图片描述

在这里插入图片描述
3、队列绑定交换机

在这里插入图片描述

2.9.2 生产者发送消息

1、配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、配置YAML

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /

3、主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQPriorityProducer {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQPriorityProducer.class, args);
    }

}

4、发送消息

  • 不要启动消费者程序,让多条不同优先级的消息滞留在队列中
  • 第一次发送优先级为1的消息
  • 第二次发送优先级为2的消息
  • 第三次发送优先级为3的消息
  • 先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息

①第一次发送优先级为1的消息

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
    public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
            message.getMessageProperties().setPriority(1);
            return message;
        });
    }

}

②第二次发送优先级为2的消息

@Test
public void testSendMessage() {
    rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{
        message.getMessageProperties().setPriority(2);
        return message;
    });
}

③第三次发送优先级为3的消息

@Test
public void testSendMessage() {
    rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{
        message.getMessageProperties().setPriority(3);
        return message;
    });
}

在这里插入图片描述

2.9.3 消费端接收消息

1、配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、配置YAML

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /

3、主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQPriorityConsumer {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQPriorityConsumer.class, args);
    }

}

4、监听器

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyMessageProcessor {

    public static final String QUEUE_PRIORITY = "queue.test.priority";

    @RabbitListener(queues = {QUEUE_PRIORITY})
    public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
        log.info(data);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

5、测试效果

对于已经滞留服务器的消息,只要消费端一启动,就能够收到消息队列的投递,打印效果如下:

在这里插入图片描述

总结

RabbitMQ目前有了一个初步的了解,下一篇文章深入了解RabbitMQ集群搭建

文章代码:GitHub

如果这篇文章对您,希望可以点赞、收藏支持一下
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1858386.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

维度建模中的事实表

在维度建模中&#xff0c;根据粒度划分&#xff0c;数据表主要分为维度表和事实表。而事实表又可以分为三种类型&#xff1a;事务型事实表、周期快照型事实表和累计快照型事实表。本文将详细介绍这三种事实表&#xff0c;并提供相应的代码示例。 目录 1. 事务型事实表示例 2. 周…

推荐系统三十六式学习笔记:原理篇.模型融合13|经典模型融合办法:线性模型和树模型的组合拳

目录 为什么要融合&#xff1f;“辑度组合”原理逻辑回归梯度提升决策树GBDT二者结合 总结 推荐系统在技术实现上一般划分为三个阶段&#xff1a;挖掘、召回、排序 。 为什么要融合&#xff1f; 挖掘的工作是对用户和物品做非常深入的结构化分析&#xff0c;各个角度各个层面…

硬盘监控和分析工具:Smartctl

文章目录 1. 概述2. 安装3. 使用4. smartctl属性信息介绍 1. 概述 Smartctl&#xff08;S.M.A.R.T 自监控&#xff0c;分析和报告技术&#xff09;是类Unix系统下实施SMART任务命令行套件或工具&#xff0c;它用于打印SMART自检和错误日志&#xff0c;启用并禁用SMRAT自动检测…

PR模板 | RGB特效视频标题模板Titles | MOGRT

RGB特效视频标题模板mogrt免费下载 4K分辨率&#xff08;38402160&#xff09; 支持任何语言 友好的界面 输入和输出动画 快速渲染 视频教程 免费下载&#xff1a;https://prmuban.com/39055.html 更多pr模板视频素材下载地址&#xff1a;https://prmuban.com

初见:AntDB智能运维“三剑客“之ADC

引言 6月15日&#xff0c;PostgreSQL数据库技术峰会广州站圆满落幕。峰会上&#xff0c;亚信安慧数据库智能运维产品负责人李志龙介绍了AntDB的6大数据库引擎和3大工具产品能力。 这里的3大工具分别指&#xff1a; AntDB数据库迁移工具包 MTK 数据库智能运维平台 ACC AntDB数据…

Modbus协议在工业自动化中的应用

Modbus协议介绍 Modbus是一种常用的工业现场总线通信协议,被广泛应用于工业自动化领域。它是一种简单、易实现的主从式通信协议,具有高度的可靠性和通用性。本文将从Modbus协议的基本概念、通信模式、数据格式、常见应用场景等方面进行全面介绍,并通过图文并茂的方式帮助读者更…

ardupilot开发 --- 视觉伺服 篇

风驰电掣云端飘&#xff0c;相机无法对上焦 视觉伺服分类视觉伺服中的坐标系成像模型推导IBVS推导参考文献 视觉伺服分类 控制量是在图像空间中推导得到还是在欧式空间中推导得到&#xff0c;视觉伺服又可以分类为基于位置(PBVS)和基于图像的(IBVS)视觉伺服。 视觉伺服中的坐…

关于docker存储overlay2相关问题

报错如下&#xff1a; 报错原因&#xff1a;使用rm -rf 清理overlay2导致的&#xff0c;非正常清理。 正常清理命令如下&#xff1a; # 清理Docker的所有构建缓存 docker builder prune# 删除旧于24小时的所有构建缓存 docker builder prune --filter "until24h"#删…

node.js环境安装以及Vue-CLI脚手架搭建项目教程

目录 ▐ vue-cli 搭建项目的优点 ▐ 安装node.js环境 ▐ 搭建vue脚手架项目 ▐ 项目结构解读 ▐ 常用命令 ▐ 创建组件 ▐ 组件路由 ▐ vue-cli 搭建项目的优点 传统的前端项目架构由多个html文件&#xff0c;且每个html文件都是相互独立的&#xff0c;导入外部组件时需…

wireshark常用过滤命令

wireshark常用过滤命令 wireshark抓包介绍单机单点&#xff1a;单机多点&#xff1a;双机并行&#xff1a; wireshark界面认识默认布局调整布局(常用)显示FCS错误 wireshark常见列Time回包数据报对应网络模型 wireshark基本操作结束抓包再次开始抓包 **wireshark常用过滤命令**…

【实物资料包】基于STM32智能台灯设计

【实物资料包】基于STM32智能台灯设计 需要资料的请在文章结尾获取哦~~~~&#xff08;如有问题私信我即可&#xff09; 1.介绍 1 添加wifi模块模块&#xff0c;可通过wifi模块APP或者手动按钮切换自动/手动模式 2 自动模式下&#xff0c;台灯可以感应是否有人落座&#xff0…

【BSCP系列第2期】XSS攻击的深度剖析和利用(文末送书)

文章目录 前言一、官方地址二、开始&#xff08;15个&#xff09;1&#xff1a;Lab: DOM XSS in document.write sink using source location.search inside a select element2&#xff1a;Lab: DOM XSS in AngularJS expression with angle brackets and double quotes HTML-e…

猫头虎分享已解决Bug:Array Index Out of Bounds Exception

&#x1f42f; 猫头虎分享已解决Bug&#xff1a;Array Index Out of Bounds Exception &#x1f42f; 摘要 大家好&#xff0c;我是猫头虎&#xff0c;今天我们要聊聊后端开发中经常遇到的一个问题&#xff1a;Array Index Out of Bounds Exception&#xff0c;即 java.lang.…

哪里找好用的商城系统源码?

很多企业在挑选商城系统时&#xff0c;由于不懂源码&#xff0c;很难选择到高质量源码的商城系统&#xff0c;那么哪里找好用的商城系统源码?如何选择?接下来就跟着启山智软小编一起来看看吧&#xff0c;以下为选择源码时的四看&#xff1a; 1.一看源码公司行业动态 可以查…

【linux】详解——库

目录 概述 库 库函数 静态库 动态库 制作动静态库 使用动静态库 如何让系统默认找到第三方库 lib和lib64的区别 /和/usr/和/usr/local下lib和lib64的区别 环境变量 配置相关文件 个人主页&#xff1a;东洛的克莱斯韦克-CSDN博客 简介&#xff1a;C站最萌博主 相关…

[FreeRTOS 内部实现] 信号量

文章目录 基础知识创建信号量获取信号量释放信号量信号量 内部实现框图 基础知识 [FreeRTOS 基础知识] 信号量 概念 创建信号量 #define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) #define semSEMAPHORE_QUEUE_ITEM_LENGTH ( ( uint8_t ) 0U ) #define xSe…

C++精解【6】

文章目录 eigenMatrix基础例编译时固定尺寸运行指定大小 OpenCV概述 eigen Matrix 基础 所有矩阵和向量都是Matrix模板类的对象。向量也是矩阵&#xff0c;单行或单列。Matrix模板类6个参数&#xff0c;常用就3个参数&#xff0c;其它3个参数有默认值。 Matrix<typename…

LGY-110G零序电压继电器 导轨安装 约瑟JOSEF

110系列零序电压电流继电器 系列型号: LGY-110零序过电压继电器&#xff1b;LGY-110/AC零序过电压继电器&#xff1b; LGL-110零序过电流继电器&#xff1b;LGL-110/AC零序过电流继电器&#xff1b; 1 应用 DY-110 型低电压继电器用于判别线路和电力设备的电压降低&#xf…

达梦数据守护集群部署

接上篇 达梦8单机规范化部署 https://blog.csdn.net/qq_25045631/article/details/139898690 1. 集群规划 在正式生产环境中&#xff0c;两台机器建议使用统一配置的服务器。使用千兆或千兆以上网络。 两台虚拟机各加一块网卡&#xff0c;仅主机模式&#xff0c;作为心跳网卡…

伸展树(数据结构篇)

数据结构之伸展树 伸展树 概念&#xff1a; 伸展树是一颗对任意一个节点被访问后&#xff0c;就经过一系列的AVL树的旋转操作将该节点放到根上的特殊二叉查找树。伸展树能保证对树操作M次的时间复杂度为O(MlogN)&#xff0c;而当一个查找树的一个节点刚好处于查找树最坏的情…