RabbitMQ的几种通讯方式及其代码示例

news2025/1/8 3:48:17

在这里插入图片描述

文章目录

      • 一、引言
      • 二、RabbitMQ介绍
      • 三、RabbitMQ安装
      • 四、RabbitMQ架构
        • 4.1 官方的简单架构图
        • 4.2 RabbitMQ的完整架构图
        • 4.3 RabbitMQ 通讯方式
        • 4.4 Hello-World案例演示
        • 4.5 基本原理
      • 五、SpringBoot整合RabbitMQ的使用
        • 5.1 导入依赖
        • 5.2 在application.properties中增加配置
        • 5.3 Hello-World 简单队列
        • 5.4 Work 工作队列
        • 5.5 Publish/Subscribe 发布订阅模式
        • 5.6 Routing 路由模式
        • 5.7 Topic 主题模式
        • 5.8手动Ack
          • 5.8.1 原生方式测试
          • 5.8.2 SpringBoot中测试
      • 六、事务与confirm机制
        • 6.1 消息的可靠性
        • 6.2 RabbitMQ事务
        • 6.3 Confirm机制
          • 6.3.1 创建工具类
          • 6.3.2 单条消息确认
          • 6.3.2 批量消息确认
          • 6.3.3 回调方式确认
        • 6.4 Return机制
        • 6.5 SpringBoot实现
      • 七. 死信队列
          • 7.1 场景
          • 7.2 测试
      • 八. 避免消息重复消费
        • 8.1 幂等性
        • 8.2 解决方案
        • 8.3 在springboot中测试
          • 8.3.1 导入依赖
          • 8.3.2 配置application.yml
          • 8.3.3 这里以简单模式演示

一、引言

模块之间的耦合度过高,一旦一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。

RabbitMQ引言
在这里插入图片描述

二、RabbitMQ介绍


市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多门语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal,由Erlang语言开发(并发的编程语言)

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

​ RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。

​ Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。

​ AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。

优点

1、解耦:降低系统模块的耦合度

2、提高系统响应时间

3、异步消息

4、过载保护,流量削峰

1.应用解耦

场景:双11购物,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:

  • 当库存系统出现故障时,订单就会失败.
  • 订单系统和库存系统高耦合.

引入消息队列

  • 订单系统: 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统: 订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失

在这里插入图片描述

2.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行的方式 2. 并行的方式

串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

在这里插入图片描述

并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间

在这里插入图片描述

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.

消息队列

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍

在这里插入图片描述

3.流量削峰

流量削峰一般在秒杀活动中应用广泛

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

在这里插入图片描述

这样我们就可以采用队列的机制来处理,如同我们在超市结算一样,并不会一窝蜂一样涌入收银台,而是排队结算,一个接着一个的处理,不能插队,因为同时结算就只能达到这么多。

三、RabbitMQ安装

docker安装


version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq
[root@192 ~]# cd /opt
[root@192 opt]# mkdir docker_rabbitmq
[root@192 opt]# cd docker_rabbitmq/
[root@192 docker_rabbitmq]# vim docker-compose.yml
[root@192 docker_rabbitmq]# docker-compose up -d
Creating network "docker_rabbitmq_default" with the default driver
Pulling rabbitmq (daocloud.io/library/rabbitmq:management)...
management: Pulling from library/rabbitmq
01bf7da0a88c: Pull complete
f3b4a5f15c7a: Pull complete
57ffbe87baa1: Pull complete
5ef3ef76b1b5: Pull complete
82a3ce07c0eb: Pull complete
1da219d9bd70: Pull complete
446554ac749d: Pull complete
8e4c09e200e7: Pull complete
7a8620611ebf: Pull complete
c70a2924b273: Pull complete
3b0b9e36b4e9: Pull complete
7619a9a42512: Pull complete
965a8e1f1b1c: Pull complete
Digest: sha256:4cc2267788b21e0f34523b4f2d9b32ee1c2867bf2de75d572158d6115349658c
Status: Downloaded newer image for daocloud.io/library/rabbitmq:management
Creating rabbitmq ... done

浏览器访问:http://ip:15672 (注:ip指当前云服务器的地址,云服务器记得开放 15672 和 5672 端口)

用户名和密码默认都是:guest

四、RabbitMQ架构


4.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

简单架构图
在这里插入图片描述

4.2 RabbitMQ的完整架构图

完整架构图

完整架构图

4.3 RabbitMQ 通讯方式

https://www.rabbitmq.com/getstarted.html

4.4 Hello-World案例演示

  1. 导入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>
  1. 创建生产者 Publisher
package com.guo.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

