kafka学习-消费者

news2025/1/19 8:03:44

目录

1、消费者、消费组

2、心跳机制

3、消费者常见参数配置

4、订阅

5、反序列化

基本概念

自定义反序列化器

6、位移提交

6.1、自动提交

6.2、手动提交

同步提交

异步提交

7、再均衡

7.1、定义与基本概念

7.2、缺陷

7.3、如何避免再均衡

7.4、如何进行组内分区分配

7.5、谁来执行再均衡和消费组管理

8、消费者拦截器

作用

自定义消费者拦截器


1、消费者、消费组

  • 消费者从订阅的主题消费消息,消费消息的偏移量保存在kafka中的__consumer_offsets的主题中。
  • 多个消费同一个主题的消费者,可以通过group.id配置,加入到同一个消费组中。消费组均衡地给消费者分配分区,每个分区只由消费组中的一个消费者消费,防止重复消费。
  • 同一个消费组里:一个分区只会对应一个消费者,但一个消费者可以消费多个分区。
  • group_id一半设置为应用或者业务的逻辑名称。

2、心跳机制

消费者4宕机,重新分配分区3的消费者
分区3所在broker宕机,重选分区3的leader分区

  • 消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区;
  • broker宕机,分区3重选leader副本,出发再平衡,重新分配分区3消息。

        心跳机制,就是consumer和broker之间的健康检查。consumer和broker之间保持长连接,通过心跳机制检测对方是否健康。心跳检测相关参数如下所示:

        在broker端,可配置sessionTimeoutMs参数,如果consumer心跳超期,broker会把消费者从消费组中移除,并触发再平衡,重新分配分区;

        在consumer端,可配置sessionTimeoutMs和rebalanceTimeoutMs参数,如果broker心跳超期,consumer则会告知broker主动退出消费组,并触发再平衡。

3、消费者常见参数配置

4、订阅

主题、分区(leader和follower分区)、消费者、消费组、订阅。

  • 主题:topic,用于分类管理消息的逻辑单元,可以用于区分业务类型;
  • 分区:partition,同一个topic的消息,会被分散到多个分区中,不同分区通常在不同broker上,方便水平扩展。分区可分为leader分区和follower分区,leader分区用于与生产者/消费者通信,follower分区用于备份leader分区的数据;
  • 消费者:与分区长连接,用于消费分区中的消息;
  • 消费组:消费组中可能会有多个消费者,保证一个消费组获取到特定主题的全部消息。消费组可以保证一个主题的分区只会被消费组中的一个消费者消费;
  • 订阅:消费者订阅主题,并将消费者加入到消费组中,采用pull模式,从broker分区中读取消息。kafka的消费者只有pull模式,该模式下消费者可以自主控制消费消息的速率。

5、反序列化

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息者接收消息后,需要将消息反序列化为指定的数据格式进行处理。
  • 消费者通过key.deserializer和value.deserializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Deserializer<T>接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArrayDeserializer、ByteBufferDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、StringDeserializer、LongDeserializer、ShortDeserializer。

自定义反序列化器

        实现org.apache.kafka.common.serialization.Deserializer<T>接口,并实现其中的deserializer方法。

public class UserDeserializer implements Deserializer<User> {
	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
	}
	
	@Override
	public User deserialize(String topic, byte[] data) {
		ByteBuffer allocate = ByteBuffer.allocate(data.length);
		allocate.put(data);
		allocate.flip();
		int userId = allocate.getInt();
		int length = allocate.getInt();
		System.out.println(length);
		String username = new String(data, 8, length);
		return new User(userId, username);
	}
	@Override
	public void close() {
	}
}

6、位移提交

  • 位移 = kafka分区消息的偏移量。
  • kafka中有一个主题,专门用于保存消费者的偏移量。
  • 消费者与分区一一对应,消费者在消费分区消息时,需要向kafka提交自己的位移(偏移量)信息,kafka只记录该消费者在对应分区的偏移量信息。
  • 消费者向kafka提交偏移量的过程,叫做位移提交。
  • 位移提交,分为自动提交和手动提交;也分为同步提交和异步提交。

