前言
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
创建、启动producer的逻辑,是写在业务系统中的,根据rocketmq源码中的demo,只需要这几行代码,就可以启动producer
这几行代码的逻辑,也比较简单,就是创建一个producer对象,创建对象时,需要指定producerGroup’然后设置nameSrv地址;最后调用其start()方法即可
所以,启动producer最为核心的代码,是在start()方法中
在start()方法中,最重要的是mqClientFactory.start()方法
在mqClientFactory.start()方法中,有几个关键的点
- 如果内存中没有nameSrv的地址信息,手动去查询一次,更新内存
- 启动nettyClient
- 启动了一些定时任务
- 启动拉取消息和负载均衡的service,但是这两个service理论上,对于producer是不生效的,因为这两个service是consumer在启动的时候会用到,这个start()方法,producer和consumer在启动的时候,共用的
我们来看启动的定时任务有哪些?
private void startScheduledTask() {
/**
* 定时拉取nameServer的地址信息到内存中
* 每2分钟执行一次
*/
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
/**
* 从nameServer更新topic的路由信息
* 更新topic以及对应的broker信息到内存中
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
/**
* 清理已经下线的broker
* 定时发送心跳到所有的broker
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
这里的第二个定时任务,更新topic路由信息,在producer发送消息的时候,会用到,可以先关注下
总结
总结来看,producer启动的流程是比较简单的
- 启动nettyClient服务
- 启动了一些定时任务,定时更新内存中的一些数据;这些数据在producer发送消息的时候都会用到