kafka消费者多线程开发

news2024/9/28 5:32:09

目录

前言

kafka consumer 设计原理

多线程的方案 

参考资料


前言

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

kafka consumer 设计原理

 从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

 所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

 单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

多线程的方案 

 我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

 2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。 

这两种方案的比较如下:

实现代码示例如下:

方案一的代码:

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
      ConsumerRecords records = 
        consumer.poll(Duration.ofMillis(10000));
                 //  执行消息处理逻辑
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构

方案2 的代码:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
  workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<>(1000), 
  new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
  ConsumerRecords<String, String> records = 
    consumer.poll(Duration.ofSeconds(1));
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
..

参考资料

20 | 多线程开发消费者实例-极客时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1026133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java框架-Spring-容器创建过程

java框架-Spring-容器创建源码

pip pip3安装库时都指向python2的库

当在python3的环境下使用pip3安装库时&#xff0c;发现居然都指向了python2的库 pip -V pip3 -V安装命令更改为&#xff1a; python3 -m pip install <package>

CCC数字钥匙设计【BLE】--URSK管理

1、URSK创建流程 URSK的英文全称为&#xff1a;UWB Ranging Secret Key&#xff0c;即UWB安全测距密钥。 在车主配对时会生成URSK&#xff0c;且在车主配对期间&#xff0c;车辆不得尝试生成第二个URSK。 URSK示例: ed07a80d2beb00f785af2627c96ae7c118504243cb2c3226b3679da…

面向面试知识--MySQL数据库与索引

面向面试知识–MySQL数据库与索引 优化难点与面试点 什么是MySQL索引&#xff1f; 索引的MySQL官方定义&#xff1a;索引是帮助MySQL快速获取数据的数据结构。 动力节点原文&#xff1a; MysQL官方对于索引的定义:索引是帮助MySQL高效获取数据的数据结构。 MysQL在存储数据之…

问题usr/bin/env: “python‘: Too many levels of symbolic links太多层链接的bug pycharm

问题描述 解决&#xff1a;建议不要用过去的conda环境了&#xff0c;直接新建一个环境&#xff0c;然后在图片这个步骤的时候务必选择现有的解释器 。&#xff08;产生问题的原因可能就是新建的解释器太多了&#xff09;

Mermaid画流程图可以实现从一条线中间引出另外一条线吗

这张图中开始和操作1之间引出的一条线要怎么表示啊&#xff01;&#xff01;&#xff01; Mermaid是不能实现这样的画法的吗&#xff1f;可是为什么老师就可以画出来&#xff1f;&#xff1f;&#xff1f; 求大佬指教&#xff01;&#xff01;&#xff01;&#xff01;

现场总线学习

文章目录 1.现场总线现状2.数据编码2.1 数字数据的数字编码2.2 数字数据的模拟编码 3.通信方式&#xff01;&#xff01;&#xff01;4.局域网及其拓扑结构5.工业总线协议6.为什么要在can协议的控制器和bus总线之间&#xff0c;连接一个can收发器&#xff1f;7.那其他协议也需要…

vue修改node_modules打补丁步骤和注意事项

当我们使用 npm 上的第三方依赖包&#xff0c;如果发现 bug 时&#xff0c;怎么办呢&#xff1f; 想想我们在使用第三方依赖包时如果遇到了bug&#xff0c;通常解决的方式都是绕过这个问题&#xff0c;使用其他方式解决&#xff0c;较为麻烦。或者给作者提个issue&#xff0c;然…

dev board sig技术文章:轻量系统适配ARM架构芯片平台

摘要&#xff1a;本文简单介绍OpenHarmony轻量系统移植&#xff0c;会分多篇 适合群体&#xff1a;想自己动手移植OpenHarmony轻量系统的朋友 开始尝试讲解一下系统的移植&#xff0c;主要是轻量系统&#xff0c;也可能会顺便讲下L1移植。 1.1移植类型 OpenHarmony轻量系统的…

腾讯云服务器收费价格表(腾讯云服务器租用价格表)

作为国内领先的云计算服务提供商&#xff0c;腾讯云凭借其稳定、安全、高效的特点&#xff0c;备受用户青睐。本文将详细介绍腾讯云服务器的收费价格表及使用场景&#xff0c;帮助大家更好地了解并选择合适的云服务器方案。 一、轻量应用服务器 轻量应用服务器是一款开箱即用的…

