Kafka-消费者-KafkaConsumer分析

news2025/1/14 18:36:10

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1392870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

葡萄酒术语“干”是什么意思呢?

一个初学品酒的人常常会感到力不从心,有如此多的术语,如甜、干、单宁、酒体等等,很容易让人迷失。嗯,就像情人眼里出西施一样,“好酒”因人而异。虽然品尝各种不同的葡萄酒是了解你喜欢什么的最好方法,但我…

springboot开启HTTPS

目录 一、前言 HTTP和HTTPS的含义以及区别 二、域名映射 三、添加SSL证书 四、Http转Https 五、内网穿透 一、前言 我们平常写完一个接口,其访问一般都是使用http协议 我们最终想要的结果是使用安全的HTTPS来访问 在我们开始实现之前,我们要先搞明…

前端——框架——Vue

提示: 本文只是从宏观角度简要地梳理一遍vue3,不至于说学得乱七八糟、一头雾水、不知南北,如果要上手写代码、撸细节,可以根据文中的关键词去查找资料 简问简答: vue.js是指vue3还是vue2? Vue.js通常指的是…

软件测试|sqlalchemy relationship

简介 SQLAlchemy是一个流行的Python ORM(对象关系映射)库,它允许我们以面向对象的方式管理数据库。在SQLAlchemy中,relationship是一个重要的功能,用于建立表之间的关系。在本文中,我们将详细探讨relation…

阿里云国外云服务器地域、收费标准及活动报价2024新版

阿里云国外服务器优惠活动「全球云服务器精选特惠」,国外服务器租用价格24元一个月起,免备案适合搭建网站,部署独立站等业务场景,阿里云服务器网aliyunfuwuqi.com分享阿里云国外服务器优惠活动: 全球云服务器精选特惠…

【前后端的那些事】评论功能实现

文章目录 聊天模块1. 数据库表2. 后端初始化2.1 controller2.2 service2.3 dao2.4 mapper 3. 前端初始化3.1 路由创建3.2 目录创建3.3 tailwindCSS安装 4. tailwindUI5. 前端代码编写 前言:最近写项目,发现了一些很有意思的功能,想写文章&…

Power Designer 连接 PostgreSQL 逆向工程生成pd表结构操作步骤以及过程中出现的问题解决

一、使用PowerDesigner16.5 链接pg数据库 1.1、启动PD.选择Create Model…。 1.2、选择Model types / Physical Data Model Physical Diagram:选择pgsql直接【ok】 1.3、选择connect 在工具栏选择Database-Connect… 快捷键:ctrlshiftN.如下图&#xff…

第八站:C++面向对象(继承和派生)

继承和派生 派生:由父类派生出子类 继承:子类继承父类(继承不会继承析构函数和构造函数:父类的所有成员函数,以及数据成员,都会被子类继承!) "子类派生出的类"会指向"父类被继承的类",父类就是基类 实例1: 先创建一个父…

Flask框架小程序后端分离开发学习笔记《2》构建基础的HTTP服务器

Flask框架小程序后端分离开发学习笔记《2》构建基础的HTTP服务器 Flask是使用python的后端,由于小程序需要后端开发,遂学习一下后端开发。本节提供一个构建简单的本地服务器的代码,仔细看注释,学习每一步的流程,理解服…

react-app框架——使用monaco editor实现online编辑html代码编辑器

文章目录 ⭐前言💖react系列文章 ⭐配置monaco-editor💖引入react-monaco-editor💖引入react-app-rewired💖通过config-overrides.js添加monaco插件配置 ⭐编辑代码的react页面配置💖扩展 可自定义配置语言 ⭐效果⭐总…

使用 mybatis-plus 的mybaits的一对多时, total和record的不匹配问题

应该是框架的问题,去官方仓库提了个issues,等回复 https://github.com/baomidou/mybatis-plus/issues/5923 背景 发现 record是两条,但是total显示3 使用resultMap一对多时,三条数据会变成两条,但是total确是3条 下…

新能源汽车智慧充电桩方案:基于视频监控的可视化智能监管平台

一、方案概述 TSINGSEE青犀&触角云新能源汽车智慧充电桩方案围绕互联网、物联网、车联网、人工智能、视频技术、大数据、4G/5G等技术,结合云计算、移动支付等,实现充电停车一体化、充电桩与站点管理等功能,达到充电设备与站点的有效监控…

[足式机器人]Part2 Dr. CAN学习笔记-Ch04 Advanced控制理论

本文仅供学习使用 本文参考: B站:DR_CAN Dr. CAN学习笔记 - Ch04 Advanced控制理论 1. 绪论2. 状态空间表达State-Space Representation3. Phase Portrait相图,相轨迹3 1. 1-D3 2. 2-D3 3. General Form3 4. Summary3.5. 爱情中的数学-Phase …

Luckysheet类似excel的在线表格(vue)

参考文档&#xff1a;快速上手 | Luckysheet文档 一、引入 在vue项目的public文件夹下的index.html的<head>标签里面引入 <link relstylesheet hrefhttps://cdn.jsdelivr.net/npm/luckysheetlatest/dist/plugins/css/pluginsCss.css /><link relstylesheet hre…

使用 Neo4j 和 LangChain 集成非结构化知识图增强 QA

目前基于大模型的信息检索有两种方法&#xff0c;一种是基于微调的方法&#xff0c;一种是基于 RAG 的方法。 信息检索和知识提取是一个不断发展的领域&#xff0c;随着大型语言模型&#xff08;LLM&#xff09;和知识图的出现&#xff0c;这一领域发生了显着的变化&#xff0…

河南选调生报名照片上传成功,不能大于50kb

河南选调生报名照片要求&#xff1a; 1、上传近期正面免冠证件照 2、照片背景&#xff1a;要求为蓝底 3、照片格式&#xff1a;jpg格式 4、照片宽高比例约为1.3:1.6&#xff0c;大小为130160像素 5、照片大小&#xff1a;50kb以下&#xff0c;最终效果以输出后的大小为准

pytest学习和使用-pytest如何进行分布式测试?(pytest-xdist)

1 什么是分布式测试&#xff1f; 在进行本文之前&#xff0c;先了解些基础知识&#xff0c;什么是分布式测试&#xff1f;分布式测试&#xff1a;是指通过局域网和Internet&#xff0c;把分布于不同地点、独立完成特定功能的测试计算机连接起来&#xff0c;以达到测试资源共享…

SpringMVC JSON数据处理见解6

6.JSON数据处理 6.1.添加json依赖 springmvc 默认使用jackson作为json类库,不需要修改applicationContext-servlet.xml任何配置&#xff0c;只需引入以下类库springmvc就可以处理json数据&#xff1a; <!--spring-json依赖--> <dependency><groupId>com.f…

群晖搭建LDAP服务器实现一个账号登录DSM、Gitea、jellyfin

文章目录 前言安装LDAP Server新建群组新增用户 DSM加入LDAPDSM使用LDAP登录 Gitea配置登录取消其登录权限 Jellyfin配置登录 总结 前言 LDAP&#xff08;轻量级目录访问协议&#xff09;是一种用于访问和管理分布式目录服务的协议&#xff0c;它具有以下好处&#xff1a; 集…

flutter开发windows桌面软件,使用Inno Setup打包成安装程序,支持中文

最近使用flutter开发windows桌面软件的时候&#xff0c;想要将软件打包成安装程序&#xff0c;使用了flutter官方推荐的msix打包&#xff0c;但是打包出来的软件生成的桌面快捷方式有蓝色背景&#xff1a; 这个蓝色背景应该是没有设置为动态导致的&#xff0c;windows系统的屏幕…