kafka多线程消费

news2024/11/16 5:47:27

Kafka consumer

多线程消费

kafka 消费者对象 - KafkaConsumer是非线程安全的。与KafkaProducer不同,KafkaProducer是线程安全的,因为开发者可以在多个线程中放心地使用同一个KafkaProducer实例。

但是对于消费者而言,由于它是非线程安全的,因此用户无法直接在多个线程中直接共享同一个KafkaConsumer实例。对应kafka 多线程消费给出两种解决方案:

  • 每个线程维护一个KafkaConsumer,每个KafkaConsumer消费一个topic分区

    在这里插入图片描述

  • 单个KafkaConsumer实例统一拉取数据,交给多个worker线程进行处理

    在这里插入图片描述

多Consumer

  • 程序代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MultiThreadConsumer {

    public static void main(String[] args) {
        String brokers = "localhost:9092";
        String topic   = "topic_t40";
        String groupId = "app_q";
        int consumers  = 2;

        for(int i = 0;i < consumers;i++){
            final ConsumerRunnable consumer = new ConsumerRunnable(brokers, groupId, topic, "thread" + i);
            new Thread(consumer).start();
        }
    }

    static class ConsumerRunnable implements Runnable{

        private final KafkaConsumer<String,String> consumer;

        private volatile boolean isRunning = true;

        private String threadName ;

        public ConsumerRunnable(String brokers,String groupId,String topic,String threadName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));

            this.threadName = threadName;
        }

        @Override
        public void run() {
            try {
                while (isRunning) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    records.forEach(record -> {
                        System.out.println(this.threadName + " : Message received " + record.value() + ", partition " + record.partition());
                    });
                }
            }finally {
                consumer.close();
            }
        }
    }
}
  • 测试结果

在这里插入图片描述

多Work线程

  • 程序代码

    public class WorkerConsumer {
    
        private static ExecutorService executor = Executors.newFixedThreadPool(100);
    
        public static void main(String[] args){
    
            String topicName = "topic_t40";
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_w");
            props.put("client.id", "client_02");
            props.put("enable.auto.commit", true);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    if(!records.isEmpty()){
                        executor.execute(new MessageHandler(records));
                    }
                }
            }finally {
                consumer.close();
            }
        }
    
        static class MessageHandler implements Runnable{
    
            private ConsumerRecords<String, String> records;
    
            public MessageHandler(ConsumerRecords<String, String> records) {
                this.records = records;
            }
    
            @Override
            public void run() {
                records.forEach(record -> {
                    System.out.println(" 开始处理消息: " + record.value() + ", partition " + record.partition());
                });
            }
        }
    }
    
  • 测试结果

在这里插入图片描述

方法对比

多Consumer

  • 优势 - 实现简单;速度较快 无线程切换,方便位移管理,易于维护分区间消息消费顺序
  • 缺点 - socket连接大;consumer的数量受限于topic的分区数,扩展性差;

多Work线程

  • 优势 - 消息获取与消息处理解耦;可独立扩展消费者数量和工作线程数量,伸缩性好
  • 缺点 - 难以维护分区消息处理的有序性,位移管理困难

__consumer_offsets

之前提到过,消费者通过拉取模式从broker中拉取数据,每次消费成功后,消费者记录自身消费位移,当服务重启后,默认从最后的位移位置开始拉取最新的数据。那么消费者是如何记录自身的位移的呢?

__consumer_offset是kafka自行创建的一个内部topic,因此开发者不可以删除该topic,它的目的是存储Kafka 消费者的偏移量。consumer_offset是管理所有消费者的偏移量的一个主题。

# 查看kafka配置文件日志路径
more config/server.properties | grep log.dirs
log.dirs=/tmp/node0/kafka-logs

在这里插入图片描述

在kafka的日志目录中,可以看到由**__consumer_offsets**开头的带数字序号的50个文件夹,表示该topic有50个分区。进入任意文件夹,发现他跟正常的topic文件差不多,里面至少包含了2个index索引文件,一个日志文件