//生产者
public class Publisher {
    public static void main(String[] args) throws Exception{
        System.out.println("Publisher...");
        //配置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.25.132");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //获取连接
        Connection connection = factory.newConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //配置队列参数
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化,值为true时表示持久化,rabbitmq宕机或重启后,队列依然在
        //参数3:exclusive - 当前队列是否为排他队列,值为true时表示与当前连接(connection)绑定,连接关闭,队列消失,排他队列会对当前队列加锁,其他通道channel是不能访问的,否则会报异常
        //参数4:autoDelete - 当前队列是否自动删除,值为true时表示队列中的消息一旦被消费,该队列会消失
        //参数5:arguments - 指定当前队列的相关参数
        channel.queueDeclare("helloworldQueue",false,false,false,null);
        //发布消息到exchange,同时指定路由的规则
        // 参数1:指定exchange,目前测试没有创建交换机,使用""
        // 参数2:指定路由的规则,或者使用具体的队列名称
        // 参数3:指定传递的消息所携带的properties,目前测试不需要,使用null
        // 参数4:指定发布的具体消息,byte[]类型,目前测试需要,传递数据进行类型转换
        channel.basicPublish("","helloworldQueue",null,"helloworld".getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}
  1. 创建消费者 Consumer
package com.guo.rabbitmq;

import com.rabbitmq.client.*;
import java.io.IOException;

//消费者
public class Consumer {
    public static void main(String[] args)throws Exception {
        System.out.println("Consumer...");
        //配置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.25.132");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //获取连接
        Connection connection = factory.newConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("来自生产者的消息:"+new String(body));
            }
        };

        //消费消息
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true表示接收到消息后,会立即告知RabbitMQ,false表示不告知)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("helloworldQueue",true,defaultConsumer);
        
        //设置延迟关闭,否则可能无法显示相关信息,消费方一般不关,目的是为了可以及时处理消息
        Thread.sleep(5000);
        
        //关闭资源
        channel.close();
        connection.close();
    }
}

分别启动生产者和消费者进行测试(生产一次才能消费一次)

4.5 基本原理

RabbitMQ是消息队列的一种实现,那么一个消息队列到底需要什么?答案是队列,即Queue,那么接下来所有名词都是围绕这个Queue来拓展的。

​ 就RabbimtMQ而言,Queue是其中的一个逻辑上的实现,我们需要连接到RabbitMQ来操作队列进而实现业务功能,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。

​ 接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel

​ 再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的
但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程

至此,我们就把所有的名词贯通咯,接下来做个概要描述:

  • Broker:消息队列服务器实体,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输服务。
  • ConnectionFactory:与RabbitMQ服务器连接的管理器。
  • Connection:与RabbitMQ服务器的TCP连接。
  • Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。
  • Queue:消息的载体,每个消息都会被投到一个或多个队列。
  • Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
  • Message Queue:消息队列,用于存储还未被消费者消费的消息。
  • Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
  • RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受。
  • BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中。
  • Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
  • Server: 接受客户端连接,实现AMQP消息队列和路由功能的进程。
  • Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host。

五、SpringBoot整合RabbitMQ的使用


5.1 导入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.2 在application.properties中增加配置

#对于rabbitMQ的支持
spring.rabbitmq.host=192.168.153.136
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

5.3 Hello-World 简单队列

一个生产者,一个默认的交换机,一个队列,一个消费者

结构图

1)创建配置类,用于创建队列对象

package com.guo.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SimpleQueueConfig {

    @Bean
    public Queue simple(){
        return new Queue("simpleQueue");
    }
}

2)创建生产者

package com.guo.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SimpleQueueProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        System.out.println("SimpleQueueProducer");
        //发送消息,第一个参数为队列名称,第二参数为消息内容
        rabbitTemplate.convertAndSend("simpleQueue","简单模式");
    }
}

3)创建消费者

package com.guo.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SimpleQueueCustomer {
    
    @RabbitListener(queues="simpleQueue")//监听指定的消息队列
    public void receive(String content){
        System.out.println("SimpleQueueCustomer");
        System.out.println("来SimpleQueueProducer的信息:"+content);
    }
}

4)在src\test\java\com\guo\Rabbitmq01ApplicationTests.java进行测试

package com.guo;
import com.guo.simple.SimpleQueueProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Rabbitmq01ApplicationTests {
    
    @Test
    void contextLoads() {
    }
    
    @Autowired
    private SimpleQueueProducer simpleQueueProducer;

    @Test
    public void testSimpleQueueProducer(){
        simpleQueueProducer.send();
    }
}

如果传递的是 JavaBean 对象,该实体类需要实现序列化接口,具体流程如下:

  1. 导入lombok依赖,创建User类
package com.guo.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private String username;
    private String password;
}
  1. 修改生产者中的代码
package com.guo.simple;

import com.guo.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

//生产者
@Component
public class SimplePublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        System.out.println("SimplePublisher...");
        //rabbitTemplate.convertAndSend("","simpleQueue","简单模式");
        rabbitTemplate.convertAndSend("","simpleQueue",new User("张三","123"));
    }
}

  1. 修改消费者中的代码
package com.guo.simple;

import com.guo.pojo.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//消费者
@Component
public class SimpleConsumer {

//    @RabbitListener(queues = "simpleQueue")
//    public void receive(String content){
//        System.out.println("SimpleConsumer...");
//        System.out.println("来自SimplePublisher的消息:"+content);
//    }

    @RabbitListener(queues = "simpleQueue")
    public void receive(User user){
        System.out.println("SimpleConsumer...");
        System.out.println("来自SimplePublisher的消息:"+user);
    }
}
  1. 运行测试类即可!

5.4 Work 工作队列

一个生产者,一个默认的交换机,一个队列,两个消费者

结构图
在这里插入图片描述

1)创建配置类,用于创建队列对象

