RabbitMQ

news2025/1/23 6:05:12

RabbitMQ

1.MQ引言

MessageQueue: 消息队列

模块之间的耦合度多高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。

1587650344256

1.1什么是MQ

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

1.2MQ有哪些

主要的MQ产品包括:RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ、Kafka、IBM WebSphere 等。

市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal公司维护。

RabbitMQ严格的遵循AMQP协议,一种高级消息队列协议,帮助我们在进程之间传递异步消息。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等

1.3不同MQ的特点

特性ActiveMqRabbitMqRocketMQKafka
成熟度成熟成熟比较成熟成熟的日志领域
时效性微秒级毫秒级毫秒级
社区活跃度
单机吞吐量万级,吞吐量比RocketMQ和Kafka要低了一个数量级万级,吞吐量比RocketMQ和Kafka要低了一个数量级10万级,RocketMQ也是可以支撑高吞吐的一种MQ10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topictopic从几十个到几百个的时候,吞吐量会大幅度下降所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
可用性高,基于主从架构实现高可用性高,基于主从架构实现高可用性非常高,分布式架构非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据经过参数优化配置,可以做到0丢失经过参数优化配置,消息可以做到0丢失
功能支持MQ领域的功能极其完备基于erlang开发,所以并发能力很强,性能极其好,延时很低MQ功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
优劣势总结非常成熟,功能强大,在业内大量的公司以及项目中都有应用偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现维护越来越少,几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用erlang语言开发,性能极其好,延时很低;吞吐量到万级,MQ功能比较完备而且开源提供的管理界面非常棒,用起来很好用社区相对比较活跃,几乎每个月都发布几个版本分在国内一些互联网公司近几年用rabbitmq也比较多一些但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集

1.4RabbitMQ介绍

RabbitMQ is the most widely deployed open source message broker.

为什么最受欢迎,应用最广泛?

1.使用AMQP协议 支持很多业务场景 比如 点对点 交换机路由 发布订阅模式 能适用很多业务场景

  1. 使用 erlang 语言 这个语言的特点 叫做面向并发编程 自身并发能力强 对socket 编程 支持友好
  2. 和spring 无缝整合
  3. 对数据一致性 数据丢失 错误处理 非常友好 可以不丢失任何数据 对错误数据恢复

https://rabbitmq.com/

rabbit 是部署最广泛的消息中间件

2.RabbitMQ安装

2.1下载

官网下载地址:https://www.rabbitmq.com/download.html

image-20200902201442656

使用 docker-compose 安装 启动服务 进入服务内部 启动web 访问

cd /opt
mkdir docker_rabbitmq
cd docker_rabbitmq/
vim docker-compose.yml
# -d 后台作为守护进程启动
docker-compose up -d

docker-compose.yml 文件内容如下

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672     #rabbitmq  服务的端口号
      - 15672:15672   # rabbitmq 图形化界面的端口号
    volumes:
      - ./data:/var/lib/rabbitmq

打开浏览器:http://192.168.174.128:15672 用户名 guest 密码 guest

2.2RabbitMQ架构【重点

2.2.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange
  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息
  • Exchange - 交换机:和生产者建立连接并接收生产者的消息
  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
  • Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单的架构图
1587703812776

2.2.2 RabbitMQ的完整架构图

完整架构图
1587705504342

image-20200902200329661

image-20210322181722088

image-20210322181923032

查看图形化界面并创建一个Virtual Host

创建一个全新test用户,全新的Virtual Host,并且将test用户设置上可以操作/test的权限

image-20200905171744241

3.RabbitMQ的使用【重点

3.1 RabbitMQ的通讯方式

通讯方式
1589104462867
1587707774754
image-20200905173415121
image-20200909105131707

3.2java连接rabbitmq

3.2.1创建maven 项目

image-20200905172528262

3.2.2导入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
3.2.3创建连接工具类
public class RabbitMQUtil {
    public static Connection getConnection(){
        // 创建连接mq 的连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 rabbitmq 的主机
        factory.setHost("192.168.5.201");
        // 设置端口号
        factory.setPort(5672);
        // 设置用户名
        factory.setUsername("test");
        // 设置密码
        factory.setPassword("test");
        // 设置连接哪个虚拟机
        factory.setVirtualHost("/test");
        // 创建Connection
        Connection conn = null;
        try {
            conn = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 返回
        return conn;
    }

    public static void main(String[] args) {
        System.out.println(getConnection());
    }
}

image-20200905172646445

3.3 Hello-World

一个生产者,一个默认的交换机,一个队列,一个消费者

image-20200905172734283

创建生产者,创建一个channel,发布消息到默认exchange,指定路由规则。

创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

P:生产者 也就是要发送消息的程序

C:消费者 消息的接受者 会一直等待消息的到来

queue:消息队列 图中红色部分 类似一个邮箱 可以缓存消息 生产者向其中投递消息 消费者从其中取消息

package com.glls._1helloworld;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;

public class HelloWorldTest {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建Channel通道
        Channel channel = connection.createChannel();
        //3. 发布消息到exchange,同时指定路由的规则
        String msg = "Hello-World!";
        
        // 发布消息
        // 参数1:指定exchange 交换机 ,当前模式 是  生产者 ---队列----消费者 模式   
        // 参数2:指定路由的规则,使用具体的队列名称   队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。  
        // 比如:MessageProperties.PERSISTENT_BASIC 表示持久化消息
        // 参数4:指定发布的具体消息,byte[]类型
        //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
		//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
		//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
        
        channel.basicPublish("","HelloWorld",null,msg.getBytes());
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("生产者发布消息成功!");
        //4. 释放资源
        channel.close();
        connection.close();
    }


    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明队列-HelloWorld
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外conn.close()-当前队列会被自动删除,当前队列只能被一个消费者消费
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("HelloWorld",true,false,false,null);

	    /**
         * DefaultConsumer 是  Consumer 接口的实现类    接口中的定义的方法如下 
           这个不作为重点   了解即可
         * handleCancel:除了调用basicCancel的其他原因导致消息被取消时调用。
         * handleCancelOk:basicCancel调用导致的订阅取消时被调用。
         * handleConsumeOk:任意basicComsume调用导致消费者被注册时调用。
         * handleDelivery:消息接收时被调用。
         * handleRecoverOk:basic.recover-ok被接收时调用
         * handleShutdownSignal:当Channel与Conenction关闭的时候会调用。
         */
	    
