当参数调优无法解决kafka消息积压时可以这么做

news2024/9/27 17:35:32

今天的议题是:如何快速处理kafka的消息积压

通常的做法有以下几种:

  1. 增加消费者数
  2. 增加 topic 的分区数,从而进一步增加消费者数
  3. 调整消费者参数,如max.poll.records
  4. 增加硬件资源

常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办?本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率,本文的重点就是通过提高消费速率来解决消息积压。

经验判断,消费速率低下的主要原因往往都是数据处理时间长,业务逻辑复杂最终导致一次 poll 的时间被无限拉长,如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点:

  1. 业务逻辑对乱序数据不敏感,因为并行一定会导致乱序问题
  2. kafka 的消费者是线程不安全的
  3. 如何提交 offset

基于上述几点,思路就是消费者 poll 下来一批数据,交给多个线程去并行处理,消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下:

第一步:创建一个MultipleConsumer类用于封装消费者和线程池

public class MultipleConsumer {
    private final KafkaConsumer<String, String> consumer;

    private final int threadNum;

    private final ExecutorService threadPool;

    private boolean isRunning = true;

    public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {
        // 实例化消费者
        consumer = new KafkaConsumer<>(properties);
        // 订阅主题
        consumer.subscribe(topics);
        this.threadNum = threadNum;
        this.threadPool = Executors.newFixedThreadPool(threadNum);
    }
}

理论上相较于传统的消费速率可以提升 threadNum 倍。

第二步:因为需要并行处理一批 poll 数据,因此需要对数据进行切分,切分逻辑如下

private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {
      HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();
      for (int i = 0; i < threadNum; i++) {
          tasks.put(i, new ArrayList<>());
      }

      int recordIndex = 0;
      for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
          tasks.get(recordIndex % threadNum).add(consumerRecord);
          recordIndex++;
      }

      return tasks;

  }

这里采用轮训的方式且切分的个数与 threadNum 一致,尽可能保证每个线程处理的数据数量相差不大

第三步:定义一个静态内部类用来处理数据,并处理同步逻辑(因为需要等待所有线程执行完再提交 offset)

private static class InnerProcess implements Runnable {
      private final List<ConsumerRecord<String, String>> records;

      private final CountDownLatch countDownLatch;

      public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {
          this.records = records;
          this.countDownLatch = countDownLatch;
      }

      @Override
      public void run() {
          try {
              // 处理消息
              for (ConsumerRecord<String, String> record : records) {
                  System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());
                  TimeUnit.SECONDS.sleep(1);
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              countDownLatch.countDown();
          }
      }
  }

使用 CountDownLatch 实现线程同步逻辑,假设每条数据的业务处理时间为 1 s

第四步:消费者 poll 逻辑

public void start() {
      while (isRunning) {
          ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));
          if (!consumerRecords.isEmpty()) {
              // 分割任务
              Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);
              CountDownLatch countDownLatch = new CountDownLatch(threadNum);
              // 提交任务
              for (int i = 0; i < threadNum; i++) {
                  threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));
              }
              // 等待任务执行结束
              try {
                  countDownLatch.await();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              // 提交偏移量
              consumer.commitAsync((map, e) -> {
                  if (e != null) {
                      System.out.println("提交偏移量失败");
                  }
              });
          }


      }
  }

完整代码如下:

import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;

/**
 * @author wjun
 * @date 2023/3/1 14:50
 * @email wjunjobs@outlook.com
 * @describe
 */
public class MultipleConsumer {
    private final KafkaConsumer<String, String> consumer;

    private final int threadNum;

    private final ExecutorService threadPool;

    private boolean isRunning = true;

    public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {
        // 实例化消费者
        consumer = new KafkaConsumer<>(properties);
        // 订阅主题
        consumer.subscribe(topics);
        this.threadNum = threadNum;
        this.threadPool = Executors.newFixedThreadPool(threadNum);
    }

    public void start() {
        while (isRunning) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));
            if (!consumerRecords.isEmpty()) {
                // 分割任务
                Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);
                CountDownLatch countDownLatch = new CountDownLatch(threadNum);
                // 提交任务
                for (int i = 0; i < threadNum; i++) {
                    threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));
                }
                // 等待任务执行结束
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // 提交偏移量
                consumer.commitAsync((map, e) -> {
                    if (e != null) {
                        System.out.println("提交偏移量失败");
                    }
                });
            }


        }
    }

    private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {
        HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();
        for (int i = 0; i < threadNum; i++) {
            tasks.put(i, new ArrayList<>());
        }

        int recordIndex = 0;
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            tasks.get(recordIndex % threadNum).add(consumerRecord);
            recordIndex++;
        }

        return tasks;

    }

    public void stop() {
        isRunning = false;
        threadPool.shutdown();
    }

    private static class InnerProcess implements Runnable {
        private final List<ConsumerRecord<String, String>> records;

        private final CountDownLatch countDownLatch;

        public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {
            this.records = records;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                // 处理消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }
    }
}

