RocketMQ-源码架构

news2024/12/28 3:49:15

源码环境搭建

1、主要功能模块

RocketMQ官方Git仓库地址:GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

RocketMQ的官方网站下载:下载 | RocketMQ

重要模块:

  • broker: Broker 模块(broke 启动进程)

  • client :消息客户端,包含消息生产者、消息消费者相关类

  • example: RocketMQ 示例代码

  • namesrv:NameServer模块

  • store:消息存储模块

  • remoting:远程访问模块

2、源码启动服务

将源码导入IDEA后,需要先对源码进行编译。编译指令clean install -Dmaven.test.skip=true

编译完成后就可以开始调试代码:

调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conflogback_broker.xmllogback_namesrv.xml

window10执行上面的编译指令一直报错A required class was missing while executing org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process: org/apache/commons/collections/ExtendedProperties,最后在Linux执行才成功

unzip rocketmq-rocketmq-all-4.9.5.zip
cd rocketmq-rocketmq-all-4.9.5/
mvn clean install -Dmaven.test.skip=true
启动nameServer

虽然编译指令在windows执行报错,但是nameSrv还是可以正常启动的

如果启动时报错,并提示需要配置ROCKETMQ_HOME环境变量,可以在工程里面添加:

启动Broker

启动Broker之前,需要先修改之前复制的broker.conf文件

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort

启动Broker时,还需要配置一个-c 参数,指向broker.conf配置文件

发送消息

在源码的example模块下,提供了非常详细的测试代码。

启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息

在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。

// 源码中指定
producer.setNamesrvAddr("127.0.0.1:9876");
消费消息

启动example模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息

consumer.setNamesrvAddr("192.168.232.128:9876");

3、读源码的方法

1、带着问题读源码。一定要自己思考!

2、小步快走。不要觉得一两遍就能读懂源码

3、分步总结。带上自己的理解,及时总结。对各种扩展功能,尝试验证

对于RocketMQ,试着去理解源码中的各种单元测试。

源码热身阶段

NameServer的启动过程

关注重点

RocketMQ集群中,实际上消息存储、推送等核心功能点是Broker。NameServer的作用,和微服务中的注册中心非常类似,只是提供了Broker端的服务注册与发现功能。

源码重点

NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Controller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。

另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。

从NameServer启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样的:

这两个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?

可以看出,RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求,Service处理业务,各种Table保存消息

部分源码示例:

// NamesrvStartup#main
public static void main(String[] args) {
	main0(args);
}

public static NamesrvController main0(String[] args) {

	try {
		NamesrvController controller = createNamesrvController(args);
		start(controller);
		String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
		log.info(tip);
		System.out.printf("%s%n", tip);
		return controller;
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}

	return null;
}

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
	//PackageConflictDetect.detectFastjson();

	Options options = ServerUtil.buildCommandlineOptions(new Options());
	commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
	if (null == commandLine) {
		System.exit(-1);
		return null;
	}

	// 两个配置类在这里
	final NamesrvConfig namesrvConfig = new NamesrvConfig();
	final NettyServerConfig nettyServerConfig = new NettyServerConfig();
	// 端口号简单粗暴
	nettyServerConfig.setListenPort(9876);
	// -c指令指定配置文件,可以替换端口号。配置文件内容:listenPort=9876
	if (commandLine.hasOption('c')) {
		String file = commandLine.getOptionValue('c');
		if (file != null) {
			InputStream in = new BufferedInputStream(new FileInputStream(file));
			properties = new Properties();
			properties.load(in);
			MixAll.properties2Object(properties, namesrvConfig);
			MixAll.properties2Object(properties, nettyServerConfig);

			namesrvConfig.setConfigStorePath(file);

			System.out.printf("load config properties file OK, %s%n", file);
			in.close();
		}
	}

	if (commandLine.hasOption('p')) {
		InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
		MixAll.printObjectProperties(console, namesrvConfig);
		MixAll.printObjectProperties(console, nettyServerConfig);
		System.exit(0);
	}
    
	... ...
        
	final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

	// remember all configs to prevent discard
	controller.getConfiguration().registerConfig(properties);

	return controller;
}

// NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
	this.namesrvConfig = namesrvConfig;
	this.nettyServerConfig = nettyServerConfig;
	this.kvConfigManager = new KVConfigManager(this);
	this.routeInfoManager = new RouteInfoManager();
	this.brokerHousekeepingService = new BrokerHousekeepingService(this);
	this.configuration = new Configuration(
		log,
		this.namesrvConfig, this.nettyServerConfig
	);
	this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

// NamesrvController#initialize
public boolean initialize() {

	this.kvConfigManager.load();

	this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

	... ...
}

Broker服务启动过程

关注重点

Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。

重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点

源码重点

Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试

启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动

// BrokerStartup#main
public static void main(String[] args) {
	start(createBrokerController(args));
}

public static BrokerController createBrokerController(String[] args) {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

	try {
		//PackageConflictDetect.detectFastjson();
		Options options = ServerUtil.buildCommandlineOptions(new Options());
		commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
			new PosixParser());
		if (null == commandLine) {
			System.exit(-1);
		}
		
		// Broker服务配置
		final BrokerConfig brokerConfig = new BrokerConfig();
		// Netty服务端占用了10911端口。同样也可以在配置文件中覆盖
		final NettyServerConfig nettyServerConfig = new NettyServerConfig();
		// Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳
		final NettyClientConfig nettyClientConfig = new NettyClientConfig();

		nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
			String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
		nettyServerConfig.setListenPort(10911);
		// 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
		final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
		
		... ...
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}

	return null;
}

BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig 这几个配置是了解如何优化 RocketMQ 使用的关键

在BrokerController.start方法启动了一大堆Broker的核心服务,部分源码:

// BrokerController#start
public void start() throws Exception {
	if (this.messageStore != null) {
		//启动核心的消息存储组件
		this.messageStore.start();
	}

	if (this.remotingServer != null) {
		this.remotingServer.start();
	}

	if (this.fastRemotingServer != null) {
		//启动两个Netty服务
		this.fastRemotingServer.start();
	}

	if (this.brokerOuterAPI != null) {
		//启动客户端,往外发请求
		this.brokerOuterAPI.start();
	}

	... ...

	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

		@Override
		public void run() {
			try {
				//向NameServer注册心跳
				BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
			} catch (Throwable e) {
				log.error("registerBrokerAll Exception", e);
			}
		}
	}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

	if (this.brokerStatsManager != null) {
		this.brokerStatsManager.start();
	}

	if (this.brokerFastFailure != null) {
		//负责具体业务的功能组件
		this.brokerFastFailure.start();
	}
}

抽象出Broker的一个整体结构:

实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。

小试牛刀阶段

开始理解一些比较简单的业务逻辑

Netty服务注册框架

关注重点

网络通信服务是构建分布式应用的基础,也是理解RocketMQ底层业务的基础。

重点梳理RocketMQ的这个服务注册框架,理解各个业务进程之间是如何进行RPC远程通信的

Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,NameServer主要是RPC的服务端RemotingServer,Broker对于客户端来说,是RPC的服务端RemotingServer,而对于NameServer来说,又是RPC的客户端。各种Client是RPC的客户端RemotingClient。

RocketMQ基于Netty保持客户端与服务端的长连接Channel。RemotingServer和RemotingClient都需要注册自己的服务。

源码重点

1、NameServer需要NettyServer。客户端,Producer和Consumer,需要NettyClient。

2、所有的RPC请求数据都封装成RemotingCommand对象。每个处理消息的服务逻辑,都会封装成一个NettyRequestProcessor对象。