        //4. 开启监听Queue
        //简易版自定义 Consumer
		//只需要重写DefaultConsumer 的handleDelivery方法即可取出消息,额外属性新增属性等操作
        DefaultConsumer consume = new DefaultConsumer(channel){
        // 重写DefaultConsumer 的handleDelivery 的方法  方法中获取消息
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("接收到消息:" + new String(body,"UTF-8"));
        }
    };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK 开启自动确认机制 (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 消费回调接口
        channel.basicConsume("HelloWorld",true,consume);
        System.out.println("消费者开始监听队列!");
		// 不要让消费者线程 结束  否则看不到监听效果了   如果没有这行代码  下面不要关闭通道和连接
        System.in.read();  
        //5. 释放资源
        channel.close();    // 不建议关闭  通道和连接
        connection.close();
    }
}

点对点的模型应用场景:

比如在注册的时候发送短信验证就可以以消息队列的形式 调用短信服务接口

再比如签到送积分的功能 可以用此模型 向积分服务 发送请求

3.3.1参数细节说明-durable

使用consume 这行代码来说明

# 参数1  队列名称
# 参数2  队列是否持久化  注意 是队列持久化  不是 队列中的数据持久化  
# 当前设置为 false 启动消费者,在web页面会看到这个队列
channel.queueDeclare("HelloWorld",false,false,false,null);

image-20200905213651479

启动消费者,web页面看到这个HelloWorld 队列

此时 停止消费者,这个记录依然存在, 停止docker中的rabbit ,再重启,这条记录就没有了,

image-20200905214129170

image-20200905214152740

# 参数2  是否持久化  设置为 true
channel.queueDeclare("HelloWorld",true,false,false,null);

image-20200905214417798

此时 再停止 rabbit服务 重启rabbit 服务,这条记录依然存在,这就是 队列持久化,但是队列持久化 ,却不是队列中的数据持久化

要想让队列中的数据持久化,需要在生产者发布消息时 设置 队列中消息的属性

# 参数3  设置队列中消息的持久化  
# 设置队列持久化  队列中消息持久化,rabbit 服务重启,队列 和 队列中的消息依然存在
channel.basicPublish("","HelloWorld",MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
3.3.2参数细节说明-exclusive

参数3 是否独占队列 当前队列 只允许当前连接可用 其他连接不能使用这个队列 true 独占队列 false 不独占 一般不独占

或者说 参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) 一般不希望独占

#参数3 是否独占队列   一般不独占 设置为false ,也就是除了当前连接,其他消费者也可以 连接这个队列
channel.queueDeclare("HelloWorld",true,false,false,null);
#参数3 是否独占队列   如果设置为 true ,则当前队列 被当前连接独占 运行状态  如下
channel.queueDeclare("HelloWorld",true,true,false,null);

image-20200905215927328

此时 如果停掉 consumer 这个队列 就会消失, 另外需要注意一点 ,生产者 此时 就不能声明队列了,因为 生产者再声明队列 就和消费者中的队列 出现排他性问题 报异常 ,只有把 生产者中 声明 队列 //channel.queueDeclare(“HelloWorld”,true,true,false,null); 这行代码 注释掉, 才能正常发送 消息。

3.3.3参数细节说明-autoDelete

如果这个队列没有消费者在消费,队列自动删除

参数4

# 消费者 这行代码  设置 第四个 参数  没有消费者的时候   是否自动删除 true 自动删除 false 不自动删除
channel.queueDeclare("HelloWorld",true,false,true,null);

image-20200905225211583

此时 停止消费者线程 或者 关闭连接 ,这条记录会消失 ,即 删除了这个 队列

HelloWorld模型总结:生产者发送消息 到 消息队列 ,消费者监听消息队列 消息的变化,一旦有消息 ,消费者就去消费,这种模型比较简单,一个生产者 对应一个消费者,也是一些简单的业务用的比较多的场景,消息队列在中间 就类似一个邮箱 一个缓存,生产者把消息发送到消息队列,被消费者监听到 就去处理消息 依次来完成对应的业务操作。

这种模型是最简单的模型,可能应对不了某些特殊的场景。比如 消费者在处理某些消息时 因为业务逻辑的复杂 或者 消费者处理过慢,会造成消息队列中的消息 不断造成消息的堆积,所以 我们希望 生产者生产的消息 可以给更多的消费者消费 ,这样就提高了 消费者处理消息 的 效率 ,当然 我们要保证 不同的消费者处理的消息是不同的 不然会造成业务的重复处理, 这样 就可以 处理消息快一些 不堵塞消息队列。基于这种需求 就是下面的Work模型

3.4Work

一个生产者,一个默认的交换机,一个队列,两个消费者

work queues 也称为task queues ,任务模型 。

HelloWorld 模型的不足 :当消息处理比较耗时时,可能生产消息的速度会远远大于消息的消费速度,长此以往 消息堆积的越来越多,无法及时处理 此时 就可以考虑work 模型,让多个消费者绑定到同一个队列,共同消费队列中的消息。队列中的消息 一旦被消费 就会消失 因此 任务是不会被重复执行的。

image-20200905173141743

只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing

P:生产者 消息的发送者

C1:消费者 领取任务 并完成任务

C2: 消费者2 领取任务并完成任务

queue: 红色部分 队列

Qos: (Quality of Service)QoS 是消息的发送⽅(Sender)和接受⽅(Receiver)之间达成的⼀个协议

ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。

消费者指定Qoa和手动ack

package com.glls._2work;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 *  Work 模型: 按劳分配  能者多劳
 *
 * 现在有两个消费者:消费者1处理消息处理的慢2秒一个
 * 消费者2处理消息处理的快1秒一个 ,但是 在自动确认模式下
 *
 *      channel.basicConsume("Work",true,consumer);   Auto:ack  true  ,生产者发送了100条消息  会一下子全被 接受   在web 页面看不到被消费的过程
 *      而且 消费者轮流依次消费 没有出现谁消费的快 就多消费一点的情况
 *      如果改成 手动确认模式生产者发送的100 条消息 会逐渐被消费 在web页面能看到被消费的过程而且哪个消费者消费的快就会多消费
 *
 *      实际场景 我们希望 能者多劳 处理消息快的消费者 多处理一些
 *
 *      所以不建议使用消息的自动确认应该改为手动确认
 *
 */
