Spring kafka源码分析——消息是如何消费的

news2025/2/24 2:25:41

文章目录

    • 概要
    • 端点注册
    • 创建监听容器
    • 启动监听容器
    • 消息拉取与消费
    • 小结

概要

本文主要从Spring Kafka的源码来分析,消费端消费流程;从spring容器启动到消息被拉取下来,再到执行客户端自定义的消费逻辑,大致概括为以下4个部分:

在这里插入图片描述

源码分析主要也是从以上4个部分进行分析;

环境准备

maven依赖如下:

 	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
 	<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

消费端代码:

 @KafkaListener(topics = KafkaController.TOPIC_TEST_ERROR, groupId = "${spring.application.name}")
    public void replicatedTopicConsumer2(ConsumerRecord<String, String> recordInfo) {
        int partition = recordInfo.partition();
        System.out.println("partition:" + partition);
        String value = recordInfo.value();
        System.out.println(value);

    }

参数配置使用默认配置

端点注册

KafkaAutoConfiguration

与其他组件相同,spring-kafka的入口加载入口类也是以AutoConfiguration结尾,即:KafkaAutoConfiguration,由于本文重点分析消费者流程,自动类这里主要关注以下几个地方:
在这里插入图片描述
在这里插入图片描述
kafka启动后,会自动将ConcurrentKafkaListenerContainerFactory加载到容器中。

一般来说,消费端会使用到@KafkaListener注解或者@KafkaListeners注解,所以,我们的重点就是只要是关注,这两个注解是如何被识别,并且起到监听作用的,以下是类的加载流程:

在这里插入图片描述
Bean在执行init方法后会调用,初始化后置处理方法,而KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor,KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization就会被触发执行,在该方法中,会读取该bean中标注了@KafkaListener@KafkaListeners的方法
在这里插入图片描述

protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

从上面看出,每个标注了KafkaListener注解的方法都会创建一个MethodKafkaListenerEndpoint,接着调用KafkaListenerEndpointRegistrar#registerEndpoint(KafkaListenerEndpoint,KafkaListenerContainerFactory<?>)进行注册

在这里插入图片描述
MethodKafkaListenerEndpoint又得到KafkaListenerEndpointDescriptor,最后将有的KafkaListenerEndpointDescriptor放到endpointDescriptors集合中

这里需要注意的是,KafkaListenerAnnotationBeanPostProcessor中的KafkaListenerEndpointRegistrar registrar属性是new出来的,并没有在spring容器中,而后面的创建监听器时还会再用到。
在这里插入图片描述
以上就是kafka端点注册流程。

创建监听容器

spring kafka把每个标注了KafkaListener注解的方法称为Endpoint,为每个方法生成了一个MethodKafkaListenerEndpoint对象,同时又为每个端点生成了一个MessageListenerContainer;以下是具体的生成流程

在这里插入图片描述
KafkaListenerAnnotationBeanPostProcessor实现了SmartInitializingSingleton,其中的方法afterSingletonsInstantiated会在bean初始化后进行执行

