messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延迟消息级别
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//事务消息处理
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果是延迟消息
if (msg.getDelayTimeLevel() > 0) {
// 如果设置的值过大,则设置为最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 修改Topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 根据延迟级别,决定要将其投递到那个队列中
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 记录原始的 topic 和 队列信息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 修改topic和队列信息
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
RocketMQ的Broker端在存储生产者写入消息时,首先将其写入CommitLog里,为了不让用户立刻就能消费到这条消息,
这里先将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并且根据设置的延迟级别选择将消息投放到哪一个队列里。
整个流程;
- 生产者发送延迟消息到Broker里
- 把消息转发到SCHEDULE_TOPIC_XXXX主题下的队列中
- 延迟服务定期消费SCHEDULE_TOPIC_XXXX主题下的消息,到时间了就把它拿到CommitLog中
- 消息重新被投放到目标Topic里
- 消费者消费延迟消息