3、服务端和客户端都维护一个processorTable,这是个HashMap。key是服务码requestCode,value是对应的运行单元 Pair<NettyRequestProcessor,ExecutorService>类型,包含了处理Processor和执行线程的线程池。具体的Processor,由业务系统自行注册。Broker服务注册:BrokerController.registerProcessor(),客户端的服务注册:MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor,统一处理所有服务。

4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后,可以先缓存到responseTable中,等NettyClient下次来获取,这样就不用阻塞Channel了,可以提升请求吞吐量。

5、重点理解remoting包中是如何实现全流程异步化。

整体RPC框架流程:

RocketMQ使用Netty框架提供了一套基于服务码的服务注册机制,让各种不同的组件都可以按照自己的需求,注册自己的服务方法。RocketMQ的这一套服务注册机制,是非常简洁实用的。要开发一个大型的IM项目,要加减好友、发送文本,图片,甚至红包、维护群聊信息等等各种各样的请求,这些请求如何封装,就可以很好的参考这个框架。

关于RocketMQ的同步结果推送与异步结果推送

RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture>

处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

//NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
	final long timeoutMillis)
	throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
	final int opaque = request.getOpaque();

	try {
		final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
		this.responseTable.put(opaque, responseFuture);
		final SocketAddress addr = channel.remoteAddress();
		channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture f) throws Exception {
				if (f.isSuccess()) {
					responseFuture.setSendRequestOK(true);
					return;
				} else {
					responseFuture.setSendRequestOK(false);
				}

				responseTable.remove(opaque);
				responseFuture.setCause(f.cause());
				responseFuture.putResponse(null);
				log.warn("send a request command to channel <" + addr + "> failed.");
			}
		});

		RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
		if (null == responseCommand) {
			if (responseFuture.isSendRequestOK()) {
				throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
					responseFuture.getCause());
			} else {
				throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
			}
		}

		return responseCommand;
	} finally {
		this.responseTable.remove(opaque);
	}
}

//ResponseFuture#waitResponse
//实际上,同步也是通过异步实现的
//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
	this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
	return this.responseCommand;
}

//ResponseFuture#putResponse
//等待异步获取到消息后,再通过countDownLatch释放当前线程
public void putResponse(final RemotingCommand responseCommand) {
	this.responseCommand = responseCommand;
	this.countDownLatch.countDown();
}

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

//NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
	final InvokeCallback invokeCallback)
	throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
	long beginStartTime = System.currentTimeMillis();
	final int opaque = request.getOpaque();
	boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
	if (acquired) {
		final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
		long costTime = System.currentTimeMillis() - beginStartTime;
		if (timeoutMillis < costTime) {
			once.release();
			throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
		}

		final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
		this.responseTable.put(opaque, responseFuture);
		try {
			channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture f) throws Exception {
					if (f.isSuccess()) {
						responseFuture.setSendRequestOK(true);
						return;
					}
					requestFail(opaque);
					log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
				}
			});
		} catch (Exception e) {
			responseFuture.release();
			log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
			throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
		}
	} else {
		if (timeoutMillis <= 0) {
			throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
		} else {
			String info =
				String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
					timeoutMillis,
					this.semaphoreAsync.getQueueLength(),
					this.semaphoreAsync.availablePermits()
				);
			log.warn(info);
			throw new RemotingTimeoutException(info);
		}
	}
}

//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {

	@Override
	public void run() {
		try {
			NettyRemotingServer.this.scanResponseTable();
		} catch (Throwable e) {
			log.error("scanResponseTable exception", e);
		}
	}
}, 1000 * 3, 1000);

Broker心跳注册管理

关注重点

Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。

源码重点

Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。

NameServer内部会通过RouteInfoManager组件及时维护Broker信息。同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法。

极简化的服务注册发现流程

为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?

首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。

另外,其实更重要的还是对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。

但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得的,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表。

Producer发送消息过程

关注重点

回顾Producer使用案例

Producer有两种:

  • 一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。

  • 另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。