package com.guo.work;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkQueueConfig {
    
    @Bean
    public Queue work(){
        return new Queue("workQueue");
    }
}

2)创建生产者

package com.guo.work;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WorkQueueProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(){
        System.out.println("WorkQueueProducer");
        rabbitTemplate.convertAndSend("workQueue","工作队列模式");
    }
}

3)创建消费者,本案例创建两个消费者

package com.guo.work;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkQueueCustomer_01 {
    
    @RabbitListener(queues="workQueue")
    public void receive(String content){
        System.out.println("WorkQueueCustomer_01:"+content);
    }
}
package com.guo.work;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkQueueCustomer_02 {

    @RabbitListener(queues="workQueue")
    public void receive(String content){
        System.out.println("WorkQueueCustomer_02:"+content);
    }
}

4)在测试类中添加对象和方法进行测试

@Autowired
private WorkQueueProducer workQueueProducer;

@Test
public void testWorkQueueProducer(){

    for (int i = 0; i<100; i++){
        workQueueProducer.send();
    }
}

5.5 Publish/Subscribe 发布订阅模式

一个生产者,一个交换机,两个队列,两个消费者

结构图
在这里插入图片描述

使用该模式需要借助交换机,生产者将消息发送到交换机,再通过交换机到达队列.

有四种交换机:direct/topic/headers/fanout,默认交换机是direct,发布与订阅的实现使用第四个交换器类型fanout

使用交换机时,每个消费者有自己的队列,生产者将消息发送到交换机(X),每个队列都要绑定到交换机

本例中:

创建2个消息队列

创建一个fanout交换机对象

Bind交换机和队列

1)创建配置类

package com.guo.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    //创建两个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanoutQueue1");
    }

    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanoutQueue2");
    }

    //创建一个交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    //将两个队列绑定到交换机上
    @Bean
    public Binding bindingFanoutQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutQueue2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

2)创建生产者

package com.guo.fanout;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        System.out.println("FanoutProducer");
        //第一个参数是交换机的名称 ,第二个参数是routerKey 这里设置为空字符串即可,第三个参数是要发送的消息
        rabbitTemplate.convertAndSend("fanoutExchange","","发布/订阅");
    }
}

3)创建消费者,本案例创建两个消费者

package com.guo.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutCustomer_01 {

    @RabbitListener(queues = "fanoutQueue1")
    public void receive(String content){
        System.out.println("FanoutCustomer_01:"+content);
    }
}
package com.guo.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutCustomer_02 {

    @RabbitListener(queues = "fanoutQueue2")
    public void receive(String content){
        System.out.println("FanoutCustomer_02:"+content);
    }
}

4)在测试类中添加对象和方法进行测试

@Autowired
private FanoutProducer fanoutProducer;

@Test
public void testFanoutProducer(){
    fanoutProducer.send();
}

5.6 Routing 路由模式

一个生产者,一个交换机,两个队列,两个消费者

结构图
在这里插入图片描述

生产者将消息发送到direct交换机(路由模式需要借助直连交换机实现),在绑定队列和交换机的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。

本例中:

创建2个消息队列

创建一个direct交换机对象

Bind交换机和队列

1)创建配置类

package com.guo.direct;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    @Bean
    public Queue directQueue1(){
        return new Queue("directQueue1");
    }

    @Bean
    public Queue directQueue2(){
        return new Queue("directQueue2");
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    @Bean
    public Binding bingDirectQueue1(){
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("zhangsan");
    }

    @Bean
    public Binding bingDirectQueue2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("lisi");
    }

}

2)创建生产者

package com.guo.direct;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private DirectExchange directExchange;

    public void send(){
        System.out.println("DirectProducer");
        rabbitTemplate.convertAndSend(directExchange.getName(),"zhangsan","zhangsanContent");
        rabbitTemplate.convertAndSend(directExchange.getName(),"lisi","lisiContent");
    }
}

3)创建两个消费者

package com.guo.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectCustomer_01 {

    @RabbitListener(queues = "directQueue1")
    public void receive(String content){
        System.out.println("DirectCustomer_01:"+content);
    }
}
package com.guo.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectCustomer_02 {

    @RabbitListener(queues = "directQueue2")
    public void receive(String content){
        System.out.println("DirectCustomer_02:"+content);
    }
}

4)在测试类中添加对象和方法进行测试

@Autowired
private DirectProducer directProducer;

@Test
public void testDirectProducer(){
    directProducer.send();
}

5.7 Topic 主题模式

一个生产者,一个交换机,两个队列,两个消费者

结构图
在这里插入图片描述

又称通配符模式(可以理解为模糊匹配,路由模式相当于精确匹配)

使用直连交换机可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由。

在消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源。这时候可以使用topic交换机。

使用主题交换机时不能采用任意写法的路由键,路由键的形式应该是由点分割的有意义的单词。例如"goods.stock.info"等。路由key最多255字节。

*号代表一个单词

#号代表0个或多个单词

1)创建配置类

package com.guo.topic;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {

    @Bean
    public Queue topicQueue1(){
        return new Queue("topicQueue1");
    }

    @Bean
    public Queue topicQueue2(){
        return new Queue("topicQueue2");
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding bingTopicQueue1(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("wangwu.*");
    }

    @Bean
    public Binding bingTopicQueue2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zhaoliu.#");
    }

}