ls -ll /tmp/node0/kafka-logs/__consumer_offsets-1
total 16
-rw-r--r--  1 andy  wheel  10485760 12 27 21:07 00000000000000000000.index
-rw-r--r--  1 andy  wheel         0 12 26 19:49 00000000000000000000.log
-rw-r--r--  1 andy  wheel  10485756 12 27 21:07 00000000000000000000.timeindex
-rw-r--r--  1 andy  wheel         8 12 27 21:07 leader-epoch-checkpoint
-rw-r--r--  1 andy  wheel        43 12 26 19:49 partition.metadata00000000000000000000.log	00000000000000000000.timeindex	leader-epoch-checkpoint		partition.metadata

当多个consumer 或 consumer group需要同时提交位移信息时,kafka会根据每个消费者的group.id 做hash取模运算,从而将位移数据负载到不同的分区上。

订阅主题

kafka 消费者处理支持常规的topic列表进行订阅之外,还支持基于正则表达式订阅topic,代码实现分别如下:

  • Topic列表订阅
consumer.subscribe(Arrays.asList("hello","world"));
  • 基于正则表达是订阅
consumer.subscribe(Pattern.compile("topic_*"));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/126106.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FreeFileSync 11.29 发布

导读FreeFileSync 是一款开源软件&#xff0c;适用于 Windows、macOS 和 Linux。FreeFileSync 本质是一个用于文件夹对比和同步的软件&#xff0c;它可以创建和管理所有重要文件的备份副本。FreeFileSync 不是每次都复制每个文件&#xff0c;而是确定源文件夹和目标文件夹之间的…

使用 ClusterResourceSet 为 Cluster API 集群自动安装 CNI 插件

1 什么是 Cluster API Cluster API[1] 是一个 Kubernetes 子项目&#xff0c;它将声明式、Kubernetes 风格的 API 引入到集群的创建、配置和管理中。Cluster API 支持在 AWS, Azure, GCP, vSphere, KubeVirt 等多种环境中创建和管理 Kuberenetes 集群&#xff0c;并负责提供部…

单例(Singleton)设计模式

一、单例(Singleton)设计模式说明 设计模式是在大量的实践中总结和理论化之后优选的代码结构、编程风格、以及解决问题的思考方式。设计模式免去我们自己再思考和摸索。就像是经典的棋谱&#xff0c;不同的棋局&#xff0c;我们用不同的棋谱&#xff0c;"套路"所谓类…

WebView以及使用HTTP访问访问网络

文章目录使用网络技术WebView的用法使用HTTP访问网络使用HttpURLConnection使用OkHttp使用网络技术 在Android开发当中,我们应该合理的使用网络编写出更加出色的应用程序,下面学习以下如何在手机端使用HTTP和服务器进行网络交互,并对服务器返回的数据进行解析,这也是在Android…

爽啊,这么多有趣好玩强大的 Python 库

Python语言简洁、易读以及可扩展&#xff0c;在国内外用 Python 做研究的非常多。 Python 语言向来以丰富的第三方库而闻名。这么多有趣好玩且强大&#xff0c;靠一个人去寻找太难了。 最近粉丝群小伙伴们又罗列了一些&#xff0c;分享给大家。喜欢记得点个赞&#xff0c;加入…

OpenHarmony#深入浅出学习eTs#(三)UI布局

本项目Gitee仓地址&#xff1a;深入浅出eTs学习: 带大家深入浅出学习eTs (gitee.com) 一、ArkUI介绍 框架介绍 方舟开发框架&#xff08;简称&#xff1a;ArkUI&#xff09;&#xff0c;是一套UI开发框架&#xff0c;提供开发者进行应用UI开发时所必需的能力。 基本概念 组…

力扣sql入门篇(二)

力扣sql入门篇(二) 1 计算特殊奖金 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 1.2 示例sql语句 SELECT employee_id, case when employee_id%21 AND name not like "M%" then salary else 0 end bonus FROM Employees ORDER BY employee_id;1.3 运行…

【软件测试】测试人的一份“漂亮“的年终总结报告......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 不管这一年&#xf…

举一反三-自建zabbix监控mysql

之前写过2篇zabbix监控redis的文章。 本篇针对的是mysql。除了描述如何创建mysql监控的步骤,本篇另一个目的是描述创建任意一个监控对象的基本原理,未来面对其它监控对象时,可以举一反三。 zabbix监控的最基本的部件时zabbix server和zabbix agent. zabbix server负责汇总…

c++primer 第4章 表达式

