前言
PullRequestHoldService 继承了ServiceThread类,它本身是一个线程,以后台方式无线循环运行,支持长轮询(默认5秒)和短轮询(默认1秒)两种方式(CountDownlatch 方式控制)控制线程执行间隔。5秒钟后,线程内部会检查被挂起的请求(消费者建立连接后,会立即执行一次消息拉取服务,如果能拉到消息直接返回相应,如果拉不到,就会被挂起到 PullMessHoldService 服务),通知消息到达,调用 PullMessageProcessor 消息拉取处理组件,再进行一次拉取处理,如果能拉到消息,就直接返回,如果拉取不到再次挂起。
除了以上方式唤醒被挂起的消息拉取请求,还有一个 NotifyMessageArrivingListener 消息到达监听器,可以监听 topic-queueId-tagCode 维度的消息,进行通知唤醒被挂起的线程。
本次涉及三个核心组件:
- PullRequestHoldService 长轮询消息拉取组件;
- PullMessageProcessor 拉取消息处理组件;
- NotifyMessageArrivingListener 消息到达监听组件;
源码版本:4.9.3
源码架构图
核心数据结构
长轮询消息拉取组件数据结构,维护了一个拉取请求处理列表:Map<topic@queueId, 挂起的请求列表> pullRequestTable 。
public class PullRequestHoldService extends ServiceThread {
// 系统时钟
private final SystemClock systemClock = new SystemClock();
// 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合
protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
}
pullRequestTable 拉取请求处理列表的 value对象(ManyPullRequest)维护的核心数据结构是被挂起的请求列表。
public class ManyPullRequest {
// 拉取请求列表
private final ArrayList<PullRequest> pullRequestList = new ArrayList<>();
}
核心行为流程
唤醒流程一
PullMessageProcessor 接受网络通信请求,处理消息拉取请求。从messageStore 消息存储组件拉取消息,如果拉不到就将请求,挂起到pullRequestHoldService长轮询消息拉取组件。
// pull方式拉取消息处理器组件
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
....
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
// 一堆校验...
// 关键点:从消息存储组件拉取消息MessageStore
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
....
// 处理查询结果,没有拉到消息时,将请求挂起到hold服务
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
...
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
}
...
}
PullRequestHoldService 长轮询消息拉取组件,接受挂起的请求,维护在内存中。且以后台线程方式,轮询pullRequestTable,唤醒所有请求,尝试拉取消息。
// 长轮询消息拉取组件
public class PullRequestHoldService extends ServiceThread {
// 长轮询消息拉取请求表,key为topic@queueId,value为PullRequest集合
protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
// 如果没有拉取到请求,就挂起拉取请求,等待当前线程长轮询处理
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
@Override
public void run() {
log.info("{} service started", this.getServiceName());
// 循环,处理被挂起的拉取请求
while (!this.isStopped()) {
try {
// 长轮询,默认5秒
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
// 短轮询,默认1秒
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 处理被挂起的拉取请求
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
protected void checkHoldRequest() {
// 遍历被挂起的拉取请求列表
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 从消息存储组件获取当前队列消费的最大偏移量
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 通知消息消费者
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
// 获取指定topic-queueId被挂起的拉取请求列表
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 克隆被挂起的拉取请求列表
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
// 遍历所有请求
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
// 如果拉取的最大偏移量小于当前拉取的起始偏移量,则从消息存储组件获取最大的偏移量开始拉取
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
// 匹配消息过滤器
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
// 重点:如果消息到达filter匹配,则执行挂起请求唤醒服务
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 重点:超时处理,如果超时,则执行挂起请求唤醒服务
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
// 没有执行的请求,重新放回被挂起的拉取请求列表
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
}
唤醒流程二
NotifyMessageArrivingListener 通知消息到达监听器,会监听指定topic-指定queueId的消息变化,通知 pullRequestHoldService 长轮询请求处理服务,实时唤醒挂起的对应请求,进行消息拉取。
// 通知消息到达监听器
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
this.pullRequestHoldService = pullRequestHoldService;
}
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// 通知长轮询请求处理服务,消息到达,唤醒挂起的请求
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
}
}
完整源码文件
见资源包