RabbitMQ从入门到精通之安装、通讯方式详解

news2025/1/11 2:27:34

文章目录

  • RabbitMQ
    • 一、RabbitMQ介绍
      • 1.1 现存问题
    • 一、RabbitMQ介绍
    • 二、RabbitMQ安装
    • 三、RabbitMQ架构
    • 四、RabbitMQ通信方式
      • 4.1 RabbitMQ提供的通讯方式
      • 4.2 Helloworld 方式
      • 4.2Work queues
      • 4.3 Publish/Subscribe
      • 4.4 Routing
      • 4.5 Topics
      • 4.6 RPC (了解)
    • 五、Springboot 操作RabbitMQ
    • 六、RabbitMQ保证消息可靠性
      • 6.1、 保证消息一定送达到Exchange
      • 6.2、保证消息可以路由到Queue中
      • 6.3、保证队列持久化消息
      • 6.4、保证消息者可以正常消费消息
      • 6.5 SpringBoot实现上述操作
        • 6.5.1 Confirm
      • 6.5.2 Return
      • 6.5.3 消息持久化
    • 七、RabbitMQ死信队列 & 延迟交换机
      • 7.4、准备Exchange&Queue
      • 7.5、实现效果
    • 八、RabbitMQ的集群


RabbitMQ

一、RabbitMQ介绍

1.1 现存问题

    • 服务异步调用: 服务A如何保证异步请求一定能被服务B接收到并处理
      在这里插入图片描述
    • 削峰: 海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?

在这里插入图片描述

    • 服务解耦: 如何尽量的降低服务之间的耦合问题,如果在订单与积分和商家服务解构,需要一个队列,而这个队列依然需要实现上述两个情况功能。

在这里插入图片描述

一、RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

AMQP协议:
在这里插入图片描述

Erlang:

Erlang在1991年由爱立信公司向用户推出了第一个版本,经过不断的改进完善和发展,在1996年爱立信又为所有的Erlang用户提供了一个非常实用且稳定的OTP软件库并在1998年发布了第一个开源版本。Erlang同时支持的操作系统有linux,windows,unix等,可以说适用于主流的操作系统上,尤其是它支持多核的特性非常适合多核CPU,而分布式特性也可以很好融合各种分布式集群。

二、RabbitMQ安装

docker-compose.yml

version: “3.1”
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- ./data/:/var/lib/rabbitmq/
ports:
- 5672:5672
- 15672:15672

在这里插入图片描述
在这里插入图片描述

docker-compose.yml文件内容:
在这里插入图片描述

镜像拉取完成后,直接在linux 内部执行: curl localhost:5672

在这里插入图片描述
执行后能够显示AMQP 字样的内容就说明执行成功了

执行 docker exec -it rabbitmq bash 命令 进入容器内部
cd opt/rabbitmq/ 目录下
在这里插入图片描述
执行cd plugins/sbin 命令进入目录下, 执行 ./rabbitmq-plugins enable rabbitmq_managent命令 启动rabbitmq 图形化界面
在这里插入图片描述
访问15672 端口,默认的账号密码是guest/guest
在这里插入图片描述

三、RabbitMQ架构

在这里插入图片描述

四、RabbitMQ通信方式

4.1 RabbitMQ提供的通讯方式

  • Hello World :为了入门操作
  • Work queues : 一个队列被多个消费者消费
  • Publish/Subscribe:手动创建Exchange(FANOUT)
  • Routing: 手动创建Exchange(DIRECT)
  • Topics : 手动创建Exchange(TOPIC)
  • RPC: RPC方式
  • Publisher Confirms

4.2 Helloworld 方式

在这里插入图片描述

//工具类
public class RabbitMQConnectionUtil {
    public static final String RABBITMQ_HOST ="172.16.177.133";
    public static final int RABBITMQ_POST =5672;
    public static final String RABBITMQ_USERNAME ="guest";
    public static final String RABBITMQ_PASSWORD ="guest";
    public static final String RABBITMQ_VIRTUAL_HOST ="/";

