一、RabbitMQ延时队列可以用于哪些场景
RabbitMQ延时队列可以用于以下场景:
-
订单处理:在电商网站中,订单处理是一个常见的业务流程。如果订单需要立即处理,可以使用RabbitMQ的延时队列来实现延迟处理。例如,可以将订单发送到一个延时队列中,并设置一个延迟时间(例如30分钟),然后在延迟时间到达后,将订单从队列中取出并进行处理。
-
消息推送:在移动应用或Web应用程序中,可以使用RabbitMQ的延时队列来实现消息推送。例如,可以将用户订阅的消息发送到一个延时队列中,并设置一个延迟时间(例如1小时),然后在延迟时间到达后,将消息从队列中取出并推送给用户。
-
定时任务:在分布式系统中,可以使用RabbitMQ的延时队列来实现定时任务。例如,可以将需要定期执行的任务发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将任务从队列中取出并执行。
-
数据备份:在数据库中,可以使用RabbitMQ的延时队列来实现数据备份。例如,可以将需要备份的数据发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将数据从队列中取出并进行备份。
-
优惠券发放:您可以设置一个延时队列,将优惠券发放任务添加到队列中,设置一定的延时时间,以保证优惠券在特定时间后才能被消费。
-
动态路由:您可以使用延时队列来实现动态路由的功能,将消息发送到延时队列中,并设置一定的路由规则,以实现消息在特定时间后被路由到不同的目标队列中。
二、RabbitMQ延时队列,如何实现定时任务
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
常用解决方案:
spring的 schedule 定时任务轮询数据库
缺点:
消耗系统内存、增加了数据库的压力、存在较大的时间误差
解决:
rabbitmq的消息TTL和死信Exchange结合
示例:
- 首先,需要安装并启动RabbitMQ服务器。
- 在Java代码中,使用RabbitMQ的Java客户端库(如amqp-client)连接到RabbitMQ服务器。
- 创建一个交换机(exchange),并设置其类型为“x-delayed-message”。
- 将需要延迟发送的消息发送到该交换机上,并指定一个唯一的键值(key)。
- 在发送消息时,将消息的优先级设置为较低的值(例如0或1)。
- RabbitMQ会根据消息的优先级和队列中的其他消息,将该消息放入一个名为“x-delayed-messages”的队列中。
- 在Java代码中,使用ScheduledExecutorService来定期执行任务。在每个任务中,从“x-delayed-messages”队列中取出消息并进行处理。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class RabbitMQDelayedQueueExample {
private static final String EXCHANGE_NAME = "delayed_queue";
private static final String ROUTING_KEY = "delayed_routing_key";
private static final int DELAY_SECONDS = 60; // 延迟1分钟
private static final AtomicInteger messageCount = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机与队列
channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false);
channel.queueDeclare(ROUTING_KEY, true, false, false, null);
// 绑定队列 到交换机
channel.queueBind(ROUTING_KEY, EXCHANGE_NAME, ROUTING_KEY);
// 发送 (0 or 1)
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
if (i % DELAY_SECONDS == 0) {
messageCount.incrementAndGet();
} else {
messageCount.incrementAndGet();
}
}
// 定时任务
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
Queue queue = channel.queueDeclare("x-delayed-messages", true, false, false, null).getQueue();
GetResponse response = channel.basicGet(queue, true);
if (response != null && response.getMessageProperties() != null && response.getMessageProperties().getDeliveryMode() == DeliveryMode.PERSISTENT) {
byte[] body = response.getBody();
String message = new String(body);
System.out.println("Processed message: " + message);
} else {
System.out.println("No messages available in the queue at this moment");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
scheduler.shutdownNow();
}
}, DELAY_SECONDS * TimeUnit.SECONDS, DELAY_SECONDS * TimeUnit.SECONDS, TimeUnit.SECONDS);
三、RabbitMQ如何设置消息的TTL
消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
-
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的
设置。超过了这个时间,我们认为这个消息就死了,称之为死信。 -
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队
列中,这个消息死亡的时间有可能不一样(不同的队列设置)。 -
这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。
在声明队列时设置 TTL:
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 设置 TTL 为 5 秒
Queue queue = channel.queueDeclare("my_queue", true, false, false, args).getQueue();
在发送消息时设置 TTL:
MessageProperties properties = new MessageProperties();
properties.setExpiration("10000"); // 设置 TTL 为 10 秒
byte[] body = "Hello, world!".getBytes();
channel.basicPublish("", "my_queue", properties, body);
四、RabbitMQ死信交换机与死信队列
1、概念
RabbitMQ中的死信交换机(Dead Letter Exchange,简称DLX)和死信队列(Dead Letter Queue,简称DLQ)是用于处理无法被消费者处理的消息的机制。当消息无法被消费时,可以将消息转发到DLX和DLQ中,以便后续处理。
-
DLX是一个特殊的交换机,它用于接收被其它交换机拒绝或超时等无法被消费者处理的消息。当消息被转发到DLX后,可以根据消息的路由键将其路由到相应的队列中进行处理。
-
DLQ是一个特殊的队列,它用于存储被消费者处理失败的消息。当消息无法被消费者处理时,可以将消息转发到DLX,然后再将消息存储到DLQ中。消费者可以在DLQ中处理这些消息,或者将它们重新发送到其它队列中尝试进行处理。
使用DLX和DLQ可以增强RabbitMQ的可靠性和容错性,确保消息能够被正确地处理或者在处理失败时得到适当的处理。在RabbitMQ中,可以通过声明DLX和DLQ来使用这个机制。
2、示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DeadLetterExample {
private final static String QUEUE_NAME = "my_queue";
private final static String EXCHANGE_NAME = "my_exchange";
private final static String ROUTING_KEY = "my_routing_key";
private final static String MESSAGE_BODY = "Hello, world!";
private final static String DLQ_NAME = "my_dlq";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
channel.queueDeclare(DLQ_NAME, true, false, false, null);
channel.queueBind(DLQ_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE_BODY.getBytes());
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
System.out.println("Received message: " + new String(delivery.getBody(), "UTF-8"));
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
// 模拟消费失败,将消息转发到DLX和DLQ中
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE_BODY.getBytes());
// 等待一段时间后关闭连接和通道
Thread.sleep(5000); // 等待5秒
channel.close();
connection.close();
}
}
在这个示例中,我们首先创建了一个交换机和一个队列,并将它们绑定在一起。然后,我们发送了一条消息到这个队列中,接着模拟了一个消费失败的场景,将消息转发到DLX和DLQ中。最后,我们等待5秒钟后关闭连接和通道。
五、RabbitMQ使用延时队列来关闭单据
1. 简单流程
- 首先,生产者在订单生成。
- 给 user.order.delay.exchange发送消息,路由键是order_delay
- 交换机把消息再发送给user.order.delay.queue,路由键是order_delay,这个队列设置三个参数:
x-dead-letter-exchange: user.order.exchange //死信队列
x-dead-letter-routing-key: order //死信路由键
x-message-ttl: 60000 //消息存活1分钟
- 1分钟后,交给 user.order.exchange死信交换机
- user.order.exchange再把消息存入到 user.order.queue死信队列
- 判断,如果没有支付,则关闭单据
2. 优化流程
将上面的进行优化,使用同一个交换机,就是将user.order.delay.exchange与user.order.exchange进行合并
如图:
- 首先,生产者在订单生成。
- 给 order-event- exchange发送消息,路由键是order.create.order
- 交换机把消息再发送给order.delay.queue,路由键是order.create.order,这个队列设置三个参数:
x-dead-letter-exchange: order-event-exchange
x-dead-letter-routing-key: order.release.order
x-message-ttl: 60000 //消息存活1分钟
- 1分钟后,交还给 order-event- exchange交换机,路由键order.release.order
- order-event- exchange再把消息存入到order.release.order. queue死信队列,路由键order.release.order
- 判断,如果没有支付,则关闭单据
3、优化流程代码实现
3.1 配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* 创建队列,交换机,延时队列,绑定关系 的configuration
* 1.Broker中的Queue、Exchange、Binding不存在的情况下,会自动创建(在RabbitMQ),不会重复创建覆盖
* 2.懒加载,只有第一次使用的时候才会创建(例如监听队列)
*/
@Configuration
public class MyRabbitMQConfig {
/**
* 延时队列
*/
@Bean
public Queue orderDelayQueue() {
/**
* Queue(String name, 队列名字
* boolean durable, 是否持久化
* boolean exclusive, 是否排他
* boolean autoDelete, 是否自动删除
* Map<String, Object> arguments) 属性【TTL、死信路由、死信路由键】
*/
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");// 死信路由
arguments.put("x-dead-letter-routing-key", "order.release.order");// 死信路由键
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
return new Queue("order.delay.queue", true, false, false, arguments);
}
/**
* 交换机(死信路由)
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
/**
* 死信队列
*/
@Bean
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
/**
* 绑定:交换机与订单解锁延迟队列
*/
@Bean
public Binding orderCreateBinding() {
/**
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
**/
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 绑定:交换机与订单解锁死信队列
*/
@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
/**
* 绑定:交换机与库存解锁
*/
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
}
3.2 发送消息
@ResponseBody
@GetMapping(value = "/test/createOrder")
public String createOrderTest() {
//订单下单成功
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
//给MQ发送消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "ok";
}
3.3 监听消息
@Service
public class DeadQueueListener {
/**
* queues:声明需要监听的队列
* channel:当前传输数据的通道
* 获取实际消息内容有两种方式:
* 方式一:在方法参数列表中直接声明出来
* 方式二:从请求体中取出消息的二进制形式,然后通过JSON反序列化即可
*/
@RabbitListener(queues = {"order.release.order.queue"})
public void revieveMessage(Message message, OrderEntity entity, Channel channel) throws IOException {
System.out.println("接受到的消息内容" + entity);
}
}