事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。这里只关注DefaultMQProducer的消息发送过程。

整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。

重点关注以下几个问题:

1、Producer启动过程中启动了哪些服务

2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?

3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。

源码重点
  • Producer的核心启动流程

所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。

这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由MQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable

  • 发送消息的核心流程

1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。

2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。

//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
//如果进到这里lastBrokerName不为空,那么表示上一次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
	if (lastBrokerName == null) {
		return selectOneMessageQueue();
	} else {
		for (int i = 0; i < this.messageQueueList.size(); i++) {
			int index = this.sendWhichQueue.incrementAndGet();
			int pos = Math.abs(index) % this.messageQueueList.size();
			if (pos < 0)
				pos = 0;
			MessageQueue mq = this.messageQueueList.get(pos);
			if (!mq.getBrokerName().equals(lastBrokerName)) {
				return mq;
			}
		}
		return selectOneMessageQueue();
	}
}

3、如果在发送消息时传了Selector,那么Producer就不会走这个负载均衡的逻辑,而是会使用Selector去寻找一个队列。 具体参见org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法

//DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(
	Message msg,
	MessageQueueSelector selector,
	Object arg,
	final CommunicationMode communicationMode,
	final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	long beginStartTime = System.currentTimeMillis();
	this.makeSureStateOK();
	Validators.checkMessage(msg, this.defaultMQProducer);

	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
	if (topicPublishInfo != null && topicPublishInfo.ok()) {
		MessageQueue mq = null;
		try {
			List<MessageQueue> messageQueueList =
				mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
			Message userMessage = MessageAccessor.cloneMessage(msg);
			String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
			userMessage.setTopic(userTopic);

			mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
		} catch (Throwable e) {
			throw new MQClientException("select message queue threw exception.", e);
		}

		long costTime = System.currentTimeMillis() - beginStartTime;
		if (timeout < costTime) {
			throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
		}
		if (mq != null) {
			return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
		} else {
			throw new MQClientException("select message queue return null.", null);
		}
	}

	validateNameServerSetting();
	throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

Consumer拉取消息过程

关注重点

回顾消费者的几个重点问题:

  • 消费者也是有两种,推模式消费者和拉模式消费者。优秀的MQ产品都会有一个高级的目标,就是要提升整个消息处理的性能。而要提升性能,服务端的优化手段往往不够直接,最为直接的优化手段就是对消费者进行优化。所以在RocketMQ中,整个消费者的业务逻辑是非常复杂的,甚至某种程度上来说,比服务端更复杂,所以,在这里重点关注用得最多的推模式的消费者。

  • 消费者组之间有集群模式和广播模式两种消费模式。就要了解下这两种集群模式是如何做的逻辑封装。

  • 然后关注消费者端的负载均衡的原理。即消费者是如何绑定消费队列的,那些消费策略到底是如何落地的。

  • 最后关注在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什么后者能保持消息顺序。

源码重点

Consumer的核心启动过程和Producer是一样的, 最终都是通过MQClientFactory对象启动。不过之间添加了一些注册信息。整体启动过程:

广播模式与集群模式的Offset处理

在DefaultMQPushConsumerImpl的start方法中,启动了非常多的核心服务。 比如,对于广播模式与集群模式的Offset处理

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
	this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
	switch (this.defaultMQPushConsumer.getMessageModel()) {
		case BROADCASTING:
			this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
			break;
		case CLUSTERING:
			this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
			break;
		default:
			break;
	}
	this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

广播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端远程保存offset。而这两种Offset的存储方式,最终都是通过维护本地的offsetTable缓存来管理Offset。

Consumer与MessageQueue建立绑定关系

start方法中还一个比较重要的东西是给rebalanceImpl设定了一个AllocateMessageQueueStrategy,用来给Consumer分配MessageQueue的。

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer负载均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

