RabbitMQ系列文章,前几篇介绍了基础概念,AMQP 0-9-1 协议,和服务端安装,准备工作都完成后,就开始着手开发了。这篇文章主要介绍RabbitMQ生产者的开发,包括Producer、Message常见的参数,读完这篇文章,基本掌握了Procuder端日常开发和使用。
这篇文章的所有代码实例都是基于AMQP 0-9-1,需要了解AMQP 0-9-1 协议的详细内容的小伙伴们,请参考AMQP 0-9-1 协议介绍 这篇文章。
这篇文章中设计的代码会列出关键部分,如果想看完整的Producer客户端开发的代码,可以参考RabbitMQ简单使用这篇文章,只需要在此基础上,加上这篇文章给出的关键代码,稍作修改就可以实现相关的功能。
mandatory参数
Producer发布的消息并不能百分百保证准确无误的路由到队列,当交换器无法将消息路由到队列时,消息就有可能丢失。mandatory参数可以解决消息丢失的问题,只需要把mandatory设置为true,当出现上述问题时,RabbitMQ会将消息返回给Producer,让Producer自己根据实际业务处理。如果设置为false,出现上述问题,消息就会被丢弃。
具体实现方式很简单,只需要在发布消息是将mandatory参数设置为true,并给channel添加ReturnListener监听器。代码实现如下。
// 添加监听器,当消息无法路由时,就会监听到RabbitMQ返回的消息
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(new String(body));
}
});
String message = "mandatory message";
// 发布消息时,第三个参数mandatory设为true
channel.basicPublish(exchange, "aaaaaaaa", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
备份Exchange
当Producer发布的消息无法路由到队列,如果mandatory设置false,消息会丢失,mandatory设置true,则会增加编程复杂度。备份队列可以解决这个问题,业务队列绑定备份队列后,当消息无法路由到队列时,此消息经过备份交换器,路由到备份队列,消息不会丢失,在合适的时候再处理备份队列中的消息,这种方式让程序更简单且更具灵活性。
下面是备份交换器的代码实现。
// 普通交换器
String normalExchange = "normal_exchange";
// 普通队列
String normalQueue = "normal_queue";
// 普通路由键
String normalRoutingKey = "normal_routing_key";
// 备份交换器
String aeExchange = "ae_exchange";
// 备份队列
String aeQueue = "ae_queue";
// 设置参数,申明备份交换器
Map<String, Object> args = new HashMap<>();
args.put("alternate-exchange", aeExchange);
// 申请普通交换器,将参数设置到普通交换器上
channel.exchangeDeclare(normalExchange, BuiltinExchangeType.DIRECT, false, false, args);
// 申明备份交换器(备份交换器也需要创建)
channel.exchangeDeclare(aeExchange, BuiltinExchangeType.FANOUT, false, false, null);
// 申明普通队列
channel.queueDeclare(normalQueue, false, false, false, null);
// 申明备份队列
channel.queueDeclare(aeQueue, false, false, false, null);
// 绑定
channel.queueBind(normalQueue, normalExchange, normalRoutingKey);
channel.queueBind(aeQueue, aeExchange, "");
只有发布消息时的路由键是normal_routing_key,消息才会路由到 normal_queue,当发布消息的路由键不是normal_routing_key 时,找不到队列,此时消息会进入ae_queue。下面列出一张图帮助理解。
这里的备份交换器类型是fanout,也可以设置为direct或者topic,但是需要注意,此时备份交换器上的绑定键需要和普通交换器上的绑定键一致,消息才能进入备份队列,以上面的代码举例:
普通路由键是normal_routing_key,备份交换器时 ae_routing_key,当Producer发布的消息,带的路由键是other_routing_key,无法路由到普通队列,就会进入备份交换器,但是备份交换器上的绑定键 ae_routing_key,无法和发布消息携带的 other_routing_key 匹配,消息还是被丢弃。
所以,建议备份交换器的类型设置为fanout,确实需要使用direct或者topic的话,注意路由键。如果不了解交换器类型的小伙伴们,可以参考AMQP 0-9-1 协议介绍 或者RabbitMQ相关概念及代码示例 这两篇文章。
消息过期时间(TTL)
给消息设置TTL,在超过TTL值后,消息就会变成dead message(死信),订阅此队列的消费者无法消费(也不是绝地的,后续文章会介绍解决办法)。只需要在申明队列的时候,设置x-message-ttl 值即可。下面是代码实现。
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5 * 1000);
channel.queueDeclare(queue, true, false, false, args);
如果不设置消息的ttl,消息不会过期;如果ttl设置为0,除非可以直接投递给消费者,否则消息会被丢弃。
每条消息可以设置不同的TTL,所以每条消息在被投递到消费者之前,才会判断消息是否过期,这样就会存在一种情况,后面的消息比前面的先过期,但是消费者依然不能消费到后面的消息,必须前面的消息先被投递到消费者,RabbitMQ就是采用这种方案的。下面用一张图帮助理解。
队列过期时间(TTL)
RabbitMQ不仅支持消息的TTL,还支持队列级别的TTL,可以通过x-expires 参数控制在队列删除之前处于未使用状态的时间,比如设置为1000,表示队列在1s之内,没有被使用,就会被删除。注意,队列级别的TTL不能设置为0。下面是代码实现。
HashMap<String, Object> args = new HashMap<>();
args.put("x-expires", 20000);
channel.queueDeclare(queue, true, false,false, args);
队列级别的TTL和消息级别的TTL不一样,因为不用考虑每条消息的TTL,只要队列到了TTL,就可以被删除。
好了,以上就是基于AMQP 0-9-1 协议,关于Producer的常用API使用第一部分的分享。
RabbitMQ系列文章会陆续更新,欢迎各位小伙伴关注后面的技术分享。