写作目的
第一个原因:最近玩哔哩哔哩遇到一个RocketMQ的Contributor,一开始不知道他是Contributor,后来问到延迟消息的时候这块还不是很了解,他告诉我学习要系统,你既然了解事务消息那我理解应该也了解延迟消息,事实我不了解,所以这块想通过看源码的方式了解一下。
第二个原因:好久没写文章了,需要水一篇,也需要不断学习,所以搞一下。
源码分析
延迟消息配置
消息的延时级别level一共有18级,分别为:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
延迟消息发送
生产延迟消息的代码如下
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
System.out.println("----------------");
String topic = "DelayDemo";
Message msg =
new Message(
topic /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ ")/* Message body */
.getBytes(RemotingHelper.DEFAULT_CHARSET) );
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);//下标有0开始
SendResult sendResult = producer.send(msg, 1000000000);
System.out.printf("%s%n", sendResult);
// Shut down once the producer instance is not longer in use.
producer.shutdown();
}
核心的话就是设置延迟消息等级的参数
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);//下标有0开始
那么设置的这个参数的意思是什么呢?可以跟进源码看一下。原来是给这个消息设置了一个KV,仅仅是打一个tag,后面会用到。
public void setDelayTimeLevel(int level) {
//String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
broker消息接收
目的:我们要定位是到消息生产者Clinet生产的消息被broker接收到走的哪块源码。
首先我们要找到程序的入口代码,如下图所示,就是一个broker的main方法启动。
跟进start方法,然后一步一步往下跟,会跟到NettyRemotingServer的start方法
NettyRemotingServer的start方法里会就是Netty创建ServerBootstrap了,那么很自然的就会想到自定义的handler,也就是NettyServerHandler。
NettyServerHandler#channelRead0方法是处理接收到的程序,进一步跟到NettyRemotingAbstract#processRequestCommand方法,我们可以推断出根据消息的code找到具体的NettyRequestProcessor,就可以知道具体的消息存储逻辑了。
那么发现消息的code是什么呢?
消息生产者发送消息的时候一直跟源码,就会跟到下图的这个地方
就可以拿到
public static final int SEND_MESSAGE_V2 = 310;
根据这个code就可以定位到处理消息的processor,即SendMessageProcessor
延迟消息存储到CommitLog
从上面的逻辑中我们已经定位到SendMessageProcessor,那么接下来看一下消息存储的粗略逻辑
从SendMessageProcessor#processRequest方法开始跟,如下图所示
最后跟到CommitLog的asyncPutMessage方法,其中里面有一个分支如下图所示
接下来就是正常的存储了
延迟消息构建Consumequeue
Consumequeue的构建在RocketMQ中msg&tag的生命周期4.2小节有讲过。
接下来看一下延迟消息构建过程。
核心在ReputMessageService#doReput方法里的构建DispatchRequest方法
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
checkMessageAndReturnSize就是构建DispatchRequest ,当然也会构建tagsCode。如下面的代码所示,如果是延迟消息,则tagsCode=存储时间+延迟时间
延迟消息定时任务
源码剖析RocketMQ延时消息原理第3小节中讲的很详细。