2)创建生产者

package com.guo.topic;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange topicExchange;

    public void send(){
        System.out.println("TopicProducer");
 rabbitTemplate.convertAndSend(topicExchange.getName(),"wangwu.abc","wangwuContent");    rabbitTemplate.convertAndSend(topicExchange.getName(),"zhaoliu.xyz.qwer","zhaoliuContent");
    }
}

3)创建两个消费者

package com.guo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicCustomer_01 {

    @RabbitListener(queues = "topicQueue1")
    public void receive(String content){
        System.out.println("TopicCustomer_01:"+content);
    }
}
package com.guo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicCustomer_02 {

    @RabbitListener(queues = "topicQueue2")
    public void receive(String content){
        System.out.println("TopicCustomer_02:"+content);
    }
}

4)在测试类中添加对象和方法进行测试

@Autowired
private TopicProducer topicProducer;

@Test
public void testTopicProducer(){
    topicProducer.send();
}

5.8手动Ack

RabbitMQ中的Ack: 主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除,spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除队列中的消息,这样会存在一些问题:如果消费者处理消息需要较长时间,或者在消费消息的时候出现异常,都会出现问题,手动Ack可以避免消息重复消费。

5.8.1 原生方式测试

1.以简单模式为例,只需要修改消费者即可,启动生产者进行测试

package com.guo.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消费者
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("消费者启动...");

        //创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("192.168.25.134");
        factory.setPort(5672);//浏览器访问的是:15672

        //获取连接
        Connection connection = factory.newConnection();

        //获取Channel
        Channel channel = connection.createChannel();

        //回调,创建Consumer
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //获取消息
                    System.out.println(new String(body, "UTF-8"));
                    //模拟异常
                    int i = 1/0;
                    //手动ack
                    //参数1:deliveryTag:投递过来消息的标签,由MQ打的,从1开始,1,2,3...
        			//参数2:multiple:是否批量确认之前已经消费过的消息,一般为false
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {//捕获所有异常
                    //第三个参数:requeue -> true表示重新放入队列,false -> 放弃该消息
                    //channel.basicNack(envelope.getDeliveryTag(), false, true);
                    //抛弃此条消息
                    channel.basicNack(envelope.getDeliveryTag(), false, false);
                    e.printStackTrace();
                    //关闭,否则一直循环当前操作
                    connection.close();
                }
            }
        };

        //获取消息
        //1.队列名称(从哪个队列中获取消息)
        //2.true表示自动ack(消费完消息之后,自动告诉rabbitmq)
        //  false表示手动ack,需要自己收到调用方法
        //3.回调
        channel.basicConsume("helloworld",false,defaultConsumer);

        //由于channel会回调DefaultConsumer中的handleDelivery方法,直接关闭会报错,可以在关闭之前调用Thread.sleep();
        //或者可以不调用关闭方法(消费方一般不关闭,有消息过来可以及时处理)
        //channel.close();
        //connection.close();
    }
}
5.8.2 SpringBoot中测试

1.在 application.properties 中添加配置

#配置手动Ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

2.在之前测试的任意模式中添加 AckCustomer 演示

package com.guo.simple;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

//消费者:监听队列中的消息,然后获取消息
@Component
@RabbitListener(queues = "simpleQueue")
public class AckCustomer {

    //处理消息(获取消息,进一步操作)
    @RabbitHandler
    public void receive(String message,Channel channel,Message msg){
        System.out.println("AckCustomer...");
        //获取消息内容
        if(message!=null && message.length()>0){
            try {
                System.out.println("获取消息:"+message);
                int i = 1/0;
                //手动确认
                long deliveryTag = msg.getMessageProperties().getDeliveryTag();
                System.out.println("deliveryTag:"+deliveryTag);
                channel.basicAck(deliveryTag,false);
            } catch (Exception e) {//捕获所有异常
                System.out.println("消息处理...");
                try {
                    //重新放入队列中
                    //channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);
                    //放弃消息
                    channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
                    e.printStackTrace();
                    //关闭
                    channel.getConnection().close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }

        }else{
            System.out.println("没有消息");
        }
    }
}

注释或打开异常进行测试即可!

六、事务与confirm机制


6.1 消息的可靠性

思考?

1.如果消息已经到达RabbitMQ,RabbitMQ宕机了,消息是不是就丢失了?

可以使用Queue的持久化机制

2.消费者在消费消息的时候,程序执行到一半,消费者宕机了怎么办?

可以手动Ack

3.生产者发送消息时,由于网络问题,导致消息没有发送到RabbitMQ怎么办?

RabbitMQ提供了事务操作和Confirm以及Return机制

保证消息的传递可以使用RabbitMQ中的事务,事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息,但是事务操作效率太低。

RabbitMQ中除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

在这里插入图片描述

6.2 RabbitMQ事务

RabbitMQ的事务是对AMQP协议的实现,通过设置Channel 的模式来完成,具体操作如下:

channel.txSelect();  //开启事务
// ....本地事务操作
channel.txCommit();  //提交事务
channel.txRollback(); //回滚事务

特别说明:RabbitMQ的事务机制是同步操作,会极大的降低RabbitMQ的性能。

6.3 Confirm机制

由于RabbitMQ的事务性能的问题,于是就又推出了发送方确认模式。

6.3.1 创建工具类
package com.guo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtils {
    /**
     * 1. 创建连接工厂(ConnectionFactory)
     * 2. 创建连接 (Connection)
     * 3. 创建通道  (Channel)
     Connection conn = connectionFactory.newConnection();
     Channel channel = conn.createChannel();
    */
    private static ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("192.168.25.134");
    }


