Kafka 多线程开发消费者实例

news2025/4/2 1:27:41

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

Kafka Java Consumer 设计原理

在开始探究之前,我先简单阐述下 Kafka Java Consumer 为什么采用单线程的设计。了解了这一点,对我们后面制定多线程方案大有裨益。

谈到 Java Consumer API,最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程

所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

不过,虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。因此,在消费消息的这个层面上,我们依然可以安全地认为 KafkaConsumer 是单线程的设计。

其实,在社区推出 Java Consumer API 之前,Kafka 中存在着一组统称为 Scala Consumer 的 API。这组 API,或者说这个 Consumer,也被称为老版本 Consumer,目前在新版的 Kafka 代码中已经被完全移除了。

我之所以重提旧事,是想告诉你,老版本 Consumer 是多线程的架构,每个 Consumer 实例在内部为所有订阅的主题分区创建对应的消息获取线程,也称 Fetcher 线程。老版本 Consumer 同时也是阻塞式的(blocking),Consumer 实例启动后,内部会创建很多阻塞式的消息获取迭代器。但在很多场景下,Consumer 端是有非阻塞需求的,比如在流处理应用中执行过滤(filter)、连接(join)、分组(group by)等操作时就不能是阻塞式的。基于这个原因,社区为新版本 Consumer 设计了单线程 + 轮询的机制。这种设计能够较好地实现非阻塞式的消息获取。

除此之外,单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

另外,不论使用哪种编程语言,单线程的设计都比较容易实现。相反,并不是所有的编程语言都能够很好地支持多线程。从这一点上来说,单线程设计的 Consumer 更容易移植到其他语言上。毕竟,Kafka 社区想要打造上下游生态的话,肯定是希望出现越来越多的客户端的。

多线程方案

了解了单线程的设计原理之后,我们来具体分析一下 KafkaConsumer 这个类的使用方法,以及如何推演出对应的多线程方案。

首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

当然了,这也不是绝对的。KafkaConsumer 中有个方法是例外的,它就是wakeup(),你可以在其他线程中安全地调用KafkaConsumer.wakeup()来唤醒 Consumer。

鉴于 KafkaConsumer 不是线程安全的事实,我们能够制定两套多线程方案。

  1. 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

总体来说,这两种方案都会创建多个线程,这些线程都会参与到消息的消费过程中,但各自的思路是不一样的。

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。

这两种方案孰优孰劣呢?应该说是各有千秋。我总结了一下这两种方案的优缺点,我们先来看看下面这张表格。


推荐阅读

结合案例深入理解DDD聚合与聚合根

技术架构:作为开发,你真的了解系统吗-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2325302.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux线程池实现

1.线程池实现 全部代码&#xff1a;whb-helloworld/113 1.唤醒线程 一个是唤醒全部线程&#xff0c;一个是唤醒一个线程。 void WakeUpAllThread(){LockGuard lockguard(_mutex);if (_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程&q…

Linux《进程概念(上)》

在之前的Linux学习当中我们已经了解了基本的Linux指令以及基础的开发工具的使用&#xff0c;那么接下来我们就要开始Linux当中一个非常重要的部分的学习——进程&#xff0c;在此进程是我们之后Linux学习的基础&#xff0c;并且通过进程的学习会让我们了解更多的操作系统的相关…

【算法】并查集基础讲解

一、定义 一种树型的数据结构&#xff0c;用于处理一些不相交集合的合并及查询问题。思想是用一个数组表示了整片森林&#xff08;parent&#xff09;&#xff0c;树的根节点唯一标识了一个集合&#xff0c;只要找到了某个元素的的树根&#xff0c;就能确定它在哪个集合里。 …

C++ STL常用算法之常用集合算法

常用集合算法 学习目标: 掌握常用的集合算法 算法简介: set_intersection // 求两个容器的交集 set_union // 求两个容器的并集 set_difference // 求两个容器的差集 set_intersection 功能描述: 求两个容器的交集 函数原型: set_intersection(iterator beg1, iterat…

日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习(3号通知)

日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习&#xff08;3号通知&#xff09; 日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习&#xff08;3号通知&#xff09;

Linux C语言调用第三方库,第三方库如何编译安装

在 Linux 环境下使用 C 语言调用第三方库时&#xff0c;通常需要先对第三方库进行编译和安装。以下为你详细介绍一般的编译安装步骤&#xff0c;并给出不同类型第三方库&#xff08;如使用 Makefile、CMake 构建系统&#xff09;的具体示例。 一般步骤 1. 获取第三方库源码 …

leetcode -编辑距离

为了求解将 word1 转换成 word2 所需的最少操作数&#xff0c;可以使用动态规划。以下是详细的解决方案&#xff1a; ### 方法思路 1. **定义状态** dp[i][j] 表示将 word1 的前 i 个字符转换成 word2 的前 j 个字符所需的最少操作数。 2. **状态转移方程** - 如果 word1[…

字节开源版Manus来袭

字节开源版Manus来袭 项目地址&#xff1a;https://github.com/langmanus/langmanus/blob/main/README_zh.md 在人工智能领域&#xff0c;Manus的出现无疑是一颗重磅炸弹&#xff0c;它凭借强大的通用Agent能力&#xff0c;迅速吸引了全球开发者和AI爱好者的目光。然而&#…