public class WorkTest {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建Channel
        Channel channel = connection.createChannel();
        //3. 发布消息到exchange,同时指定路由的规则
        String msg = "Hello-Work!";
        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型
        for(int i=0;i<100;i++){
            channel.basicPublish("","Work",null,(i+msg).getBytes());
        }
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("生产者发布消息成功!");
        //4. 释放资源
        channel.close();
        connection.close();
    }


    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明队列-HelloWorld
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("Work",true,false,false,null);


        //1 指定当前消费者,一次消费多少个消息,没有过来的消息,还在队列中保存,这样设置,不会造成消息丢失
        //因为假如不指定一次消费一条消息就有可能有多条消息到达消费者此时消费者一旦宕机到达消费者的消息也就丢了
        // 所以消息从队列到消费者一次来一条免得过来多条消息在半路丢了
        channel.basicQos(1);    // 不要一次性的把消息都给消费者容易丢失一次给一条安全

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                // 参数1:long类型  标识队列中哪个具体的消息     
                // 参数2:boolean 类型 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
                // 处理完了消息  手动确认一下  队列再删除这个消息   这种机制保证消息永不丢失
                // 队列给消费者 一条消息  消费者收到消息 处理完了之后 手动确认,  确认了之后  队列才把消息删除 保证消息永不丢失
                // 而且 消费者 确认一个消息  队列发送一个消息  消费者确认的快  队列发送的快   能者多劳
             }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume("Work",false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }


    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明队列-HelloWorld
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("Work",true,false,false,null);

        //1 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
                //2. 手动ack
                // 参数1:long类型,标识队列中哪个具体的消息
                // 参数2:boolean 类型 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume("Work",false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }

}

3.5 Publish/Subscribe

fanout 扇出 也称为广播

一个生产者,一个交换机,多个队列,多个消费者

可以有多个消费者 每个消费者都有自己的 queue

每个队列都要绑定到Exchange 交换机

生产者发送消息 只能发送到交换机 交换机决定要发给哪个队列 生产者无法决定

交换机把消息发送给绑定过得所有队列

队列的消费者都能拿到消息 实现一条消息被多个消费者消费

image-20200905173859838

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。

让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

消费者还是正常的监听某一个队列即可。

使用场景:比如 注册成功 发送一个消息 短信服务 邮件服务 积分服务 这些服务 作为消费者

来消费接受这个消息 生产实践用的也较多

package com.glls._3pubsub;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
/**
 * 发布订阅模型  广播模型  要使用交换机了
 *
 * fanout  : 扇出  广播
 *
 * */
public class PubSubTest {
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建通道对象
        Channel channel = connection.createChannel();
        //3. 发布消息到exchange,同时指定路由的规则
        String msg = "Hello-PubSub!";
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        //3. 创建交换机   - 绑定某一个队列
        //参数1: exchange  交换机的名称
        //参数2: 指定exchange  交换机的类型  
        // FANOUT - pubsub  广播类型 ,   DIRECT - Routing , TOPIC - Topics
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);

        /**
         *
         * 广播模式下路由key 是没有用的   routingKey  没有意义  所以空着不写
         * */
        //第一次 发布消息 到 交换机
        channel.basicPublish("pubsub-exchange","",null,msg.getBytes());
        //第二次 发布消息 到 交换机
        channel.basicPublish("pubsub-exchange","",null,msg.getBytes());

        System.out.println("生产者发布消息成功!");
        //4. 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);

        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queueName,"pubsub-exchange","");

        //1 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume(queueName,false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明 交换机
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);

        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定 交换机 和 队列
        channel.queueBind(queueName,"pubsub-exchange","");

        //1 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume(queueName,false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }

}

3.6 Routing

Routing之订阅模型-Direct

在Fanout模式中,一条消息 会被所有订阅的队列都消费。但是在某些场景下 我们希望不同的消息被不同的队列消费。这时 就要用到Direct 类型的Exchange

在Direct模型下:

1.队列和交换机的绑定 不能是任意绑定了 ,而是要指定一个RoutingKey (路由key)

2.消息的发送方在向Exchange 发送消息时 ,也必须指定消息的RoutingKey

3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key 进行判断,只有队列的RoutingKey 与消息的RoutingKey 完全一致 ,才会接收到消息

image-20200905174142792

P:生产者 向Exchange 发送消息 发送消息时 会指定一个 RoutingKey

X:Exchange (交换机) 接受生产者的消息 然后把消息传递给 与RoutingKey 完全匹配的队列

C1 消费者 其所在队列指定了需要RoutingKey 为error 的消息

C2 消费者 其所在队列 指定了需要RoutingKey 为 info error 的消息

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

消费者没有变化

package com.glls._4routing;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * Exchange   ---  direct
 *
 * */
public class RoutingTest {
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建连接通道对象
        Channel channel = connection.createChannel();

        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。

        //3. 创建exchange - 绑定某一个队列
        //参数1: 交换机的名称
        //参数2: 路由模式   指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
        
        //4. 发布消息到exchange,同时指定路由的规则
        channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

        System.out.println("生产者发布消息成功!");
        //5. 释放资源
        channel.close();
        connection.close();
    }


    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明 交换机
        channel.exchangeDeclare("routing-exchange",BuiltinExchangeType.DIRECT);

        //创建临时 队列
        String queueName = channel.queueDeclare().getQueue();

        //基于 路由key 绑定  队列 和  交换机
        channel.queueBind(queueName,"routing-exchange","ERROR");

        //1 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume(queueName,false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3.声明交换机
        channel.exchangeDeclare("routing-exchange",BuiltinExchangeType.DIRECT);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();

        //基于 路由key 绑定  队列 和  交换机
        channel.queueBind(queueName,"routing-exchange","ERROR");  //绑定路由Key 为ERROR 的
        channel.queueBind(queueName,"routing-exchange","INFO");  // 绑定路由Key 为INFO 的

        //1 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume(queueName,false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }
}