6.1、自动提交

  • 开启⾃动提交: enable.auto.commit=true,kafka默认为自动提交。
  • 配置⾃动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s。
        自动提交模式下,Kafka会保证在开始调⽤ poll ⽅法时,提交上次 poll 返回的所有消息,因此⾃动提交不会出现消息丢失,但会重复消费,比如:
  1. Consumer 5s 提交一次offset
  2. 假设提交 offset 后的 3s 发⽣了 Rebalance
  3. Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费
  4. 因此 Rebalance 发⽣前 3s 的消息会被重复消费

6.2、手动提交

同步提交

while (true) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
	process(records); // 处理消息
	try {
		consumer.commitSync();
	} catch (CommitFailedException e) {
		handle(e); // 处理提交失败异常
	}
}
  • 使⽤ KafkaConsumer#commitSync(),会提交 KafkaConsumer#poll() 返回的最新 offset
  • ⼿动同步提交可以控制offset提交的时机和频率
  • 调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
  • 会影响 TPS
  • 如果提交间隔过长,consumer重启后,会有更多的消息被重复消费。

异步提交

while (true) {
	ConsumerRecords<String, String> records = consumer.poll(3_000);
	process(records); // 处理消息
	consumer.commitAsync((offsets, exception) -> {
		if (exception != null) {
			handle(exception);
		}
	});
}
  • 使⽤ KafkaConsumer#commitAsync():会提交 KafkaConsumer#poll() 返回的最新 offset
  • commitAsync出现问题不会⾃动重试,可通过异步提交与同步提交相结合的方式解决。

7、再均衡

7.1、定义与基本概念

        也叫做重平衡,主要是为了让消费组下的消费者来重新分配主题下的每一个分区。再均衡的触发条件有如下三个:

  1. 消费组内成员变更(增加和减少消费者),⽐如消费者宕机退出消费组,或者新增一个消费者。
  2. 主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发再均衡。
  3. 订阅的主题发⽣变化,比如消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发再均衡。

7.2、缺陷

        再均衡过程中,消费者无法从kafka消费消息。如果kafka节点过多,再均衡过程会及其耗时(数分钟甚至小时),过程中kafka基本处于不可用状态。

7.3、如何避免再均衡

        完全避免,那不可能,因为你无法保证消费者不会故障。但是我们可以通过避免增加分区、增加订阅的主题、增加消费者这几种情况,减少再均衡的触发。
        但有时候,kafka会错误地认为一个正常的消费者已经挂掉,从而触发再均衡。我们要做的,就是避免这种情况。
        消费者和kafka之间通过心跳机制来做健康检查。当消费者宕机、网络阻塞或是消费者因负载过重没来得及发送心跳时,kafka都会认为消费者挂掉了。所以,设置合理的健康检查参数可以有效减少再均衡的发生。比较重要的参数如下:
  1. session.timout.ms:控制⼼跳超时时间,推荐设置为6s
  2. heartbeat.interval.ms:控制⼼跳发送频率,频率越高越不容易误判,但也会消耗更多资源,推荐设置为2s
  3. max.poll.interval.ms:控制poll的间隔,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。推荐为消费者处理消息最长耗时 + 1分钟。

7.4、如何进行组内分区分配

有三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。

7.5、谁来执行再均衡和消费组管理

        kafka里有一个角色,叫做Group Coordinator,用于执行消费组的管理。
        Group Coordinator——每个消费组分配一个消费组协调器⽤于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

8、消费者拦截器

作用

  1. 消费者在拉取了分区消息后,会先通过反序列化对key和value进行处理;
  2. 然后可通过设置消费者拦截器对消息进行处理,允许更改消费者接收到的消息,或者做一些监控、日志处理
  3. 应用程序处理消费者拉取的分区消息;

自定义消费者拦截器

        ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。

        自定义消费者拦截器需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口,并实现其中的configure()、onConsume()、onCommit()、close()方法,其中:

  • onConsume():该方法在poll方法返回之前调用,调用结束后poll方法就返回消息了。可通过该方法修改消费者消息,返回新的消息。
  • onCommit():当消费者提交偏移量时,调用该方法。
  • close():用于关闭该拦截器用到的资源,如打开的文件、连接的数据库等。
  • configure():用于获取消费者的参数配置。