    public static Connection getConnection() {
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }

    public static void close(Channel channel, Connection connection) {
        try {
            if(null != channel) {
                channel.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}
6.3.2 单条消息确认
channel.confirmSelect(); //开启发送方确认模式

1.在RabbitMq控制台页面,创建一个direct类型的交换机,再创建一个队列并绑定

channel.waitForConfirms(); //对于单条消息的确认,true表示成功

package com.guo.confirm;

import com.guo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ConfirmTest1 {
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //开启confirm
        channel.confirmSelect();
        //发送消息(前提:队列已经通过routingKey绑定到该交换机上)
        channel.basicPublish("myExchange","my",null,"消息内容".getBytes());

        //判断消息到达交换机,true表示到达,若没有交换机则系统会直接报错
        if(channel.waitForConfirms()){
            System.out.println("消息已到达交换机");
        }
        
        //设置延迟关闭,否则可能无法显示相关信息
        Thread.sleep(5000);

        //关闭
        RabbitMQUtils.close(channel,connection);
    }
}
6.3.2 批量消息确认

channel.waitForConfirmsOrDie(); //批量消息确认,如果有一条消息没有发送成功,会抛出异常

package com.guo.confirm;

import com.guo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ComfirmTest2 {
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //开启confirm
        channel.confirmSelect();
        //发送消息(前提:队列已经通过routingKey绑定到该交换机上)
        for (int i = 0; i < 10; i++) {
            //判断
            if(i == 5){
                //前提是系统中没有名为myExchange2的交换机
                channel.basicPublish("myExchange2", "my", null, ("消息内容"+ i).getBytes());
                continue;
            }
            //发送消息
            channel.basicPublish("myExchange", "my", null, ("消息内容"+ i).getBytes());
        }

        //确定批量操作是否成功
        //当发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常
        channel.waitForConfirmsOrDie();
        
        //设置延迟关闭,否则可能无法显示相关信息
        Thread.sleep(5000);

        //关闭
        RabbitMQUtils.close(channel,connection);
    }
}
6.3.3 回调方式确认
package com.guo.confirm;

import com.guo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;

public class ComfirmTest3 {
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //开启confirm
        channel.confirmSelect();

        //发送消息(前提:队列已经通过routingKey绑定到该交换机上)
        channel.basicPublish("myExchange","my",null,"消息内容".getBytes());

        //开启异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("成功达到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有到达交换机");
            }
        });
        
        //设置延迟关闭,否则可能无法显示相关信息
        Thread.sleep(5000);

        //关闭
        RabbitMQUtils.close(channel,connection);
    }
}

6.4 Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue

而且exchange是不能持久化消息的,queue是可以持久化消息

采用Return机制来监听消息是否从exchange送到了指定的queue中

开启Return机制,在发送消息时,需要指定mandatory为true

package com.guo.confirm;

import com.guo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnTest {
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //开启confirm
        channel.confirmSelect();

        //发送消息
        //注意:指定mandatory参数为true,设置没有绑定的routingkey
        channel.basicPublish("myExchange","my2",true,null,"消息内容".getBytes());

        //开启异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("成功达到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有到达交换机");
            }
        });

        //没有到达队列的时候触发,设置错误的路由的key的名称进行测试
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] body) throws IOException {

                System.out.println("没有到达队列");
                //如果消息没有到达队列,我们可以获取到消息的相关信息
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("body:" + new String(body));
                //然后人工干预,考虑编写一个补偿机制,把消息保存到redis或者mysql中,保证消息不丢失
                //....
            }
        });
        
        //设置延迟关闭,否则可能无法显示相关信息
        Thread.sleep(5000);

        //关闭
        RabbitMQUtils.close(channel,connection);
    }
}

6.5 SpringBoot实现

1.在 application.properties 中添加配置

spring.rabbitmq.publisher-confirm-type 对应值的说明

  • NONE :禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • SIMPLE:两种效果
  1. 和CORRELATED值一样会触发回调方法
  2. 在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
# 配置开启Confirm和Return
spring.rabbitmq.publisher-confirm-type: simple
spring.rabbitmq.publisher-returns: true

2.创建配置类

package com.guo.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;


@Configuration
public class PublisherConfirmAndReturnConfig implements
        RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }


    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("已到达交换机");
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("未到达队列");
        //考虑人工干预,获取消息信息进行保存
        String exchange = returnedMessage.getExchange();
        String routingKey = returnedMessage.getRoutingKey();
        Message message = returnedMessage.getMessage();

        System.out.println("exchange:" + exchange);
        System.out.println("routingKey:" + routingKey);
        System.out.println("message:" + new String(message.getBody()));
    }
}

3.修改生产者发送消息方法中的routingKey,然后启动测试类测试即可!