3.7 Topic

Routing之订阅模型-Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey 把消息路由到不同的队列,只不过Topic 类型的Exchange可以让队列在绑定RoutingKey 的时候使用通配符!这种模型RoutingKey 一般都是由一个或多个单词组成,多个单词之间以 “.” 分割, 例如 item.insert

image-20200905174354708

#通配符 
    * (star) can substitute  for exactly  one word .     匹配不多不少恰好一个词
    #  (hash)  can  substitute for zero  or more words .  匹配一个或多个词 
# 如:
	audit.#      匹配  audit.irs.corporate   或者  audit.irs  等
	audit.*      只能匹配   audit.irs    

生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。

消费者只是监听队列,没变化。

package com.glls._5topic;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

public class TopicTest {

    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建Channel
        Channel channel = connection.createChannel();
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        //3. 创建exchange - 绑定某一个队列
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);

        //绑定多个队列    在消费者中 指定 临时队列
        //channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
        //channel.queueBind("topic-queue-2","topic-exchange","fast.#");
        //channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");

        //3. 发布消息到exchange,同时指定路由的规则
        channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
        channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
        channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());

        System.out.println("生产者发布消息成功!");
        //4. 释放资源
        channel.close();
        connection.close();
    }


    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
       //3.声明交换机
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
        //4.创建临时队列
        String queueName = channel.queueDeclare().getQueue();

        //5.绑定队列和交换机
        channel.queueBind(queueName,"topic-exchange","*.red.*");
        //6 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);
        //7. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume(queueName,false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }


    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQUtil.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
        //4.创建临时队列
        String queueName = channel.queueDeclare().getQueue();

        //5.绑定队列和交换机
        channel.queueBind(queueName,"topic-exchange","*.*.rabbit");
        channel.queueBind(queueName,"topic-exchange","fast.#");

        //6 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);

        //7. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));

                //2. 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        //3. 指定手动ack
        channel.basicConsume(queueName,false,consumer);
        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();
        //5. 释放资源
        channel.close();
        connection.close();
    }
}

4.SpringBoot整合RabbitMQ【重点

4.1 SpringBoot整合RabbitMQ

4.1.1 创建SpringBoot工程
4.1.2 导入依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
4.1.3 编写配置文件
spring:
  rabbitmq:
    host: 192.168.5.201
    port: 5672
    username: test
    password: test
    virtual-host: /test
4.1.4 Hello-World

第一种使用配置类定义队列的方式

//===========配置类===========
@Configuration
public class RabbitMQConfig {

    @Bean
    Queue simpleQueue(){
        return new Queue("simpleQueue");
    }
}

//=============消费者=============
@Component
public class HelloConsume {

    // 在配置类中创建queue在这里引用即可从simpleQueue队列中消费消息
    // @RabbitListener也可以在类上使用
    @RabbitListener(queues = "simpleQueue")
    public void receive(String msg){
        System.out.println("消费者接收到的消息是:" + msg);
    }

}

//==========生产者========
@SpringBootTest
class SpringbootMqApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // 生产者到simpleQueue对列发布消息
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("simpleQueue","SpringBoot整合MQ发送的消息");
    }

}

第二种使用注解的方式定义队列

//=========消费者==========
@Component
public class HelloConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(queuesToDeclare = @Queue(name = "simpleQueue"))
    public void receive(String msg){
        System.out.println("消费者接收到的消息是:" + msg);
    }

}

//===========生产者============
@SpringBootTest
class SpringbootMqApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // 生产者到simpleQueue对列发布消息
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("simpleQueue","SpringBoot整合MQ发送的消息");
    }

}
4.1.5Work
// 生产者
@Test
public void testWork(){
    // Work 模型
    for(int i=0;i<20;i++){
        rabbitTemplate.convertAndSend("work","work模型:   "+i);
    }
}
-----------------------------------------------------------------------------
@Component
public class WorkConsumer {

    @RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
    public void getMessage(Object message){
        System.out.println("接收到消息1:" + message);
    }

    @RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
    public void getMessage2(Object message){
        System.out.println("接收到消息2:" + message);
    }
}

4.1.6Pub/Sub
// 生产者
@Test
public void testFanout(){
    // 生产发布模型 广播模型
    rabbitTemplate.convertAndSend("boot-pubsub-exchange","","广播模式");
}
  -----------------------------------------------------------
// 消费者
@Component
public class PubSubConsumer {
      @RabbitListener(bindings = {
      @QueueBinding(value = @Queue,   // 创建临时队列
       exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout")) // 绑定的交换机
      })
      public void getMessage(Object message){
          System.out.println("消费者1:"+message);
      }
    
      @RabbitListener(bindings = {
      @QueueBinding(value = @Queue,   // 创建临时队列
      exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout")) // 绑定的交换机
      })
      public void getMessage2(Object message){
          System.out.println("消费者2:"+message);
      }
}
4.1.7route
// 生产者
@Test
public void testRouting(){
    // 路由模式
    rabbitTemplate.convertAndSend("boot-route-exchange","info","发送的是info的key的路由信息");
}
// ------------------------------------------------------
// 消费者
@Component
public class RouteConsumer {

    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue,    // 创建临时队列
     exchange = @Exchange(value = "boot-route-exchange",type = "direct"),
                  key = {"info","error"})})
    public void getMessage1(Object message){
        System.out.println("消费者1:"+message);
    }

    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue,    // 创建临时队列
    exchange = @Exchange(value = "boot-route-exchange",type = "direct"),key = {"info"})})
    public void getMessage2(Object message){
        System.out.println("消费者2:"+message);
    }
    
}   
4.1.8topic
// 生产者
@Test
void testTopic2(){
    //rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
    rabbitTemplate.convertAndSend("boot-topic-exchange","black.dog.and.cat","黑色狗和猫!!");
}
//消费者
package com.glls.bootrabbitmqdemo1._5topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                            key = {"*.red.*","black.*.#"}
                    )
            }
    )
    public void getMessage1(Object message){
        System.out.println("接收到消息1:" + message);
    }


    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                            key = {"black.*.#"}
                    )
            }
    )
    public void getMessage2(Object message){
        System.out.println("接收到消息2:" + message);
    }

}

