Kafka 多线程消费者

news2025/1/17 3:02:11

Kafka 多线程消费者

  • 多线程方案

Kafka 0.10.1.0 后,Kafka Consumer 变为双线程的设计 :

  • 用户主线程 : 启动 Consumer 的 main
  • 心跳线程 (Heartbeat Thread) : 定期对 Broker 发送心跳请求,探测消费者的存活性 (liveness)
  • 将心跳频率与主线程处理频率分开,对俩者进行解耦

老 Consumer 是多线程的架构 :

  • Fetcher 线程 : 每个 Consumer 给所有订阅的主题分区创建对应的消息获取线程
  • Consumer 是阻塞式的(blocking),Consumer 启动后,内部会创建很多阻塞式的消息获取迭代器

新 Consumer 设计了单线程 + 轮询的机制 , 实现非阻塞式的消息获取

多线程方案

KafkaConsumer 不是线程安全的 (thread-safe)

  • 所有的网络 I/O 处理都在用户主线程中,使用时要确保线程安全
  • 不能在多个线程中共享同个 Kafka Consumer,不然会 ConcurrentModificationException 异常

两套多线程方案 :

  1. 消费者启动多个线程,每个线程维护个 KafkaConsumer ,负责完整的消息获取、消息处理流程

在这里插入图片描述

  1. 消费者用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑

在这里插入图片描述

例子 : 消费者要做 1、2、3、4、5

  • 方法 1 : 工作划分为粗粒度,会创建多个线程,每个线程要执行 1、2、3、4、5,来实现并行处理,不能分割具体的子任务
  • 方法 2 : 更细粒度化,将 1、2 分割,用单线程(也可多线程),将 3、4、5,用别的多个线程
方案优点缺点
多线程 + 多 KafkaConsumer方便实现占更多系统资源
速度块, 无线程间交互开销线程数受限于主题分区数, 扩展性差
易于维护分区内的消费顺序线程处理消息易超时 , 会 Rebalance
单线程 + 单 KafkaConsumer + 消息处理 Worker 线程池可独立扩展消费获取线程数和 Worker 线程数实现难度高
可扩展性好难维护分区的消息消费顺序
处理链路长, 不易于位移提交管理

方案 1 优势 :

  • 实现简单,用多个线程在每个线程中创建各自 KafkaConsumer
  • 多个线程之间没有任何交互,能避开线程安全
  • 每个线程用各自 KafkaConsumer 来执行消息获取和消息处理逻辑,所以主题中的每个分区都能保证只被一个线程处理,容易实现分区内的消息消费顺序

方案 1 缺点 :

  • 每个线程都要维护自己的 KafkaConsumer ,会占用更多的系统资源,如 : 内存、TCP 连接
  • 线程数受限于 Consumer 订阅主题的总分区数 : 一个消费者组中,每个分区只能被组内的一个消费者所消费
  • 当一个消费者组订阅 100 个分区,那最多只能扩展到 100 个线程
  • 每个线程要执行消息获取和消息处理逻辑。一旦消息处理逻辑重,就造成消息处理速度慢,导致 Rebalance,从而引发整个消费者组的消费停滞

方案 2 优势 :

  • 将任务分为消息获取和消息处理的俩个线程
  • 高伸缩性 : 能独立调节消息获取的线程数,消息处理的线程数,不用考虑两者是否相互影响
  • 消费获取速度慢,就增加消费获取的线程数
  • 消息的处理速度慢,就增加 Worker 线程池线程数

方案 2 缺点 :

  • 实现难度大,要管理两组线程
  • 将消息获取和消息处理分开,无法保证分区内的消费顺序。如 : 在某个分区中,消息 1 在消息 2 前被保存,但Worker 线程可能先处理消息 2,再处理消息 1,会破坏消息在分区的顺序
  • 多组线程,让整个消息消费链路被拉长,位移提交困难,可能出现重复消费

方案 1 的代码 :

  • 创建 Runnable 类 : 执行消费获取和消费处理的逻辑
  • 每个 KafkaConsumerRunner 类都会创建各种 KafkaConsumer
