SpringBoot 处理 @KafkaListener 消息

news2024/12/24 10:21:08

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {
		if (isRunning()) {
			return;
		}
		ContainerProperties containerProperties = getContainerProperties();
		if (!this.consumerFactory.isAutoCommit()) {
			AckMode ackMode = containerProperties.getAckMode();
			if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
				Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
			}
			if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
					&& containerProperties.getAckTime() == 0) {
				containerProperties.setAckTime(5000);
			}
		}

		Object messageListener = containerProperties.getMessageListener();
		Assert.state(messageListener != null, "A MessageListener is required");
		if (containerProperties.getConsumerTaskExecutor() == null) {
			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
		this.listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
		if (this.listener instanceof DelegatingMessageListener) {
			Object delegating = this.listener;
			while (delegating instanceof DelegatingMessageListener) {
				delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
			}
			listenerType = ListenerUtils.determineListenerType(delegating);
		}
        // 这里创建了监听消费者对象
		this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
		setRunning(true);
        // 将消费者对象放入到线程池中执行
		this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
	}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {
			this.consumerThread = Thread.currentThread();
			if (this.genericListener instanceof ConsumerSeekAware) {
				((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
			}
			if (this.transactionManager != null) {
				ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
			}
			this.count = 0;
			this.last = System.currentTimeMillis();
			if (isRunning() && this.definedPartitions != null) {
				try {
					initPartitionsIfNeeded();
				}
				catch (Exception e) {
					this.logger.error("Failed to set initial offsets", e);
				}
			}
			long lastReceive = System.currentTimeMillis();
			long lastAlertAt = lastReceive;
			while (isRunning()) {
				try {
					if (!this.autoCommit && !this.isRecordAck) {
						processCommits();
					}
					processSeeks();
					if (!this.consumerPaused && isPaused()) {
						this.consumer.pause(this.consumer.assignment());
						this.consumerPaused = true;
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Paused consumption from: " + this.consumer.paused());
						}
						publishConsumerPausedEvent(this.consumer.assignment());
					}
                    // 拉取信息
					ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
					this.lastPoll = System.currentTimeMillis();
					if (this.consumerPaused && !isPaused()) {
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Resuming consumption from: " + this.consumer.paused());
						}
						Set<TopicPartition> paused = this.consumer.paused();
						this.consumer.resume(paused);
						this.consumerPaused = false;
						publishConsumerResumedEvent(paused);
					}
					if (records != null && this.logger.isDebugEnabled()) {
						this.logger.debug("Received: " + records.count() + " records");
						if (records.count() > 0 && this.logger.isTraceEnabled()) {
							this.logger.trace(records.partitions().stream()
								.flatMap(p -> records.records(p).stream())
								// map to same format as send metadata toString()
								.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
								.collect(Collectors.toList()));
						}
					}
					if (records != null && records.count() > 0) {
						if (this.containerProperties.getIdleEventInterval() != null) {
							lastReceive = System.currentTimeMillis();
						}
						invokeListener(records);
					}
					else {
						if (this.containerProperties.getIdleEventInterval() != null) {
							long now = System.currentTimeMillis();
							if (now > lastReceive + this.containerProperties.getIdleEventInterval()
									&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
								publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
										? this.consumer : null, this.consumerPaused);
								lastAlertAt = now;
								if (this.genericListener instanceof ConsumerSeekAware) {
									seekPartitions(getAssignedPartitions(), true);
								}
							}
						}
					}
				}
				catch (WakeupException e) {
					// Ignore, we're stopping
				}
				catch (NoOffsetForPartitionException nofpe) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
					break;
				}
				catch (Exception e) {
					handleConsumerException(e);
				}
			}
			ProducerFactoryUtils.clearConsumerGroupId();
			if (!this.fatalError) {
				if (this.kafkaTxManager == null) {
					commitPendingAcks();
					try {
						this.consumer.unsubscribe();
					}
					catch (WakeupException e) {
						// No-op. Continue process
					}
				}
			}
			else {
				ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
				KafkaMessageListenerContainer.this.stop();
			}
			this.monitorTask.cancel(true);
			if (!this.taskSchedulerExplicitlySet) {
				((ThreadPoolTaskScheduler) this.taskScheduler).destroy();
			}
			this.consumer.close();
			this.logger.info("Consumer stopped");
		}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {
		if (!isRunning()) {
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
			if (topicPartitions != null
					&& this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);

            // 创建多个消费者
			for (int i = 0; i < this.concurrency; i++) {
				KafkaMessageListenerContainer<K, V> container;
				if (topicPartitions == null) {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties);
				}
				else {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties, partitionSubset(containerProperties, i));
				}
				String beanName = getBeanName();
				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix("-" + i);
				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
				container.start();
				this.containers.add(container);
			}
		}
	}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {

  

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);
        // poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);
        return props;
    }

}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 增加开启批量处理
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}
 
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2137182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024-2025年最全的计算机软件毕业设计选题大全

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