4.2 手动ack

要在消息消费完之后才告诉 rabbitmq 这个消息消费了,而不是还没消费就确认。

避免消息消费失败了但是消息已经被自动确认了 那么这个消息就相当于丢了 即丢消息

实现步骤:

1.在yml 配置文件 指定 手动 配置

spring:
  rabbitmq:
    host: 192.168.5.201
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual    #  手动指定 ack
  1. 在 消费者的方法参数中 指定参数
@RabbitListener(
       bindings = {
               @QueueBinding(
                       value = @Queue,
                       exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                       key = {"black.*.#"}
               )
       }
)
public void getMessage3(String msg, Channel channel, Message message) throws IOException {
   System.out.println("接收到消息3:" + msg);

  // 手动  ack
   channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

5.RabbitMQ的其他操作

RabbitMQ的消息确认机制 保证了 消息的可靠性传递

RabbitMQ的消息确认有两种。

一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 确认是否到达交换机 使用 confirm 机制 确认是否到达 queue 使用 return 机制

第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。 即消费者的 ack

消息确认的作用是什么?

为了防止消息丢失。消息丢失分为发送丢失和消费者处理丢失,相应的也有两种确认机制。

5.1消息的可靠性

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

image-20201001114157249

5.1.1普通Confirm方式

package com.glls._6confirm;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 *
 * 普通  confirm
 * 普通Confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。
 * 实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,
 * 客户端进行消息重传;
 * */
public class HelloWorldTest {

    private final static String QUEUE_NAME = "confirm";

    @Test
    public void publish() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        // 开启 confirm
        channel.confirmSelect();
        
        String msg = "This is a confirm message!";
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);

        for (int i = 0; i < 500; i++) {
            channel.basicPublish("",QUEUE_NAME,null,("普通Confirm模式, 第" + (i + 1) + "条消息").getBytes());
            // 判断消息是否发送成功
            if(channel.waitForConfirms()){
               // System.out.println("消息发送成功");
            }else{
                System.out.println("消息发送失败");
                // 可以在这里进行重发操作
            }
        }

        //4. 关闭频道和连接
        channel.close();
        connection.close();
    }


    // 消费者不变
    @Test
    public void consume() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);

        DefaultConsumer consume = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consume);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.2 批量Confirm方式。

package com.glls._6confirm;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 *
 * 批量Confirm方式
 * 批量Confirm模式:批量Confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,
 * 这种批量确认的模式极大的提高了Confirm效率,但是如果一旦出现Confirm返回false或者超时的情况,
 * 客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
 * */
public class HelloWorldTest2 {
    private final static String QUEUE_NAME = "confirm many";

    @Test
    public void publish() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();
        // 开启 confirm
        channel.confirmSelect();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //这里主要更改代码为发送批量消息后 再进行等待服务器确认   把确认消息  放在 for 循环 外面
        for (int i = 0; i < 500; i++) {
            String msg = "批量确认!" + i;
            channel.basicPublish("",QUEUE_NAME,null,(" 批量Confirm模式, 第" + (i + 1) + "条消息").getBytes());
        }
        //3.3 确定批量操作是否成功

        // 该方法会等到最后一条消息得到确认或者得到nack才会结束
        // 也就是说在waitForConfirmsOrDie处会造成当前程序的阻塞
        // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
        channel.waitForConfirmsOrDie();     
        //4. 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        DefaultConsumer consume = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("接收到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consume);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        channel.close();
        connection.close();
    }

}

5.1.3 异步Confirm方式。

package com.glls._6confirm;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 *
 * 异步Confirm方式。
 * 异步Confirm模式:提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。
 *
 * */
public class HelloWorldTest3s {
    private final static String QUEUE_NAME = "asynch confirm ";
    @Test
    public void publish() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();
        // 开启 confirm
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        long start = System.currentTimeMillis();
        //3.3 开启异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                //   deliveryTag:是该消息的index
                System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
                // multiple的值为true,true确认所有将比第一个参数指定的 delivery-tag 小的消息都得到确认
            }

            //handleNack 3s 10s xxx.. 重试
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
            }
        });
        System.out.println("执行异步确认耗费时间: "+(System.currentTimeMillis()-start)+"ms");

        //批量发送消息
        for (int i = 0; i < 500; i++) {
            channel.basicPublish("",QUEUE_NAME,null,(" 异步Confirm模式, 第" + (i + 1) + "条消息").getBytes());
        }


        /**
         *  可以看到,虽然我们还是发送了500条消息,同样我们并没有收到500个ack消息 ,只收到较少的ack消息,
         *  并且这些个ack消息的multiple域都为true,你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,
         *  说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;
         *
         * */

        // 卡住程序 不结束  就可以看到 异步监听的效果了
        System.in.read();  
        //4. 释放资源
        channel.close();
        connection.close();
    }
    @Test
    public void consume() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        DefaultConsumer consume = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("接收到消息:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consume);
        System.out.println("消费者开始监听队列!");
         System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.4 Return机制

Confirm只能监听到消息是否到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

1587801910926

开启Return机制,并在发送消息时,指定mandatory为true

package com.glls._7return;

import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 *
 * return 机制
 * confirm 机制 只能确认 消息 发送到了  exchange 不能保证到 消息队列了
 * 所以  开启 return 机制 监听  是否消息 到了 queue  中
 *
 * */
public class HelloWorldTest4 {

    private final static String QUEUE_NAME = "unsynch confirm  and return";
    @Test
    public void publish() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();
        // 开启 confirm
        channel.confirmSelect();
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);

        //3.3 开启异步回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
            }
        });

        // 1. 开启return机制   2. 发送消息时  指定 mandatory  参数
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 当消息没有送达到queue时,才会执行。
                System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
            }
        });

        //批量发送消息
        for (int i = 0; i < 500; i++) {
                // 2. 发送消息时  指定 mandatory为true  第三个参数
            channel.basicPublish("",QUEUE_NAME,true,null,(" 异步Confirm和return, 第" + (i + 1) + "条消息").getBytes());
        }

        System.in.read();  // 卡住程序 不结束  就可以看到 异步监听的效果了
        //4. 释放资源
        channel.close();
        connection.close();
    }
    @Test
    public void consume() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        System.out.println(connection);
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,true,null);

        DefaultConsumer consume = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("接收到消息:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consume);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.2 springboot 实现rabbitmq消息的可靠性

5.2.1配置文件

编写配置文件 开启Confirm 和 Return 机制

spring:
  rabbitmq:
    host: 192.168.5.201
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual    #  手动指定 ack
    publisher-confirm-type: simple    #  确认
    publisher-returns: true           #  消息可靠性

指定RabbitTemplate 对象 开启 Confirm 和 Return 并编写回调方法

生产者 消费者 没有什么变化

package com.qf.springbootmq.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author zed
 * @date 2022/6/8 6:46
 */
@Component
public class MqConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标识"+correlationData);
        System.out.println("确认结果"+ack);
        System.out.println("失败原因"+cause);
        if(ack){
            System.out.println("消息已经送到了exchange");
        }else{
            System.out.println("消息没有送到exchange");
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 如果执行了这个方法  说明消息没有送到 queue 中
        System.out.println("消息没有送到 queue ");
    }
}