public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;
    
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = 
                consumer.poll(Duration.ofMillis(10000));
                //  执行消息处理逻辑
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
		} finally {
            consumer.close();
        }
    }
    
    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

方案 2 :

  • 由线程池负责处理具体的消息
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
//...
private int workerNum = ...;

executors = new ThreadPoolExecutor(
    workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000), 
    new ThreadPoolExecutor.CallerRunsPolicy());

//...
while (true)  {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofSeconds(1));
    
    for (final ConsumerRecord record : records) {
        executors.submit(new Worker(record));
    }
}
//..

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/398525.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MQTT协议-取消订阅和取消订阅确认

MQTT协议-取消订阅和取消订阅确认 客户端向服务器取消订阅 取消订阅的前提是客户端已经通过CONNECT报文连接上服务器&#xff0c;并且订阅了一个主题 UNSUBSCRIBE—取消订阅 取消订阅的报文同样是由固定报头可变报头有效载荷组成 固定报头由两个字节组成&#xff0c;第一个…

2023年,当我们谈论架构时,我们要聊什么

架构是一个非常宽泛的话题&#xff0c;从组织结构上来说&#xff0c;涉及到前端、后端、运维&#xff1b;从软件设计上来说&#xff0c;涉及到需求分析、设计、编码、测试&#xff1b;从物理结构上来说&#xff0c;涉及到CDN、负载均衡、网关、服务器、数据库。当前一些架构方面…

奇淫技巧:阅读源码时基于一组快捷键,让我们知道身在何方!

一个十分蛋疼的问题 在我们阅读框架底层源码的时候&#xff0c;我们往往会一个方法一个方法的往下翻&#xff0c;翻了很久很快就会有这样的灵魂拷问&#xff1a;我从那个类&#xff08;方法&#xff09;来&#xff0c;我要到哪个&#xff08;类&#xff09;方法中去。这个时候…

RK3568平台开发系列讲解(显示篇) DRM显示系统组成分析

🚀返回专栏总目录 文章目录 一、DRM Framebuffer二、CRTC三、Planes四、Encoder五、Connector沉淀、分享、成长,让自己和他人都能有所收获!😄 📢让我们分析一下绿框中的五个部件,以及他们的联动。 一、DRM Framebuffer 与 framebuffer一样,是一片存放图像的内存区域,…

敏捷开发还需要PRD吗

一、PRD有什么用 prd提升与RD或者未来接手人的沟通效率 二、为什么会有PRD 首先来说说为什么会有PRD文档。 1、稍微大一点的团队产品经理未必能向每个人传达产品需求&#xff0c;这就需要有一个文档的形式来向项目的所有成员来传达需求&#xff0c;这就是文档的来源。 2、由…

Python读写mdb文件的实战代码

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。喜欢通过博客创作的方式对所学的知识进行总结与归纳,不仅形成深入且独到的理…

MySQL的分库分表?通俗易懂

1- 为什么要分库分表 如果一个网站业务快速发展&#xff0c;那这个网站流量也会增加&#xff0c;数据的压力也会随之而来&#xff0c;比如电商系统来说双十一大促对订单数据压力很大&#xff0c;Tps十几万并发量&#xff0c;如果传统的架构&#xff08;一主多从&#xff09;&a…

【数据结构】解决顺序表题的基本方法

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a;> 初阶数据结构 &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0…

java 4 (面向对象上)

java——面向对象&#xff08;上&#xff09; 目录java——面向对象&#xff08;上&#xff09;面向对象的思想概述类的成员&#xff08;1-2&#xff09;&#xff1a;属性和方法对象的内存解析类中属性的使用类中方法的使用1.举例&#xff1a;2.声明方法&#xff1a;3.说明4.re…

计算机网络基础知识点【1】

文章目录计算机网络第一章 计算机网络参考模型1.计算机网络为什么需要分层&#xff1f;1.1 分层思想1.2 分层好处2.OSI七层模型2.1 OSI七层模型总结2.2 OSI七层工作原理2.3 数据封装与解封装2.4 计算机网络常用协议3.TCP/IP参考模型3.1 什么是TCP/IP协议3.2 TCP/IP协议族的组成…