七. 死信队列


死信队列并不是一个特殊的队列,只是一个普通的队列,只是我们把他们取名叫做死信队列。

死信队列的设计是在某个队列的头信息中设定x-dead-letter-exchange (死信交换机)和x-dead-letter-routing-key(死信路由键)即可。关联到一个绑定到某个死信交换机的队列上。然后给该队列指定过期时间或者指定的消息的过期时间,那么该消息到期后会自动到达死信队列中。

7.1 场景

场景一:未支付订单在规定的时间取消。实现的方式为,将订单消息放入到一个队列中,并指定其过期时间。当过期时间到了之后,就进入到了死信队列,那么可以直接在死信队列的消费端取出对应的消息即可。

场景二:某条消息在消费端曾多次尝试消费,但是均未消费成功,那么就进入死信队列,让人工干预。

7.2 测试

提前创建RabbitMQUtils工具类并测试即可

package com.guo.dead;

import com.guo.utils.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.HashMap;
import java.util.Map;

public class DeadQueueTest {

    //死信交换机
    private static final String dead_letter_exchange = "dead_letter_exchange";
    //死信路由键
    private static final String dead_letter_routing_key = "dead_letter_routing_key";
    //死信队列
    private static final String dead_letter_queue = "dead_letter_queue";

    private static final String people_exchange = "people_exchange";
    private static final String people_routing_key = "people_routing_key";
    private static final String people_queue = "people_queue";

    public static void main(String[] args) throws Exception{
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 创建一个死信的交换机
        channel.exchangeDeclare(dead_letter_exchange, "direct");
        // 创建死信队列
        channel.queueDeclare(dead_letter_queue, true, false, false, null);
        // 将死信队列绑定到死信交换机,路由键为 "dead_letter_routing_key"
        channel.queueBind(dead_letter_queue, dead_letter_exchange, dead_letter_routing_key);

        //设置队列参数
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", dead_letter_exchange);
        arguments.put("x-dead-letter-routing-key", dead_letter_routing_key);

        //创建当前交换机,队列以及路由键
        channel.exchangeDeclare(people_exchange, "direct");
        //最后一个参数是当前队列的属性
        channel.queueDeclare(people_queue, true, false, false, arguments);
        channel.queueBind(people_queue, people_exchange, people_routing_key);

        //设置消息的过期时间,单位:毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("15000").build();

        //发送消息
        channel.basicPublish(people_exchange, people_routing_key, properties, "dead_message".getBytes());

        RabbitMQUtils.close(channel, connection);
    }
}

八. 避免消息重复消费


8.1 幂等性

所有的消息中间件都会存在这样一个问题,那就是消息的重复消费问题,所以我们必须做幂等性设计,所谓幂等性设计就是,一条消息无论消费多少次所产生的结果都是相同的。

重复消费消息,是对非幂等性操作造成问题,重复消费消息的原因是因为消费者没有给RabbitMQ一个Ack

在这里插入图片描述

为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

id-0(正在执行业务)

id-1(执行业务成功)

如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0表示有一个消费者正在执行业务逻辑,如果是1,表示消费者已经消费完毕,直接ack。

极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

8.2 解决方案

方案一:为每条消息生成全局唯一ID,将ID和业务数据放在同一个事务中,每次消费消息之后都将ID在表中插入一条数据,每次消费之前先查询ID是否存在,如果不存在就执行对应的逻辑;如果存在则直接确认。

方案二(推荐):利用redis+数据库的方案来实现幂等性的设计,实现的思路与redis的缓存击穿方案类似;当插入数据的时候,将唯一ID同时插入数据库,然后放入到redis中,设置过期时间,每次从redis中判断。

8.3 在springboot中测试

8.3.1 导入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
8.3.2 配置application.yml
#配置rabbitmq
spring:
  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.131
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual #表示手动Ack
    #confirm和return配置
    publisher-confirm-type: simple
    publisher-returns: true

  #配置redis
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    lettuce:
      pool:
        max-active: 8
8.3.3 这里以简单模式演示

1.编写配置类

package com.guo.config;

import org.ietf.jgss.MessageProp;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author yangl
 * @version 1.0
 * @date 2022/12/1 16:55
 */

@Configuration
public class SimplestConfig {

    @Bean
    public Queue simplest(){
        return new Queue("simplestQueue");
    }

    //返回MessagePostProcessor,获取消息id
    @Bean
    public MessagePostProcessor getMessagePostProcessor(){

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                return message;
            }

            @Override
            public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
                //获取消息参数
                MessageProperties messageProperties = message.getMessageProperties();
                //获取消息id
                String correlationDataId = ((CorrelationData) correlation).getId();
                System.out.println("配置类中的消息id:" + correlationDataId);
                messageProperties.setCorrelationId(correlationDataId);

                return message;
            }
        };

        return messagePostProcessor;
    }
}

2.编写生产者

package com.guo.simplest;

import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 生产者
 */
@Component
public class SimplestProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MessagePostProcessor messagePostProcessor;

    public void send(){
        System.out.println("SimplestProducer生产者");
        //发送消息
        //参数一:队列名称(或是路由规则)
        //参数二:消息内容
        CorrelationData correlationData = new CorrelationData();
        String id = correlationData.getId();
        System.out.println("生产者中的消息id:" + id);

        rabbitTemplate.convertAndSend("","simplestQueue","简单队列",messagePostProcessor,correlationData);
    }

}