    /**
     * 构建RabbitMQ的连接对象
     * @return
     */
    public static Connection getConnection() throws Exception{
        //1.创建connection    工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RABBITMQ_HOST);
        connectionFactory.setPort(RABBITMQ_POST);
        connectionFactory.setUsername(RABBITMQ_USERNAME);
        connectionFactory.setPassword(RABBITMQ_PASSWORD);
        connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
        //2.设置RabbitMQ的连接信息
        Connection connection = connectionFactory.newConnection();
        //3. 返回连接对象

        return connection;
    }
//生产者
 @Test
    public void consume() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback =new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息者获取消息"+new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,true,callback);

        System.out.println("开始监听");
        System.in.read();

    }

//消费者
    @Test
    public void publish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4. 发布消息
        String message="hello word!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送成功");
        System.in.read();
    }

4.2Work queues

在这里插入图片描述
一个队列中的消息,只会被一个消费者成功的消费,默认情况下,RabbitMQ的队列会将消息以轮询的方式交给不同的消费者消费,消费者拿到消息后,需要给RabbitMQ一个ack,RabbitMQ认为消费者已经拿到消息了

//消费者
 @Test
    public void consume() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        //3.1 设置消息的控制,一次拿几个消息
//#2        channel.basicQos(1);
        //4. 监听消息
        DefaultConsumer callback =new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                	//模拟业务执行时间
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消息者获取消息"+new String(body,"UTF-8"));
//#1				channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
        System.out.println("开始监听");
        System.in.read();
 //消费者2
 @Test
    public void consume2() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
        //3.1 设置消息的控制,一次拿几个消息
//#2        channel.basicQos(1);
        //4. 监听消息 
        DefaultConsumer callback =new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                	//模拟业务执行时间
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消息者获取消息"+new String(body,"UTF-8"));
                //basicAck(标识,是否批量操作)
//#1				channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);

        System.out.println("开始监听");
        System.in.read();

    }
//生产者
 public static final String QUEUE_NAME="work";
    @Test
    public void publish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4. 发布消息
        for (int i=0;i<10;i++){
            String message="hello word!"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
         }
        System.out.println("消息发送成功");
        System.in.read();
    }

当两台消费者的消费能力不相同的时候,为了提高效率,就不能以轮询的方式进行分发,而是以消费者消费完成后手动传递ack 的方式进行下一个消息的分发,将== #1 #2 的代码 ==打开即可
操作步骤:

  • 操作#1 让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
  • 操作#2 设置每次拿几个消息

4.3 Publish/Subscribe

自行创建路由器,并绑定队列
在这里插入图片描述
如何构建一个自定义交换机,并指定类型是FANOUT,让交换机和多个Queue绑定到一起

//生产者 
public static final String EXCHANGE_NAME="pubsub";
    public static final String QUEUE_NAME1="pubsub-one";
    public static final String QUEUE_NAME2="pubsub-two";
    @Test
    public void pulish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channal
        Channel channel = connection.createChannel();
        //3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
        //5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
        //6.发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe!".getBytes());
        System.out.println("消息成功发送");
        
    }

4.4 Routing

DIRECT 类型的交换机,在绑定Exchange和Queue时,需要指定好routingKey同时在发送消息的时候,也指定routingkey,只有在routingkey 一致时,才会把指定的消息路由到指定的队列
在这里插入图片描述

public static final String EXCHANGE_NAME="routing";
    public static final String QUEUE_NAME1="routing-one";
    public static final String QUEUE_NAME2="routing-two";
    @Test
    public void pulish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channal
        Channel channel = connection.createChannel();
        //3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.DIRECT
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
        //5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");
        //6.发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());


        System.out.println("消息成功发送");

    }

4.5 Topics

topics 模式支持模糊匹配RoutingKey,就像是sql中的 like子句模糊查询,而路由模式等同于sql中的where子句等值查询

在这里插入图片描述

通过模糊路由到队列。该方式的Routing key必须具有固定格式:以 . 间隔的一串单词,比如:quick.orange.rabbit,Routing key 最多不能超过255byte。

交换机和队列的Binding key用通配符来表示,有两种语法:

  • * 可以替代一个单词;
  • # 可以替代 0 或多个单词;