@Override
public void afterSingletonsInstantiated() {
   // 这个registrar没有放入到spring 容器中
	this.registrar.setBeanFactory(this.beanFactory);

	if (this.beanFactory instanceof ListableBeanFactory) {
		Map<String, KafkaListenerConfigurer> instances =
				((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
		for (KafkaListenerConfigurer configurer : instances.values()) {
			configurer.configureKafkaListeners(this.registrar);
		}
	}

	if (this.registrar.getEndpointRegistry() == null) {
		if (this.endpointRegistry == null) {
			Assert.state(this.beanFactory != null,
					"BeanFactory must be set to find endpoint registry by bean name");
			this.endpointRegistry = this.beanFactory.getBean(
					KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					KafkaListenerEndpointRegistry.class);
		}
		this.registrar.setEndpointRegistry(this.endpointRegistry);
	}

	if (this.defaultContainerFactoryBeanName != null) {
		this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
	}

	// Set the custom handler method factory once resolved by the configurer
	MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
	if (handlerMethodFactory != null) {
		this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
	}
	else {
		addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
	}

	// 主要方法,注册端点并创建容器
	this.registrar.afterPropertiesSet();
}
···
**KafkaListenerEndpointRegistrar**

```java
@Override
public void afterPropertiesSet() {
	registerAllEndpoints();
}

protected void registerAllEndpoints() {
	synchronized (this.endpointDescriptors) {
		// 上一个阶段已经把所有的端点放入了endpointDescriptors集合中
		for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
			this.endpointRegistry.registerListenerContainer(
					// 注意这个resolveContainerFactory
					descriptor.endpoint, resolveContainerFactory(descriptor));
		}
		this.startImmediately = true;  // trigger immediate startup
	}
}

// 如果在KafkaListener注解中的属性containerFactory没有配置容器工厂的名字,就会默认获取ConcurrentKafkaListenerContainerFactory实现类作为容器工厂
private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {
		if (descriptor.containerFactory != null) {
			return descriptor.containerFactory;
		}
		else if (this.containerFactory != null) {
			return this.containerFactory;
		}
		else if (this.containerFactoryBeanName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			this.containerFactory = this.beanFactory.getBean(
					this.containerFactoryBeanName, KafkaListenerContainerFactory.class);
			return this.containerFactory;  // Consider changing this if live change of the factory is required
		}
		else {
			throw new IllegalStateException("Could not resolve the " +
					KafkaListenerContainerFactory.class.getSimpleName() + " to use for [" +
					descriptor.endpoint + "] no factory was given and no default is set.");
		}
	}

在这里插入图片描述
以上截图是真正创建容器的地方,并把创建好的容器添加到Map<String, MessageListenerContainer> listenerContainers,后面起动时会用到。
至此,kafka监听容器创建完成,整理下主要类之间的关系,如下

在这里插入图片描述

一个@KafkaListener注解标注的方法,就可以得到一个MethodKafkaListenerEndpoint,再使用默认的ConcurrentKafkaListenerContainerFactory就会创建出一个MessageListenerContainer监听容器,有几个方法标注了@KafkaListener 就可以得到几个ConcurrentMessageListenerContainer

启动监听容器

上面的流程知道,所有创建的容器放到了`Map<String, MessageListenerContainer> listenerContainers``

在这里插入图片描述
KafkaListenerEndpointRegistry实现了Lifecycle,其中的start()方法会在bean加载的最后一个阶段中被执行到

在这里插入图片描述
以下是执行流程
在这里插入图片描述
其中org.springframework.kafka.listener.KafkaMessageListenerContainer#doStart如下

@Override
protected void doStart() {
	if (isRunning()) {
		return;
	}
	if (this.clientIdSuffix == null) { // stand-alone container
		checkTopics();
	}
	ContainerProperties containerProperties = getContainerProperties();
	checkAckMode(containerProperties);

	Object messageListener = containerProperties.getMessageListener();
	if (containerProperties.getConsumerTaskExecutor() == null) {
		SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
				(getBeanName() == null ? "" : getBeanName()) + "-C-");
		containerProperties.setConsumerTaskExecutor(consumerExecutor);
	}
	GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
	ListenerType listenerType = determineListenerType(listener);
	// ListenerConsumer的构造函数中创建了真正的Consumer<K, V> consumer
	this.listenerConsumer = new ListenerConsumer(listener, listenerType);
	setRunning(true);
	this.startLatch = new CountDownLatch(1);
	// ListenerConsumer 实现了Runnable,调用submitListenable是就会开启新的线程执行其中的run方法
	this.listenerConsumerFuture = containerProperties
			.getConsumerTaskExecutor()
			.submitListenable(this.listenerConsumer);
	try {
		if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
			this.logger.error("Consumer thread failed to start - does the configured task executor "
					+ "have enough threads to support all containers and concurrency?");
			publishConsumerFailedToStart();
		}
	}
	catch (@SuppressWarnings(UNUSED) InterruptedException e) {
		Thread.currentThread().interrupt();
	}
}

