【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析

news2025/1/8 11:39:27

前提介绍

在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。

DefaultMQPullConsumer

DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。

总体流程执行

DefaultMQPullConsumer使用例子

public class MQPullConsumer {
	private static final Map OFFSE_TABLE = new HashMap();
	public static void main(String[] args) throws MQClientException {
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
		consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
		consumer.start();

		Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");
		for(MessageQueue mq:mqs){
			try {

			   long offset = consumer.fetchConsumeOffset(mq,true);
			    while(true){
				 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                                 putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
					switch(pullResult.getPullStatus()){
					case FOUND:
						List messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.println(new String(m.getBody()));
                        }
						break;
					case NO_MATCHED_MSG:
						break;
					case NO_NEW_MSG:
						break;
					case OFFSET_ILLEGAL:
						break;
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		consumer.shutdown();
	}

	private static void putMessageQueueOffset(MessageQueue mq,
			long nextBeginOffset) {
	    OFFSE_TABLE.put(mq, nextBeginOffset);
	}

	private static Long getMessageQueueOffset(MessageQueue mq) {
		Long offset = OFFSE_TABLE.get(mq);
		if(offset != null){
			return offset;
		}
		return 0l;
	}
}
  • 消费者启动:consumer.start();
  • 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set mqs = consumer.fetchSubscribeMessageQueues("topicTest");

	for(MessageQueue mq:mqs){
		try {

			long offset = consumer.fetchConsumeOffset(mq,true);
			while(true){

				PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}

DefaultMQPullConsumer的总体流程

启动DefaultMQPullConsumer是通过调用start()方法完成的

DefaultMQPullConsumer拉取源码分析

分析下DefaultMQPullConsumer拉取消息的流程

consumer.fetchSubscribeMessageQueues("order-topic")

从指定topic中拉取所有消息队列

Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");

核心源码分析

fetchSubscribeMessageQueues()
  • 通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
  • 每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
  • 向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
 public Set fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {

                Set mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }

遍历过程TopicRouteData

遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限, 若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合; 最后返回给MessageQueue集合

public static Set topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set mqList = new HashSet();
        List qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }
        return mqList;
    }
consumer.fetchConsumeOffset

通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。

DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)

fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。 这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。

pullBlockIfNotFound拉取信息

rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。

不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }

获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。

  • 参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
  • 参数2:需要过滤用的表达式 ;
  • 参数3:偏移量即消费队列的进度 ;
  • 参数4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的实现过程
      private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.isRunning();

        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }

        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }

        if (maxNums 0) {
            throw new MQClientException("maxNums , null);
        }

        this.subscriptionAutomatically(mq.getTopic());

        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());

        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);

        this.resetTopic(pullResult.getMsgFoundList());
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }

检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap

    SubscriptionData subscriptionData;
    try {

        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }

    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
        mq,
        subscriptionData.getSubString(),
        0L,
        offset,
        maxNums,
        sysFlag,
        0,
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
        timeoutMillis,
        CommunicationMode.SYNC,
        null
    );

    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}

Push和Pull的操作对比

  • push-优点:及时性、服务端统一处理实现方便
  • push-缺点:容易造成堆积、负载性能不可控
  • pull-优点:获得消息状态方便、负载均衡性能可控
  • pull-缺点:及时性差

使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/899560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LRU 算法

LRU 缓存淘汰算法就是一种常用策略。LRU 的全称是 Least Recently Used&#xff0c;也就是说我们认为最近使用过的数据应该是是「有用的」&#xff0c;很久都没用过的数据应该是无用的&#xff0c;内存满了就优先删那些很久没用过的数据。 力扣&#xff08;LeetCode&#xff09…

【最全】Python连接数据库取数与写入数据

不管是做数据分析还是风控建模&#xff0c;都避免不了从数据库中取数&#xff0c;和把数据写入数据库。本文整理连接数据库的不同方法&#xff0c;以及单条写入数据和批量写入数据。所有代码都实测可用&#xff0c;并实际应用于生产&#xff0c;分享给更多在这方面遇到困难的朋…

零基础自学:2023 年的今天,请谨慎进入网络安全行业

前言 2023 年的今天&#xff0c;慎重进入网安行业吧&#xff0c;目前来说信息安全方向的就业对于学历的容忍度比软件开发要大得多&#xff0c;还有很多高中被挖过来的大佬。 理由很简单&#xff0c;目前来说&#xff0c;信息安全的圈子人少&#xff0c;985、211 院校很多都才…

AI 绘画Stable Diffusion 研究(十二)SD数字人制作工具SadTlaker插件安装教程

免责声明: 本案例所用安装包免费提供&#xff0c;无任何盈利目的。 大家好&#xff0c;我是风雨无阻。 想必大家经常看到&#xff0c;无论是在产品营销还是品牌推广时&#xff0c;很多人经常以数字人的方式来为自己创造财富。而市面上的数字人收费都比较昂贵&#xff0c;少则几…

​网安板块是真风口还是炒作?

看需求&#xff0c;官方明确要求政企等在网安上投入的比重不低于10%&#xff0c;而当前&#xff0c;信息安占IT的投入比重不到3%&#xff01;即使是政府对网安和IT合规的要求高&#xff0c;占比高达23.7%&#xff0c;但是全国平均下来也不过5%&#xff0c;距离10%的投入要求&am…

中断之MSI和MSI-X的区别详细总结附图文快速掌握

目录 一、整体介绍二、MSI和MSI-X对比2.1 中断向量连续2.2 映射区域区别2.3 MSI-X配置空间2.3.1 MSI-X Capbility介绍2.3.2 Capbility ID介绍2.3.3 Message Control介绍2.3.4 MSI-X Table介绍2.3.5 MSI-X Pending Table&#xff08;PBA&#xff09;介绍 三、MSI 处理过程3.1 Me…

