一、什么是MQ
1、什么是MQ
MQ(message queue),本质是个队列,FIFO先入先出。只不过队列中放的是message,是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游解耦的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
2、为什么使用MQ
- 流量消峰
当一个系统只能够承受一万次点击的时候,这时候来了两万次点击,服务器就会宕机(采取方式是限制第一万次以上的访问)。而现在使用MQ来做缓冲,可以取消这个限制,把一秒内的访问分散成一段时间来处理(排队),缺点是,排队就要时间,所以收到访问结果会慢,但是这比不能访问要好。
- 应用解耦
以电商应用为例,用户下单,通过订单系统调用库存系统、物流系统、支付系统,如果耦合调用,当任何一个子系统出现了故障,都会造成下单操作异常。而当转变为基于消息队列的方式后(在调用之间加上队列),当一个子系统出现故障需要几分钟修复,在这几分钟内,用户的下单操作依然可以正常进行(对这个子系统要处理的内存会被缓存在消息队列中),等待故障的子系统修复好后就从缓存中获取去处理。这样用户基本感受不到子系统出现故障(因为可以正常下单操作),提升系统的可用性
- 异步处理
异步调用服务时,例如A调用B,B需要花费很长时间去执行,但是A需要知道B什么时候执行完成。以前的方式,是通过A过一段时间去调用B的查询api查询,或者A提供一个callback api,当B执行完就调用这个api来通知A。现在使用mq,A调用B后,只需要监听mq转发的消息。当B完成后,会发送一条信息给MQ,MQ会将此信息转发给A服务。这样A服务能及时得到异步处理成功的消息
简而言之,就是通过一个第三方,来接受B的完成信息,然后转发给A
3、MQ分类
- ActiveMQ
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较
低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
- Kafka
为大数据而生的消息中间件。
优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界KafkaManager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消
息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,
但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
- RocketMQ
阿里巴巴的开源产品,用Java语言实现
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分
布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅
读源码,定制自己公司的 MQ
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ
核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
- RabbitMQ
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最
主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
缺点:商业版需要收费,学习成本较高
4、MQ选择
- Kafka
大型公司建议用,如果有日志采集功能,首选kafka
- RocketMQ
为金融互联网领域而生。适用于对于可靠性要求很高的场景,在稳定上
- RabbitMQ
性能好时效性微秒级,社区活跃度高,如果数据量不大,中小型公司建议选择
二、RabbitMQ入门
2.1、概念
1、RabbitMQ
RabbitMQ是一个消息中间件:接收并转发消息。
2、核心概念
- 生产者:产生数据发生消息到程序是生产者
- 交换机:一方面接收来自生产者的消息,另一方面将消息推送到队列中。对于消息的处理由交换机类型决定
- 队列:是一种内部使用的数据结构。本质是一个大的消息缓冲区
- 消费者:等待接收消息的程序
3、安装
# 设置rabbitmq服务开机自启动
systemctl enable rabbitmq-server
#启动
systemctl start rabbitmq-server
#查看状态
systemctl status rabbitmq-server
# 关闭rabbitmq服务
systemctl stop rabbitmq-server
# 重启rabbitmq服务
systemctl restart rabbitmq-server
- 开启web管理插件
rabbitmq-plugins enable rabbitmq_management
访问 http://ip地址:15672/
创建账号和密码
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p ]
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
查看当前用户和角色
rabbitmqctl list_users
2.2、简单队列模式
Java代码
- 消息生产者
/**
* 生产者--发送信息
*/
public class Producer {
//队列名称
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
//创建连接工厂,设置连接属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.180.100");
factory.setUsername("adminx");
factory.setPassword("123456");
//从连接工厂中获取连接
Connection connection = factory.newConnection();
//在连接中创建信道,mq中的所有操作都是在信道中完成
Channel channel = connection.createChannel();
/**
* 生成一个队列,各参数含义:
* 1、队列名称 2、队列里面消息是否持久化(持久化就是存入磁盘),true是持久化,false则是存入内存
* 3、是否进行消息共享(多个消费者消费),ture是进行共享,false是只能一个消费者消费
* 4、是否自动删除(最后一个消费者断开连接以后,该队列是否自动删除)false是不自动删除
* 5、其他参数 null
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发送的消息内容
String message = "hello world";
/**
* 通过信道发送信息
* 1、发送到哪个交换机,
* 2、路由的key值是哪个,本次是队列的名称
* 3、其他参数信息
* 4、发送信息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
- 消息消费者
/**
* 消费者:接收信息
*/
public class Consumer {
//队列名称,和生产者发送时的队列名称一致
public static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws Exception{
//创建连接工厂,设置连接属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.180.100");
factory.setUsername("adminx");
factory.setPassword("123456");
//从工厂中获得连接
Connection connection = factory.newConnection();
//在连接中创建信道,接收信息是在信道中发生
Channel channel = connection.createChannel();
/**
* RabbitMQ推送给消费者回调接口,在该接口中用于编写如何对消息进行处理
*consumerTag,消费者注册到mq之后,mq会生成一个该消费者的唯一标识
* message,推送过来的信息
*/
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
} ;
/**
* mq取消该消费者对信道中队列的订阅时,调用的回调接口
* 当我们在mq管理界面手动删除该队列时,就会调用该接口
*/
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断" + consumerTag);
};
/**
* 接收信息
* 1、消费哪个队列
* 2、消费成功是否要自动应答,true--自动挡
* 3、信息推送给消费者的回调
* 4、消费者取消消费者的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
2.3、工作队列模式
工作队列(又称任务队列)的主要思想是避免立即执行大量任务 ,导致不得不等待它完成而耗费时间 。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
当生产者发送大量消息到队列时,接收消息的消费者变成多个工作线程,一条消息只会被一个工作线程所处理,rabbitmq采用轮询的方式将消息平均发送给工作线程,线程在处理完某条信息才会接收到下一个信息
- 模拟两个工作线程
将创建信道的代码抽取出来
package com.xqh.rabbitmq.two;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 将创建连接工程以及创建信道的操作抽取出来
*/
public class RabbitMqUtils {
//静态方法,可以直接通过类名访问
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.180.100");
factory.setUsername("adminx");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
工作线程1和2
package com.xqh.rabbitmq.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (comsumerTag,message)->{
String receivedMessage = new String(message.getBody());
System.out.println("接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消订阅消息接口回调逻辑");
};
System.out.println("c1消费者启动等待消费...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
可以利用idea的配置,勾选允许多个实例允许
修改输出为出 c2启动等待消费,然后再运行一次,就可以得到第二个工作线程
启动一个发送线程
package com.xqh.rabbitmq.two;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 发送线程
*/
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完成:"+message);
}
}
}
从控制台接收要发送的消息
测试结果
rabbitmq将大量信息采用轮询方式分配给工作线程。例如,当输入发送信息aa,假设线程1接收到信息,那么再输入发送信息bb,就会分配给线程2处理,以此类推。你一条我一条,轮询分配
一个消息只能处理一次,不能处理多次
2.4、消息应答
1、消息应答
rabbitMQ在发送一条信息给消费者,一旦传递给消费者就会立马把信息删除,假如此时消费者并没有处理完这条信息就挂掉了,那么这条信息就会丢失 。为了保证消息在发送过程不丢失,rabbitMQ引入消息应答机制,消费者在接收到消息并且处理该消息之后,需要告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了
即rabbitmq在消费者应答之后再进行消息删除
2、自动应答
- 消息发送后立即被认为已经传送成功,自动给出应答。 这种模式需要在高吞吐量和数据传输安全性方面做权
衡。(优点是可以处理高吞吐量的信息,缺点是传输安全性不好)
因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,这样就使得信息的安全传输得不到保证。
虽然这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,但是有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死。
- 所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
自动应答是以接收到消息为准,然而接收到消息后还需要处理,但是消费者一接收到信息就会立马给出应答。
3、手动应答(推荐)
- 手动应答的三个方法
A.Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack (用于否定确认),C.Channel.basicReject (用于否定确认):不处理该消息了直接拒绝,可以将其丢弃了
- 手动应答的好处
可以批量应答并且减少网络拥堵。消息在手动应答时是不丢失,如果出现消费者失去连接会自动放回队列中重新消费、
- Multiple的解释
批量应答。
multiple的true和false代表不同意思
例如channel上有传输信息5,6,7,8,(队列先进先出)当前要传输的是8
如果是true:那么信道上5~8的这些还未应答的消息都会被确认收到消息应答
如果是false:那么只应答当前的8的消息,5,6,7这三个消息依然不会收到消息应答
- 代码实现
启动一个生产者线程
package com.xqh.rabbitmq.three;
import com.rabbitmq.client.Channel;
import com.xqh.rabbitmq.two.RabbitMqUtils;
import java.util.Scanner;
/**
*生产者
*/
public class Task2 {
public static final String Task_QUEUQ_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(Task_QUEUQ_NAME,false,false,false,null);
//从控制台扫描要发送的信息
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish("",Task_QUEUQ_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
启动消费者线程1
package com.xqh.rabbitmq.three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xqh.rabbitmq.two.RabbitMqUtils;
/**
* 消息消费者
*/
public class Work03 {
public static final String Task_QUEUQ_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c1等待接收信息处理时间较短");
//采用手动应答
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag,message)->{
//沉睡一秒
SleepUtils.sleep(1);
System.out.println("接收到的信息:"+new String(message.getBody(),"UTF-8"));
/**
* 手动应答 channel.basicAck()
* 参数:
* 1、消息的标记 tag
* 2、是否批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
}
}
启动消费者线程2
package com.xqh.rabbitmq.three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xqh.rabbitmq.two.RabbitMqUtils;
/**
* 消息消费者
*/
public class Work04 {
public static final String Task_QUEUQ_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c2等待接收信息处理时间较长");
//采用手动应答
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag,message)->{
//沉睡一秒
SleepUtils.sleep(30);
System.out.println("接收到的信息:"+new String(message.getBody(),"UTF-8"));
/**
* 手动应答 channel.basicAck()
* 参数:
* 1、消息的标记 tag
* 2、是否批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
}
}
模拟场景:
- 当生产者发送信息aa , 消费者线程1接收到信息aa
- 当生产者发送信息bb,消费者线程2接收到信息bb (很慢)
- 当生产者发送信息cc,消费者线程1接收到信息cc
- 当生产者发送信息dd,在消费者线程2处理信息dd的过程,将线程2关闭,此时,信息会自动重新入列,此时发现消费者线程1可以接受信息,于是把信息发送给消费者线程1,----线程1接收到信息dd
说明,当消费者失去连接时,传给他的信息并不会丢失,mq会将它重新入列排队,此时其他消费者可以处理会重新分配给其他消费者。
4、消息自动重新入列
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
2.5、持久化
1、概念
如何保障当RabbitMQ服务停掉以后,生产者发来的消息不会丢失呢?默认情况下,一旦rabbitmq退出或由于某种原因崩溃时,它会忽视队列和消息—造成消息丢失。要确保消息不丢失,我们需要将队列和消息都标记为持久化
2、队列实现持久化
之前我们创建的队列都是非持久化的,一旦rabbitmq重启,队列就会被删除。要实现持久化,需要在队列声明时将参数durable设为true
boolean durable = true;
channel.queueDeclare(Task_QUEUQ_NAME,durable,false,false,null);
3、消息实现持久化
//设置消息持久化(消息保存到磁盘)MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",Task_QUEUQ_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了
2.6、不公平分发
1、显而易见,采用轮询分配看似你接收一条我接受一条很公平,但其实在一些情况下是“不公平”的。例如当工作线程1处理任务快,而工作线程2处理任务很慢时,如果仍然采用轮询,那么会出现线程1大部分时间空闲着,而线程2又一直在处理的情况。
2、为了避免这种情况,让“能力强的多干”,可以设置参数channle.basicQos(),默认是0—轮询,设置为1就是不公平分发
在消费者端修改代码
//改为不公平分发
channel.basicQos(1);
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
测试结果表明,当处理慢的线程,正在处理消息时,mq会将传来消息分配给空闲的线程,不再遵循轮询。
3、预取值
-
因为消息的发送是异步的,任何时候,信道肯定不止只有一个信息,消费者的确认信息同样也是异步。因此这里存在一个未确认消息的缓冲区。我们可以限制缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
-
通过设置basic.qos方式设置预取值,该值定义信道上允许的未确认消息的最大数量 。一旦到达配置的数量,RabbitMQ将停止在信道上传递更多信息,只有当未处理的消息被确认,才会继续传递消息。
-
通过设置预取值,在多个工作线程的情景下,我们可以设置这些工作线程的确认消息的数量。例如在上面一个线程处理快一个线程处理慢的情况下,我们可以设置处理消息快的线程预取值大一点,而处理消息慢的线程预取值则小一点。
-
消息应答和qos预取值对用户吞吐量有重大影响 。增加预取值将提高消费者传递消息的速度。虽然自动应答的传输效率是最高、吞吐量是最大的,但是这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗。应该小心使用具有无限预处理的自动确认模式或手动确认模式 。预取值太大------导致消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是反复试验的过程。不同的负载该值取值不同,100-300范围内的值通常可提供最佳的吞吐量。
在前面的两个工作线程加上预取值
线程1 (快)
channel.basicQos(4);
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
线程2 (慢)
channel.basicQos(2);
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
和前面设置basicQos为1,区别在于,前面慢的线程在处理完信息前,信道只能有一个消息。此时再分配,也是给空闲的快的线程。而如果给慢线程设置(2),那么信道可以放2个未确认消息,它就可以抢占到两条消息,其余的分配给快线程。而抢占到哪两条,是根据你一条我一条,例如输入 11 、22 、33 、44 、55 、66 ,假设快线程确认消息11,那么22分配给慢线程,33分配给快线程,44分配给慢线程,55给快线程,此时如果慢线程两条都还没确认(信道已经达到预取值最大),那么66还是给快线程。
(感觉设置预取值对慢线程有意义,可以限制信道上未确认的消息的数量。而对于快线程意义不大,因为快线程处理消息速度很快,信道上未确认的消息堆积慢,处理很快。除非预取值设置过大,所以预取值也不能设置太大。要根据消费者处理消息的能力)
三、发布确认
1、简单来说,就是,在前面的持久化操作中,我们是为了防止rabbitmq突然宕机,导致发送给rabbitmq的消息丢失,于是采取将队列持久化、将消息持久化,来防止消息丢失。但是,这是建立在消息传到rabbitmq然后存入磁盘后,才能实现持久化。生产者发送消息给rabbitmq,如果在存入磁盘过程中就rabbitmq就宕机,那么消息依旧会丢失。 所以采用发布确认,当生产者将消息发给rabbitmq,只有当rabbitmq将消息写入磁盘后,才会发送一个确认发布的消息给生产者 ,只有达到发布确认这一步,才能完全保证消息不会丢失。
因此要保证消息不丢失要这三步:
- 设置要求队列持久化
- 设置要求队列中的消息持久化
- 发布确认
2、开启发布确认的方法
发布确认默认是关闭的,要开启的话需要在信道上调用方法confirmSelect(),
3、单个确认发布
- 这是一种同步确认发布的方式。发布一个消息之后,只有它被确认发布,后续的消息才能继续发布。
- 显而易见,这种发布确认的方式会导致发布特别的慢。没有确认发布的消息就会阻塞所有后续消息的发布。
/**
*1、单个确认耗时
*/
public static void publishMessageIndividually() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开始发布确认
channel.confirmSelect();
//记录开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0;i<MESSAGE_COUNT;i++){
String message = i+" ";
channel.basicPublish("",queueName, null,message.getBytes());
//发一条确认一次
//channel.waitForConfirms(),消息确认成功才返回true
if (channel.waitForConfirms()){
System.out.println("消息发送成功");
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条单独确认消息,花费"+(end-begin)+"ms");
}
4、批量确认发布
先发布一批信息然后一起确认可以极大提高吞吐量 ,缺点是:当发生故障导致发布问题时,不知道哪个消息出现问题。并且,这种方式依然是同步的,同样会阻塞消息的发布(阻塞这一批要确认发布的消息后面的消息)
/**
* 2、批量确认耗时
*/
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开始发布确认
channel.confirmSelect();
//记录开始时间
long begin = System.currentTimeMillis();
//批量确认发布消息的容量
int batchSize = 100; //每一百条确认发布一次
//批量发送消息,批量确认发布
for (int i = 0;i<MESSAGE_COUNT;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//达到100条批量确认一次
if (i%batchSize==0){
channel.waitForConfirms();
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条批量确认消息,花费"+(end-begin)+"ms");
}
虽然批量确认发布消息的时间快了很多,但是弊端非常明显,当发生故障导致发布问题,无法查验是哪条消息出现错误
5、异步确认发布
- 在消息生产者将消息发送到信道,信道在对发来的的消息保存时同时进行编号----实现map来进行编号,key值为消息序号,value值为消息内容。从channel发送到broker,如果rabbitmq确认收到,则回调ackCallback函数来通知生产者哪条消息确认发布;当rabbitmq未确认收到,也会回调nackCallback函数通知生产者哪条消息未确认。 并且,生产者的发布消息,和broker回调确认/未确认函数是异步的,作为生产者无需考虑发布的消息是否被确认,只需要负责发布,确认与否由broker回调函数去通知哪些出问题或者哪些没出问题。
- 之前的两种方式都是同步确认,生产者发送一条或者发送多条,就需要等待rabbitmq确认,现在不用,生产者只管发布,哪些有问题或者没问题会稍后通知生产者,这就是异步确认
- 显然这种方式才是性价比最高的,既能保证效率,又可以保证可靠性,可以查出哪条具体哪条消息出现问题。
/**
* 异步确认发布
* @throws Exception
*/
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启确认发布
channel.confirmSelect();
/**
* 准备一个线程安全有序的哈希表(hashmap),适用于高并发情况下。来保存未确认的消息。
* 1、轻松的将序号与消息进行关联保存
* 2、批量删除条目,根据序号
* 3、支持高并发(多线程)
* 能被发布线程访问
*/
//key是消息编号,value是消息内容
ConcurrentSkipListMap<Long,String>outstandingConfirms = new ConcurrentSkipListMap<>();
//准备消息监听器,监听哪些消息成功了,哪些消息失败了(监听从broker发来的确认发布的消息)
//消息确认成功,回调函数。第一个参数是消息的标记,第二个标记是否为批量确认
ConfirmCallback ackCallback = (deliveryTag,multiple)->{
/**
* 头部是已确认消息
* multiple为true,则调用headMap来获取已确认消息
*headMap(deliveryTag)
* confirmed--返回小于等于当前消息序号的消息----已确认消息
* 将这部分消息清除,得到的就是未确认
* 如果multiple为false,则直接删除当前序号
*/
if(multiple){
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:"+deliveryTag);
};
//消息确认未成功,回调函数---后续可以对确认失败的消息操作
ConfirmCallback nackCallback = (deliveryTag,multiple)->{
//打印未确认消息
String confirmMessage = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息是:"+confirmMessage);
};
//可以选择只监听一种(成功或者失败的),也可以选择两种都监听(建议)
channel.addConfirmListener(ackCallback,nackCallback);//异步通知
//记录发布前的时间
long begin = System.currentTimeMillis();
//批量发布消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"消息";
channel.basicPublish("",queueName,null,message.getBytes());
//记录所有要发送的消息
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条异步确认消息,花费"+(end-begin)+"ms");
}
从结果的打印也可以看出这是异步的。发布耗时非常短,生产者一端不用管是否发布确认成功,只需要发布,过一会rabbitmq会发送成功与否的通知。
- 处理未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 监听线程confirmcallbacks 与发布线程之间进行消息的传递。
准备一个哈希表来保存未确认的消息
如何得到未确认消息?
首先将全部发布的消息存入哈希表,然后减去确认发布的消息,得到的就是未确认的消息
6、三种发布确认的方式
public static void main(String[] args) throws Exception{
// ConfirmMessage.publishMessageIndividually();//830ms
// ConfirmMessage.publishMessageBatch(); //127ms
ConfirmMessage.publishMessageAsync(); //52ms
}
- 单独发布消息
同步等待确认,简单,但吞吐量非常有限。
- 批量发布消息
批量同步等待确认,简单,合理的吞吐量,但是一旦出现问题但很难推断出是那条消息出现了问题。
- 异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
四、交换机
4.1、概念
1、概念
生产者生产的消息并不是直接发送到队列,而只能将消息发送到交换机(exchange),交换机的工作内容是:一方面接收来自生产者的信息,另一方面将它们推入队列。交换机对于消息处理分为这三种:将消息放到特定队列、将消息放到许多队列、丢弃消息,对于消息的处理由交换机的类型来决定。
在之前的学习中,我们没有设置交换机,但并不说消息就直接发送给了队列,而是rabbitmq采用默认交换机。
2、类型
- 直接(direct)—路由模式
- 主题(topic)
- 标题(headers)
- 扇出(fanout)—发布订阅模式
3、创建临时队列
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue(); //创建一个临时队列,队列名随机
4、绑定(bindings)
binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。交换机通过RoutingKey与队列进行绑定 。通过routingKey,可以决定发给哪个队列(前提是与交换机进行了绑定的队列)。
4.2、Fanout
1、介绍
Fanout这种类型非常简单,它是将接收到的所有消息广播 到它知道的所有队列中。系统中有默认的fanout类型的交换机
2、实战
两个接收消息的线程,一个发送消息的线程。采用Fanout类型的交换机
- 接收消息的线程1
public class ReceiveLogs01 {
//交换机的名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//创建一个临时队列,队列名称随机。当消费者断开队列连接,自动删除
String queue = channel.queueDeclare().getQueue();
/**
* 绑定交换机和队列
* 队列名、交换机名、RoutingKey
*/
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打在屏幕上...");
//接收消息
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("ReceiveLogs01接收到:"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println(consumerTag+"消费者取消消息时回调接口逻辑");
};
channel.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}
- 接收消息的线程2
/**
* 一个发送消息,两个接收消息
* 消息的接收1
*/
public class ReceiveLogs02 {
//交换机的名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//创建一个临时队列,队列名称随机。当消费者断开队列连接,自动删除
String queue = channel.queueDeclare().getQueue();
/**
* 绑定交换机和队列
* 队列名、交换机名、RoutingKey
*/
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打在屏幕上...");
//接收消息
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("ReceiveLogs02接收到:"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println(consumerTag+"消费者取消消息时回调接口逻辑");
};
channel.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}
- 发布消息
/**
* 发送消息给交换机
*/
public class EmitLog {
//交换机的名字
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 可以不加,交换机已经声明过在消息接收里
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
结果是,一旦发布消息线程发布消息,两个接收消息的线程都会接收到消息。这就是交换机将消息广播给了绑定的两个队列,两个队列将消息发送到消费者。于是两个消费者都接收到了消息
4.3、Direct
1、Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去。
通过交换机与不同队列绑定时不同的routingKey,来让消息到对应的队列中。 消息绑定键符合的去到对应的队列,而不符合的则将被丢弃。
2、当使用direct类型的交换机,但是绑定的多个队列的routingKey却相同,这种情况下表现和fanout类似,和广播差不多
3、代码实现
与fanout模式相比,只需要在队列与交换机绑定时绑定上routingKey,然后在发布消息时,将消息绑定routingKey
- 接收消息
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("console",false,false,false,null);
//绑定队列和交换机
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
- 发布消息
String message = sc.next();
//想让哪个队列接收,就填那个队列绑定的routingKey
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
4.4、Topics
1、虽然之前的direct类型让接收消息变得更灵活,但仍然有局限性。因为direct类型只能让绑定了对应routingKey的消息只进入绑定了routingKey的队列。如果现在需要接收二者结合的消息,direct则办不到。比方说我们想接收的日志类型有info.base 和 info.advantage,而有一个队列只能传送 info.base 的消息,那这个时候 direct 就办不到了。就只能使用 topic 类型
2、Topic要求
- 必须是单词列表,以点号分隔开
- *可以代替一个单词,#可以替代0个或多个单词
- 当一个队列绑定键是#,那么这个队列将接收所有数据,有点像fanout(#可以接受所有,所以其他队列接收到的他都能收到,广播)
- 当队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct
3、实战
更改代码就是在绑定的routingKey上
- 发布消息
/**
* channel.queueBind("Q1",EXCHANGE_NAME,"lazy.#");
* channel.queueBind("Q1",EXCHANGE_NAME,"*.*.rabbit");
* channel.queueBind("Q2",EXCHANGE_NAME,"*.orange.*");
* 通过map,将消息和routingKey绑定
* 发布消息时,通过getKey得到RoutingKey,通过getValue得到信息内容
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q1 接收一次");//Q1匹配度更高
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q1");
//将map集合遍历出来
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
- 接收消息1
channel.queueBind("Q1",EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind("Q1",EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收消息...");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收队列:"+"Q1"+"绑定键"+ message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println(consumerTag+"消费者取消消息时回调接口逻辑");
};
channel.basicConsume("Q1",true,deliverCallback,cancelCallback);
- 接收消息2
channel.queueDeclare("Q2",false,false,false,null);
channel.queueBind("Q2",EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息...");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收队列:"+"Q2"+"绑定键"+ message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println(consumerTag+"消费者取消消息时回调接口逻辑");
};
channel.basicConsume("Q2",true,deliverCallback,cancelCallback);
结果显示,只有匹配了绑定的消息,会被相对应的队列接收传给消费者。而不满足任何绑定的消息,会被舍弃掉。
五、死信队列
1、概念
无法被消费的消息。一般来说,consumer从queue取出消息进行消费,但某些时候由于特定的原因,导致queue中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就会变成死信,为了保证这部分信息不丢失,rabbitmq的死信队列机制,当消息消费发送异常,将消息投入死信队列。例如用户在商场下单并点进去支付在指定时间未支付时自动失效。
2、来源
- 消息TTL过期(消息存活时间)
- 队列达到最大长度
- 消息被拒绝
3、模拟
创建两个消费者,一个是普通消费者,消费普通队列的消息。而另一个则是消费死信队列的消息。当生产者发布带有过期时间的消息,发布到普通队列里,(关闭c1消费者线程,模拟消息在队列里过期),消息ttl过期后就会进入死信队列,启动c2,发现c2接收到了死信。
- consumer01(消费普通队列里的消息的消费者)
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明队列
/**
* 声明队列时,当消息一旦过期,就会传给死信队列,通过死信交换机
* 所以声明正常队列时,要设置过期时间,设置死信交换机,设置死信routingKey。这些参数通过map设置
*/
Map<String, Object> arguments = new HashMap<>();
//设置过期时间
//arguments.put("x-message-ttl",10000);可以由生产者发布消息时设置,更加方便
//设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routingKey
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定队列和交换机
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息");
DeliverCallback deliverCallback =(consumerTag,message)->{
String ms = new String(message.getBody(),"UTF-8");
System.out.println(new String("consumer01接收到的消息是:"+ ms));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});
}
- consumer02(消费死信队列里的消息)
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息");
DeliverCallback deliverCallback =(consumerTag,message)->{
String ms = new String(message.getBody(),"UTF-8");
System.out.println(new String("consumer02接收到的消息是:"+ ms));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
}
- producer
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
//设置消息的 TTL 时间
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
message.getBytes());
System.out.println("生产者发送消息:"+message);
}
}
}
}
六、延迟队列
1、简单来说,延迟队列有点像前面死信队列的消息TTL过期的情况----当发布者发布一条带有过期时间的消息,消息传到普通队列,普通消费者c1挂机,消息到期后传到死信队列,然后被c2消费。那么,对于生产者和c2之间来说,就是存在一个过期时间的延迟队列。
2、延时队列的内部是有序的,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列 。
3、在很多场景下,延时队列都得到应用:
- 订单十分钟之内未支付自动取消
- 用户注册成功后三天内没激活进行信息提醒
- 预定会议后,在预定的时间点前十分钟通过各个与会人员参加会议等等
这些场景都是需要在某个事件发生之后或之前的指定时间点完成一个任务。
4、TTL
- TTL是RabbitMQ中一个消息或者队列的属性。表明一条消息或者该队列中所有的消息的最大存活时间,ms
- 如果一条消息在TTL时间内没有被消费就会进入死信队列
- 如果一条消息本身设置了TTL值然后又进入了设置了TTL的队列,那么较小的那个TTL会被使用
5、整合SpringBoot
- 导入依赖
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
- Swagger配置类
- 写一个配置类,配置的是队列和交换机之间的绑定关系。例如绑定队列A到x交换机,并为它设置死信交换性、死信routingKey、ttl
/**
* 声明队列 A, ttl 为 10s 并绑定到对应的死信交换机
* @return
*/
//声明队列A
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
如图,只定义符合目前所需要的延迟时间的队列是不完善的,因为后续如果要补充其他延迟时间的消息,难道又要去创建一个一个的对应延迟时间的队列吗?所以在这里创建一个没有ttl的队列,用来传递那些自定义ttl的消息,使更灵活的实现延迟队列。
- 生产者发布消息
@Slf4j
@RequestMapping("ttl")
@RestController
/**
* 发送延迟消息
*/
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
//发送给两个队列
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
}
//发布带有ttl的消息
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void senMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长为{}毫秒的信息给队列QC:{}", new Date(),ttlTime, message);
rabbitTemplate.convertAndSend("X","XC",message,msg->{
//发送的消息的延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
}
- 接收死信队列的消息的消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
- 输出结果:
当前时间:Tue Jul 11 22:03:16 CST 2023,发送一条时长为50000毫秒的信息给队列QC:你好3
2023-07-11 22:03:22.562 INFO 2636 --- [nio-8080-exec-5] c.e.r.controller.SendMsgController : 当前时间:Tue Jul 11 22:03:22 CST 2023,发送一条时长为20000毫秒的信息给队列QC:你好4
2023-07-11 22:04:06.222 INFO 2636 --- [ntContainer#0-1] c.e.r.c.c.DeadLetterQueueConsumer : 当前时间:Tue Jul 11 22:04:06 CST 2023,收到死信队列信息你好3
2023-07-11 22:04:06.222 INFO 2636 --- [ntContainer#0-1] c.e.r.c.c.DeadLetterQueueConsumer : 当前时间:Tue Jul 11 22:04:06 CST 2023,收到死信队列信息你好4
通过一个不设置ttl的队列来传递带有tt的消息,这样确实可以满足更多的不同延迟时间的消息。但是可以看出,这样也有一个很大的弊端。带有ttl的消息进入死信队列同样要排队处理,假如排在前面的消息的延迟时间长,而排在后面的消息延迟时间短,那么死信队列仍旧先处理前面的,即等前面的ttl到期再让消费者接收,然后再处理后面的,这样后面的消息的ttl就没有意义了。
6、优化
通过rabbitmq的插件。新增一种交换机类型—延迟交换机类型
利用插件可以解决前面的问题。
7、总结
- 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
七、发布确认高级
1、正常流程,是生产者发送消息到交换机,然后交换机将消息转给相应的队列,队列再发送给消费者。
当交换机出现问题,但生产者还不知道,这个时候发布消息给交换机,消息就会丢失。
为了解决这个问题,rabbitmq有发布确认机制,通过交换机收到消息后发送确认回调,通知生产者 ,如果交换机没有接收到,这时消息会放入缓存,而缓存可以设置定时任务对未成功发送的消息重新传递给交换机,当交换机能够接收到,那么就会在缓存中将对应的消息删除。
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE
2、回调接口
交换机收到消息或者没收到消息都会调用回调接口通知生产者。
- 交换机收到消息
correlationData :保存回调消息的ID及相关消息
ack = true
cause null
- 交换机没收到消息
correlationData :保存回调消息的ID及相关消息
ack = false
cause 失败的原因
3、回退消息
正常情况下,生产者将消息发送给交换机,交换机接收到消息后会直接给生产者发送确认消息,交换机通过绑定的路由key将消息发送到队列上,队列再发送给消费者。但是,当消息不可路由时,也就是消息没有发送到队列时 ,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃的。
那么如何让无法被路由的消息被处理?通过设置mandatory参数,可以在当消息传递过程中无法到达目的地时将消息返回给生产者
在配置文件当中需要添加
spring.rabbitmq.publisher-returns = true
4、备份交换机
前面通过消息回退可以将没有被路由的消息回退给生产者,但是生产者拿到后并不能很好的处理,这些未达目的地的消息我们不知道该如何处理,最多打个日志,但是通过日志来处理是很不优雅的,特别是当生产者所在服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置mandatory会增加生产者的复杂性那么有什么更好的解决办法吗?既能不丢失消息又可以不增加生产者的负担?
答案是有的。对于这些没有进入到队列的消息,在rabbitmq中可以备份交换机,当为某一个交换机声明了备份交换机后,就会为它创建一个备胎,当交换机接收到一条不可路由的消息(没有对应的队列)时,将会把这条消息转发给备份交换机中,备份交换机的类型为fanout,在备份交换机下建立队列,这样那些无法被路由得到消息都会进入这个队列 ,我们还可以建立一个报警队列(也能接收到没被路由的消息,因为备份交换机的类型是fanout),由独立的消费者来进行监测和报警。
- mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先
级高,经过上面结果显示答案是备份交换机优先级高 。所以如果二者都设置开启了,那么没被路由的消息会进入备份交换机。
八、其他知识点
1、幂等性
用户对同一个操作发起的一次请求或者多次请求的返回结果一致,不会因为多次点击而不同。例如支付,当用户支付时,支付扣款成功,此时返回结果时网络异常,钱已经扣了,用户再次点击付款会进行第二次扣钱。在以前的单应用系统中,我们只需要将数据操作放入事务即可,发生错误立即回滚,但是再响应客户端的过程也有可能出现网络中断。
在mq中,消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,
故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但
实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。如何解决?
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过
2、优先级队列
在我们系统中有一个订单催付的场景,当用户在下单后设定时间内未付款,那么会给用户推送一条短信提醒。但是,对于大客户(公司团购之类的)和小客户,要区分对待。对于大客户的订单应当优先处理,而之前使用的redis来存放是使用定时轮询,这样并不能创造出一个优先级的队列。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
原理:
当使用优先级队列传递消息时,消费者在获取优先级队列的消息时,并不是根据消息发来的顺序获取,而是会先根据消息的优先级去排一个(优先级越高的在前面)顺序。从而达到优先级高的消息先被消费的目的
3、惰性队列
- 使用场景
惰性队列会尽可能将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标就是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种原因使得长时间不能消费消息造成堆积时,惰性队列就很有必要了。
-
惰性队列其实就是消息的一个保存位置。正常情况下,消息是保存在内存,而惰性队列中的消息则是保存在磁盘
-
内存开销对比
在发送一百万条消息,每条消息大概占1kb的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB。但是惰性队列速度非常慢,因为要从磁盘里读取