Kafka-02 @KafkaListener学习

news2025/2/27 20:46:44

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.11</version>
</dependency>

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;

  • ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息;
    • 收到新的消息就执行,执行完了就继续自旋;
    • 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ----------------------------
protected void finishRefresh() {
   clearResourceCaches();

   initLifecycleProcessor();

   // 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessor
   getLifecycleProcessor().onRefresh();

   publishEvent(new ContextRefreshedEvent(this));

   if (!NativeDetector.inNativeImage()) {
      LiveBeansView.registerApplicationContext(this);
   }
}



// ------------------------------ DefaultLifecycleProcessor ----------------------------
public void onRefresh() {
    startBeans(true);
    this.running = true;
}



// ------------------------------ DefaultLifecycleProcessor ----------------------------
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
	Lifecycle bean = lifecycleBeans.remove(beanName);
	if (bean != null && bean != this) {
		String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
		for (String dependency : dependenciesForBean) {
			doStart(lifecycleBeans, dependency, autoStartupOnly);
		}
		if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) || 
             ((SmartLifecycle) bean).isAutoStartup())) {
			try {
                // 获取容器中的 LifeCycle bean 对象,调用它的 start()
                // SpringKafka 中对应的是 KafkaListenerEndpointRegistry
                // 我们重点看一下 KafkaListenerEndpointRegistry.start()
				bean.start();
			}
			catch (Throwable ex) {
				throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
			}
		}
	}
}

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
public void start() {
    // 轮询所有的 ConcurrentMessageListenerContainer 对象
    // 执行 ConcurrentMessageListenerContainer.start()
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
        startIfNecessary(listenerContainer);
    }
    this.running = true;
}



// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if ((this.contextRefreshed && this.alwaysStartAfterRefresh) 
        || listenerContainer.isAutoStartup()) {
        // 执行 ConcurrentMessageListenerContainer.start()
        listenerContainer.start();
    }
}