3.编写消费者

package com.guo.simplest;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 *
 * 消费者
 */

@Component
public class SimplestCustomer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //监听队列中的消息
    @RabbitListener(queues = "simplestQueue")
    public void recrive(String content, Channel channel, Message message) throws Exception {
        //获取消息id
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("消费者中的消息id:" + correlationId);
        //存到redis中,使用setIfAbsent方法,防止死锁
        Boolean flag = stringRedisTemplate.opsForValue()
                .setIfAbsent(correlationId, "0", 600, TimeUnit.SECONDS);
        //判断
        if(flag){
            System.out.println("SimplestCustomer消费者");
            System.out.println("接收来自simplestQueue中的消息:" + content);
            //业务执行完毕了,使用set方法修改key的值
            stringRedisTemplate.opsForValue()
                    .set(correlationId, "1", 600, TimeUnit.SECONDS);
            //手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            return;
        }

        //如果key没有设置成功,意味着redis中已经有这个消息的记录了
        //判断,1表示已经执行完毕
        if("1".equalsIgnoreCase(stringRedisTemplate.opsForValue().get(correlationId))){
            System.out.println("消息已被消费过,直接Ack");
            //直接手动Ack就可以
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }

}

4.测试类测试即可

package com.guo;

import com.guo.simplest.SimplestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Springboot17RabbitmqApplicationTests {

	@Test
	void contextLoads() {
	}

	//简单模式
	@Autowired
	private SimplestProducer simplestProducer;
	@Test
	public void testSimplestProducer(){
		simplestProducer.send();
	}
}

测试成功后,到redis中查看即可,此时id已经生成,并且值为1,然后把SimplestCustomer消费者类中的第一次手动Ack的方法注释掉,再执行,这是消息已经消费掉,但是rabbitmq队列中依然会有消息,然后再启动该工程的启动类,会加载消费者自动调用监听队列的方法,则执行判断redis中值为1的方法,直接手动Ack.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/542018.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端学习--Vue(2)

一、Vue简介 1.1 概念 Vue是一套用于构建用户界面的前端框架 框架&#xff1a;现成解决方案&#xff0c;遵守规范去编写业务功能 指令、组件、路由、Vuex、vue组件库 1.2 特性 数据驱动视图 vue连接页面结构和数据&#xff0c;监听数据变化&#xff0c;自动渲染页面结构…

Vue--》Vue 3 路由进阶——从基础到高级的完整指南

目录 Vue3中路由讲解与使用 路由的安装与使用 路由模式的使用 编程式路由导航 路由传参 嵌套路由 命名视图 重定向与别名 Vue3中路由讲解与使用 Vue 路由是 Vue.js 框架提供的一种机制&#xff0c;它用于管理网页上内容的导航。Vue 路由可以让我们在不刷新页面的情况下…

【ChatGPT】通过 ChatGPT 用文字描述来绘制插画

点击上方“独立开发者杂谈” 喜欢本文&#xff0c;请置顶或星标 使用文字描述绘制插画具有以下好处 无需绘画技巧&#xff0c;体验与AI结合&#xff0c;创意灵活性&#xff0c;节省时间。 使用 Figma 工具 Figma &#xff08;https://www.figma.com&#xff09;是一款流行的设计…

Linux:iptables防火墙

Linux&#xff1a;iptables防火墙 一、iptables防火墙概述1.1 iptables防火墙1.2 netfilter/iptables 关系 二、Linux防火墙基础2.1 iptables的表、链结构2.2 数据包控制的匹配流程 三、编写防火墙规则3.1 基本语法、控制类型3.2 添加、查看、删除规则等3.3 规则的匹配条件3.3.…

黑马Redis原理篇

黑马Redis原理篇 1、数据结构1.1、动态字符串SDS1.2、IntSet1.3、Dict1.4、ZipList1.5、QuickList1.6、SkipList1.7、RedisObject1.8、五种数据结构1. String&#xff08;小EMBSTR,大RAW (SDS),少量整数INT&#xff09;2. List&#xff08;Redis3.2之后使用QuickList实现&#…

CSDN周赛52期及53期浅析

好久没写题解了&#xff0c;没办法&#xff0c;C站的题目更新的速度太慢了&#xff0c;重复考过去的老题已经不能再进步了。52期还混了个名次&#xff0c;总要写篇文章完成一下任务。而53期就惨了去了&#xff0c;三道选择题全蒙错了。 反正我个人觉得在现在C站的OJ环境里考选…

手撸鉴权系统——SpringBoot2+Vue2(一定惊喜满满,万字长文)

初衷&#xff1a; 一直不太理解整个前后端的鉴权&#xff0c;跨域等问题&#xff0c;抽空两个晚上整理出万字文章&#xff0c;也是对于自己的一个交代&#xff0c;现在共享出来&#xff0c;希望大家也能受益&#xff0c;将使用过程在这里一一详述&#xff0c;还是多说一句&…

【图论(1)】图的存储、遍历与拓扑排序