扬帆优配|引活水 增活力 促转型 创业板助力实体经济高质量发展

立异就是生产力&#xff0c;企业赖之以强&#xff0c;国家赖之以盛。全面注册制变革持续开释立异生机。日前&#xff0c;创业板公司已开端连续公布2022年度年度报告和2023年第一季度成绩预告&#xff0c;从频频传来的“喜报”中可窥见立异驱动开展战略下新兴工业的强劲开展态势…

jvm之堆上的GC和分代思想解读

堆上的GC JVM在进行GC时&#xff0c;并非每次都对上面三个内存区域一起回收的&#xff0c;大部分时候回收的都是指新生代。 性能调优主要就是减少GC&#xff0c;GC线程执行引发STW会让用户线程停止&#xff0c;阻碍了用户线程的执行&#xff0c;并且majorGC和fullGC阻碍的时间…

内卷把同事逼成了“扫地僧”,把Git上所有面试题整理成足足24W字Java八股文

互联网大厂更多的是看重学历还是技术&#xff1f;毫无疑问&#xff0c;是技术&#xff0c;技术水平相近的情况下&#xff0c;肯定学历高/好的会优先一点&#xff0c;这点大家肯定都理解。说实话&#xff0c;学弟学妹们找工作难&#xff0c;作为面试官招人也难呀&#xff01;&am…

TypeScript深度剖析:Vue项目中应用TypeScript?

一、前言 与link类似 在VUE项目中应用typescript&#xff0c;我们需要引入一个库vue-property-decorator&#xff0c; 其是基于vue-class-component库而来&#xff0c;这个库vue官方推出的一个支持使用class方式来开发vue单文件组件的库 主要的功能如下&#xff1a; metho…

【刷题笔记】之滑动窗口(长度最小的子数组、水果成篮、最小的覆盖子串)

滑动窗口模板//滑动窗口模板&#xff1a;注意使用滑动窗口方法&#xff0c;使用一个 for(while) 循环中的变量是用来控制终止位置的//最小滑窗&#xff1a;给定数组 nums&#xff0c;定义滑动窗口的左右边界 i、j&#xff0c;求满足某个条件的滑窗的最小长度 for(j 0; j < …

华为OD机试题,用 Java 解【寻找身高相近的小朋友】问题

华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 参加华为od机试,一定要注意不…

SpringMVC源码:视图解析器

参考资料&#xff1a; 《SpringMVC源码解析系列》 《SpringMVC源码分析》 《Spring MVC源码》 写在开头&#xff1a;本文为个人学习笔记&#xff0c;内容比较随意&#xff0c;夹杂个人理解&#xff0c;如有错误&#xff0c;欢迎指正。 前文&#xff1a; 《SpringMVC源码&a…

APP线上产品的日志埋点方案

运营运维系列文章 APP线上产品的日志埋点方案 APP日志埋点前言什么是埋点&#xff1f;埋点方案设计事件模型埋点事件上报日志存储平台1. 亚马逊云S32. Kibana博客创建时间&#xff1a;2023.03.08 博客更新时间&#xff1a;2023.03.09 以Android studio build7.0.0&#xff0c…

git stash 暂存减少分支误操作的神器

背景 有时不小心在master或者develop分支上开发了代码&#xff0c;正要提交时才发现自己选错分支了&#xff0c;以前的笨方法是把要提交的代码&#xff0c;一个个记录下来&#xff0c;都保存另外一个文件中去&#xff0c;然后再切换到特性分支中&#xff0c;一个个覆盖到具体的…

问到ThreadLocal,看这一篇就够了|金三银四系列

ThreadLocal 原理和常见问题详解&#xff0c;用来复习准没错&#xff01;点击上方“后端开发技术”&#xff0c;选择“设为星标” &#xff0c;优质资源及时送达ThreadLocal 是什么&#xff1f;ThreadLocal 是线程本地变量。当使用 ThreadLocal 维护变量时&#xff0c;ThreadLo…