// ---------------------------- AbstractMessageListenerContainer ---------------------------
public final void start() {
    checkGroupId();
    synchronized (this.lifecycleMonitor) {
        if (!isRunning()) {
            // 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()
            doStart();
        }
    }
}

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer ---------------------------
protected void doStart() {
    if (!isRunning()) {
        checkTopics();
        ContainerProperties containerProperties = getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && this.concurrency > topicPartitions.length) {
            this.concurrency = topicPartitions.length;
        }
        setRunning(true);

        // 1. 根据 @KafkaListener 中配置的 concurrency 轮询
        for (int i = 0; i < this.concurrency; i++) {
            // 2. 创建 KafkaMessageListenerContainer
            KafkaMessageListenerContainer<K, V> container =
                constructContainer(containerProperties, topicPartitions, i);
            
            // 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置
            configureChildContainer(i, container);
            if (isPaused()) {
                container.pause();
            }
            
            // 4. 启动 KafkaMessageListenerContainer
            container.start();
            
            // 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中
            this.containers.add(container);
        }
    }
}

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
    String beanName = getBeanName();
    beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
    container.setBeanName(beanName);
    ApplicationContext applicationContext = getApplicationContext();
    if (applicationContext != null) {
        container.setApplicationContext(applicationContext);
    }
    ApplicationEventPublisher publisher = getApplicationEventPublisher();
    if (publisher != null) {
        container.setApplicationEventPublisher(publisher);
    }

    // 设置 clinetIdSuffix,clientId 前缀
    container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
    container.setGenericErrorHandler(getGenericErrorHandler());
    container.setCommonErrorHandler(getCommonErrorHandler());
    container.setAfterRollbackProcessor(getAfterRollbackProcessor());
    container.setRecordInterceptor(getRecordInterceptor());
    container.setBatchInterceptor(getBatchInterceptor());
    container.setInterceptBeforeTx(isInterceptBeforeTx());
    container.setListenerInfo(getListenerInfo());
    AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
    if (exec == null) {
        // 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors
        exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
        this.executors.add(exec);
        
        // 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer
        container.getContainerProperties().setConsumerTaskExecutor(exec);
    }
}

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protected void doStart() {
    if (isRunning()) {
        return;
    }
    ContainerProperties containerProperties = getContainerProperties();
    checkAckMode(containerProperties);

    Object messageListener = containerProperties.getMessageListener();
    AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
    if (consumerExecutor == null) {
        consumerExecutor = new SimpleAsyncTaskExecutor(
            (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }
    GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
    ListenerType listenerType = determineListenerType(listener);
    
    // 1. 创建 ListenerConsumer
    // ListenerConsumer 是一个 Runnable 对象
    // new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员
    // 它的 run() 比较重要
    this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    setRunning(true);
    this.startLatch = new CountDownLatch(1);
    
    // 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用
    this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

public void run() {
    ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
    publishConsumerStartingEvent();
    this.consumerThread = Thread.currentThread();
    setupSeeks();
    KafkaUtils.setConsumerGroupId(this.consumerGroupId);
    this.count = 0;
    this.last = System.currentTimeMillis();
    initAssignedPartitions();
    publishConsumerStartedEvent();
    Throwable exitThrowable = null;
    
    // 开启自旋
    while (isRunning()) {
        // 通过 KafkaConsumer 向 kafka-server 发起 poll 请求
        pollAndInvoke();
    }
    wrapUp(exitThrowable);
}

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
   if (this.invokerHandlerMethod != null) {
       // 最终的执行入口
       // 最后会通过反射调用我们的 @KafkaListener 声明的方法
      return this.invokerHandlerMethod.invoke(message, providedArgs);
   } else if (this.delegatingHandler.hasDefaultHandler()) {
      Object[] args = new Object[providedArgs.length + 1];
      args[0] = message.getPayload();
      System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
      return this.delegatingHandler.invoke(message, args);
   } else {
      return this.delegatingHandler.invoke(message, providedArgs);
   }
}

至此,SpringKafka 分析完毕;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1918145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据交换系列-DWG数据交换方案选型

1.背景介绍 1.1 什么是块 块相当于一个“标准件”&#xff0c;当你要用的时候可以随意插入&#xff0c;插入的时候可以旋转角度&#xff0c;还可以定义比例。块 &#xff0c;在CAD中&#xff0c;就是一个、或一组图形实体的总称&#xff0c;可以包含任意对象&#xff0c;简单…

Linux(一)线程——何为线程???Linux线程控制

文章目录 什么是线程&#xff1f;&#xff1f;&#xff1f;线程和进程的区别和联系Linux线程控制POSIX线程库创建线程线程等待线程终止线程分离 什么是线程&#xff1f;&#xff1f;&#xff1f; 在一个程序里的一个执行路线就叫做线程&#xff08;thread&#xff09;。更准确的…

Airtest成功案例分享:KLab连续2年携Airtest私有云产品参加CEDEC大会!

一、KLab株式会社介绍 KLab株式会社是一家位于日本的移动游戏开发公司&#xff0c;成立于2000年。公司以开发和运营基于动漫和漫画IP的手机游戏而闻名&#xff0c;尤其是在音乐节奏游戏领域。KLab的一些知名作品包括《LoveLive!学园偶像祭》、《排球少年&#xff1a;新的征程》…

【Redis】Redis十大类型

文章目录 前言一、string字符串类型二、List列表类型三、 Hash表四、 Set集合五、 ZSet有序集合六、 GEO地理空间七、 HyperLogLog基数统计八、Bitmap位图九、bitfield位域十、 Stream流10.1 队列指令10.2 消费组指令10.3 ACK机制 前言 redis是k-v键值对进行存储&#xff0c;k…

c#中的设计模式

1、设计模式 MVVM、MVC、GOF23种设计模式 2、GOF23种设计模式分类 创建型&#xff1a;对类的现实化进行了抽象&#xff0c;能够使软件模块做到与对象的创建和组织无关。 包括&#xff1a;单例模式、抽象工厂模式、建造者模式、工厂模式、原型模式 结构型&#xff1a;描述类…

PHP充电桩小程序系统源码

绿色出行新伴侣&#xff01;充电桩小程序&#xff0c;让充电不再烦恼✨ &#x1f50b; 开篇&#xff1a;告别电量焦虑&#xff0c;充电桩小程序来救场&#xff01; 在这个电动车日益普及的时代&#xff0c;电量不足成了不少车主的“心头大患”。但别担心&#xff0c;充电桩小…

基于IDEA的Lombok插件安装及简单使用

lombok介绍 Lombok能以注解形式来简化java代码&#xff0c;提高开发效率。开发中经常需要写的javabean&#xff0c;都需要花时间去添加相应的getter/setter&#xff0c;也许还要去写构造器、equals等方法&#xff0c;而且需要维护。而Lombok能通过注解的方式&#xff0c;在编译…

Redis 主从复制,、哨兵与集群

目录 一.redis主从复制 1.redis 主从复制架构 2.主从复制特点 3.主从复制的基本原理 4.命令行配置 5.实现主从复制 6.删除主从复制 7.主从复制故障恢复 8.主从复制完整过程 9.主从同步优化配置 二.哨兵模式&#xff08;Sentinel&#xff09; 1.主要组件和概念 2.哨…

利用Python的sympy包求解一元多次方程

一元1次方程 import sympy as sp # 导入sympy包 x sp.Symbol(x) # 定义符号变量 f 2*x -8 # 定义要求解的一元1次方程 x sp.solve(f) # 调用solve函数求解方程 x[4]一元2次方程 import sympy as sp # 导入sympy包 x sp.Symbol(x) # 定义符号变量 f …

无人直播赚钱的底层逻辑是什么?一文揭晓!

当前&#xff0c;网络直播已经成为各类商家提高曝光和引流获客的主要渠道之一&#xff0c;这在为商家带来新机遇的同时&#xff0c;也让他们因人手不足或资金匮乏等原因而陷入无人问津窘境之中。在此背景下&#xff0c;无人直播软件一经出现&#xff0c;便引起了众多商家的关注…

【解密】记一次辽宁省某综合实践教学管理平台加解密算法分析

最近接到需求&#xff0c;于是准备弄一下&#xff0c;发现对方整个流程是&#xff1a;先加密在请求&#xff0c;请求得到的数据再进行拼接加密&#xff0c;不过花了2个小时还是完成了解密 哈哈 找到请求发现请求数据加密 在启动器里面发现登录方法 打印出各个关键变量数据 …

docs | 使用 sphinx 转化rst文件为html文档

1. 效果图 book 风格。 优点&#xff1a; 极简风格右边有标题导航左侧是文件导航&#xff0c;可隐藏 2. 使用方式 reST 格式&#xff0c;比markdown格式更复杂。 推荐使用 book 风格。 文档构建工具是 sphinx&#xff0c;是一个python包。 $ pip3 list | grep -i Sphinx …

嵌入式ARM控制器在AGV里的应用

随着ARM技术以及芯片加工工艺的迅猛发展&#xff0c; ARM工业计算机得到了越来越广泛的应用&#xff0c;尤其在工业智慧城市、智能设备以及工业自动化控制等领域。本文将为大家详细介绍ARM控制器在AGV控制系统中的应用&#xff0c;来供大家学习和参考&#xff0c;欢迎大家一起来…

开源公司网站源码系统,降低成本,提升效率 附带完整的安装代码包以及搭建教程

系统概述 开源公司网站源码系统是一个基于开源技术的网站建设解决方案。它提供了完整的网站框架和功能模块&#xff0c;允许企业快速搭建起一个功能齐全、设计美观的企业网站。该系统不仅降低了网站开发的成本&#xff0c;还大大提高了建设效率&#xff0c;使企业能够更快地将…

便携式气象参数检测仪:智能气象监测

随着科技的飞速发展&#xff0c;气象监测已不再是传统意义上的固定站点观测&#xff0c;而是逐渐向智能化、便携化、高精度化方向演进。在这一背景下&#xff0c;便携式气象参数检测仪应运而生&#xff0c;以其轻便、高效、多功能的特性&#xff0c;成为气象监测领域的得力助手…

css实现图片渐变切换效果

一、效果 使用csskeyframes&#xff0c;实现5个图片渐变切换的效果。如下图&#xff1a; 二、代码 1.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"w…

【启明智显分享】乐鑫HMI方案4.3寸触摸串口屏:水质检测仪应用解决方案

水是万物的源泉&#xff0c;了解水的酸碱度对于保障我们的健康、生产和生活环境至关重要。水质检测仪应运而生&#xff0c;它让我们能够洞察水的酸碱奥秘。 水是万物的源泉&#xff0c;了解水的酸碱度对于保障我们的健康、生产和生活环境至关重要。水质检测仪应运而生&#xff…

成都云飞浩容文化传媒有限公司电商服务的行业翘楚

在数字经济的浪潮中&#xff0c;电商行业正以前所未有的速度发展&#xff0c;各大企业纷纷寻求突破&#xff0c;以在激烈的市场竞争中站稳脚跟。而在这个大背景下&#xff0c;成都云飞浩容文化传媒有限公司&#xff08;以下简称“云飞浩容”&#xff09;凭借其专业的电商服务&a…

网页设计零基础入门:前端技术全攻略

在当今互联网飞速发展的时代&#xff0c;前端网页设计已经成为一个备受关注的领域。随着其重要性的不断提高&#xff0c;越来越多的专业人士和爱好者开始对前端设计感兴趣&#xff0c;希望通过掌握这项技术开辟自己的职业道路。然而&#xff0c;对于新手设计师来说&#xff0c;…

Web安全:SQL注入

一、SQL注入三要素 1、用户可以对输入的参数值进行修改。 2、后端不对用户输入的参数值进行严格过滤。 3、用户修改后的参数值可以被带入后端中成功执行&#xff0c;并返回一定结果。 二、SQL注入原理 简单来说&#xff0c;用户输入的值会被插入到SQL语句中&#xff0c;然后…