RabbitMQ 队列参数
RabbitMQ在申明队列的时候,可以指定一些参数:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
本次主要讲解 arguments
参数。
arguments
如果RabbitMQ开启了管理插件(rabbitmq-plugins enable rabbitmq_management
),那么可以登录管理界面,在Queues下新增队列时可以看到参数。
Message TTL: 发布到队列的消息在被丢弃之前可以存在多长时间(毫秒)。(设置“x-message-ttl”参数。)
Auto expire: 队列在被自动删除之前可以使用多长时间(毫秒)。(设置“x-expires”参数。)
Max length: 一个队列在开始从头中丢弃消息之前可以包含多少(准备好的)消息。(设置“x-max-length”参数。)
Max length bytes: 队列在开始从头部丢弃消息之前所能包含的就绪消息的总正文大小。(设置"x-max-length-bytes"参数)
Overflow behaviour: 设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值为drop-head、reject-publish或reject-publish-dlx。仲裁队列类型只支持drop-head和拒绝-发布。
Dead letter exchange: 一个可选的死信交换机,如果消息被拒绝或过期,将重新发布到死信交换机。(设置“x-dead-letter-exchange”参数。)
Dead letter routing key: 当消息是死信时使用的可选替换路由键。如果未设置此值,则将使用消息的原始路由键。(设置“x-dead-letter-routing-key”参数。)
Single active consumer: 如果设置,确保每次只从队列中使用一个使用者,并在活动使用者被取消或死亡的情况下故障转移到另一个注册使用者。(设置“x-single-active-consumer”参数。)
Maximum priority: 队列所支持的最大优先级数;如果没有设置,队列将不支持消息优先级。(设置“x-max-priority”参数。)
Lazy mode : 将队列设置为延迟模式,将尽可能多的消息保存在磁盘上,以减少RAM的使用;如果没有设置,队列将在内存中保留一个缓存以尽可能快地传递消息。(设置“x-queue-mode”参数。)
Master locator: 将队列设置为主位置模式,确定在节点集群上声明队列主位置时所依据的规则。(设置"x-queue-master-locator"参数。)
Message TTL
官网TTLd的文档:https://rabbitmq.com/ttl.html#per-queue-message-ttl
过期时间是一个毫秒数,最小值是0,会导致消息在到达队列时过期,除非消息可以立即传递给使用者。
设置整个队列的TTL:
Map<String, Object> args = new HashMap<String, Object>();
// 设置队列中的所有消息最大生存时间为60s
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
设置单条消息的TTL
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 单条消息最大生存时间为60S
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
Auto expire
队列只有在没有被使用的情况下(例如,没有使用者)才会在一段时间后过期。此特性可以与自动删除队列属性一起使用。
x-expires
参数或过期策略的值expires
以毫秒为单位描述过期时间。它必须是一个正整数(与消息 TTL 不同,它不能是0)。因此,值为1000意味着一个未使用1秒钟的队列将被删除。
该队列在30分钟未使用之后过期:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
Max length & Max length bytes
官网文档:https://rabbitmq.com/maxlength.html
队列的最大长度可以限制为一组消息数或一组字节数(所有消息正文长度的总和,忽略消息属性和任何开销) ,或者两者兼而有之
对于任何给定的队列,都可以使用策略(强烈建议使用此选项)或客户端使用队列的可选参数来定义最大长度(任何类型的)。在有效队列策略和参数都指定最大长度的情况下,将使用这两个值中的最小值。
声明了一个最大长度为10条消息的队列:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);
申明一个最大长度为10字节的最大长度的队列
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length-bytes", 10);
channel.queueDeclare("myqueue", false, false, false, args);
如果两个参数都设置了,那么两个参数都将应用; 首先达到的限制将被强制执行。
可以设置消息溢出行为:
Map<String, Object> args = new HashMap<String, Object>();
// drop-head (default), reject-publish or reject-publish-dlx.
args.put("x-overflow", "drop-head");
channel.queueDeclare("myqueue", false, false, false, args);
可以在管理界面查看队列的最大长度限制
Overflow behaviour
使用溢出设置来配置队列溢出行为。如果溢出设置为reject-publish
或reject-publish-dlx
,则将丢弃最近发布的消息。此外,如果启用了发布者 confirm
,则将通过 basic.nack
消息通知发布者拒绝。如果一条消息被路由到多个队列并被其中至少一个队列拒绝,通道将通过 basic.nack
通知发布者。消息仍然会被发布到所有其他可以对其进行排队的队列中。reject-publish
和reject-publish-dlx
之间的区别在于,reject-publish-dlx
也是拒绝消息的死信。
Dead letter exchange
官网:https://rabbitmq.com/dlx.html
以下情况,消息会进入死信
- 消费者使用
basic.reject
或者basic.nack
且requeue
设置为false 时 - 消息过期了
- 消息达到队列设置的最大长度而被删除了
请注意,队列过期不会使队列中的消息死掉。
死信交换机也是正常的交换机。
设置队列的死信交换机,在声明队列时指定可选的 x-dead-letter-exchange
参数。该值必须是同一虚拟主机中的交换器名称。
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
Dead letter routing key
上面定义了死信交换机,我们还可以指定一个RoutingKey,方便在使用死信消息时路由到不同的队列。
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);