说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。
这里用SpringBoot maven:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:
package com.esint.configs;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL延迟队列配置文件类
*
*/
@Configuration
public class TtlQueueConfig {
//
//普通交换机的名称 X
public static final String X_EXCHANGE = "X";
//死信交换机名称 Y
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列QA QB
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列名称QD
public static final String DEAD_LETTER_QUEUE = "QD";
//
//声明X_EXCHANGE
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明死信交换Y_DEAD_LETTER_EXCHANGE
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列 QA
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey (死信后充当了消费者的发送路由)
arguments.put("x-dead-letter-routing-key","YD");
//消息过期时间
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
//声明队列 QB
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey (死信后充当了消费者的发送路由)
arguments.put("x-dead-letter-routing-key","YD");
//消息过期时间
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
//声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
//捆绑
//绑定队列QA与交换机X_EXCHANGE
@Bean
public Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//绑定队列QB与交换机X_EXCHANGE
@Bean
public Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//绑定队列QD与交换机Y_Exchange
@Bean
public Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange")DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;
//发送延迟消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/senMsg/{message}")
public void sendMes(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
}
消费者与死信队列创建一个监听者示例:
package com.esint.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 队列TTL消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//接受消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);
}
}
rabbitmq的配置文件:
spring:
rabbitmq:
host: *.*.*.*
port: 5672
username: guest
password: guest
接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置
队列:
交换机:
点开详细后,也能考到他们之间的绑定关系:
消息发布测试:
生产者发送消息:
浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice
通过生产者发送:nice
当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice
消费者在10s后和40秒分别收到了消息:
拓展:是不是有一种可能,如果再队列中不设置过期时间,在生产者发送消息时设置过期时间 来实现过期时间自由设定,而延迟自由?
结论是不能:
rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。
示例验证:
增加一个无过期时间约束的队列,以routing-key为XC绑定X交换机,过期后以routing-key为YD绑定Y交换机。
过期时间放生产者发送时设定。
在的rabbitMQ配置类中增加QC 绑定前(X routing-key=XC)后(Y routing-key=YD)交换机:
// 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
public static final String QUEUE_C ="QC";
//声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
@Bean("queueC")
public Queue queueC(){
Map<String,Object> arguments = new HashMap<>(2);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置routing-key
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
//绑定队列QC与交换机X_EXCHANGE 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
@Bean
public Binding queueCBindXExchange(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
生产者:
@GetMapping("/sendttl/{message}/{ttlTime}")
public void sendMes(@PathVariable String message,@PathVariable String ttlTime){
/**
* 死信队列做延迟时的缺陷:
* rabbitMQ只会检查第一个消息是否过期带来的问题就是,如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。
*/
log.info("当前时间:{},发送一条ttl为{}ms的消息给QC队列:{}",new Date().toString(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,mes->{
mes.getMessageProperties().setExpiration(ttlTime);
return mes;
});
}
消费者不变,启动服务!
生产者发送消息:第一条 3000ms
http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000
http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000
结论:第二条虽然早早过期,它依然需要等待第一条过期后,才能排到他。rabbitMQ的队列过期检查机制。