5.3避免消息的重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

1587820070938

为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

id-0(正在执行业务)

id-1(执行业务成功)

如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

5.4避免消息的重复消费-springboot 实现

5.4.1 导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

5.4.2 编写配置文件

spring:
  redis:
    host: 192.168.199.109
    port: 6379

5.4.3 修改生产者

    @Test
    void testRepeat(){
        rabbitTemplate.convertAndSend("boot-topic-exchange-repeat","black.dog.and.cat","黑色狗和猫!!",new CorrelationData());
        //System.in.read();
    }

5.4.4 修改消费者

package com.glls.bootrabbitmqdemo1._8repeat;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@Component
public class TopicRepeatConsumer {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange-repeat",type = "topic"),
                            key = {"black.*.#"}
                    )
            }
    )
    public void getMessage4(String msg, Channel channel, Message message) throws IOException {
        //0. 获取MessageId
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        //1. 设置key到Redis
        if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
            //2. 消费消息
            System.out.println("接收到消息:" + msg);
            //3. 设置key的value为1
            redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
            //4.  手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }else {
            //5. 获取Redis中的value即可 如果是1,手动ack
            if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
        }

    }


}

5.5 高级特性

5.5.1 TTL

Time to live 过期时间 设置消息的过期时间 有两种方式

  1. 指定一条消息的过期时间
  2. 给队列设置消息过期时间,队列中所有的消息都有同样的过期时间

应用场景:比如 下单未支付 则订单自动删除的实现

5.5.1.1 设置消息的过期时间

发送一条设置了过期时间的消息

细节: 过期时间 指的是 消息在 队列中的存活时间,所以 此时 为了看到效果 不用设置消费者监听队列 一直消费消息,如果 一直监听队列 消费消息的话 就看不到消息过期之后 消息从队列中消失的的效果了

所以 创建 交换机 创建 队列 别创建消费者

创建一个工具类 封装 交换机名字 队列名字 等 这些常量值

public class RabbitMQCommonConfig {

    //ttl-direct-exchange 交换机
    public static final String TTL_DIRECT_EXCHANGE = "ttldirectExchange";
    //ttl-direct-queue 队列
    public static final String TTL_DIRECT_QUEUE = "ttldirectQueue";
    //ttl-direct-routingkey  路由key
    public static final String TTL_DIRECT_ROUTINGKEY = "ttl_direct_routingkey";


}

在消费者端 创建 交换机 队列 Binging 这些bean ,注意 不创建消费者 是为了看到消息的过期时间属性效果

@Configuration
public class TTLConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange(RabbitMQCommonConfig.TTL_DIRECT_EXCHANGE);
    }

    @Bean
    public Queue ttlDirectQueue(){
        return new Queue(RabbitMQCommonConfig.TTL_DIRECT_QUEUE);
    }


    @Bean
    public Binding ttlDirectBinding(){
        return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with(RabbitMQCommonConfig.TTL_DIRECT_ROUTINGKEY);
    }

}

在生产者端 发送一条 设置了 过期时间的消息

// 发送 ttl 消息   
// 第一种方式 设置消息的 属性 
@Test
public void sendTTLMessage(){

    String msg = "测试设置了过期时间的消息"+new Date().toLocaleString();
    MessageProperties messageProperties=new MessageProperties();   // 消息属性对象
    messageProperties.setMessageId(UUID.randomUUID().toString());
    messageProperties.setExpiration("10000");  // 设置消息的过期时间为10秒

    Message message = new Message(msg.getBytes(),messageProperties);
    rabbitTemplate.send(RabbitMQCommonConfig.TTL_DIRECT_EXCHANGE,RabbitMQCommonConfig.TTL_DIRECT_ROUTINGKEY,message);
}

测试 发现 消息在队列 10 秒后 消失

5.5.1.2 设置队列的过期时间

直接 在 队列上 设置消息的过期时间 这样 队列中的消息过期时间也都跟 队列设置的过期时间相同, 如果 消息也设置了过期时间 谁小谁优先级高

@Bean
public Queue ttlDirectQueue(){
    // 在队列上 设置  此队列中  消息的过期时间
    Map<String,Object> map=new HashMap<>();
    // 队列中 所有的消息的过期时间 为 20秒
    //map.put("x-message-ttl",20000);
    // 队列中 所有的消息的过期时间 为 5秒
    map.put("x-message-ttl",5000);
    //return new Queue(TTL_DIRECT_QUEUE,true,false,false);
    return new Queue(TTL_DIRECT_QUEUE,true,false,false,map);
}

说明: 如果同时指定了Message的TTL 和 Queue 的 TTL ,则优先较小的那一个

所以 最佳实践 是 在 队列上设置过期时间

注意点:

TTL的延时队列存在一个问题,就是同一个队列里的消息延时时间最好一致,比如说队列里的延时时间都是1小时,千万不能队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去,很坑!!!

举个栗子:延时队列里先后进入A,B,C三条消息,存活时间是3h,2h,1h,结果到了1小时C不会死,到了2hB不会死,到了3小时A死了,同时B,C也死了,意味着3h后A,B,C才能消费,很坑!!!

7.2 死信队列

image-20210605220216190

