发布确认
- 发布确认原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。此外 broker 也可以设置 basic.ack的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于它是异步的。一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。 - 开启发布确认的方法
发布确认默认是没有开启的,开启需要调信道 confirmSelect 方法。
- 单个发布确认
同步确认发布的方式,也就是发布一个消息之后被确认发布后,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,若在指定的时间范围内没有确认则抛出异常。
缺点:发布速度特别慢,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
//单个发布确认
public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"";
channel.basicPublish("", queueName, null, message.getBytes());
//单个消息就马上进行发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("单独确认耗时:"+(end-begin)+"ms");
}
- 批量发布确认
先发布一批消息后,一起确认可以极大提高吞吐量。缺点:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案也是同步的,也一样阻塞消息的发布。
//批量发布确认
public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认消息大小
int batchSize = 100;
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"";
channel.basicPublish("", queueName, null, message.getBytes());
//判断达到100条信息的时候批量确认一次
if((i+1)%batchSize==0){
channel.waitForConfirms();
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("批量确认耗时:"+(end-begin)+"ms");
}
- 异步发布确认
异步发布确认虽然编程逻辑比上面连个要复杂,但是性价比最高,无论是可靠性还是效率都没的说,它是利用回调函数来达到消息可靠性传递的。
//异步发布确认
public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//消息确认成功回调
/**
* 1.消息的标记
* 2.是否为批量确认
*/
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
System.out.println("确认的消息:"+deliveryTag);
};
//消息确认失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
System.out.println("未确认的消息:"+deliveryTag);
};
/**
* 准备消息的监听器 监听哪些消息成功了,哪些失败了
*/
channel.addConfirmListener(ackCallback, nackCallback);
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"";
channel.basicPublish("", queueName, null, message.getBytes());
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("异步确认耗时:"+(end-begin)+"ms");
}
- 如何处理异步未确认消息
最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
//异步发布确认
public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况下
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目
* 3.支持高并发(多线程)
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
//消息确认成功回调
/**
* 1.消息的标记
* 2.是否为批量确认
*/
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
//2.删除已经确认的消息,剩下的就是未确认的消息
if(multiple){
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else{
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:"+deliveryTag);
};
//消息确认失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
//3.打印一下未确认的消息都有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息:"+deliveryTag+":"+message);
};
/**
* 准备消息的监听器 监听哪些消息成功了,哪些失败了
*/
channel.addConfirmListener(ackCallback, nackCallback);
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i+"";
channel.basicPublish("", queueName, null, message.getBytes());
//1.此处记录下所有要发送的消息 消息的总和
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("异步确认耗时:"+(end-begin)+"ms");
}
交换机
上述工作队列中,每个任务交给一个消费者。如果将消息传达给多个消费者,这种模式叫“发布/订阅”。
Exchanges
RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。相反,生产者只能将消息发送到交换机(Exchange),交换机工作的内容就是接受来着生产者的消息,并把消息推入队列。交换机怎么处理消息,由交换机类型决定。
-
Exchange 类型
直接(direct):也叫路由类型。
主题(topic)
标题(headers):也叫头类型,已经不常用了。
扇出(fanout):就是发布/订阅模式。 -
无名 Exchange
默认交换机,通过空字符串(“”)进行标识。消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定key指定的,无名交换机时routingKey默认是队列名称。
临时队列
未持久化的随机名称的队列,一旦断开了消费者的连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue();
绑定(binding)
binding 就是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。下图中 X 与 Q1 和 Q2 进行了绑定。
Fanout 扇出(发布/订阅模式)
将接收到的所有消息广播到它知道的所有队列中。RoutingKey相同。系统中有个默认有 Fanout 类型 Exchange。
- Fanout 实战
ReceiveLogs01、ReceiveLogs02 将接收消息打印在控制台
package com.ql.rabbitmq.five;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息接收
*/
public class ReceiveLogs01 {
//交换机的名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个队列 临时队列
/**
* 生成一个临时队列、队列的名称是随机的
* 当消费者断开连接后队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机与队列
*/
channel.queueBind(queueName, EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("ReceiveLogs01接收到的消息:"+new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
}
}
EmitLog 发送消息给两个消费者
package com.ql.rabbitmq.five;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 发消息 交换机
*/
public class EmitLog {
//交换机的名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+ message);
}
}
}
运行测试
Direct exchage 直接交换机(路由模式)
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示,也可称该参数为 binding key。
创建绑定:channel.queueBind(queueName, EXCHANGE_NAME, routingKey);绑定之后的意义由其交换机类型决定。
Direct exchage 工作方式:消息只去到它绑定的 routingKey 队列中去。
图中,生产者发布消息到 Exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black、green 的消息会被发布到队列 Q2。
- 多重绑定
Exchage 的绑定的类型是 direct ,但是它绑定的多个队列的 key 如果相同,表现和 fanout 有点类似。 - 实战
package com.ql.rabbitmq.six;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("console", false, false, false, null);
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("ReceiveLogsDirect01接收到的消息:"+new String(message.getBody(), "UTF-8"));
};
channel.basicConsume("console", true, deliverCallback, consumerTag->{});
}
}
package com.ql.rabbitmq.six;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("disk", false, false, false, null);
channel.queueBind("disk", EXCHANGE_NAME, "error");
System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("ReceiveLogsDirect02接收到的消息:"+new String(message.getBody(), "UTF-8"));
};
channel.basicConsume("disk", true, deliverCallback, consumerTag->{});
}
}
package com.ql.rabbitmq.six;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 发消息 交换机
*/
public class DirectLogs {
//交换机的名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+ message);
}
}
}
运行测试
Topics
- 概念
发送到类型为 Topic 交换机的消息的 RoutingKey 不能随意写,必须是一个单词列表,以点号隔开。如“stock.used.jhhd”。单词列表最多不能超过255个字节。
*(星号)可以代替一个单词。
#(井号)可以替代零个或多个单词。
上图,如 lazy.orange.jkljkl 被队列Q1和Q2收到。
当一个队列绑定建是 #,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有 # 和 * 出现,那么该队列绑定类型就是 direct 了。 - 实战
消费者
package com.ql.rabbitmq.seven;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 声明主题交换机 及相关队列
* 消费者C1
*/
public class ReceiveLogsTopic01 {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
}
}
package com.ql.rabbitmq.seven;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 声明主题交换机 及相关队列
* 消费者C2
*/
public class ReceiveLogsTopic02 {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
}
}
生产者
package com.ql.rabbitmq.seven;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class EmitLogTopic {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("lazy.orange.jkljkl", "被队列Q1和Q2收到");
bindingKeyMap.put("asdf.orange.jkljkl", "被队列Q1收到");
bindingKeyMap.put("lazy.asdf.jkljkl", "被队列Q2收到");
bindingKeyMap.put("lazy.ordddd.rabbit", "被队列Q2收到一次");
bindingKeyMap.put("lazy.sadf.jkljkl.adsf", "被队列Q2收到");
bindingKeyMap.put("sdaf.asdf.jkljkl.adsf", "被丢弃");
for (Map.Entry<String,String> bindingKeyEntry:
bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
运行测试
死信队列
死信,顾名思义就是无法被消费的消息。
producer 将消息投递到 broker 或者直接到 queue 里了,有些时候由于特定原因导致 queue 中的某些消息无法被消费,这些消息若没有后续的处理,就变成了死信,有了死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
死信的来源
- 消息 TTL(存活时间)过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false。
死信实战
- 消息TTL过期
消费者1
package com.ql.rabbitmq.eight;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* 死信队列 实战
* 消费者1
*/
public class Consumer01 {
//普通交换机名
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列名
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明普通和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
Map<String, Object> arguments = new HashMap<>();
//过期时间 10s=10000ms
//arguments.put("x-message-ttl", 10*1000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定普通交换机和普通队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
//绑定死信交换机和死信队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("Consumer01接收的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag->{});
}
}
生产者
package com.ql.rabbitmq.eight;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 死信队列之 生产者
*/
public class Producer {
//普通交换机名
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//死信消息 设置TTL时间 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = "info"+(i+1);
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
}
}
}
消费者2
package com.ql.rabbitmq.eight;
import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 死信队列 实战
* 消费者2
*/
public class Consumer02 {
//死信队列名
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("Consumer02接收的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag->{});
}
}
先运行消费者1,把交换机和队列声明好,然后停止消费者1。
然后运行生产者发消息。
10s过后消息以为TTL过期到死信队列。
然后运行消费者2接收消息。
- 队列达到最大长度
把生产者的TTL参数注释掉
然后把消费者1 里设置正常队列限制的长度
因为对正常队列的参数重新设置,需要先删除之前声明的正常队列
运行消费者1,然后再停止。可以看到重新声明的队列有三个参数
然后运行生产者,发了10个消息,4个被移到死信队列
再运行消费者2 把4个消息消费掉
- 消息被拒绝
修改消费者1
然后删掉正常队列后启动消费者1,和生产者
然后启动消费者2消费info5消息