文章目录
- 一. 消息队列
- 1. 定义
- 2. 作用
- 2.1 流量消峰
- 2.2 应用解耦
- 2.3 异步处理
- 3. 分类
- 4. MQ的选择
- 5. RabbitMQ
- 5.1 概念
- 5.2 四大概念
- 5.3 六大模式
- 5.4 RabbitMQ 工作原理
- 5.5 安装
- 6. 代码实现
- 二. Hello World (简单模式)
- 1. 生产者代码
- 2. 消费者代码
- 三. Work Queues (工作队列模式)
- 1. 轮询分发消息
- 2. 消息应答
- 2.1 概念
- 2.2 自动应答
- 2.3 手动应答
- 2.3 消息应答重新入队
- 3. RabbitMQ持久化
- 3.1 队列持久化
- 3.2 消息持久化
- 3.3 不公平分发
- 3.4 预取值
- 四. 发布确认
- 1. 原理
- 2. 发布确认的策略
- 2.1 开启发布确认的方法
- 2.2 单个确认发布
- 2.3 批量确认发布
- 2.4 异步确认发布 (内置了一个监听器)
- 2.5 确认未处理的消息
- 2.6 对比
- 五. 交换机
- 1. 概述
- 2. fanout扇形交换机
- 3. direct直接交换机
- 4. Topic主题交换机
- 5. 自我总结
- 六. 死信队列
- 1. 死信的概念
- 2. 死信的来源
- 3. 实战
- 七. 延迟队列
- 1. 概念及应用 ***
- 2. RabbitMQ中的两种TTL
- 3. 延时队列基础模型(基于死信队列)
- 4. 优化(基于死信队列)
- 5. 插件实现延迟队列
- 八. 发布确认高级
- 1. 基于SpringBoot的基本代码及存在问题
- 2. 回退消息
- 3. 备份交换机
- 九. RabbitMQ补充知识 *****
- 1. 幂等性
- 1.1 概念
- 1.2 消息重复消费
- 1.3 消费端的幂等性保障
- 2. 优先队列
- 3.惰性队列
- 十. RabbitMQ集群
- 1. 搭建
- 2. 镜像队列
- 3. Haproxy + Keepalive实现高可用负载均衡
- 4. Federation Exchange/Queue
- 5. Shovel
一. 消息队列
1. 定义
2. 作用
2.1 流量消峰
2.2 应用解耦
2.3 异步处理
3. 分类
4. MQ的选择
大量数据:Kafaka;高并发:RocketMQ; 中小型数据量少:RabbitMQ
5. RabbitMQ
5.1 概念
快递站
5.2 四大概念
生产者,消费者,交换机,队列
交换机可以对应多个队列
5.3 六大模式
简单模式
工作模式
发布/订阅模式
路由模式
主题模式
发布确定模式
5.4 RabbitMQ 工作原理
5.5 安装
密码:123456
用户名:admin
6. 代码实现
有基于SpringBoot的代码 实现起来比较简单,参考资源下载。
二. Hello World (简单模式)
1. 生产者代码
- 导入依赖
- 代码
密码改成自己的
2. 消费者代码
package com.chent;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static final String QUEUE_NAME = "HELLO";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.86.130");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待消息被消费..");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
三. Work Queues (工作队列模式)
1. 轮询分发消息
- 直接用idea启动两个线程的技巧
右上角run旁边edict configuration 然后Build and Run最后 modify option选项 选择 allow mutiple instances
2. 消息应答
2.1 概念
2.2 自动应答
不靠谱
2.3 手动应答
- 参数Multiple的解释
- 代码实现
2.3 消息应答重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认, RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
默认的自动应答会使消息丢失(自动应答就是发送消息后即认为成功发射),要想实现消息不丢失,必须采用手动应答
3. RabbitMQ持久化
3.1 队列持久化
- 注意事项
已有消息队列改成持久化,必须先删除
3.2 消息持久化
3.3 不公平分发
轮训分发-公平分发,但是不合理
3.4 预取值
限制缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
通过使用 basic.qos 方法设置“预取计数” 值来完成的。 该值定义通道上允许的未确认消息的最大数量
四. 发布确认
1. 原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID( 从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了**,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。**
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
2. 发布确认的策略
2.1 开启发布确认的方法
**confirmSelect()**开启发布
2.2 单个确认发布
发布速度特别的慢
channel.waitForConfirms(): 确认订阅成功
2.3 批量确认发布
先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
2.4 异步确认发布 (内置了一个监听器)
一个监听器+两个回调函数(一个成功,一个失败返回)
完整代码:channel.addConfirmListener(成功返回函数,失败返回函数)
2.5 确认未处理的消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
package com.chent;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* 发布订阅模式:异步发布确认
*/
public class Publish {
public static void main(String[] args) throws Exception {
Publish.publishAsync();
}
public static void publishAsync() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.86.130");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "publishTest";
channel.queueDeclare(queueName,true,false,false,null);//申明队列
channel.confirmSelect();//开启发布确认
//异步发布的逻辑
//线程安全有序hashmap 适用于高并发情况
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
//消息成功确认回调函数
ConfirmCallback ackCallback = (var1,var3)->{
//2.删除确认的消息
if(var3){//VAR3表示处理批量情况
ConcurrentNavigableMap<Long, String> confirmMessage = map.headMap(var1);
confirmMessage.clear();}
else{
map.remove(var1);
}
System.out.println("成功确认:"+ var1);
};
//消息失败确认回调函数
ConfirmCallback nackCallback = (var1,var3)->{
//3.打印未确认的消息
String message = map.get(var1);
System.out.println("未确认的消息是:"+message);
};
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for(int i = 0;i<1000;i++){
String message = "消息" + i;
channel.basicPublish("",queueName,null,message.getBytes());
//1.记录所有要发送的消息
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("===========================================异步耗时:" + (end-begin)+"============================");
}
}
2.6 对比
五. 交换机
在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为 ”发布/订阅
1. 概述
- 作用
RabbitMQ 消息传递模型的核心思想是**: 生产者生产的消息从不会直接发送到队列**。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反, 生产者只能将消息发送到交换机(exchange),**交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。**交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定 - 交换机类型
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout) - 无名Exchange
- 临时队列
- 绑定Bind
2. fanout扇形交换机
将接收到的所有消息广播到它知道的所有队列中
- 消费者代码
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
// 1. 声明fanout交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
//2. 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
//3. 绑定交换机和队列的关系
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台打印接收到的消息"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 生产者代码
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
3. direct直接交换机
在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
队列只对它绑定的交换机的消息感兴趣。绑定用参数: routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
4. Topic主题交换机
- 引入
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型 - 命名规范
5. 自我总结
交换机和队列绑定后,可以指定消息发给对应的队列
扇形交换机会给全部绑定的队列发消息,routingKey为空
直接交换机会给对应RoutingKey的队列发消息
主题交换机会给对应类型的routingKey(在直接交换机基础上改进)的队列发消息
说白了三种交换机其实就是在改变routingKey参数
channel.exchangeDeclare(交换机名称,类型);
channel.queueBind(队列名称,交换机名称, routingKey);
六. 死信队列
1. 死信的概念
-
概念
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了, consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 -
应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
2. 死信的来源
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
3. 实战
- 代码架构图
- 模拟TTL过期
- 生产者代码
在生产者中设置过期时间
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel())
{ channel.exchangeDeclare(NORMAL_EXCHANGE,
BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,
message.getBytes());
System.out.println("生产者发送消息:"+message);
}
}
}
}
- 消费者C1代码
声明普通/死信队列和交换机,绑定routingKey关系;
核心部分:普通队列通过声明队列的参数 param绑定死信交换机的关系
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
}
}
- 消费者C2代码
只是简单的让C2消费死信
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
- 队列达到最长长度
如何设置队列长度:在队列声明中添加参数
七. 延迟队列
1. 概念及应用 ***
2. RabbitMQ中的两种TTL
3. 延时队列基础模型(基于死信队列)
-
定义
-
代码见资源下载(基于SpringBoot来实现)
-
代码架构
-
效果展示
-
存在的问题
如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
4. 优化(基于死信队列)
-
架构
新建一个队列,但是不设置TTL;在生产者端设置TTL即可 -
效果展示
-
缺点
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
5. 插件实现延迟队列
-
效果展示
-
实现步骤
-
安装插件
-
架构图及代码
-
延时队列的其他选择
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
八. 发布确认高级
1. 基于SpringBoot的基本代码及存在问题
交换机发出了确认回调,但实际上队列没有收到消息
2. 回退消息
- 概念:
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。 - 核心代码
- 效果展示
3. 备份交换机
- 概念
备份交换机可以理解为 RabbitMQ 中交换机的“备胎” ,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。 - 代码框架
- 代码实现
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
//声明备份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//声明确认 Exchange 交换机的备份交换机
@Bean("confirmExchange")
public DirectExchange
confirmExchange(){ExchangeBuilder
exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//设置该交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// 声明警告队列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 声明报警队列绑定关系
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange
backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
// 声明备份队列
@Bean("backQueue")
public Queue backQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 声明备份队列绑定关系
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}
}
九. RabbitMQ补充知识 *****
1. 幂等性
1.1 概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
1.2 消息重复消费
- 问题描述
消费者在消费 MQ 中的消息时, MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。 - 解决方案
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
1.3 消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性。
- 方案一:唯一ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。 - 方案二:Redis原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
2. 优先队列
- 应用场景
- 实现原理
- 代码实现
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
//给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++){
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
}
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
{Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消费者启动等待消费..............");
DeliverCallback deliverCallback=(consumerTag, delivery)-
>{ String receivedMessage = new
String(delivery.getBody());System.out.println("接收到消
息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)-
>{System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}
3.惰性队列
- 定义
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。 - 应用场景
当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
- 两种设置模式
- 内存开销对比
十. RabbitMQ集群
1. 搭建
2. 镜像队列
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
3. Haproxy + Keepalive实现高可用负载均衡
Haproxy 实现负载均衡
Keepalived 实现双机(主备)热备