队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:,

1,消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)

2,消息由于消息有效期(per-message TTL)过期

3,消息由于队列超过其长度限制而被丢弃

注意,队列的有效期并不会导致其中的消息过期

x-dead-letter-exchange 指定死信交换机

x-dead-letter-routing-key 指定死信路由key

消费者端 创建 死信队列

@Configuration
public class DLXConfig {

    // DirectExchange 类型的交换机
    @Bean
    public DirectExchange dlxDirectExchange(){
        return new DirectExchange(RabbitMQCommonConfig.DLX_DIRECT_EXCHANGE);
    }

    @Bean
    public Queue dlxDirectQueue(){

        return new Queue(RabbitMQCommonConfig.DLX_DIRECT_QUEUE,true,false,false);
    }


    @Bean
    public Binding dlxDirectBinding(){
        return BindingBuilder.bind(dlxDirectQueue()).to(dlxDirectExchange()).with(RabbitMQCommonConfig.DLX_DIRECT_ROUTINGKEY);
    }

}

@Component
public class DLXConsumer {

    @RabbitListener(queues = {RabbitMQCommonConfig.DLX_DIRECT_QUEUE})
    public void getMessage(Channel channel, Message message) throws IOException {

        System.out.println("死信队列测试接受延迟消息"+ new String(message.getBody()) +  ":" +new Date().toLocaleString());

        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

}

7.3 延迟队列

什么是延迟队列?

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列使用场景:

那么什么时候需要用延时队列呢?考虑一下以下场景:

订单在十分钟之内未支付则自动取消。

新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

账单在一周内未支付,则自动结算。

用户注册成功后,如果三天内没有登陆则进行短信提醒。

用户发起退款,如果三天内没有得到处理则通知相关运营人员。

预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

延迟队列的实现方式:

1,利用TTL+死信队列

生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

image-20210606093916208

这种方式的弊端,无法做到通用性,每搞一个新的延迟任务,都要去实现一个实现的TTL+死信队列,比较麻烦;

2,利用RabbitMQ插件实现

安装一个插件即可:https:/https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。

把下载的插件 放到 容器内的 /plugins 目录内

image-20210606095728590

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装插件

重启 rabbitmq 容器

package com.ww.rabbitmq._9delay;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**

 */
@Configuration
public class DelayConfig {

    /**
     * delayedDirect交换机名称
     */
    public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";

    /**
     * delayed direct队列名称
     */
    public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue";


    /**
     * delayed_direct路由Key
     */
    public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";

    /**
     * 配置一个DELAYED Direct类型的交换机
        自定义类型  交换机
     * @return
     */

    @Bean
    public CustomExchange delayedCustomExChange(){

        Map<String, Object> args = new HashMap<>();

        args.put("x-delayed-type", "direct");

        return new CustomExchange(DELAYED_DIRECT_EXCHANGE,"x-delayed-message", true, false, args);

    }
    /**
     * 定义一个DELAYED direct队列
     * @return
     */
    @Bean
    public Queue delayedDirectQueue(){
        return new Queue(DELAYED_DIRECT_QUEUE);
    }

    /**
     * 配置一个delayed交换机和队列的绑定
     * @return
     */

    @Bean
    public Binding delayedDirectBinding(){
        return BindingBuilder.bind(delayedDirectQueue()).to(delayedCustomExChange()).with(DELAYED_DIRECT_ROUTINGKEY).noargs();
    }

}

package com.ww.rabbitmq._9delay;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

/**

 */

@Component
public class DelayConsumer {

    /**
     * delayedDirect交换机名称
     */
    public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";

    /**
     * delayed direct队列名称
     */
    public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue";


    /**
     * delayed_direct路由Key
     */
    public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";