wsl,字体乱码问题

配置wsl&#xff0c;字体乱码问题 一、前言 用zsh配置好wsl&#xff0c;每次打开还是会出现乱码&#xff0c;只有再新打开一个终端才会显示字体 如下图&#xff1a;第一次打开&#xff0c;出现乱码 如图&#xff1a;按加号&#xff0c;再开一个新终端才会显示字体。 二、解…

IDEA:Error running,Command line is too long. 解决方法

报错如下&#xff1a; Error running SendSmsUtil. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun.原因是启动命令过长。 解决方法&#xff1a; 1、打开Edit Configurations 2、点击Modify options设置&#x…

React(6)

1.React插槽 import React, { Component } from react import Child from ./compoent/Childexport default class App extends Component {render() {return (<div><Child><div>App下的div</div></Child></div>)} }import React, { Compon…

remove、remove_if、remove_copy、remove_copy_if

remove(b,e,v) //[b,e) 删value remove_if(b,e,p) //[b,e) 删p条件 remove_copy(b,e,r,v) //[b,e) 删v&#xff0c;结果存入r remove_copy_if(b,e,r,p) //[b,e) 删p条件&#xff0c;结果存入r remove和remove_if结果相同&#xff0c;只是传入的条件不一样。示例图如下&#xf…

如何用输入函数为数组赋值

在编写程序时我们经常使用数组&#xff0c;而数组的大小可能是很大的但是我们并不需要为每个元素都自己赋值&#xff0c;我们可能会自定义输入数组元素个数&#xff0c;我们应该如何实现通过输入函数为数组赋值呢&#xff1f; 目录 第一种&#xff1a; 第二种&#xff1a; 第一…

信号量与管程

前言 我们知道&#xff0c;在并发领域内&#xff0c;需要关注分工、同步与互斥&#xff0c;针对分工问题&#xff0c;就是将任务拆解&#xff0c;分配给多个线程执行&#xff0c;而在多线程执行的过程中&#xff0c;需要解决线程之间的协作与互斥问题进而保证并发安全。那么解…

day-25 代码随想录算法训练营(19)回溯part02

216.组合总和||| 思路&#xff1a;和上题一样&#xff0c;差别在于多了总和&#xff0c;但是数字局限在1-9 17.电话号码的字母组合 思路&#xff1a;先纵向遍历第i位电话号码对于的字符串&#xff0c;再横向递归遍历下一位电话号码 93.复原IP地址 画图分析&#xff1a; 思…

OpenLayers实战,OpenLayers实现地图鼠标经过点要素时显示名称标注提示框,移出后隐藏

专栏目录: OpenLayers实战进阶专栏目录 前言 本章讲解OpenLayers实现地图鼠标经过点要素时显示名称标注提示框,移出后隐藏的功能。 二、依赖和使用 "ol": "^6.15.1"使用npm安装依赖npm install ol@6.15.1使用Yarn安装依赖yarn add olvue中如何使用:…

【Java基础】深入理解String、StringBuffer和StringBuilder的异同

文章目录 一、结论&#xff1a;二、可变性String&#xff08;不可变&#xff09;StringBuffer和StringBuilder&#xff08;可变&#xff09; 三、线程安全性String&#xff08;线程安全&#xff09;StringBuffer&#xff08;线程安全&#xff09;和StringBuilder&#xff08;线…

58.C++ STL标准模板库 STL概述 STL三大组件

一、初识STL 1.1 STL概述 长久以来&#xff0c;软件界⼀直希望建⽴⼀种可重复利⽤的东⻄&#xff0c;以及⼀种得以制造出”可重复运⽤的东⻄”的⽅法,让程序员的⼼⾎不⽌于随时间的迁移&#xff0c;⼈事异动⽽烟消云散&#xff0c;从函(functions)&#xff0c;类别(classes),函…

nginx 配置反向代理的逻辑原则案例(值得一看)

一 实操步骤 1.1 架构图 1.2 配置原则 匹配准则&#xff1a; 当proxy_pass代理地址端口后有目录(包括 / 和/xxx),相当于是绝对根路径&#xff0c;则 nginx 不会把 location 中匹配的路径部分代理走; 当proxy_pass代理地址端口后无任何内容&#xff0c;可以理解为相对路径…

matlab使用教程(19)—曲线拟合与一元方程求根

1.多项式曲线拟合 此示例说明如何使用 polyfit 函数将多项式曲线与一组数据点拟合。您可以按照以下语法&#xff0c;使用 polyfit 求出以最小二乘方式与一组数据拟合的多项式的系数 p polyfit(x,y,n), 其中&#xff1a; • x 和 y 是包含数据点的 x 和 y 坐标的向量 …

DP读书:鲲鹏处理器 架构与编程(七)ARMv8-A 体系结构

一小时速通ARMv8-A体系结构 一、ARMv8-A处理单元核心架构1. ARMv8-A架构的处理器运行模式a. ARMv8-A的执行架构A. AArch64 执行状态B. AArch32 执行状态 b. ARMv8-A架构支持的指令集c. ARMv8-A 支持的数据类型d. ARMv8-A 的异常等级与安全模型e. ARMv8-A的虚拟化架构f. ARMv8-A…

Vue项目商品购物车前端本地缓存逻辑(适用H5/ipad/PC端)——前端实现购物车删除商品、购物车增减数量,清空购物车功能

一、需求 1、用户选择商品&#xff0c;自动回显在购物车列表中&#xff1b; 2、同个商品追加&#xff0c;购物车列表数量叠加&#xff1b; 3、开启赠送&#xff0c;选中的商品&#xff0c;在购物车中另增一条数据&#xff0c;且购物车列表价格显示为0&#xff1b;其实际价格在…