前言
这篇笔记,记录producer发送消息的相关源码
我们以最简单的demo为例
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
我们要看的是:producer.send()这个方法
源码
从send()方法开始,中间有几层代码的调用,但是逻辑比较简单,就不贴代码了,最后会调用到
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
在这个方法中,大致有四个逻辑
- 校验消息的合法性
- 获取topic路由信息
- 根据当前topic所有的messageQueue和负载均衡策略,选取一个messageQueue
- 通过netty发送send请求
第一点:校验消息合法性这里,主要是校验消息中指定的topic格式是否合法,消息体的长度是否符合要求(最大不能超过4M)等
第二点这里,根据topic查询路由信息
这个其实是在内存中维护了一个map集合,根据topic查询到对应的路由信息,那内存中的这个map集合是在哪里维护的呢?有两个地方
- 在producer启动的时候,会启动一个异步线程,每30S执行一次,执行时,是会根据当前producer中所有的topic,依次去nameSrv查询路由信息,然后更新到内存中;参考前面一篇博客
- 在这里查找路由的时候,如果内存的map集合中,没有该topic的路由信息,会主动触发更新当前topic的路由信息
第三点这里,根据topicPublishInfo选择一个messageQueue,里面的细节逻辑暂时没有看的太明白,因为不同的场景有不同的处理方法,但是整体上看,是根据当前在内存中维护的一个计数的integer数值,来判断要使用哪个messageQueue,这个integer数值,每发送一次消息,会+1;这里看起来是轮询的策略
第四点:构建netty请求,发送消息
在发送请求的时候,有多层逻辑需要处理
- 根据brokerName获取broker的地址信息
- 设置messageId
- 判断消息长度是否超过了4K,超过,就进行压缩
- 判断是否是事务消息
- 对请求信息进行封装
- 判断是同步发送(sync)、异步发送(async)、还是oneWay模式
最后,在发送请求前,会判断code码
前面有说过,rocketmq内部在通过netty发送请求的时候,通过封装了一个code码,来判断业务类型
最终会根据不同的方式,来发送netty请求
里面的代码,就不截图看了,有兴趣的可以去看下,里面就是netty的相关逻辑了,基本上底层就是调用了channel.writeAndFlush()方法
总结
所以,总结来看,在发送请求的时候,大致的步骤:
- 校验消息的合法性
- 根据消息中的topic,找到对应的路由信息
- 根据路由信息 以及轮询策略,找到要发送的messageQueue
- 构建netty请求,发送到broker
这里随带插一句,如果我们要发送顺序消息,只需要在发送消息的时候,指定messageQueue或者是指定messageQueueSelector即可(在producer的send()方法,提供了多个api),无论是发送顺序消息,还是普通消息,在代码底层,都是调用的
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
只是在上层的逻辑中,如果是普通消息,是mq帮我们去轮询选择一个messageQueue,发送消息,而顺序消息要求我们在发送的时候,执行messageQueue,或者是指定messageQueueSelector,这样就能保证同一笔订单或者是同一个业务的消息是顺序的
其实所谓的顺序,我理解就是把业务先后消息放到了同一个messageQueue中