论文阅读笔记——PointVLA: Injecting the 3D World into Vision-Language-Action Models

PointVLA 论文 现有的 VLA 基于 2D 视觉-语言数据表现良好但缺乏 3D 几何先验导致空间推理缺陷。传统方案&#xff1a;1&#xff09;3D->2D 投影&#xff0c;造成几何信息损失&#xff1b;2&#xff09;3D 数据集少。PointVLA 保留原有 VLA&#xff0c;提取点云特征&#xf…

在win11 环境下 新安装 WSL ubuntu + 换国内镜像源 + ssh + 桌面环境 + Pyhton 环境 + vim 设置插件安装

在win11 环境下 新安装 WSL ubuntu ssh gnome 桌面环境 Pyhton 环境 vim 设置插件安装 简单介绍详细流程换国内镜像源安装 ssh 桌面环境python 环境vim 设置插件安装 简单介绍 内容有点长&#xff0c;这里就先简单描述内容了。主要是快速在 Win11 搭建一个 wsl 的 linux 环…

基于springboot课程学习与互动平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 随着我国经济的高速发展与人们生活水平的日益提高&#xff0c;人们对生活质量的追求也多种多样。尤其在人们生活节奏不断加快的当下&#xff0c;人们更趋向于足不出户解决生活上的问题&#xff0c;线上管理系统展现了其蓬勃生命力和广阔的前景。与此同时&#xff0c;在此…

通俗易懂的大模型原理

十分钟揭秘DeepSeek原理&#xff0c;通俗易懂的大语言模型科普&#xff01;_哔哩哔哩_bilibili 最基础原理&#xff0c;x是输入&#xff0c;y是输出。上百万和上百亿的参数 将一句话转化为数字向量 一句话就是向量矩阵 输入矩阵和参数矩阵进行计算得出输出矩阵&#xff0c;因为…

热门索尼S-Log3电影感氛围旅拍LUTS调色预设 Christian Mate Grab - Sony S-Log3 Cinematic LUTs

热门索尼S-Log3电影感氛围旅拍LUTS调色预设 Christian Mate Grab – Sony S-Log3 Cinematic LUTs 我们最好的 Film Look S-Log3 LUT 的集合&#xff0c;适用于索尼无反光镜相机。无论您是在户外、室内、风景还是旅行电影中拍摄&#xff0c;这些 LUT 都经过优化&#xff0c;可为…

【jQuery】插件

目录 一、 jQuery插件 1. 瀑布流插件&#xff1a; jQuery 之家 http://www.htmleaf.com/ 2. 图片懒加载&#xff1a; jQuery 插件库 http://www.jq22.com/ 3. 全屏滚动 总结不易~ 本章节对我有很大收获&#xff0c;希望对你也是~~~ 一、 jQuery插件 jQuery 功能…

MATLAB导入Excel数据

假如Excel中存在三列数据需要导入Matlab中。 保证该Excel文件与Matlab程序在同一目录下。 function [time, voltage, current] test(filename)% 读取Excel文件并提取时间、电压、电流数据% 输入参数:% filename: Excel文件名&#xff08;需包含路径&#xff0c;如C:\data\…

孤码长征:破译PCL自定义点云注册机制源码迷局——踩坑实录与架构解构

在之前一个博客《一文搞懂PCL中自定义点云类型的构建与函数使用》中&#xff0c;清晰地介绍了在PCL中点云的定义与注册方法。我的一个读者很好奇其内部注册的原理以及机制&#xff0c;再加上最近工作中跟猛男开发自定义点云存储的工作&#xff0c;借着这些需求&#xff0c;我也…

Centos 7 搭建 jumpserver 堡垒机

jumpserver 的介绍 1、JumpServer 是完全开源的堡垒机, 使用 GNU GPL v2.0 开源协议, 是符合4A 的专业运维审计系统 1)身份验证 / Authentication 2)授权控制 / Authorization 3)账号管理 / Accounting 4)安全审计 / Auditing 2、JumpServer 使用 Python / Django 进行开…

封装了一个优雅的iOS全屏侧滑返回工具

思路 添加一个全屏返回手势&#xff0c;UIPangesturerecognizer, 1 手势开始 在手势开始响应的时候&#xff0c;将navigationController的delegate代理设置为工具类&#xff0c;在工具类中执行代理方法&#xff0c;- (nullable id )navigationController:(UINavigationControll…

HCIP-6 DHCP

HCIP-6 DHCP DHCP&#xff08;Dynamic Host Configuration Protocol&#xff0c;动态主机配置协议&#xff09; 手工配置网络参数存在的问题 灵活性差 容易出错 IP地址资源利用率低 工作量大 人员素质要求高 DHCP服务器按照如下次序为客户端选择IP地址: ①DHCP服务器的数…

opencv图像处理之指纹验证

一、简介 在当今数字化时代&#xff0c;生物识别技术作为一种安全、便捷的身份验证方式&#xff0c;正广泛应用于各个领域。指纹识别作为生物识别技术中的佼佼者&#xff0c;因其独特性和稳定性&#xff0c;成为了众多应用场景的首选。今天&#xff0c;我们就来深入探讨如何利…