每个监听容器ConcurrentMessageListenerContainer中都会创建一个出一个ListenerConsumer或多个(跟concurrency参数配置有关)ListenerConsumer,真正从kafka服务端拉去消息的逻辑在ListenerConsumerrun方法中。
到这里,主要类跟参数之间的对应关系如下
在这里插入图片描述

消息拉取与消费

这一阶段只要关注,消息的拉取到触发用户自定义方法流程与自动位移提交

不断循环拉去消息,并反射调用用户自定义方法:
在这里插入图片描述

protected void pollAndInvoke() {
	if (!this.autoCommit && !this.isRecordAck) {
		processCommits();
	}
	idleBetweenPollIfNecessary();
	if (this.seeks.size() > 0) {
		processSeeks();
	}
	pauseConsumerIfNecessary();
	this.lastPoll = System.currentTimeMillis();
	this.polling.set(true);
	// 调用kafka原生api进行拉取
	ConsumerRecords<K, V> records = doPoll();
	if (!this.polling.compareAndSet(true, false) && records != null) {
		/*
		 * There is a small race condition where wakeIfNecessary was called between
		 * exiting the poll and before we reset the boolean.
		 */
		if (records.count() > 0) {
			this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
		}
		return;
	}
	resumeConsumerIfNeccessary();
	debugRecords(records);
	if (records != null && records.count() > 0) {
		if (this.containerProperties.getIdleEventInterval() != null) {
			this.lastReceive = System.currentTimeMillis();
		}
		// 获取消息后,触发@KafkaListener标注地方法
		invokeListener(records);
	}
	else {
		checkIdle();
	}
}

下面先关注消费者位移在dopoll方法中什么时候触发提交地
在这里插入图片描述

在这里插入图片描述

// 每个消费组都有一个消费者协调器coordinator,在coordinator.poll方法中会判断是否需要自动提交位移
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
      if (coordinator != null && !coordinator.poll(timer)) {
          return false;
      }

      return updateFetchPositions(timer);
  }


public boolean poll(Timer timer) {
    maybeUpdateSubscriptionMetadata();

    invokeCompletedOffsetCommitCallbacks();

    if (subscriptions.partitionsAutoAssigned()) {
        // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
        // group proactively due to application inactivity even if (say) the coordinator cannot be found.
        // 唤醒心跳检测线程,触发一次心跳检测
        pollHeartbeat(timer.currentTimeMs());
        if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
            return false;
        }

        if (rejoinNeededOrPending()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription()) {
                // For consumer group that uses pattern-based subscription, after a topic is created,
                // any consumer that discovers the topic after metadata refresh can trigger rebalance
                // across the entire consumer group. Multiple rebalances can be triggered after one topic
                // creation if consumers refresh metadata at vastly different times. We can significantly
                // reduce the number of rebalances caused by single topic creation by asking consumer to
                // refresh metadata before re-joining the group as long as the refresh backoff time has
                // passed.
                if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
                    this.metadata.requestUpdate();
                }

                if (!client.ensureFreshMetadata(timer)) {
                    return false;
                }

                maybeUpdateSubscriptionMetadata();
            }

            if (!ensureActiveGroup(timer)) {
                return false;
            }
        }
    } else {
        // For manually assigned partitions, if there are no ready nodes, await metadata.
        // If connections to all nodes fail, wakeups triggered while attempting to send fetch
        // requests result in polls returning immediately, causing a tight loop of polls. Without
        // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
        // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
        // When group management is used, metadata wait is already performed for this scenario as
        // coordinator is unknown, hence this check is not required.
        if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
            client.awaitMetadataUpdate(timer);
        }
    }
	// 判断是否需要自动提交位移
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}

在这里插入图片描述
以下就是消息拉去义位移自动提交地处理流程
在这里插入图片描述
记录返回后,会调用用户自定义地处理逻辑
在这里插入图片描述
以下时具体地调用流程

在这里插入图片描述

小结

1、kafka spring消费者端点注册、创建监听容器、启动监听容器阶段,有两个重要的类KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry,他们对应的方法postProcessAfterInitializationstart在spring容器启动时会被执行,从而实现了kafka的监听容器的创建与启动