例如 #.com.#
#可以表示0级或多级。xx.com、com.xx、com、xx.com.xx.xx、xx.xx.com.xx都可以

例如 *.com. *
*表示一级,xx.com.xx 可以 ,com.xx 不可以,前面缺少一级,xx.com.xx.xx不可以,com后面只能有一级xx,最终格式必须是 xx.com.xx

 public static final String EXCHANGE_NAME="topics";
    public static final String QUEUE_NAME1="topics-one";
    public static final String QUEUE_NAME2="topics-two";
    @Test
    public void pulish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2. 构建Channal
        Channel channel = connection.createChannel();
        //3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.TOPIC
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
        //5. 绑定 交换机 和 队列,
        // Topic类型的交换机,在和队列绑定时,需要以aaa.bbb.ccc 方式编写routingKey
        // 其中有两个特殊字符: *(相当于占位符),# (相当通配符)
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");
        //6.发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙子兔子!".getBytes());//匹配1、2
        channel.basicPublish(EXCHANGE_NAME,"small.while.rabbit",null,"小兔子!".getBytes());//匹配1、2
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog",null,"懒狗狗狗狗!".getBytes());//匹配 3


        System.out.println("消息成功发送");

    }

4.6 RPC (了解)

因为两个服务在交互时,可以尽量做到Client和server的结偶,通过RabbitMQ进行结藕操作
需要让client 发送消息时,携带两个属性,

  • replyto告知server将相应信息放到哪个队列
  • correlationId告知server 发送相应消息时,需要携带位置标识来告知client响应的消息

在这里插入图片描述

public class Publisher {
    public static final String QUEUE_PUBLISHER = "rpc_publisher";
    public static final String QUEUE_CONSUMER = "rpc_consumer";

    @Test
    public void publish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
        //4. 发布消息
        String message="hello rpc!";
        String uuid = UUID.randomUUID().toString();
        AMQP.BasicProperties prop =new AMQP.BasicProperties()
            .builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();
        channel.basicPublish("",QUEUE_PUBLISHER,prop,message.getBytes());

        channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String id = properties.getCorrelationId();
                if(id!=null && id.equals(uuid)){
                    System.out.println("接收到服务端的响应:"+new String(body));
                }
            channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

        System.out.println("消息发送成功");
        System.in.read();
    }
}

public class Consumer {
    public static final String QUEUE_PUBLISHER = "rpc_publisher";
    public static final String QUEUE_CONSUMER = "rpc_consumer";
    @Test
    public void consume() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
        //4. 监听消息
        DefaultConsumer callback =new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息者获取消息"+new String(body,"UTF-8"));
                String resp = "获取到client发出的请求,这里是响应的信息";
                String respQueueName = properties.getReplyTo();
                String uuid = properties.getCorrelationId();
                AMQP.BasicProperties prop =new AMQP.BasicProperties()
                .builder().correlationId(uuid).build();
                channel.basicPublish("",respQueueName,prop,resp.getBytes());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_PUBLISHER,false,callback);

        System.out.println("开始监听");
        System.in.read();

    }
}

五、Springboot 操作RabbitMQ


  • 创建项目
  • 导入依赖
	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 配置rabbitmq信息
spring:
  rabbitmq:
    host: 172.16.177.133
    password: guest
    username: guest
    port: 5672
    virtual-host: /

配置类声明队列

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE="boot-exchange";
    public static final String QUEUE="boot-queue";
    public static final String ROUTING_KEY="*.black.*";

    @Bean
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE).build();
    }

    @Bean
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    @Bean
    public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }
}

生产者配置

@SpringBootTest
class SpringbootRabbitmqApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
    }

    @Test
    public void publisher(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
    }

    @Test
    public void publiWithProps(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setCorrelationId("123");
                return message;
            }
        });
        System.out.println("消息发送成功2");
    }
}

消费者配置

@Component
public class ConsumeListener {

    /**
     *
     * @param msg
     * @param channel 前提是配置好spring.rabbitmq.listener.simple.acknowledge-mode: manual #开启手动ack
     * @param message
     * @throws IOException
     */
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列消息为:"+msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("标识为:"+correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}
  • 声明交换机&队列

六、RabbitMQ保证消息可靠性

confirm机制
可以通过confirm效果保证消息一定送达到Exchange,官方提供了三种,选择了对于效率影响最低的异步回调的效果

6.1、 保证消息一定送达到Exchange

使用confirm机制