通过API接口获取数据:高效、灵活的数据交互之道

在数字化时代&#xff0c;数据已成为企业最宝贵的资产之一。企业和开发者对于数据的获取、处理和分析的需求日益增长。API&#xff08;应用程序编程接口&#xff09;接口作为连接不同系统和应用程序的桥梁&#xff0c;提供了一种高效、灵活的方式来获取和交换数据。本文将探讨为…

rust + bevy 实现小游戏 打包成wasm放在浏览器环境运行

游戏界面 代码地址 github WASM运行 rustup target install wasm32-unknown-unknown cargo install wasm-server-runner cargo run --target wasm32-unknown-unknowncargo install wasm-bindgen-cli cargo build --release --target wasm32-unknown-unknown wasm-bindgen --…

工厂模式(二):工厂方法模式

一、概念 工厂方法模式&#xff08;Factory Method&#xff09;&#xff0c;定义一个用于创建对象的接口&#xff0c;让子类决定实例化哪一个类。工厂方法使一个类的实例化延迟到其子类。从而使得系统更加灵活。客户端可以通过调用工厂方法来创建所需的产品&#xff0c;而不必…

Linux进程间通信——管道实现实战;深度学习,探索管道接口、特性、情况

前言&#xff1a;本节内容仍是管道&#xff0c; 上节内容我们学习了管道的原理。 这节内容将在原理的基础上&#xff0c; 讲解管道的编程&#xff0c; 特性&#xff0c;应用等等。 下面开始我们的学习吧。 ps&#xff1a;本节内容需要了解一些管道的原理&#xff0c; 希望友友们…

AIGC-初体验

线性分类 提问&#xff0c;目的试图让AI自动线性分类 A类&#xff1a;(10,21),&#xff08;3,7&#xff09;,(9,20&#xff09;(121,242) B类&#xff1a;(3,9),(5,11),(70,212),(11,34) 根据线性关系分类 请问 (100,300)&#xff0c;&#xff08;100&#xff0c;201&#xff…

nacos和eureka的区别详细讲解

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; Nacos 和 Eureka 是两种服务注册与发现的组件&#xff0c;它们在微服务架构中扮演重要角色。两者虽然都是为了解决服务发现的问题&#xff0c;但在功能特性、架构、设计理念等方面有很多不同。以下是详细的…

【期末复习】软件项目管理

前言&#xff1a; 关于软件项目管理这一科目的重要期末考点&#xff0c;希望对你有帮助。 目录 质量管理可能遇到的问题 软件项目质量管理 软件项目风险管理 进度 题1 题2 题3 成本 题1 题2 题3 质量管理可能遇到的问题 (1)没有制定质量管理计划&#xff1a; (2)…

JMeter测试工具的简单了解

Apache JMeter 是一款开源的测试工具&#xff0c;主要用于对软件的性能进行测试。它最初被设计用于测试Web应用&#xff0c;但随着时间的推移&#xff0c;它的功能已经扩展到了其他测试领域。 可以应用到的场景 性能测试&#xff1a;评估应用程序在不同负载下的表现。负载测试…

初学代码指南(软2耶)

首先&#xff0c;很高兴又和大家见面了&#xff0c;本文章仅是作者的自我总结&#xff0c;是给笔者看的&#xff0c;所以读者在阅读时请抱着参考的心态&#xff0c;如果觉得可以借鉴的可以稍微借鉴一下&#xff0c;如果觉得笔者写了一坨shi&#xff0c;可以随便喷俺。 一.IDE …