public class MyInterceptor implements ConsumerInterceptor<String, String> {
	@Override
	public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
		// poll方法返回结果之前最后要调用的方法
		System.out.println("MyInterceptor -- 开始");
		// 消息不做处理,直接返回
		return records;
	}
	@Override
	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
		// 消费者提交偏移量的时候,经过该方法
		System.out.println("MyInterceptor -- 结束");
	}
	@Override
	public void close() {
		// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
	}
	@Override
	public void configure(Map<String, ?> configs) {
		// 用于获取消费者的设置参数
		configs.forEach((k, v) -> {
			System.out.println(k + "\t" + v);
		});
	}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997472.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React+Typescript+react-router 6 创建路由操作

本文我们来看看路由的安装 其实路由的操作没有什么变化 但是还是给大家讲一下 那么我们打开项目 在项目终端输入 npm install --save react-router react-router-dom安装 一下 react-router 和 react-router-dom 这都是react开发很基本的插件了 不过大家安装前先注意好我的版…

C++ 进制转化入门知识(1)

一、什么是进制 进制是一种用来表示数值的系统或方法&#xff0c;它是基于一个特定的基数来工作的。在我们常见的几种进制中&#xff0c;有&#xff1a; 1. **二进制&#xff08;基数 2&#xff09;**&#xff1a; 二进制只用两个数字&#xff1a;0和1。这是计算机内部使用…

GB28181学习(三)——心跳保活

心跳保活 要求&#xff1a; 1. 当原设备发现工作异常时&#xff0c;应立即向本SIP监控域的SIP服务器发送状态信息&#xff1b; 2. 无异常时&#xff0c;定时向本SIP监控域的SIP服务器发送状态信息&#xff1b; 3. 状态信息报送采用**MESSGAE**方法&#xff1b; 4. SIP设备宜在…

不同温度与工况的放电曲线与内阻曲线

在电动汽车中&#xff0c;机器学习被广泛应用于许多领域&#xff0c;包括电池状态估计。电池的状态 of charge (SOC) 是电池中可用能量的百分比。准确估计SOC对于优化电池性能、延长电池寿命和维护安全性至关重要。然而&#xff0c;SOC估计是一个复杂的任务&#xff0c;因为电池…

线性规划对偶问题:理论推导和实际应用

文章目录 对偶问题实例对偶问题定义和性质定义性质 对偶问题应用影子价格理论应用 参考文献 对偶问题实例 之前在很多地方&#xff0c;都看到过“对偶”这两个字眼&#xff0c;总觉得这个词很高大上。对偶理论的百度百科中甚至写到&#xff1a;“在线性规划早期发展中最重要的…

Unity之创建第一个2D游戏项目

一 Unity环境配置 1.1 Untity资源官网下载&#xff1a;https://unity.cn/releases 1.2 Unity Hub集成环境&#xff0c;包含工具和项目的管理 1.3 Unity Editor编辑器 1.4 Visual Studio 2022脚本编辑器 1.5 AndroidSKD&#xff0c;JDK&#xff0c;NDK工具&#xff0c;用于and…

tcp连接+套接字编程

tcp头部 tcp端口号 TCP的连接是需要四个要素确定唯一一个连接&#xff1a;&#xff08;源IP&#xff0c;源端口号&#xff09; &#xff08;目地IP&#xff0c;目的端口号&#xff09; 所以TCP首部预留了两个16位作为端口号的存储&#xff0c;而IP地址由上一层IP协议负责传递 源…

autoware.ai感知随笔--地面滤波

autwoware.ai中点云预处理–points_preprocessor points_preprocessor cloud_transformer: 点云坐标转换,将输入的点云转化为velodyne坐标系下的点云。 compare_map_filter: 对比激光雷达点云和点云地图&#xff0c;然后提取&#xff08;或去除&#xff09;一致的点。 |input_…

联通面试题

一、GC 1.1、目标 GC的主要作用是自动识别和释放不再使用的对象&#xff0c;回收其所占用的内存&#xff0c;以防止内存泄漏和内存溢出的问题。 1.2、如何实现 1.2.1、标记阶段 GC从根对象&#xff08;如线程栈中的引用、静态变量等&#xff09;开始&#xff0c;通过可达性…

CnosDB 签约京清能源,助力分布式光伏发电解决监测系统难题。

