【消息队列】聊一下Kafka多线程消费实例

news2025/1/11 2:43:38

Kafka Java Consumer设计原理

目前市面上大多数计算机都采用多核CPU来提升系统的处理性能,但是如果在程序开发层面使用单线程的话,那么必定不能完全发挥出系统的真实性能,而kafka Consumer就是单线程的。而这个只是针对于消费消息这个层面来说。
内部包含的是用户主线程和心跳线程,用户主线程说白了就是消费consumer的main线程,而心跳线程是在Consumer API中,会自动和Broker进行心跳检测的线程,两个线程指责不同,进行拆分出来也是非常合理的。可以解耦真实的消息处理和心跳检测管理机制
为什么Kafka不涉及成支持并发Consumer
从语言层面来说,并不是每个语言都可以很好的支持并发机制。并且从Kakfa推广层面来说,想要更好的打造上下游生态,那么必须要具备较好的移植性。

多线程方案

KakfaConsumer不是线程安全的。
1.消费者启动多个线程,每个线程维护专属的KakfaConsumer实例,进行消息的获取、消费流程
在这里插入图片描述
2.消费者使用单或者多线程获取消息,然后启动多个线程消费消息。获取消息可以是单个线程,或者是多个线程,但是每个线程都有专属的KafkaConsumer,将消息的获取和消息处理进行解耦合。
在这里插入图片描述
好了,我们来对比一个上述的两个方案。
如果说我们获取消息和处理消息分为1,2,3,4个过程。方案1会为创建多个线程,每个线程完整的执行完1,2,3,4过程,整个任务并不会被进行拆分,而方案2,用单线程处理1,2,然后多个线程处理3,4两个过程。
在这里插入图片描述

Code

方案1

   public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
 
 
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
			ConsumerRecords records = 
				consumer.poll(Duration.ofMillis(10000));
                 //  执行消息处理逻辑
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
 
 
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }

方案2

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
 
 
private int workerNum = ...;
executors = new ThreadPoolExecutor(
	workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
	new ArrayBlockingQueue<>(1000), 
	new ThreadPoolExecutor.CallerRunsPolicy());
 
 
...
while (true)  {
	ConsumerRecords<String, String> records = 
		consumer.poll(Duration.ofSeconds(1));
	for (final ConsumerRecord record : records) {
		executors.submit(new Worker(record));
	}
}
..

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431747.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AI热点技术】ChatGPT开源替代品——LLaMA系列之「羊驼家族」

ChatGPT开源替代品——LLaMA系列之「羊驼家族」 1. Alpaca2. Vicuna3. Koala4. ChatLLaMA5. FreedomGPT6. ColossalChat完整的 ChatGPT 克隆解决方案中英双语训练数据集完整的RLHF管线 相关链接 现在如果问什么最火&#xff0c;很多人第一反应肯定就是ChatGPT。的确&#xff0c…

Redis集群模式下使用config set 命令所有节点都会生效吗?

Redis集群模式下使用config set 命令所有节点都会生效吗? 问题: Redis集群模式下使用config set 命令所有节点都会生效吗? 实践检验真理: 前置准备 Redis版本:5.0.5版本 Redis集群模式:三主三从 操作步骤: 分别连接7001节点与7002节点,准备在7001节点使用”config get”…

交友项目【查询好友动态,查询推荐动态】实现

目录 1&#xff1a;圈子 1.1&#xff1a;查询好友动态 1.1.1&#xff1a;接口分析 1.1.2&#xff1a;流程分析 1.1.2&#xff1a;代码实现 1.2&#xff1a;查询推荐动态 1.2.1&#xff1a;接口分析 1.2.2&#xff1a;流程分析 1.2.3&#xff1a;代码实现 1&#xff1a…

Python教程:如何用PIL将图片转换为ASCII艺术

Python教程&#xff1a;如何用PIL将图片转换为ASCII艺术 ASCII 艺术是一种将图像转换为由字符组成的艺术形式。Python 是一种灵活而强大的编程语言&#xff0c;可以使用它来将图片转换为 ASCII 艺术。本文将介绍如何使用 Python 和 PIL 库来实现这一功能。 文末有完整代码 效…

数学建模第一天:数学建模先导课之MATLAB的入门

目录 一、MATLAB的简介 二、Matlab基础知识 1. 变量 ①命名规则 ②特殊变量名 2、数学符号与函数调用 ①符号 ②数学函数 ③自定义函数 三、数组与矩阵 1、数组 ①创建数组 ②访问数组元素 ③数组运算 2、矩阵 ①定义 ②特殊矩阵 ③矩阵运算 四、控制流 …

若依项目springcloud启动

若依项目springcloud启动 参考&#xff1a;http://doc.ruoyi.vip/ruoyi-cloud/document/hjbs.html 1、概述 1.1、学习前提 熟练使用springboot相关技术了解springcloud相关技术电脑配置可以支持 1.2、需要的配置 JDK > 1.8 (推荐1.8版本) Mysql > 5.7.0 (推荐5.7版…

若依数据隔离 ${params.dataScope} 替换 优化为sql 替换

若依数据隔离 ${params.dataScope} 替换 优化为sql 替换 安全问题:有风险的SQL查询&#xff1a;MyBatis解决 若依框架的数据隔离是通过 ${params.dataScope} 实现的 但是在代码安全扫描的时候$ 符会提示有风险的SQL查询&#xff1a;MyBatis 所以我们这里需要进行优化参考: M…