 public static final String QUEUE_NAME="confirms ";
    @Test
    public void publish() throws Exception {
        //1.获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();
        //2.创建Channel 通道
        Channel channel = connection.createChannel();
        //3.构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //3.1 开启confirms 的异步回调
        channel.confirmSelect();
        String message="hello word!";
        //3.2 设置confirms的异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到Exchange");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作");
            }
        });
        //3.3 设置return回调,确认消息是否路由到了队列
       channel.addReturnListener(new ReturnListener() {
           @Override
           public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
               System.out.println("消息没有到指定队列时,做其他的补偿措施!!");
           }
       });
        //3.4、设置消息持久化
        AMQP.BasicProperties prop =new AMQP.BasicProperties().builder()
                .deliveryMode(2)//消息持久化
               .build();

        //4. 发布消息
        channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());
        System.out.println("消息发送成功");
        System.in.read();
    }

6.2、保证消息可以路由到Queue中

使用return 机制
为了保证Exchange上的消息一定可以送达到Queue

//6.2设置return 回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
           @Override
           public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
               System.out.println("消息没有到指定队列时,做其他的补偿措施!!");
           }
       });
//7.在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return	 回调

6.3、保证队列持久化消息

DeliveryMode设置消息持久化

//6.3、设置消息持久化
        AMQP.BasicProperties prop =new AMQP.BasicProperties().builder()
                .deliveryMode(2)//消息持久化
               .build();
 //4. 发布消息
        channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());

6.4、保证消息者可以正常消费消息

详情看WorkQueue模式

6.5 SpringBoot实现上述操作

6.5.1 Confirm

  • 编写配置文件开启Confirm机制

spring:
rabbitmq:
publisher-confirm-type: correlated # 新版本 开启confirm机制
publisher-confirms: true # 老版本

  • 在发送消息时,配置RabbitTemplate
@Test
    public void publishWithConfirms() throws IOException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b){
                    System.out.println("消息已经送达到交换机");
                }else{
                    System.out.println("消息没有送到到Exchange,需要做一些补偿操作!");
                }
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");

        System.in.read();
    }

6.5.2 Return

  • 编写配置文件开启Return机制

spring:
rabbitmq:
publisher-returns: true # 开启return机制

  • 在发送消息时,配置RabbitTemplate
@Test
    public void publishWithReturn() throws IOException {
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                String msg = new String(message.getBody());
                System.out.println("消息失败:"+msg+"路由队列失败!!做补救操作");
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");

        System.in.read();
    }

6.5.3 消息持久化

//3.4、设置消息持久化
AMQP.BasicProperties prop =new AMQP.BasicProperties().builder()
                .deliveryMode(2)//消息持久化 #1
               .build();
        //4. 发布消息,  将参数mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
        channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());

七、RabbitMQ死信队列 & 延迟交换机

###7.1、 消息被消费者拒绝,requeue设置为false
###7.2.1、发送消息时设置消息的生存时间,如果生存时间到了,还没有被消费。
###7.2.2、 也可以指定某个队列中所有消息的生存时间,如果生存时间到了,还没有被消费
###7.3、队列已经达到消息的最长长度后,再路由过来的消息直接变成死信
在这里插入图片描述

7.4、准备Exchange&Queue

@Configuration
public class DeadLetterConfig {
    public static final String NORMAL_EXCHANGE="normal-exchange";
    public static final String NORMAL_QUEUE="normal-queue";
    public static final String NORMAL_ROUTING_KEY="normal.#";


    public static final String DEAD_EXCHANGE="dead-exchange";
    public static final String DEAD_QUEUE="dead-queue";
    public static final String DEAD_ROUTING_KEY="dead.#";
    /**普通交换机*/
    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }
    /**普通队列*/
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列
                .build();
    }
    /**普通队列绑定路由*/
    @Bean
    public Binding normalBingding(Queue normalQueue,Exchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(DEAD_ROUTING_KEY).noargs();
    }

    /**死信交换机*/
    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    /**死信队列*/
    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE ).build();
    }
    /**绑定死信队列和交换机*/
    @Bean
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}