AllocateMessageQueueStrategy就是用来给Consumer和MessageQueue之间建立一种对应关系的。也就是说,只要Topic当中的MessageQueue以及同一个ConsumerGroup中的Consumer实例都没有变动,那么某一个Consumer实例只是消费固定的一个或多个MessageQueue上的消息,其他Consumer不会来抢这个Consumer对应的MessageQueue。

为什么要让一个MessageQueue只能由同一个ConsumerGroup中的一个Consumer实例来消费?

因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset,如果一个MessageQueue上有多个同属一个ConsumerGroup的Consumer实例,他们的处理进度就会不一样。这样的话,Offset就乱套了。

顺序消费与并发消费

在start方法中,启动了consumerMessageService线程,进行消息拉取。

//Consumer中自行指定的回调函数
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
	this.consumeOrderly = true;
	this.consumeMessageService =
		new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
	this.consumeOrderly = false;
	this.consumeMessageService =
		new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();

Consumer通过registerMessageListener方法指定的回调函数,都被封装成了ConsumerMessageService的子实现类。

而对于这两个服务实现类的调用,会延续到DefaultMQPushConsumerImpl的pullCallback对象中。也就是Consumer每拉过来一批消息后,就向Broker提交下一个拉取消息的的请求。

这里也可以印证一个点,就是顺序消息,只对异步消费也就是推模式有效。同步消费的拉模式是无法进行顺序消费的。因为这个pullCallback对象,在拉模式的同步消费时,根本就没有往下传。

当然,这并不是说拉模式不能锁定队列进行顺序消费,拉模式在Consumer端应用就可以指定从哪个队列上拿消息。