凌恩生物文献分享|IF31.316→一网打尽与婴儿疾病相关的病毒组研究

期刊&#xff1a;Cell Host & Microbe 影响因子&#xff1a;31.316 发表时间&#xff1a;2022年4月 研究团队&#xff1a;清华大学医学院梁冠翔课题组与宾夕法尼亚大学医学院Frederic Bushman课题组 一、研究背景 已知微生物为人类提供营养物质和代谢物&…

AD、PADS、Cadence各有什么优势?

读者中有很大一部分是电子工程师&#xff0c;先想问下大家&#xff1a;你们画PCB常用什么软件&#xff1f; **函第一的AD? 还是最贵Cadence&#xff08;Allegro&#xff09;&#xff1f; 看到有读者在问&#xff1a;AD、PADS、Cadence各有什么优势&#xff1f; 这里就简单分…

一文吃透Java线程池——实现机制篇

前言 本篇博客是《一文吃透Java线程池》系列博客的下半部分。 上半部分链接&#xff1a;一文吃透Java线程池——基础篇 实现机制&#xff08;源码解析&#xff09; 根据前面的学习&#xff0c;我们知道&#xff0c;线程池是如下的运作机制 解析&#xff1a; 一开始&#…

Flutter插件开发-(进阶篇)

一、概述 Flutter也有自己的Dart Packages仓库。插件的开发和复用能够提高开发效率&#xff0c;降低工程的耦合度&#xff0c;像网络请求(http)、用户授权(permission_handler)等客户端开发常用的功能模块&#xff0c;我们只需要引入对应插件就可以为项目快速集成相关能力&…

2023-04-15 学习记录--C/C++-mac vscode配置并运行C/C++

mac vscode配置并运行C/C 一、vscode安装 ⭐️ 去官网下载安装mac版的vscode。 二、vscode配置 ⭐️ &#xff08;一&#xff09;、安装C/C扩展插件及必装好用插件 1、点击左边的 图标(扩展: 商店)&#xff0c;如下图&#xff1a; 2、先安装 C/C、C/CExtension Pack插件&…

大话数据结构-C(2)

二&#xff1a;算法 解决特定问题求解步骤的描述&#xff0c;在计算机中表现为指令的有限序列&#xff0c;并且每条指令表示一个或多个操作。 2.1 算法的特性 算法具有五个基本特性&#xff1a;输入、输出、有穷性、确定性、可行性。 1&#xff09;输入输出&#xff1a; 算法具…

Python --- 文件操作

目录 前言 一、open()函数 1.只读模式 r 2.只写模式 w 3.追加模式 a 二、操作其他文件 1.Python 操作二进制 2.Python 操作 json 文件 三、关闭文件 四、上下文管理器 五、文件指针位置 前言 在实际操作中&#xff0c;通常需要将数据写入到本地文件或者从本地文件中…

南方猛将加盟西方手机完全是臆测,他不会希望落得兔死狗烹的结局

早前南方某科技企业因为命名的问题闹得沸沸扬扬&#xff0c;于是一些业界人士就猜测该猛将会加盟西方手机&#xff0c;对于这种猜测可以嗤之以鼻&#xff0c;从西方手机以往的作风就可以看出来它向来缺乏容纳猛将的气量。一、没有猛将的西方手机迅速沉沦曾几何时&#xff0c;西…

【项目】bxg基于SaaS的餐掌柜项目实战(2023)

基于SaaS的餐掌柜项目实战 餐掌柜是一款基于SaaS思想打造的餐饮系统&#xff0c;采用分布式系统架构进行多服务研发&#xff0c;共包含4个子系统&#xff0c;分别为平台运营端、管家端&#xff08;门店&#xff09;、收银端、小程序端&#xff0c;为餐饮商家打造一站式餐饮服务…

如何用ChatGPT翻译?ChatGPT提升翻译速度,亲测有效

作为翻译新手&#xff0c;你是否为翻译不准确不地道而烦恼&#xff1f; 随着ChatGPT的大火&#xff0c;很多聪明的翻译已经开始使用ChatGPT辅助自己提升翻译能力和速度了。 想用ChatGPT翻译&#xff0c;首先要知道在哪里可以使用ChatGPT&#xff01;在国内选择不用注册不用登录…

python实现批量生成带内容的文件夹

我工作的时候经常遇到这个问题&#xff1a;需要批量生成带内容的文件夹来辅助工作。 我有8种不同名字的文件夹 每个文件夹下面都有以日期命名的文件夹 日期文件夹里面会记录我当天需要记录的东西。 我需要实现的功能是&#xff1a; 1.输入一个天数N&#xff0c;生成N天以前…

机器学习 day05(多元线性回归,向量化)

单个特征&#xff08;变量&#xff09;的线性回归模型 房子的价格仅由房子的大小决定&#xff0c;如图&#xff1a; 多个特征&#xff08;变量&#xff09;的线性回归模型 房子的价格由房子的大小&#xff0c;房子有多少个卧室&#xff0c;房子有几层&#xff0c;房子住了多…

代码随想录|day44|动态规划part06● 完全背包● 518. 零钱兑换 II ● 377. 组合总和 Ⅳ

完全背包 理论基础 视频&#xff1a;带你学透完全背包问题&#xff01; 和 01背包有什么差别&#xff1f;遍历顺序上有什么讲究&#xff1f;_哔哩哔哩_bilibili 链接&#xff1a;代码随想录 //先遍历背包还是先遍历物品是没有影响的。可以和01背包保持一致&#xff0c;都先遍历…