近日&#xff0c;京清能源采购CnosDB&#xff0c;升级其“太阳能光伏电站一体化监控平台”。该平台可以实现电站设备统一运行监控&#xff0c;数据集中管理&#xff0c;为操作人员、维护人员、管理人员提供全面、便捷、差异化的数据和服务。 京清能源集团有限公司&#xff08;…

【LeetCode】35.复杂链表的复制

题目 请实现 copyRandomList 函数&#xff0c;复制一个复杂链表。在复杂链表中&#xff0c;每个节点除了有一个 next 指针指向下一个节点&#xff0c;还有一个 random 指针指向链表中的任意节点或者 null。 示例 1&#xff1a; 输入&#xff1a;head [[7,null],[13,0],[11,4]…

并发-Executor框架笔记

Executor框架 jdk5开始&#xff0c;把工作单元与执行机制分离开来&#xff0c;工作单元包括Runable和Callable&#xff0c;执行机制由Executor框架来提供。 Executor框架简介 Executor框架的两级调度模型 Java线程被一对一映射为本地操作系统线程 java线程启动会创建一个本…

Linux单列模式实现线程池

目录 一、单列模式 1.1 单列模式概念以及实现条件 1.2 饿汉模式 1.1.1 饿汉模式代码实现 1.1.2 饿汉模式特征和优缺点 1.3 懒汉模式 1.3.1 懒汉模式代码实现 1.3.2 懒汉模式特征以及优缺点 二、线程池 2.1 线程池概念 2.2 实现简单线程池逻辑 2.3 模拟实现懒汉模式线程…

【八大经典排序算法】:直接插入排序、希尔排序实现 ---> 性能大比拼!!!

【八大经典排序算法】&#xff1a;直接插入排序、希尔排序实现 ---> 性能大比拼&#xff01;&#xff01;&#xff01; 一、 直接插入排序1.1 插入排序原理1.2 代码实现1.3 直接插入排序特点总结 二、希尔排序 ( 缩小增量排序 )2.1 希尔排序原理2.2 代码实现2.3 希尔排序特点…

UE5、CesiumForUnreal实现瓦片坐标信息图层效果

文章目录 1.实现目标2.实现过程2.1 原理简介2.2 cesium-native改造2.3 CesiumForUnreal改造2.4 运行测试3.参考资料1.实现目标 参考CesiumJs的TileCoordinatesImageryProvider,在CesiumForUnreal中也实现瓦片坐标信息图层的效果,便于后面在调试地形和影像瓦片的加载调度等过…

【C++入门到精通】C++入门 ——搜索二叉树(二叉树进阶)

阅读导航 前言一、搜索二叉树简介1. 概念2. 基本操作⭕搜索操作&#x1f36a;搜索操作基本代码&#xff08;非递归&#xff09; ⭕插入操作&#x1f36a;插入操作基本代码&#xff08;非递归&#xff09; ⭕删除操作&#x1f36a;删除操作基本代码&#xff08;非递归&#xff0…

给老婆写的,每日自动推送暖心消息

文章の目录 一、起因二、环境准备三、创建nestjs项目四、控制器五、service服务层1、获取Access token2、组装模板消息数据3、获取下次发工资还有多少天4、获取距离下次结婚纪念日还有多少天5、获取距离下次生日还有多少天6、获取时间日期7、获取是第几个结婚纪念日8、获取相恋…

前端面试题JS篇(4)

浏览器缓存 浏览器缓存分为强缓存和协商缓存&#xff0c;当客户端请求某个资源时&#xff0c;获取缓存的流程如下&#xff1a; 先根据这个资源的一些 http header 判断它是否命中强缓存&#xff0c;如果命中&#xff0c;则直接从本地获取缓存资源&#xff0c;不会发请求到服务…

vivado xpm 使用和封装

vivado xpm 使用和封装 tools -> language templates

【JavaScript】WebAPI入门到实战

文章目录 一、WebAPI背景知识1. 什么是WebAPI&#xff1f;2. 什么是API&#xff1f; 二、DOM基本概念三、获取元素三、事件初识1. 点击事件2. 键盘事件 四、操作元素1. 获取/修改元素内容2. 获取/修改元素属性3. 获取/修改表单元素属性4. 获取/修改样式属性 五、操作节点1. 新增…