PullCallback pullCallback = new PullCallback() {
	@Override
	public void onSuccess(PullResult pullResult) {
		if (pullResult != null) {
			pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
				subscriptionData);

			switch (pullResult.getPullStatus()) {
				case FOUND:
					... ...
					DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
						pullResult.getMsgFoundList(),
						processQueue,
						pullRequest.getMessageQueue(),
						dispatchToConsume);

这里提交的,实际上是一个ConsumeRequest线程。而提交的这个ConsumeRequest线程,在两个不同的ConsumerService中有不同的实现。

这其中,两者最为核心的区别在于ConsumerMessageOrderlyService是锁定了一个队列,处理完了之后,再消费下一个队列。

@Override
public void run() {
	... ...

	final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
	synchronized (objLock) {
		... ...
	}
}

为什么给队列加个锁,就能保证顺序消费呢?

从源码中可以看到,Consumer提交请求时,都是往线程池里异步提交的请求。如果不加队列锁,那么就算Consumer提交针对同一个MessageQueue的拉取消息请求,这些请求都是异步执行,他们的返回顺序是乱的,无法进行控制。给队列加个锁之后,就保证了针对同一个队列的第二个请求,必须等第一个请求处理完了之后,释放了锁,才可以提交。这也是在异步情况下保证顺序的基础思路。

实际拉取消息还是通过PullMessageService完成的

start方法中,相当于对很多消费者的服务进行初始化,包括指定一些服务的实现类,以及启动一些定时的任务线程,比如清理过期的请求缓存等。最后,会随着mQClientFactory组件的启动,启动一个PullMessageService。实际的消息拉取都交由PullMesasgeService进行。

所谓消息推模式,其实还是通过Consumer拉消息实现的。

//org.apache.rocketmq.client.impl.consumer.PullMessageService
private void pullMessage(final PullRequest pullRequest) {
	final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
	if (consumer != null) {
		DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
		impl.pullMessage(pullRequest);
	} else {
		log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
	}
}

客户端负载均衡管理总结

Producer负载均衡

Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker。

Producer轮训时,如果发现往某一个Broker上发送消息失败了,那么下一次会尽量避免再往同一个Broker上发送消息。但是,如果你的应用场景允许发送消息长延迟,也可以给Producer设定setSendLatencyFaultEnable(true)。这样对于某些Broker集群的网络不是很好的环境,可以提高消息发送成功的几率。

同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡

Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。

1、集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。

  • AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。

这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。

源码中有测试代码AllocateMachineRoomNearByTest。

  • AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者

  • AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。

  • AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。

  • AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。

  • AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在环上分布更为均匀。

最常用的就是平均分配和轮训分配了。

2、广播模式

广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1302781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对比三种认证方式:传统token认证,jwt认证,oauth认证

1. Token基本原理 1、客户端使用用户名跟密码请求登录&#xff1b; 2、服务端收到请求&#xff0c;去验证用户名与密码&#xff1b; 3、验证成功&#xff0c;服务端会签发一个Token&#xff08;也就是随机生成一个字符串&#xff09;保存到(Session,redis,mysql…)中&#x…

计算机毕业设计 SpringBoot的企业内管信息化系统 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

SSD在AI发展中的关键作用:从高速缓存到数据湖-1

随着人工智能技术的飞速发展&#xff0c;存储在其中发挥着至关重要的作用。特别是在AI训练过程中&#xff0c;存储SSD&#xff08;固态硬盘&#xff09;的高性能和可靠性对于提升训练效率和保证数据安全具有不可替代的作用。 存储SSD在AI发展中的作用和趋势&#xff0c;存储将…

<url-pattern>/</url-pattern>与<url-pattern>/*</url-pattern>的区别

<url-pattern>/</url-pattern> servlet的url-pattern设置为/时&#xff0c; 它仅替换servlet容器的默认内置servlet&#xff0c;用于处理所有与其他注册的servlet不匹配的请求。直白点说就是&#xff0c;所有静态资源&#xff08;js&#xff0c;css&#xff0c;ima…

人工智能数据集可视化统计分析工具:快速了解你的数据集

人工智能数据集可视化统计分析工具&#xff1a;快速了解你的数据集 简介特征示例报告安装用法 简介 Lightly Insights&#xff1a;可以轻松获取关于机器学习数据集基本洞察的工具&#xff0c;可以可视化图像数据集的基本统计信息&#xff0c;仅需提供一个包含图像和对象检测标…

自编码器 AutoEncoder

自编码器&#xff08;AutoEncoder&#xff09;&#xff0c;也称自编码模型&#xff0c;是一种基于无监督学习的数据维度压缩和特征表示方法&#xff0c;目的是对一组数据学习出一种表示。1986年 Rumelhart 提出自编码模型用于高维复杂数据的降维。由于自动编码器通常应用于无监…

建筑学VR虚拟仿真情景实训教学

首先&#xff0c;建筑学VR虚拟仿真情景实训教学为建筑学专业的学生提供了一个身临其境的学习环境。通过使用VR仿真技术&#xff0c;学生可以在虚拟环境中观察和理解建筑结构、材料、设计以及施工等方面的知识。这种教学方法不仅能帮助学生更直观地理解复杂的建筑理论&#xff0…

SpringData JPA 搭建 xml的 配置方式

1.导入版本管理依赖 到父项目里 <dependencyManagement><dependencies><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-bom</artifactId><version>2021.1.10</version><scope>…

软文开头怎么写才能拿捏用户?媒介盒子为您解答

软文标题是吸引用户点击的关键因素&#xff0c;那软文开头就是决定用户能否读下去的主要因素&#xff0c;很多运营er在写文案时经常会面临的情况之一就是好不容易想到一个标题&#xff0c;点击率不错&#xff0c;但是开头不行用户一看开头&#xff0c;跑了&#xff01;如果不知…

Git篇---第三篇

系列文章目录 文章目录 系列文章目录前言一、git pull 和 git fetch 有什么区别?二、git中的“staging area”或“index”是什么?三、什么是 git stash?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章…

c#读取XML文件实现晶圆wafermapping显示demo计算电机坐标控制电机移动

c#读取XML文件实现晶圆wafermapping显示 功能&#xff1a; 1.读取XML文件&#xff0c;显示mapping图 2.在mapping视图图标移动&#xff0c;实时查看bincode,x,y索引与计算的电机坐标 3.通过设置wafer放在平台的位置x,y轴电机编码值&#xff0c;相机在wafer的中心位置&#…

C# 任务的异常和延续处理

写在前面 当Task在执行过程中出现异常或被取消等例外的情况时&#xff0c;为了让执行流程能够继续进行&#xff0c;可以使用延续方法实现这种链式处理&#xff1b;还可以针对前置任务不同的执行结果&#xff0c;选择执行不同的延续分支方法。子任务执行过程中的任何异常都会被…

Centos7云服务器上安装cobalt_strike_4.7。附cobalt_strike_4.7安装包

环境这里是阿里的一台Centos7系统。 开始安装之前首先要确保自己安装了java11及以上环境。 安装java11步骤&#xff1a; sudo yum update sudo yum install java-11-openjdk-devel把服务器端&#xff08;CS工具分服务器端和客户端&#xff09;的CS安装到服务器上后给目录下的…

OpenEuler_22.03升级mongdb到7.0.4

使用命令&#xff1a;lscpu&#xff0c;查看cpu架构为aarch64为arm架构的一种执行状态。 所以我们直接下载arm的包安装即可。无需自己编译源码。 下载地址&#xff1a;https://www.mongodb.com/try/download/community 下载解压 wget https://fastdl.mongodb.org/linux/mong…

【LVGL】STM32F429IGT6(在野火官网的LCD例程上)移植LVGL官方的例程(还没写完,有问题 排查中)

这里写目录标题 前言一、本次实验准备1、硬件2、软件 二、移植LVGL代码1、获取LVGL官方源码2、整理一下&#xff0c;下载后的源码文件3、开始移植 三、移植显示驱动1、enable LVGL2、修改报错部分3、修改lv_config4、修改lv_port_disp.c文件到此步遇到的问题 Undefined symbol …

C语言中的一维数组与二维数组

目录 一维数组数组的创建初始化使用在内存中的存储 二维数组创建初始化使用在内存中的存储 数组越界 一维数组 数组的创建 数组是一组相同类型元素的集合。 int arr1[10]; char arr3[10]; float arr4[10]; double arr5[10];下面这个数组能否成功创建&#xff1f; int count…

python简易学生管理 + MySQL

数据库表 Python代码部分 import pymysqlclass StMgmt(object):def tips(self):"""提示用户选择的操作"""print("""学生管理系统 1.01.查看所有信息2.查看学生信息3.修改学生信息4.增加学生信息5.退出学生系统"""…

SPI 通信-stm32入门

本节我们将继续学习下一个通信协议 SPI&#xff0c;SPI 通信和我们刚学完的 I2C 通信差不多。两个协议的设计目的都一样&#xff0c;都是实现主控芯片和各种外挂芯片之间的数据交流&#xff0c;有了数据交流的能力&#xff0c;我们主控芯片就可以挂载并操纵各式各样的外部芯片&…

C语言-枚举

常量符号化 用符号而不是具体的数字来表示程序中的数字 枚举 用枚举而不是定义独立的const int变量 枚举是一种用户定义的数据类型&#xff0c;他用关键词enum以如下语法来声明&#xff1a; enum枚举类型名字{名字0&#xff0c;…&#xff0c;名字n}&#xff1b; 枚举类型名…

Q_GDW1819-2013电压监测装置协议结构解析

目录 一 专业术语二 基本功能2.1 基础功能2.2 数据存储2.3 显示功能&#xff08;设备能够看到的&#xff09;2.4 参数设置与查询2.5 事件检测与告警功能 三 其他内容3.1 通信方式3.2 通信串口 四 帧结构解析4.1 传输方式4.2 数据帧格式4.2.1 报文头&#xff08;2字节&#xff0…