基于 RabbitMQ 优先级队列的订阅推送服务详细设计方案
一、架构设计
-
分层架构:
- 订阅管理层(Spring Boot)
- 消息分发层(RabbitMQ Cluster)
- 推送执行层(Spring Cloud Stream)
- 数据存储层(Redis + MySQL)
-
核心组件:
+-------------------+ +-------------------+ +-------------------+ | 订阅配置管理模块 | | 消息优先级路由器 | | 推送执行引擎 | | (Spring Boot) |------>| (RabbitMQ Exchange)|------>| (Spring Cloud Stream) +-------------------+ +-------------------+ +-------------------+ | | | v v v +-------------------+ +-------------------+ +-------------------+ | 订阅规则数据库 | | 优先级队列集群 | | 推送状态监控中心 | | (MySQL) | | (x-max-priority=10)| | (Prometheus+Grafana) +-------------------+ +-------------------+ +-------------------+
二、优先级队列实现方案
- 队列定义:
// 紧急队列(优先级5-10)
Map<String, Object> urgentArgs = new HashMap<>();
urgentArgs.put("x-max-priority", 10); // 支持10级优先级
urgentArgs.put("x-queue-mode", "lazy"); // 惰性队列防止内存溢出
Queue urgentQueue = new Queue("urgent_queue", true, false, false, urgentArgs);
// 普通队列(优先级0-4)
Map<String, Object> normalArgs = new HashMap<>();
normalArgs.put("x-max-priority", 4);
Queue normalQueue = new Queue("normal_queue", true, false, false, normalArgs);
- 消息路由策略:
public class PriorityMessageRouter {
private static final int URGENT_THRESHOLD = 5;
// 根据业务规则自动判断优先级
public String determineRoutingKey(Message message) {
String bidType = message.getHeader("bid_type");
LocalDateTime deadline = message.getHeader("deadline");
if ("EMERGENCY".equals(bidType) ||
LocalDateTime.parse(deadline).isBefore(LocalDateTime.now().plusHours(2))) {
return "urgent_queue";
}
return "normal_queue";
}
}
三、消息生产端优化
- 消息封装规范:
public class PriorityMessageBuilder {
public static Message buildMessage(Object payload, int priority) {
MessageProperties props = new MessageProperties();
props(priority);
props.setHeader("retry_count", 0);
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return new