kafka消费数据太慢了,给优化下

news2024/11/13 17:48:58

原代码

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        int numConsumers = 5; // 增加消费者的数量
        for (int i = 0; i < numConsumers; i++) {
            new Thread(new KafkaConsumerThread()).start();
        }
    }

    static class KafkaConsumerThread implements Runnable {
        @Override
        public void run() {
            // 配置消费者属性
            Properties props = new Properties();
            props.put("bootstrap.servers", "114.15.78.14:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 调整消费者配置
            props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据
            props.put("fetch.max.wait.ms", "500"); // 最大等待500ms

            // 创建消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

            // 订阅主题
            consumer.subscribe(Collections.singletonList("test-topic"));

            // 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                if (!records.isEmpty()) {
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                    // 批量提交偏移量
                    consumer.commitSync();
                }
            }
        }
    }
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        int numConsumers = 5; // 增加消费者的数量
        for (int i = 0; i < numConsumers; i++) {
            new Thread(new KafkaConsumerThread()).start();
        }
    }

    static class KafkaConsumerThread implements Runnable {
        @Override
        public void run() {
            // 配置消费者属性
            Properties props = new Properties();
            props.put("bootstrap.servers", "114.15.78.14:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 调整消费者配置
            props.put("fetch.min.bytes", "1"); // 减少最小获取字节数
            props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间
            props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数

            // 创建消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

            // 订阅主题
            consumer.subscribe(Collections.singletonList("test-topic"));

            // 消费消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (!records.isEmpty()) {
                    processRecords(records); // 异步处理消息
                    consumer.commitAsync(); // 异步提交偏移量
                }
            }
        }

        private void processRecords(ConsumerRecords<String, String> records) {
            // 异步处理消息的逻辑
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                // 这里可以添加消息处理逻辑,例如使用线程池并行处理
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2239552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

检测敏感词功能

今天策划给我一个任务 —— 检测昵称中是否含有敏感词功能&#xff0c;然后丢给我两个压缩包&#xff0c;我解压一看&#xff1a; 有的txt文件是一行一个词&#xff1a; 有的txt文件是按逗号分隔开&#xff1a; 不管是什么格式的总之量非常多&#xff0c;把我这辈子脏话都囊括…

【OpenGL】OpenGL简介

文章目录 OpenGL概述OpenGL的本质OpenGL相关库核心库窗口管理glutfreeglutglfw 函数加载glewGLAD OpenGL概述 OpenGL(Open Graphics Library) 严格来说&#xff0c;本身并不是一个API&#xff0c;它是一个由Khronos组织制定并维护的规范(Specification)。OpenGL规范严格规定了…

python-24-一篇文章彻底掌握Python HTTP库Requests

python-24-一篇文章彻底掌握Python HTTP库Requests 一.简介 在 Python 中&#xff0c;Requests 是一个非常流行且易于使用的 Python HTTP 库&#xff0c;专门用于发送 HTTP/HTTPS 请求&#xff0c;获取请求响应&#xff1b; 可能觉得HTTP请求不是应该前端去做么&#xff1f;…

SpringMVC案例学习(一)--计算器设计登录页面设计

文章目录 1.计算器1.1.html代码1.2接口设计1.3前端测试1.4接口测试 2.登录页面设计2.1接口实现2.2查看前端页面效果2.3未进行前后端交互时候的代码2.4前后端交互设计2.5个人实践遇到的问题 3.lombok介绍3.1插件安装3.2导入依赖 1.计算器 1.1.html代码 下面的这个就是我们的前…

【Linux 麒麟系统 qt 程序通过root启动 桌面程序】

通过.desktop pkexec 启动程序 关键字方案一方案二第一步 修改.desktop第二步 xxx.sh实现 注意 关键字 pkexec .desktop qt 原始需求&#xff1a; 用户在麒麟系统上通过快捷方式(.desktop)启动程序后绑定系统的26端口&#xff0c;但是因为系统权限问题&#xff0c;26端口普通…

DevOps-Gitlab-私有代码仓库

1. 概述 1. 私有代码仓库 2. 精细化权限配置,让系统更安全 3. 控制用户/用户组是否可以提交到主分支 (PR Push Request) 4. 它使用Ruby语言写成。后来&#xff0c;一些部分用Go语言重写 2. Gitlab vs Github/Gitee GitlabGithub/Gitee共同点存放代码,git访问存放代码,git访问…

【AI声音克隆整合包及教程】第二代GPT-SoVITS V2:技术、应用与伦理思考

一、引言 在当今科技迅速发展的时代&#xff0c;声音克隆技术成为人工智能领域的一个备受瞩目的分支。GPT-SoVITS V2作为一种声音克隆工具&#xff0c;正逐渐进入人们的视野&#xff0c;它在多个领域展现出巨大的潜力&#xff0c;同时也引发了一系列值得深入探讨的问题。本文旨…

重新认识HTTPS

一. 什么是 HTTPS HTTP 由于是明文传输&#xff0c;所谓的明文&#xff0c;就是说客户端与服务端通信的信息都是肉眼可见的&#xff0c;随意使用一个抓包工具都可以截获通信的内容。 所以安全上存在以下三个风险&#xff1a; 窃听风险&#xff0c;比如通信链路上可以获取通信…

Vite初始化Vue3+Typescrpt项目

初始化项目 安装 Vite 首先&#xff0c;确保你的 Node.js 版本 > 12.0.0。然后在命令行中运行以下命令来创建一个 Vite Vue 3 TypeScript 的项目模板&#xff1a; npm init vitelatest进入项目目录 创建完成后&#xff0c;进入项目目录&#xff1a; cd vue3-demo启动…

Three.js 搭建3D隧道监测

Three.js 搭建3D隧道监测 Three.js 基础元素场景scene相机carema网络模型Mesh光源light渲染器renderer控制器controls 实现3d隧道监测基础实现道路实现隧道实现多个摄像头点击模型进行属性操作实现点击模型发光效果 性能监视器stats引入使用 总结完整代码 我们将通过three.js技…

【学术会议介绍,SPIE 出版】第四届计算机图形学、人工智能与数据处理国际学术会议 (ICCAID 2024,12月13-15日)

第四届计算机图形学、人工智能与数据处理国际学术会议 2024 4th International Conference on Computer Graphics, Artificial Intelligence and Data Processing (ICCAID 2024) 重要信息 大会官网&#xff1a;www.iccaid.net 大会时间&#xff1a;2024年12月13-15日 大会地…

VScode C++配置opencv4.5.3——先赞后看,配置成功实属不易-(镜像加速环境w版和配置文件版)

前置准备&#xff1a;配置MinGW和CMake 手把手教——class1_VScode配置C环境_linux vscode cpp配置-CSDN博客文章浏览阅读398次&#xff0c;点赞4次&#xff0c;收藏6次。点击Windows x64 Installer:下载msi文件 安装完成后验证。配置gcc文件下bin环境。最后ctrlF5运行尝试。W…

机器学习 笔记

特征值提取 字典 from sklearn.extaction import DictVectorizer mDictVectorizer(sparseFalse)#sparse是否转换成三元组形式 data[], #传入字典数据 data1model.fit_transform(data) #使用API 英文特征值提取 from sklearn.feature_extraction.text import CountVe…

推荐一款好用的postman替代工具2024

Apifox 是国内团队自主研发的 API 文档、API 调试、API Mock、API 自动化测试一体化协作平台&#xff0c;是非常好的一款 postman 替代工具。 它通过一套系统、一份数据&#xff0c;解决多个系统之间的数据同步问题。只要定义好接口文档&#xff0c;接口调试、数据 Mock、接口…

项目模块十七:HttpServer模块

一、项目模块设计思路 目的&#xff1a;实现HTTP服务器搭建 思想&#xff1a;设计请求路由表&#xff0c;记录请求方法与对应业务的处理函数映射关系。用户实现请求方法和处理函数添加到路由表&#xff0c;服务器只接受请求并调用用户的处理函数即可。 处理流程&#xff1a; …

Android音视频直播低延迟探究之:WLAN低延迟模式

Android WLAN低延迟模式 Android WLAN低延迟模式是 Android 10 引入的一种功能&#xff0c;允许对延迟敏感的应用将 Wi-Fi 配置为低延迟模式&#xff0c;以减少网络延迟&#xff0c;启动条件如下&#xff1a; Wi-Fi 已启用且设备可以访问互联网。应用已创建并获得 Wi-Fi 锁&a…

requests库如何处理 - POST请求常见的两种请求体格式:表单格式JSON格式

目录&#xff1a; 每篇前言&#xff1a;一、POST请求的两种常见请求体格式详解1. 表单格式&#xff08;form-encoded&#xff09; - 举例&#xff1a;福州搜索示例代码&#xff08;表单数据&#xff09;&#xff1a; 2. JSON格式 - 举例&#xff1a;CSDN搜索示例代码&#xff0…

HCIP-HarmonyOS Application Developer 习题(二十二)

1、用户将手机导航迁移至智能手表之后&#xff0c;智能手表如果需要获取手机传过来的数据&#xff0c;从下列哪个方法中获取? A、onCompleteContinuation() B、onStartContinuation() C、onRestoreData() D、onSaveData() 答案&#xff1a;C 分析&#xff1a;FA发起迁移后&am…

LLMs之Code:Github Spark的简介、安装和使用方法、案例应用之详细攻略

LLMs之Code&#xff1a;Github Spark的简介、安装和使用方法、案例应用之详细攻略 目录 Github Spark的简介 Github Spark的安装和使用方法 1、安装 2、使用方法 Github Spark的案例应用 Github Spark的简介 2024年10月30日&#xff0c;GitHub 重磅发布GitHub Spark 是一…

会议直击|美格智能受邀出席第三届无锡智能网联汽车生态大会,共筑汽车产业新质生产力

11月10日&#xff0c;2024世界物联网博览会分论坛——第三届无锡智能网联汽车生态大会在无锡举行&#xff0c;美格智能CEO杜国彬受邀出席&#xff0c;并参与“中央域控&#xff1a;重塑汽车智能架构的未来”主题圆桌论坛讨论&#xff0c;与行业伙伴共同探讨智能网联汽车产业领域…