音视频开发常见的开源项目汇总

FFmpeg 地址&#xff1a;https://ffmpeg.org/介绍&#xff1a;FFmpeg 是一个非常强大的开源多媒体框架&#xff0c;它可以用来处理视频和音频文件。它支持多种格式的转换、编码、解码、转码、流处理等。FFmpeg 包括了 libavformat、libavcodec、libavutil、libswscale、libpos…

✨机器学习笔记(四)—— 逻辑回归、决策边界、过拟合、正则化

Course1-Week3: https://github.com/kaieye/2022-Machine-Learning-Specialization/tree/main/Supervised%20Machine%20Learning%20Regression%20and%20Classification/week3机器学习笔记&#xff08;四&#xff09; 1️⃣逻辑回归&#xff08;logistic regression&#xff09;…

[数据集][目标检测]疟疾恶性疟原虫物种目标检测数据集VOC+YOLO格式948张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;948 标注数量(xml文件个数)&#xff1a;948 标注数量(txt文件个数)&#xff1a;948 标注类别…

数据处理工具(geodataprocess)之哨兵1预处理

在使用 Sentinel-1 数据进行遥感应用时&#xff0c;数据预处理是一个关键步骤&#xff0c;目的是提高数据的质量&#xff0c;消除或减少系统和环境引入的误差&#xff0c;使其更适合后续分析。Sentinel-1 是欧洲空间局&#xff08;ESA&#xff09;的合成孔径雷达&#xff08;SA…

数字高程模型DEM详细应用分析

DEM在各个领域都有广泛应用&#xff0c;它不仅仅是一张“高程地图”&#xff0c;更是地理分析、模拟和预测的重要工具。 一、地形分析 在地形分析中&#xff0c;DEM是不可或缺的工具. 1 坡度分析&#xff08;Slope Analysis&#xff09; 定义&#xff1a;坡度是指地形表面的…

OpenSSH Server 远程代码执行漏洞(CVE-2024-6387)(附代码)

OpenSSH Server 远程代码执行漏洞&#xff08;CVE-2024-6387&#xff09;&#xff08;附代码&#xff09; 前言影响范围验证脚本1.python2.C? 参考链接 前言 2024年7月1日&#xff0c;OpenSSH 官方发布安全通告&#xff0c;披露CVE-2024-6387 OpenSSH Server 远程代码执行漏洞…

Python画笔案例-049 绘制笑脸

1、绘制笑脸 通过 python 的turtle 库绘制 笑脸&#xff0c;如下图&#xff1a; 2、实现代码 绘制 笑脸&#xff0c;以下为实现代码&#xff1a; """笑脸.py """ import turtledef draw_circle(pos,radius):"""以pos为中心点画圆…

加拿大发布的认知战思想与力量发展

文章目录 前言一、心理作战、影响力与欺骗战术1.1 孙子兵法中的认知战思想1.2 虚假信息轰炸1.3 人脑领域的持久胜利二、加拿大及其盟友面临的认知战威胁三、俄罗斯实施的认知战3.1 利用虚假信息加剧社会两级分化并刺激个别激进群体3.2 新一代的虚假信息行动有可能造成严重的认知…

redis基本数据结构-set

文章目录 1. set的基本介绍1.1. set底层结构之hash表的简单介绍1.2. 常用命令 2. 常见的业务场景2.1. 标签系统2.2. 社交网络好友关系 1. set的基本介绍 参考链接&#xff1a;https://mp.weixin.qq.com/s/srkd73bS2n3mjIADLVg72A redis 的 set 数据结构是一个无序的集合&#…

【JavaScript】数据结构之字典 哈希表

字典 键值对存储的&#xff0c;类似于js的对象&#xff0c;但在js对象中键[key]都是字符串类型或者会转换成字符串类型&#xff0c;因此后声明的键值会覆盖之前声明的值。字典以map表示&#xff0c;map的键不会转换类型。 let map new Map() map.set(a, 1) map.set(b, 2) ma…