测试一下:

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @author wjun
 * @date 2023/3/1 16:03
 * @email wjunjobs@outlook.com
 * @describe
 */
public class MultipleConsumerTest {

    private static final Properties properties = new Properties();

    private static final List<String> topics = new ArrayList<>();

    public static void before() {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        topics.add("multiple_demo");
    }

    public static void main(String[] args) {
        new MultipleConsumer(properties, topics, 5).start();
    }
}

20 条数据的处理事件只需要 4s(threadNume = 5,即缩短 5 倍)

image-20230301172739782

但是此方法的缺点:

  1. 只适用于业务逻辑复杂导致的处理时间长的场景
  2. 对数据乱序不敏感的业务场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue 在install时候node-sass@4.14.1 postinstall: node scripts/build.js错误

今天重装了node和Vue脚手架&#xff0c;在install的时候报了下面的错误 报错如下&#xff1a; Build failed with error code: 1 [npminstall:runscript:error] node-sass^4.14.1 run postinstall node scripts/build.js error: Error: Command failed with exit code 1: node…

Allegro如何输出钻孔表操作指导

Allegro如何输出钻孔表操作指导 用Allegro做PCB设计的时候,需要输出钻孔表格,用于生产加工,如下图 如何输出钻孔表,具体操作如下 点击Manufacture点击NC

面试问题【集合】

集合常见的集合有哪些List、Set、Map 的区别ArrayList 和 Vector 的扩容机制Collection 和 Collections 有什么区别ArrayList 和 LinkedList 的区别是什么ArrayList 和 Vector 的区别是什么ArrayList 和 Array 有何区别ArrayList 集合加入1万条数据&#xff0c;应该怎么提高效率…

全面了解 B 端产品设计 — 基础扫盲篇

在今天,互联网的影响力与作用与日俱增,除了我们日常生活领域的改变以外,对于商业领域的渗透也见效颇丰。 越来越多的企业开始使用数字化的解决方案来助力企业发展,包括日常管理、运营、统计等等。或者通过互联网的方式开发出新的业务形态,进行产业升级,如这几年风头正劲的…

WMS相关知识点

目录一、WMS简介二、窗口的分类三、添加Window一、WMS简介 Window&#xff1a;在Android视图体系中Window就是一个窗口的概念。Android中所有的视图都是依赖于Window显示的。 Window是一个抽象的概念&#xff0c;它对应屏幕上的一块显示区域&#xff0c;它不是实实在在的内容&…

大学生实践| 微软ATP“师徒制”AI实战项目收获满满!

ChatGPT在极短时间内掀起了一轮AI狂潮&#xff0c;AI数据、AI大模型、AIGC……对我们AI实践项目感兴趣的同学也越来越多&#xff01;微软(亚洲)互联网工程院下属的微软ATP为大学生们提供了丰富的企业级实践项目。2个月内&#xff01;本期优秀的Chen同学在微软AI工程师团队带领下…

研报精选230301

目录 【行业230301天风证券】家用电器23W9周度研究&#xff1a;一图解读立达信招股说明书【行业230301财信证券】风电设备行业深度&#xff1a;受益大兆瓦、国产替代和技术进步&#xff0c;风电轴承or滚子有望迎来高景气度【行业230301中泰证券】有色金属行业周报&#xff1a;静…

composer安装thinkphp

人家的官方文档上步骤都有了&#xff0c;按照步骤走就行 安装composer 不作赘述 附一个文档链接&#xff1a; 安装 ThinkPHP5.1完全开发手册 看云 主要说一个自己踩过的坑吧 composer create-project topthink/think5.1.* tp5 在输入这个命令以后提示 [Composer\Downl…

git repack多包使用及相关性能测试

1、git数据结构 git 中存在四种数据结构&#xff0c;即object包含四种&#xff0c;分别是tree对象、blob对象、commit对象、tag对象 1.1 blob对象 存储文件内容&#xff0c;内容是二进制的形式&#xff0c;通过SHA-1算法对文件内容和头信息进行计算得到key(文件名)。 如果一…