文章目录第4章 表达式4.1 基础4.1.1 基础概念4.1.2 优先级与结合律4.1.3 求值顺序4.2 算术运算符4.3 逻辑和关系运算符4.4 赋值运算符4.5 递增和递减运算符4.6 成员访问运算符4.7 条件运算符4.8 位运算符4.9 sizeof运算符4.10 逗号运算符4.11 类型转换4.11.1 算术转换4.11.2 其…

ES6-ES11笔记(1)

关于这个视频的笔记 (https://www.bilibili.com/video/BV1uK411H7on?p29&vd_source3cf72bb393b8cc11b96c6d4bfbcbd890) 1.ES6 1.1let的一些注意点 let a; let b,c,d; let e 100; let f"你好",g101;// 变量名不能重复声明 // let testDepulicate 123456 // …

无信息变量消除法研究及实现(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 波长变量筛选的方法主要有相关系数法&#xff0c;逐步回归法&#xff0c;无信息变量消除法(UVE)&#xff0c;遗传算法(genetic …

python圣诞树词云

一、前言 圣诞节虽然是西方节日&#xff0c;但是个人还是比较喜欢的&#xff08;没有崇洋媚外的意思&#xff0c;中国的春节也超级棒&#xff09;&#xff0c;一个是圣诞节的氛围&#xff0c;圣诞节的圣诞老人等象征、雪花麋鹿等元素&#xff0c;都充满了浪漫的氛围。我想这也是…

Linux的文件系统编程(1)

What makes the desert beautiful is that somewhere it hides a well. 沙漠之所以美丽,是因为在它的某个角落隐藏着一口井. Linux的文件系统编程&#xff08;1&#xff09;运行过程框架标准IO和文件IO标准IO文件IO(主要学)open函数两个参数三个参数close函数read函数write函数…

Python基础语法(二)

Python基础语法&#xff08;二&#xff09; 函数 编程中的函数和数学中的函数有一定的相似之处. 数学上的函数, 比如 y sin x , x 取不同的值, y 就会得到不同的结果. 编程中的函数, 是一段 可以被重复使用的代码片段 . 代码示例: 求数列的和, 不使用函数 # 1. 求 1 - 100 …

树Tree【代码笔记】

树【Tree】 树是n&#xff08;n>0&#xff09;个结点的有限集。当n 0时&#xff0c;称为空树。在任意一棵非空树中应满足&#xff1a; 有且仅有一个特定的称为根的结点。当n>1时&#xff0c;其余节点可分为m&#xff08;m>0&#xff09;个互不相交的有限集T1,T2,……

OpenHarmony#深入浅出学习eTs#(六)编写eTs第一个控件

本项目Gitee仓地址&#xff1a;深入浅出eTs学习: 带大家深入浅出学习eTs (gitee.com) 一、控件基本属性 在使用第一个控件前&#xff0c;我们需要了解一些控件都有哪些基础属性&#xff0c;比如说我们在Super Visual中使用过的长宽和字体大小等等&#xff0c;通用属性有以下这…

Retrofit的使用

文章目录Retrofit的使用最好用的网络库: RetrofitRetrofit的基本用法处理复杂接口的地址类型Retrofit构建器的最佳写法Retrofit的使用 最好用的网络库: Retrofit Retrofit是一款由Square公司开发的网络库,但是它和OkHttp定位完全不同,OkHttp的侧重点是底层通信的实现,而Retro…

Java集合类——LinkedList(单链表及双链表)

一&#xff0c;ArrayList的缺陷 1.空间浪费 在之前的博客中&#xff0c;我利用源码详细的讲解了ArrayList这个集合类&#xff08;尤其是扩容机制&#xff09;&#xff0c;可以知道ArrayList的底层主要是一个动态的可变数组&#xff0c;容量满的时候需要进行1.5倍扩容。但是我…

第二十讲:神州路由器静态路由的配置

实验拓扑图如下所示 设备 端口 IP 子网掩码 网关 Router-A G0/0 120.83.200.55 255.255.255.0 无 G0/3 192.168.0.1 255.255.255.0 无 Router-B G0/0 120.83.200.56 255.255.255.0 无 G0/3 192.168.1.1 255.255.255.0 无 PC1 192.168.0.2 255.255.255…