7.5、实现效果

  • 基于消费者进行reject 或者 nack 实现死信效果
@Component
public class DeadListener {
    /**
     *
     * @param msg
     * @param channel 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual
     * @param message 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual
     */
    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
    public void comsume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到normal队列的消息:"+msg);
//        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}
  • 消息的生存时间

八、RabbitMQ的集群

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/978966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《C和指针》读书笔记(第十三章 高级指针话题)

目录 0 简介1 进一步探讨指向指针的指针2 高级声明3 函数指针3.1 回调函数3.2 转移表 4 命令行参数4.1 传递命令行参数4.2 处理命令行参数 5 字符串常量6 总结 0 简介 众所周知&#xff0c;指针是C语言的灵魂&#xff0c;所以本书&#xff08;《C和指针》&#xff09;才会将较…

为安全带来光明:光耦继电器的 10 种救生应用

在安全性和可靠性至关重要的世界中&#xff0c;光耦继电器已成为推动各行业进步的关键技术。这些卓越的设备经常在主流新闻中被忽视&#xff0c;但它们一直在默默地为保障生命和提高整体运营效率的关键系统提供动力。今天&#xff0c;我们重点介绍光耦继电器的十种救生应用&…

设计模式-01简单工厂模式详解 详细代码对比

目录 ChatGpt问答原生代码简单工厂模式代码 简单工厂模式&#xff08;Simple Factory Pattern&#xff09;新增boat 对比两种方法原生代码为什么使用强制转换&#xff1f;简单工厂模式 简单工厂方法总结与原生代码的区别&#xff1a;优点:缺点&#xff1a; 参考 本文将介绍什么…

golang指针的学习笔记

package main // 声音文件所在的包&#xff0c;每个go文件必须有归属的包 import ("fmt" )// 引入程序中需要用的包&#xff0c;为了使用包下的函数&#xff0c;比如&#xff1a;Printin// 字符类型使用 func main(){ // 基本数据类型&#xff0c;变量存的就是值&am…

做tiktok怎么运营?

一、揭开tiktok的神秘面纱 说到tiktok&#xff0c;你可能想到的是那些精彩的短视频&#xff0c;以及那些在几秒钟内就能吸引无数粉丝的创作者。然而&#xff0c;tiktok的魅力远不止于此。这个全球最受欢迎的短视频社交平台&#xff0c;正以惊人的速度改变着社交媒体的面貌。 二…

【IR】Vision-Language Tracking

调研&#xff1a;视觉-语言跟踪 0x01 Transformer vision-language tracking via proxy token guided cross-modal fusion, PRL2023AbstractIntroductionContribution效果Conclusion 0x02 Divert More Attention to Vision-Language Object Tracking, NeurIPS2022AbstractIntro…

起飞!Python 3.11的10个高效新特性

性能有巨大的提升是Python 3.11的一个重要的改进&#xff0c;除此以外Python 3.11还有增加了许多新的特性。在本文中我们将介绍Python 3.11新特性&#xff0c;通过代码示例演示这些技巧如何提高生产力并优化代码。 1、模式匹配 Python 3.11引入了模式匹配&#xff0c;可以简化…

OpenAI 函数调用教程

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 什么是OpenAI函数调用&#xff1f; OpenAI API 非常擅长以系统的方式生成响应。只需几行代码即可管理提示、优化模型输出以及执行、生成和语言应用程序。 即使有这么多好东西&#xff0c;OpenAI API对开发人员和工程…

【双指针】移动零

双指针-移动零 283. 移动零 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1…

03-JVM内存模型剖析与优化

1. JDK体系结构 2. Java语言的跨平台特性 3. JVM整体结构及内存模型 补充一个问题&#xff1a; 在minor gc过程中对象挪动后&#xff0c;引用如何修改&#xff1f; 对象在堆内部挪动的过程其实是复制&#xff0c;原有区域对象还在&#xff0c;一般不直接清理&#xff0c;JVM内部…

【C++基础】类与对象(上):访问限定符、类作用域、类实例化、类对象模型、this指针

​&#x1f47b;内容专栏&#xff1a; C/C编程 &#x1f428;本文概括&#xff1a; C基础语法。访问限定符、类作用域、类实例化、类对象模型、this指针等。 &#x1f43c;本文作者&#xff1a; 阿四啊 &#x1f438;发布时间&#xff1a;2023.9.6 面向过程和面向对象初识 C语…

【网络爬虫笔记】爬虫Robots协议语法详解

Robots协议是指一个被称为Robots Exclusion Protocol的协议。该协议的主要功能是向网络蜘蛛、机器人等搜索引擎爬虫提供一个标准的访问控制机制&#xff0c;告诉它们哪些页面可以被抓取&#xff0c;哪些页面不可以被抓取。本文将进行爬虫Robots协议语法详解&#xff0c;同时提供…

MQ 消费者和队列对应关系

参考 Consumer and Consumer Group Load Balancing https://rocketmq.apache.org/docs/4.x/consumer/01concept2 旧版本MQ结论 消费者应用和topic队列一对多的关系。 &#xff08;一个消费组consumer group里&#xff0c;一个消费者应用可以消费多个队列的消息。一个队列的消…

Podman安装与使用

1.Podman简介 Podman是一个无守护进程的容器引擎&#xff0c;用于在Linux系统上开发、管理和运行OCI容器。 Podman的主要功能包括&#xff1a; 创建和管理容器&#xff1a;Podman可以创建、启动、停止和删除容器&#xff0c;以及管理容器的生命周期。容器镜像管理&#xff1…

华为云云服务器评测|云耀云服务器L实例快速部署MySQL使用指南

文章目录 前言云耀云服务器L实例介绍什么是云耀云服务器L实例&#xff1f;产品优势智能不卡顿价优随心用上手更简单管理更省心 快速购买查看优惠卷购买 安装MySQL重置密码安装更新apt的软件源列表安装MySQL 设置用户名、密码、权限配置安全组 总结 前言 哈喽大家好&#xff0c…

lumion电脑速度太慢怎么办?还不是试试云电脑高效上云设计

在设计与渲染领域&#xff0c;Lumion是一款广受欢迎的3D软件&#xff0c;然而&#xff0c;使用本地电脑进行Lumion设计和渲染存在电脑卡顿崩溃&#xff0c;导致效率慢问题。本文将介绍Lumion设计师使用云电脑的优势&#xff0c;以及如何利用云电脑提高创作效率、释放无限创意。…

李宏毅-21-hw3:对11种食物进行分类-CNN

一、代码慢慢阅读理解总结内化&#xff1a; 1.关于torch.nn.covd2d()的参数含义、具体用法、功能&#xff1a; &#xff08;1&#xff09;参数含义&#xff1a; 注意&#xff0c;里面的“padding”参数&#xff1a;《both》side所以是上下左右《四》边都会加一个padding数量…

Support for password authentication was removed on August 13, 2021 解决方案

打开你的github&#xff0c;Setting 点击Developer settings。 点击generate new token 按照需要选择scope 生成token&#xff0c;以后复制下来。 给git设置token样式的remote url git remote set-url origin https://你的tokengithub.com/你的git用户名/仓库名称.git然后就可…

MySQL 连接查询和存储过程

一、连接查询 mysql的连接查询&#xff0c;通常都是将来自两个或多个表的记录行结合起来&#xff0c;基于这些表之间的共同字段&#xff0c;进行数据的拼接 首先&#xff0c;要确定一个主表作为结果集&#xff0c;然后将其它表的行有选择性的连接到选定的主表结果上&#xff…

算法训练day41|动态规划 part03(LeetCode343. 整数拆分、96.不同的二叉搜索树)

文章目录 343. 整数拆分思路分析代码实现 96.不同的二叉搜索树思路分析代码实现 343. 整数拆分 题目链接&#x1f525;&#x1f525; 给定一个正整数 n&#xff0c;将其拆分为至少两个正整数的和&#xff0c;并使这些整数的乘积最大化。 返回你可以获得的最大乘积。 示例 1: …