5月16-5月18日学习内容 文章目录 一、图是什么二、图的存储1、直接存边法2、邻接矩阵法3、邻接表法4、链式前向星时间复杂度分析 三、图的遍历DFSBFS 四、拓扑排序&#xff08;今天实在没时间写了&#xff0c;明天写&#xff09; 一、图是什么 这是oi.wiki给的定义 简而言之…

ENVI制图——土地利用专题图

ENVI制图 0 前言1 数据准备2 数据预处理2.1 先在arcmap中把数据导出为tif格式&#xff0c;然后加入envi&#xff08;别问我为什么要先用arcmap处理&#xff0c;因为这是envi制图教程 :( &#xff09;2.2 直接把tif格式的数据拖进envi&#xff0c;此时看不到数据类别&#xff0c…

低代码+AI:助力企业数字化转型,揭示未来发展趋势

问个问题&#xff1a;你有没有亲自去了解、使用近期爆火的生成式人工智能——ChatGPT&#xff1f; 如果答案是否定的&#xff0c;作为企业数字化转型的前沿先锋小Z&#xff0c;建议你一定要去亲自尝试它。虽然人工智能技术已经发展多年&#xff0c;但OpenAI所发布的ChatGPT&…

瑞吉外卖 - 新增菜品功能(16)

某马瑞吉外卖单体架构项目完整开发文档&#xff0c;基于 Spring Boot 2.7.11 JDK 11。预计 5 月 20 日前更新完成&#xff0c;有需要的胖友记得一键三连&#xff0c;关注主页 “瑞吉外卖” 专栏获取最新文章。 相关资料&#xff1a;https://pan.baidu.com/s/1rO1Vytcp67mcw-PD…

(十)Spring源码阅读:finishBeanFactoryInitialization方法

一、概述 该方法是实例化bean的主要方法&#xff0c;它实现的主要流程如下图所示。 这是方法执行的主要流程图。 具体执行流程如下图&#xff0c;我们将按照具体执行流程一个个介绍具体的方法。 二、主要方法 finishBeanFactoryInitialization内部调用了getBean方法。 getBea…

AIGC+机器人=具身智能?硅谷最酷的两个男人不谋而合预演“下个浪潮”

收集整理|小鱼新的AI题材层出不穷&#xff0c;这次轮到“机器人AI"融合而成的具身智能概念。 “硅谷钢铁侠"马斯克和热爱黑色皮衣的"显卡教父”黄仁勋均作出积极表态&#xff0c;可谓不谋而合。 当地时间5月16日&#xff0c;特斯拉2023年年度股东大会召开&…

unity DoTween动画插件的使用(最全)

DOTween是最常用的动画插件之一,比使用Unity自带脚本写动画,方便很多。 插件获取 untiy商店插件地址 https://assetstore.unity.com/packages/tools/animation/dotween-hotween-v2-27676 DOTween商城地址,开发文档 http://dotween.demigiant.com 导入和设置 DOTween首次使…

离散数学_九章:关系 —— 拓扑排序

拓扑排序 背景知识相容一个引理什么是拓扑排序 拓扑排序算法&#xff08;伪代码&#xff09;几个实例例1例2 假设一个项目由20个不同的任务构成。某些任务只能在其他任务结束之后完成。如何找到关于这些任务的顺序&#xff1f; 为了对这个问题建模&#xff0c;我们在任务的集合…

mysql强制修改mysql数据库密码(无需原密码)

1.创建新记事本new_password.txt 定位到记事本绝对路径&#xff0c;我直接放桌面 C:\Users\bao123\Desktop 为用户为 root 新密码 123456为例 ALTER user rootlocalhost identified by 123456; 为用户为 mytest 新密码 88888888为例 ALTER user mytestlocalhost identifi…

opencv_c++学习(十三)

一、创建滑动条 createTrackbar(const String & trackbarname, const String & winname, int* value, int count, onChange , TrackbarCallback 0, void * userdata 0)trackbarname:滑动条的名称。 winname:创建滑动条窗口的名称。 value:指向整数变量的指针&#xff0…

Harbor在arm架构下亲测编译成功

先安装好Docker,docker-compose 华为云arm架构安装Docker arm架构安装docker-compose Harbor官网: https://github.com/goharbor/harbor/releases Harbor官方没有提供arm架构的包,需要自己编译,我编译好的版本是:harbor-1.9.1 大家可以按我的博客,自己去编译,或评论留言给…

Vivado 下 LED 流水灯实验

目录 Vivado 下 LED 流水灯实验 1、实验简介 2、实验环境 3、实验原理 3.1、LED硬件电路 3.2、程序设计 4、Vivado 工程 4.1、创建工程 4.2、编写流水灯的 verilog代码 1. 点击 Project Manager 下的 Add Sources 图标&#xff08;或者使用快捷键 AltA&#xff09;。…

基于协同过滤的推荐算法

基于协同过滤的推荐算法 基于协同过滤&#xff08;CF&#xff09;的推荐基于近邻的协同过滤基于用户的协同过滤&#xff08;User-CF&#xff09;基于物品的协同过滤&#xff08;Item-CF&#xff09;User-CF 和 Item-CF 的比较基于协同过滤的推荐优缺点基于模型的协同过滤隐语义…