前言
在前面consumer拉取消息的博客中,有说过,对于consumer,在拉取消息的时候,是需要指定code码的,在consumer去broker拉取消息的时候,指定的code码是:PULL_MESSAGE,所以这篇博客,我们简单看下broker在接收到netty请求之后,是如何处理的
源码
在broker这边,是PullMessageProcessor 这个类来处理拉取消息请求的,我们先说为什么是这个类来处理
这是netty服务端在接收到client的请求之后,通过handler进行处理
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
这里有个关键的逻辑,我们可以看到:
1、先是从一个map集合中,根据key找到了一个处理类,然后判断处理类如果不为null,就使用map中获取到的processor
2、然后调用processor.processRequest()方法
这里我们先看下processorTable这个map是在哪里赋值的
这个registerProcessor()方法是在broker启动的时候,调用的,不再贴这个的调用链路了
在这里会根据code码以及对应的processor,调用了NettyRemotingService的registerProcessor方法
可以看到,这里会把code和对应的processor放到了这个map集合中
那接下来,我们来看具体解析请求参数,并拉取消息的逻辑
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel,
org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
对于这里处理拉取消息请求的逻辑,我还没有完全看明白所有的代码逻辑,所以我先把主核心的链路捞出来;我目前只看懂了两步
1、从commitLog根据size和offset拉取消息
2、将消息组装到response对象中
从commitLog拉取消息
在尝试拉取消息的时候,会看到,底层是通过commitLog来拉取消息的,将拉取到的消息,设置到返回对象中
将消息设置到返回参数中
这里可以看到,是根据从commitLog中取出来的数据,转换成了byte[]数组,然后把数组放到response的body字段中;
这里的body就是真正存放消息的地方,我们可以从consumer拿到response之后的解析逻辑来反向验证
consumer在接收到broker返回的请求之后,会把body中的数据,放到了messageBinary字段上
我们再往前看,对messageBinary字段是怎么用的
这里看到,会把messageBinary转换为msgList去使用;所以broker在返回参数的body字段中,设置的是消息体信息
总结
这里看起来,是比较简单明了的,就是根据当前请求中的offset和size,从commitLog中拉取消息数据,然后返回就可以了;当然,中间的很多细节都没有看,因为逻辑太多了,实在看不过来,先把核心逻辑串起来 后面再慢慢补中间的细节