一款优秀的低代码开发平台是什么样的?

目录 一、一款优秀的低代码平台应该是什么样的&#xff1f; 二、低代码核心能力 01、全栈可视化编程&#xff1a; 02、全生命周期管理&#xff1a; 03、低代码扩展能力&#xff1a; 三、小结 一、一款优秀的低代码平台应该是什么样的&#xff1f; 从企业角度来说&#x…

模拟QQ登录-课后程序(JAVA基础案例教程-黑马程序员编著-第十一章-课后作业)

【案例11-3】 模拟QQ登录 【案例介绍】 1.案例描述 QQ是现实生活中常用的聊天工具&#xff0c;QQ登录界面看似小巧、简单&#xff0c;但其中涉及的内容却很多&#xff0c;对于初学者练习Java Swing工具的使用非常合适。本案例要求使用所学的Java Swing知识&#xff0c;模拟实…

计算机组成原理(2.2)--系统总线

目录 一、总线结构 1.单总线结构 1.1单总线结构框图 ​编辑1.2单总线性能下降的原因 2.多总线结构 2.1双总线结构 2.2三总线结构 2.3四总线结构 ​编辑 二、总线结构举例 1. 传统微型机总线结构 2. VL-BUS局部总线结构 3. PCI 总线结构 4. 多层 PCI 总线结构 …

手撕八大排序(下)

目录 交换排序 冒泡排序&#xff1a; 快速排序 Hoare法 挖坑法 前后指针法【了解即可】 优化 再次优化&#xff08;插入排序&#xff09; 迭代法 其他排序 归并排序 计数排序 排序总结 结束了上半章四个较为简单的排序&#xff0c;接下来的难度将会大幅度上升&…

安卓逆向学习及APK抓包(二)--Google Pixel一代手机的ROOT刷入面具

注意:本文仅作参考勿跟操作&#xff0c;root需谨慎&#xff0c;本次测试用的N手Pixel&#xff0c;因参考本文将真机刷成板砖造成的损失与本人无关 1 Google Pixel介绍 1.1手机 google Pixel 在手机选择上&#xff0c;优先选择谷歌系列手机&#xff0c;Nexus和Pixel系列&…

mac系统上hdfs java api的简单使用

文章目录1、背景2、环境准备3、环境搭建3.1 引入jar包3.2 引入log4j.properties配置文件3.3 初始化Hadoop Api4、java api操作4.1 创建目录4.2 上传文件4.3 列出目录下有哪些文件4.4 下载文件4.5 删除文件4.6 检测文件是否存在5、完整代码1、背景 在上一节中&#xff0c;我们简…

五分钟进步系列之nginx(一)

学习方式&#xff1a;先读英文的原版&#xff0c;如果你能看懂就可以到此为止的了。如果你看不懂&#xff0c;可以再看一下我给的较高难度的英文单词的翻译。如果还是看不懂可以去最下面看我翻译的汉语。下面是我在nginx官网中找到的一段话&#xff0c;它给我们描述了nginx的负…

JAVA中加密与解密

BASE64加密/解密 Base64 编码会将字符串编码得到一个含有 A-Za-z0-9/ 的字符串。标准的 Base64 并不适合直接放在URL里传输&#xff0c;因为URL编码器会把标准 Base64 中的“/”和“”字符变为形如 “%XX” 的形式&#xff0c;而这些 “%” 号在存入数据库时还需要再进行转换&…

软件自动化测试工程师面试题集锦

以下是部分面试题目和我的个人回答&#xff0c;回答比较简略&#xff0c;仅供参考。不对之处请指出 1.自我介绍 答&#xff1a;姓名&#xff0c;学历专业&#xff0c;技能&#xff0c;近期工作经历等&#xff0c;可以引导到最擅长的点&#xff0c;比如说代码或者项目 参考&a…

Qt音视频开发19-vlc内核各种事件通知

一、前言 对于使用第三方的sdk库做开发&#xff0c;除了基本的操作函数接口外&#xff0c;还希望通过事件机制拿到消息通知&#xff0c;比如当前播放进度、音量值变化、静音变化、文件长度、播放结束等&#xff0c;有了这些才是完整的播放功能&#xff0c;在vlc中要拿到各种事…

ImportError: Can not find the shared library: libhdfs3.so解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。喜欢通过博客创作的方式对所学的知识进行总结与归纳,不仅形成深入且独到的理…