在这里插入图片描述
2、kafka自动提交位移时在poll方法中进行的,也就是每次获取新消息时,会先提交上次消费完成的消息;
3、拉取消息跟用户标注了@KafkaListener注解方法的处理逻辑用的是同一个线程,自动提交时间auto.commit.interval.ms默认是5s,假如用户的方法逻辑处理时长是10s,那么位移自动提交是在10s后再次调用poll方法时才会提交,而不是5s后就准时提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/863487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

直播|深入解析 StarRocks 存算分离—云原生湖仓 Meetup#2

StarRocks 3.0 正式开启极速统一的湖仓新范式&#xff0c;借助云原生存算分离构架、极速数据湖分析、物化视图等重量级特性实现湖仓架构升级&#xff0c;兼具数据仓库查询高性能与数据湖低成本可扩展的优势&#xff0c;让用户更简单地实现极速统一的湖仓分析。 作为 StarRocks…

05-bean工厂的准备工作

入口方法 prepareBeanFactory(beanFactory);protected void prepareBeanFactory(ConfigurableListableBeanFactory beanFactory) {// Tell the internal bean factory to use the contexts class loader etc.// 设置beanFactory的classloader为当前context的classloaderbeanF…

计算机组成原理-笔记-第六章

目录 六、第六章——总线 1、总线&#xff08;基本概念&#xff09; &#xff08;1&#xff09;总线的定义 & 特性 &#xff08;2&#xff09;串行 &并行 &#xff08;3&#xff09;总线的分类 &#xff08;4&#xff09;总线分类——功能 &#xff08;4.1&…

分布式数据库视角下的存储过程

存储过程很好呀&#xff0c;那些用不好的人就是自己水平烂&#xff0c;不接受反驳&#xff01;我就有过这样念头&#xff0c;但分布式数据库&#xff0c;更倾向少用或不用存储过程。 1 我从C/S时代走来 C/S架构时代的末期最流行开发套件是PowerBuilder和Sybase数据库&#xf…

性能测试最佳实践的思考,7个要点缺一不可!

性能测试是软件开发和应用过程中至关重要的环节。它是评估系统性能、稳定性和可扩展性的有效手段&#xff0c;可以确保软件在真实环境中高效运行。在现代技术快速发展的时代&#xff0c;性能测试的重要性愈发显著。 性能测试在软件开发和应用过程中的重要性不可低估。它是保障…

程序员怎么利用ChatGPT解放双手=摸鱼?

目录 1. 当你遇到问题时为你生成代码ChatGPT 最明显的用途是根据查询编写代码。我们都会遇到不知道如何完成任务的情况&#xff0c;而这正是人工智能可以派上用场的时候。例如&#xff0c;假设我不知道如何使用 Python 编写 IP 修改器&#xff0c;只需查询 AI&#xff0c;它就…

模拟实现string类

string类的接口有很多&#xff0c;这里我来梳理一下自己觉得有意思的几个&#xff0c;并模拟实现一下可以凑合用的my_string&#xff0c;话不多说直接开干&#xff1a; 注意事项 为了和库里的string冲突&#xff0c;所以就将自己实现的my_string放在一个命名空间里 namespace …

采用pycharm在虚拟环境使用pyinstaller打包python程序

一年多以前&#xff0c;我写过一篇博客描述了如何虚拟环境打包&#xff0c;这一次有所不同&#xff0c;直接用IDE pycharm构成虚拟环境并运行pyinstaller打包 之前的博文&#xff1a; 虚拟环境venu使用pyinstaller打包python程序_伊玛目的门徒的博客-CSDN博客 第一步&#xf…

明天就要去面试软件测试岗了,现在我能怎么做呢?

首先&#xff0c;时间已经不允许你进行大面积的专业复习&#xff0c;所以你应该做好能够立竿见影的准备工作&#xff1a; 1、整理好自己的仪表 先去理个发&#xff0c;让自己看起来精神一点&#xff0c;尤其是男生&#xff0c;整理头发&#xff0c;修修鬓角能够快速让人对自己…

安卓13不再支持PPTP怎么办?新的连接解决方案分享

随着Android 13的发布&#xff0c;我们迎来了一个令人兴奋的新品时刻。然而&#xff0c;对于一些用户而言&#xff0c;这也意味着必须面对一个重要的问题&#xff1a;Android 13不再支持PPTP协议。如果你是一个习惯使用PPTP协议来连接换地址的用户&#xff0c;那么你可能需要重…

机器学习实战——波士顿房价预测

波士顿房价预测 波士顿房地产市场竞争激烈&#xff0c;而你想成为该地区最好的房地产经纪人。为了更好地与同行竞争&#xff0c;你决定运用机器学习的一些基本概念&#xff0c;帮助客户为自己的房产定下最佳售价。幸运的是&#xff0c;你找到了波士顿房价的数据集&#xff0c;…

tabBar的使用

参考Api&#xff1a;全局配置 | 微信开放文档 (qq.com) 1.使用说明 2.使用详情 3.使用案例 在全局配置的app.json中 "tabBar": {"color": "#333","selectedColor": "#d43c33","backgroundColor": "#fff&qu…

【云原生】Kubernetes控制器中StatefulSet的使用

目录 1 什么是 StatefulSet 2 StatefulSet 特点 3 限制 4 使用 StatefulSet 1 搭建 NFS 服务 2 客户端测试 3 使用 statefulset 1 什么是 StatefulSet 官方地址&#xff1a; StatefulSet | Kubernetes StatefulSet 是用来管理有状态应用的工作负载 API 对象。 无状态应…

半年报增幅超预期,但汤臣倍健还是喂不饱年轻人?

文|琥珀消研社 作者|朱古力 谁也想不到&#xff0c;进入三伏天以后首个运动潮流会是“晒背”。 小红书上近5万条“晒背”相关的笔记里&#xff0c;多数都是对“什么时候晒背”、“晒多久合适”等入门级问题的答疑解惑&#xff0c;微博热搜词条更是针对性地给出了“晒背最佳时…

小程序如何自定义分享内容

小程序项目中遇到门票转增功能&#xff0c;用户可将自己购买的门票分享给好友&#xff0c;好友成功领取即得门票一张 1.自定义分享按钮 通过button里的open-type属性里的share参数即自可定义分享按钮 <button open-type"share">分享</button>2.配置分…

使用 Torchvision 探测器进行实时深度排序

在实际应用中,跟踪是对象检测中最重要的组成部分之一。如果没有跟踪,实时监控和自动驾驶系统等应用就无法充分发挥其潜力。无论是人还是车辆,物体跟踪都起着重要作用。然而,测试大量的检测模型和重新识别模型是很麻烦的。为此,我们将使用 Deep SORT 和 Torchvision 检测器…

电路累积(放过压防反接、IIC、锂电池保护板)

一.防过压防反接电路 简单的过压保护电路一般加个TVS可以实现&#xff0c;当外部有瞬间高能量冲击时候它能够把这股能量抑制下来&#xff0c;虽然功率高&#xff0c;上千W都可以&#xff0c;但是维持抑制的时间很短很短&#xff0c;万一器件损坏或者长时间工作电压高于正常工作…

python入门篇03 基础案例 python版与java版 语法不同之处

目录 1. 前言: -> 上篇传送门: python入门篇02 python的语法基础 2. 案例: pzy超市的收银系统(控制台输入版) -> 2.0 需求摘要: -> 2.1 python代码答案: <直接可以运行> -> 2.2 java代码答案: <必须有main方法> 2.3 两种代码运行的结果: (一样…

424. 替换后的最长重复字符

424. 替换后的最长重复字符 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 424. 替换后的最长重复字符 https://leetcode.cn/problems/longest-repeating-character-replacement/description/ 完成情况&#xff…

集群安装OpenFOAM

在个人电脑上安装比较简单&#xff0c;在服务器上安装多出几个步骤&#xff0c;需要首先安装boost库以及openmpi库 下面步骤主要参考这篇博文&#xff0c;这里我自己重复一遍。 1、安装包准备 1.1、首先是下载OF源码和第三方库&#xff0c;这里采用wget的方式下载 wget -O …