    // 方式2  利用 插件  接受延迟消息
    @RabbitListener(queues = {DELAYED_DIRECT_QUEUE})
    public void receiveMessage2(Channel channel, Message message) throws IOException {
        System.out.println("接收延迟消息:"+new String(message.getBody())+":"+new Date().toLocaleString());

        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

    /**
     * delayedDirect交换机名称
     */
    public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";

    /**
     * delayed_direct路由Key
     */
    public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";

    // 利用插件   发送延迟消息
    @Test
    public void sendDelayMessage(){

        String message="测试延时消息插件" + new Date().toLocaleString();
        Integer delayTime = 10000;     // 毫秒
        rabbitTemplate.convertAndSend(DELAYED_DIRECT_EXCHANGE,DELAYED_DIRECT_ROUTINGKEY,message,a->{
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }

6.MQ的应用场景

6.1异步处理

image-20200909220500294

image-20200909220517778

image-20200909220840680

6.2应用解耦

image-20200909220935835

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X9C1hdoH-1668231169770)(${Pictures}/image-20200909221025226.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ThJrPYeJ-1668231169770)(${Pictures}/image-20200909221101175.png)]

6.3流量削峰

image-20200909221842847

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YKMzgIjc-1668231169771)(${Pictures}/image-20200909221915223.png)]

7.练习

创建消息服务 发送阿里云手机短信验证码

image-20210323160232573

腾讯COS 京东万象

::: details 本文档代码

代码仓库

:::

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/5086.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android Studio App开发实战项目之广告轮播(附源码 可用于大作业)

需要图片集和源码请点赞关注收藏后评论区留言即可~~~ 电商App的首页上方&#xff0c;都在明显位置放了一栏广告条&#xff0c;并且广告条会轮播&#xff0c;非常吸引眼球&#xff0c;这种广告轮播的功能&#xff0c;为推广热门事物出力甚大。 轮播视频已上传至我的主页&#x…

【云原生】docker 搭建ElasticSearch7

前言 本篇演示如何基于docker环境快速搭建起es7的环境 安装es7.6 1、拉取镜像 docker pull elasticsearch:7.6.2 2、执行下面的命令进行安装 docker run -p 9200:9200 -p 9300:9300 -e "discovery.typesingle-node" -e ES_JAVA_OPTS"-Xms512m -Xmx512m"…

Android Studio App开发实战项目之计时器(附源码 简单易懂,适合新手学习)

运行有问题或需要源码请点赞关注收藏后评论区留言~~~ 一、Handler的延迟机制 活动页面的Java代码通常是串行工作的&#xff0c;而且App界面很快就加载完成容不得半点延迟&#xff0c;不过偶尔也需要某些控件时不时的动一下&#xff0c;好让界面呈现动画效果更加活泼&#xff0…

shiro框架04会话管理+缓存管理+Ehcache使用

目录 一、会话管理 1.基础组件 1.1 SessionManager 1.2 SessionListener 1.3 SessionDao 1.4 会话验证 1.5 案例 二、缓存管理 1、为什么要使用缓存 2、什么是ehcache 3、ehcache特点 4、ehcache入门 5、shiro与ehcache整合 1&#xff09;导入相关依赖&#xff0…

2019银川F,ccpc威海D - Sternhalma 2022

1401D - Maximum Distributed Tree 求每个边经过的次数&#xff0c;假设求u,v这条边的次数&#xff0c;边的左端是u这个集合一共有n-siz[v]个点&#xff0c;右端是v这个集合有siz[v]个端点&#xff0c;经过这条边的次数就是siz[v]*(n-siz[v]),然后再按照次数多的乘以大的质因数…

【附源码】Python计算机毕业设计汽车租赁管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Go 语言中的 Moduels 管理(Let‘s Go 三十四)

在 Go 1.11以前使用包管理一直被开发者所诟病。既然GOPATH这种包管理引起了一线开发者的一片骂声&#xff0c;所以&#xff0c;Go官方体恤一线开发者对GOPATH这种包管理的情绪&#xff0c;一直致力努力提供对一线开发者友好的包管理解决方法而奋斗。从最初的GOPATH到GO VENDOR&…

基于遗传算法、元胞自动机邻域和随机重启爬山混合优化算法(GA-RRHC)的柔性车间调度研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

LeetCode50天刷题计划第二季(Day 27 — 寻找旋转排序数组中的最小值(9.50- 11.20)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、题目寻找旋转排序数组中的最小值示例提示&#xff1a;二、思路三、代码前言 芜湖 一、题目 寻找旋转排序数组中的最小值 已知一个长度为 n 的数组&#…

web前端期末大作业——基于HTML+CSS+JavaScript实现中国茶文化(30页)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

大数据开发是做什么的?怎样入门?

其实现在有很多小伙伴看中了大数据的发展前景&#xff0c;但是其实不知道大数据开发具体是做什么的&#xff0c;又该怎么学习&#xff1f;学习了之后又该做什么&#xff1f; 下面具体给你分析下大数据开发是做什么的&#xff0c;又需要学习和掌握哪些技能~ 大数据开发做什么&a…

致远OA ajax.do 任意文件上传 (CNVD-2021-01627) 漏洞复现

为方便您的阅读&#xff0c;可点击下方蓝色字体&#xff0c;进行跳转↓↓↓01 漏洞描述02 影响范围03 验证方式04 利用方式05 修复方案01 漏洞描述 致远OA是一套办公协同管理软件。由于致远OA旧版本某些接口存在未授权访问&#xff0c;以及部分函数存在过滤不足&#xff0c;攻…

大数据实战之前戏

开发背景 因为要开发一套通话详单系统。该系统上每天产生1亿条通话话单&#xff0c;要保存一个月的通话话单。也就是保存30亿条通话&#xff0c;能够做到准实时的通话详单查询。于是采用大数据架构进行话单的保存和查询。 服务器规划 为了验证系统的可用性&#xff0c;我先搭…

从零学习 InfiniBand-network架构(七) ——IB协议中数据如何传输

从零学习 InfiniBand-network架构&#xff08;七&#xff09; —— IB协议中数据如何传输 &#x1f508;声明&#xff1a; &#x1f603;博主主页&#xff1a;王_嘻嘻的CSDN主页 &#x1f511;未经作者允许&#xff0c;禁止转载 &#x1f6a9;本专题部分内容源于《InfiniBand-n…

C++ 智能指针最佳实践源码分析

智能指针在 C11 标准中被引入真正标准库&#xff08;C98 中引入的 auto_ptr 存在较多问题&#xff09;&#xff0c;但目前很多 C开发者仍习惯用原生指针&#xff0c;视智能指针为洪水猛兽。但很多实际场景下&#xff0c;智能指针却是解决问题的神器&#xff0c;尤其是一些涉及多…

QT之Windows开发及源码调试环境搭建

QT之Windows开发及源码调试环境搭建1. QT 安装2. 配置源码调试2.1 QTCreator2.2 Visual Studio 20193. 参考1. QT 安装 QT对5.15以及以上版本提供在线安装工具 官方链接清华镜像&#xff0c;但是这里面没由Windows的在线安装工具(2022/11/11查看的时候没有) 这里安装以QT 5.…

全是狠活!SpringBoot文档也太那个了,图文并茂详尽讲解

前沿 SpringBoot是由Pivotal团队提供的在Spring框架基础之上开发的框架&#xff0c;其设计目的是用来简化应用的初始搭建以及开发过程。 SpringBoot本身并不提供Spring框架的核心特性以及扩展功能&#xff0c;只是用于快速、敏捷地开发新一代基于Spring框架的应用程序。也就是…

【JavaSE】继承

文章目录1. 如何继承2. 子类如何访问父类的成员2.1 子类中访问父类的成员变量2.2 子类中访问父类的成员方法3. super关键字4. 如何写构造方法前言&#xff1a;为方便讲解一个java文件写多个类。 1. 如何继承 class Cat {public String name;public int age;public String sex;…

c++基础篇(一篇文章带你进入c++的新手村)

前沿&#xff1a; c作为目前比较的流行的语言之一&#xff0c;在就业上也是运用比较广泛的语言之一&#xff0c;并且经过这么多年的历练久经不衰&#xff0c;所以说选择学c是一个不错的选择^_^&#xff0c;前面看到一个段子&#xff0c;如何在21天精通c&#xff0c;我动态里有这…

git 记录

git 工作区介绍 workspace&#xff1a;工作区&#xff0c;就是平时存放项目代码的地方。Index/Stage&#xff1a;暂存区&#xff0c;用于临时存放你的改动&#xff0c;事实上只是一个文件&#xff0c;保存即将提交到文件列表信息。Repository&#xff1a;仓库区&#xff08;或版…