数字人民币如何将支付宝钱包余额转入到微信支付钱包余额?

数字人民币如何将支付宝钱包余额转入到微信支付钱包余额&#xff1f; 第一步&#xff1a;获取微信支付数字人民币钱包编号 1.1、手机上找到并打开数字人民币APP&#xff1b; 1.2、打开后找到微众银行&#xff08;微信支付&#xff09;微信钱包&#xff0c;并点击翻转获取收款…

攻防世界-WEB-fileinclude

访问url&#xff0c;可以看到一些提示&#xff0c;绝对路径/var/www/html/index.php&#xff0c;也提示了flag在flag.php中。 快捷键Ctrlu,查看网页源代码 思路&#xff1a; 源代码中看到 include($lan.".php"); &#xff0c;可知此处存在文件包含。$lan的值是从co…

虚拟线程最佳实践

Virtual Threads: An Adoption Guide 虚拟线程&#xff1a;采用指南 接上篇 Virtual Threads 虚拟线程 原文&#xff1a;https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html 虚拟线程是由 Java 运行时而不是操作系统实现的 Java 线程。虚拟线程和传统线程&…

Datax从mysql同步数据到HDFS

在实际使用Datax的时候&#xff0c;比较常用的是同步业务数据&#xff08;mysql中的数据&#xff09;到HDFS来实现数仓的创建&#xff0c;那么怎么实现呢&#xff1f;我们一步步来实现&#xff08;基于Datax 3.0.0&#xff09; 1、检查环境&#xff0c;需要安装完一个Datax&am…

网络基础面试题

1. ISO/OSI的七层模型 ISO国际标准化组织 OSI开放系统互连 TCP和UDP都会进行差错校验&#xff0c;TCP会告诉A包发错了&#xff0c;但UDP不会告诉A发错了会把包丢弃。 静态路由不需要路由器做任何的计算&#xff0c;对路由器的消耗是最小的&#xff0c;效率最高但是缺点是…

Qt开发 - Qt基础类型

1.基础类型 因为Qt是一个C 框架, 因此C中所有的语法和数据类型在Qt中都是被支持的, 但是Qt中也定义了一些属于自己的数据类型, 下边给大家介绍一下这些基础的数类型。 QT基本数据类型定义在#include <QtGlobal> 中&#xff0c;QT基本数据类型有&#xff1a; 虽然在Qt中…

“智能制造进园区·浙江站和专家行”活动成功举办

为进一步加强央地联动&#xff0c;强化智能制造系统推进格局&#xff0c;促进重点区域行业智能制造供需对接&#xff0c;2023年9月12日-15日&#xff0c;在工业和信息化部装备工业一司指导下&#xff0c;由国家智能制造专家委员会、浙江省经济和信息化厅、智能制造系统解决方案…

专访中欧财富伍春兰:财富管理行业数字化转型升级,数据库如何选型?

以下文章来源于InfoQ数字化经纬。 InfoQ数字化经纬&#xff1a; InfoQ极客传媒旗下官方账号。面向数字化管理者、从业者、洞察者&#xff0c;提供数字化企业案例、政策解读、研究报告&#xff0c;做数字时代的「记录者」。 作者 | 赵钰莹 嘉宾 | 伍春兰 中欧财富技术总监 …

如何在微信上制作自己的小程序卖东西

在当今的数字化时代&#xff0c;微信小程序已成为电商行业的重要平台。本文将详细解析电商微信小程序的制作流程&#xff0c;帮助你了解从零到上线的过程。 一、前期准备 1. 确定商城定位和目标群体&#xff1a;在制作电商微信小程序前&#xff0c;你需要明确商城的定位&#x…

解决连接数据库提示:Public Key Retrieval is not allowed

最近在使用新的用户连接mysql时&#xff0c;总是提示&#xff1a;Public Key Retrieval is not allowed 解决方法一&#xff1a;在&#xff08;连接属性&#xff09;添加allowPublicKeyRetrievaltrue 解决方法二&#xff08;不建议&#xff09;&#xff1a;先在cmd上登录