死信队列通过设置延迟时间TTL能实现延迟队列的效果,但是
由于队列先入先出的特性, 如果队列头的消息过期时间很长, 后面的消息过期时间很短, 会导致后面的消息过期后不能及时被消费掉
基于死信队列的缺点,基于插件实现的延迟队列就很好地解决了这个问题。
安装RabbitMq延迟插件见此文章:delayed_message_exchange插件(实现插件延迟队列)
1.创建生产者,基于springBoot框架
package jot.jothot.testMq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送延迟队列消息--插件实现延迟队列
* @param message
*/
@PostMapping("/sendMessageDelayed")
public void sendMessageDelayed(String message,int delayTime){
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.
DELAYED_ROUTING_NAME, message,msg->{
//设置延迟时间
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
log.info("当前时间:{},发送一条信息给队列:{}",new Date().toString(),message);
}
}
2.设置交换机、队列等信息
package jot.jothot.testMq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 插件实现延迟队列
*/
@Configuration
public class DelayedQueueConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayedQueue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";
//routingKey
public static final String DELAYED_ROUTING_NAME = "delayedRouting";
//声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//声明交换机 CustomExchange-自定义交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
//设置路由模式 direct模式
arguments.put("x-delayed-type", "direct");
/**
* 1.交换机的名称
* 2.交换机的类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5,其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
//绑定交换机,队列和routingKey
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue")Queue queue,@Qualifier("delayedExchange")CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_NAME).noargs();
}
}
3.消费者
package jot.jothot.testMq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class ReceiveMessage {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME ,containerFactory = "rabbitListenerContainerFactory")
public void handleMessage1(Message message, Channel channel) throws Exception {
// 处理消息
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
}
}
运行调用接口发送消息,基于插件实现了延迟队列的效果
发送两条消息
1.第一条延迟10s的消息
2.第二条延迟2s的消息
结果如下:延迟2s的数据2s后被消费,避免了死信队列的设置TTl延迟队列的不足