DefaultMQProducer
根据上文:RocketMQ学习笔记:消息Message - 掘金 (juejin.cn),我们定位到Producer
中的这一行代码:
java
复制代码
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start();
- 通过
new DefaultMQProducer("ProducerGroupName")
实例化一个生产者对象。这里的ProducerGroupName
在5.x的版本已经移除了,他主要说的是生产者分组,主要是保证Producer发送同一类消息且发送逻辑一致。. - 通过
setNamesrvAddr("127.0.0.1:9876")
,是指明设置设置NameServer地址。(记得这句话吗:Producer、Consumer、Broker在启动时,会自行将数据注册到NameServer中)。 - 启动这个生产者实例。
DefaultMQProducer
是继承ClientConfig
的,ClientConfig
主要是做RocketMQ客户端公共配置的,setNamesrvAddr
就是其方法,后续我们再介绍这个ClientConfig
类。
DefaultMQProducer
的其他推荐资料:RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer_小虚竹的博客-CSDN博客,该文章介绍的很详细。
干活的是DefaultMQProducerImpl
构造它需要初始化一些基本属性,才能方便后面干活。
在构造方法中,我们继续往下定位发现,在DefaultMQProducer
中其实有一个成员属性是defaultMQProducerImpl
,我们记住:它是真正做消息传输的事:
java
复制代码
public class DefaultMQProducer extends ClientConfig implements MQProducer { protected final transient DefaultMQProducerImpl defaultMQProducerImpl; ...
在DefaultMQProducer
的构造方法中,实例化了DefaultMQProducerImpl
同时在上面第3步中,执行的
java
复制代码
producer.start();
也是在执行DefaultMQProducerImpl
的start()
。
了解一下它
当在DefaultMQProducer调用以下代码时,会去创建DefaultMQProducerImpl
,它是 RocketMQ 生产者的实现类。该类的主要作用是提供一个异步发送线程池,用于处理生产者发送消息的异步任务。
ini
复制代码
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
调用以下的代码,主要是构造DefaultMQProducerImpl
并且初始化一些属性,主要是线程池和队列。
asyncSenderThreadPoolQueue
:容量为 50000。该队列用于存储异步发送任务,当队列已满时,新的任务将被阻塞,直到队列中有空间可用。defaultAsyncSenderExecutor
:线程池中的线程数量与 CPU 核心数量相同;线程存活时间为 60 秒,即如果一个线程在 60 秒内没有执行任务,则会被回收。
java
复制代码
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { // 保存传入的 DefaultMQProducer 对象和 RPCHook 对象 this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; // 创建一个有界阻塞队列,容量为 50000,用于存储异步发送任务 this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); // 创建一个异步发送线程池 this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( // 设置核心线程数为当前可用的处理器数量 Runtime.getRuntime().availableProcessors(), // 设置最大线程数为当前可用的处理器数量 Runtime.getRuntime().availableProcessors(), // 设置线程存活时间为 60 秒 1000 * 60, TimeUnit.MILLISECONDS, // 使用上面创建的有界阻塞队列作为任务队列 this.asyncSenderThreadPoolQueue, // 创建一个 ThreadFactory,用于创建新的线程 new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { // 创建一个新的线程,名称为 AsyncSenderExecutor_1、AsyncSenderExecutor_2、... return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } }); }
启动生产者实例
java
复制代码
producer.start();
通过producer.start();
其实是调用this.defaultMQProducerImpl.start();
这也就证明了干活的是DefaultMQProducerImpl
。
最后看一下发送消息
回到Producer
类中,看最后Producer
做了什么事情。
java
复制代码
SendResult sendResult = producer.send(msg);
根据代码一步步深入,主要是以下这样的逻辑,我们直接看到最后的地方:
send
最后执行的是sendDefaltImpl
这个方法。 它主要做这些事情:
- 校验消息
java
复制代码
Validators.checkMessage(msg, this.defaultMQProducer);
根据checkMessage
源码,他主要校验:
- 消息对象是否为空
- topic是否合法
- 是否为空的校验
- 是不是系统主题的校验
- 检查消息体(实际放的内容)是否为空
- 检查消息体的长度是否为0
- 检查消息的大小有没有超过默认值,默认值是4M
- 拿 Topic 名称去 NameServer 换取详情(挖坑)
- 消息重投,同步传输默认重传3次(包含传输的1次),否则1次。
最后,消息传递的方式有:同步、异步(回调函数)、还有一个是发就得了。