Kafka 消费者位置提交方式及适用场景

news2024/11/27 20:39:38

在使用 Kafka 进行消息处理时,消费者的位置提交是一个非常重要的环节。它决定了消费者在下次启动时从哪里开始读取消息。今天,我们就来深入探讨一下 Kafka 消费者位置提交方式有哪些,以及在什么场景下使用。

一、Kafka 消费者位置提交的重要性

在 Kafka 中,消费者会不断地从主题(Topic)的分区(Partition)中读取消息。为了保证在消费者崩溃或重新启动后能够继续从上次停止的位置读取消息,消费者需要定期提交自己的位置信息。如果不进行位置提交,消费者在重新启动后可能会从头开始读取消息,导致重复处理已经处理过的消息,或者错过一些新的消息。

二、Kafka 消费者位置提交方式

  1. 自动提交

    • Kafka 消费者可以配置为自动提交位置信息。当消费者拉取一批消息后,经过一定的时间间隔或者消息数量达到一定阈值时,消费者会自动提交当前的位置信息。
    • 自动提交的优点是简单方便,不需要开发者手动干预。但是,它也存在一些缺点。例如,如果在自动提交之前消费者崩溃了,那么可能会导致一些消息被重复处理。
  2. 手动提交

    • 手动提交位置信息需要开发者在代码中显式地调用提交方法。手动提交可以分为同步提交和异步提交两种方式。
    • 同步提交:消费者会等待提交操作完成后才继续处理下一批消息。这种方式可以确保位置信息被正确提交,但是可能会影响消费者的性能,特别是在提交操作比较耗时的情况下。
    • 异步提交:消费者会在后台异步地提交位置信息,不会阻塞当前的消息处理。这种方式可以提高消费者的性能,但是如果在提交操作完成之前消费者崩溃了,那么可能会导致位置信息丢失。

三、不同提交方式的适用场景

  1. 自动提交

    • 适用于对消息处理的准确性要求不高,但是对性能要求较高的场景。例如,一些实时数据分析系统,可能更关注处理的速度,而对消息的重复处理不太敏感。
    • 自动提交也适用于一些简单的应用场景,开发者不想花费太多时间在位置提交的管理上。
  2. 手动提交(同步)

    • 适用于对消息处理的准确性要求非常高的场景。例如,在金融交易系统中,每一笔交易都必须被准确处理,不能出现重复处理或漏处理的情况。
    • 当消费者需要在提交位置信息之前进行一些额外的处理,如数据验证、事务处理等,同步提交可以确保这些处理完成后再提交位置信息。
  3. 手动提交(异步)

    • 适用于对性能要求较高,同时又希望在一定程度上保证消息处理的准确性的场景。例如,一些高并发的 Web 应用,需要快速处理大量的用户请求,同时又要确保消息不会被重复处理。
    • 异步提交可以在不影响消息处理性能的情况下,尽可能地保证位置信息的正确提交。

四、Java 代码示例

自动提交示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class AutoCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

手动同步提交示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class ManualSyncCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 同步提交
                Map<TopicPartition, OffsetAndMetadata> offsets = consumer.committed(consumer.assignment());
                for (TopicPartition partition : consumer.assignment()) {
                    long lastProcessedOffset = records.endOffsets(Collections.singleton(partition)).get(partition) - 1;
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset)));
                }
            }
        } finally {
            consumer.close();
        }
    }
}

手动异步提交示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class ManualAsyncCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 异步提交
                Map<TopicPartition, OffsetAndMetadata> offsets = consumer.committed(consumer.assignment());
                for (TopicPartition partition : consumer.assignment()) {
                    long lastProcessedOffset = records.endOffsets(Collections.singleton(partition)).get(partition) - 1;
                    consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset)), (offsets1, exception) -> {
                        if (exception!= null) {
                            System.err.println("Error committing offsets: " + exception.getMessage());
                        }
                    });
                }
            }
        } finally {
            consumer.close();
        }
    }
}

五、总结

Kafka 消费者的位置提交方式有自动提交和手动提交两种,手动提交又分为同步提交和异步提交。不同的提交方式适用于不同的场景,开发者需要根据实际需求选择合适的提交方式。在选择提交方式时,需要考虑消息处理的准确性、性能要求以及应用场景的特点等因素。

文章(专栏)将持续更新,欢迎关注公众号:服务端技术精选。欢迎点赞、关注、转发

个人小工具程序上线啦,通过公众号(服务端技术精选)菜单【个人工具】即可体验,欢迎大家体验后提出优化意见!500 个访问欢迎大家踊跃体验哦~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2206819.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux的zookeeper安装部署

1.zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,是hadoop和HBASE的重要组件 2.下载zookeeper安装包zookeeper安装包https://archive.apache.org/dist/zookeeper/zookeeper-3.5.9/ 移动到Linux解压 解压到/export/server文件夹 命令: tar -xvf apache-zooke…

《Linux从小白到高手》综合应用篇:详解Linux系统调优之内存优化

本篇介绍Linux服务器系统内存调优。 内存是影响Linux性能的主要因素之一&#xff0c;内存资源的充足与否直接影响应用系统的使用性能。内存调优的主要目标是合理分配和利用内存资源&#xff0c;减少内存浪费&#xff0c;提高内存利用率&#xff0c;从而提升系统整体性能。 1.内…

选择智能工单系统的理由,功能与效益分析

智能工单管理系统提升企业客户服务效率和质量&#xff0c;具备多渠道接收、智能分配、自动化处理等功能。ZohoDesk等系统通过实时响应、数据分析等优化服务流程&#xff0c;成为企业提升竞争力的关键工具。 一、智能工单管理系统的概念与优势 1. 智能工单管理系统概念 智能工…

【三】【算法】P1007 独木桥,P1012 [NOIP1998 提高组] 拼数,P1019 [NOIP2000 提高组] 单词接龙

P1007 独木桥 独木桥 题目背景 战争已经进入到紧要时间。你是运输小队长&#xff0c;正在率领运输部队向前线运送物资。运输任务像做题一样的无聊。你希望找些刺激&#xff0c;于是命令你的士兵们到前方的一座独木桥上欣赏风景&#xff0c;而你留在桥下欣赏士兵们。士兵们十分愤…

H3C GRE VPN基本配置实验

H3C GRE VPN基本配置实验 实验拓扑 ​​ 实验需求 按照图示配置 IP 地址在 R1 和 R3 上配置默认路由使公网区域互通在 R1 和 R3 上配置 GRE VPN&#xff0c;使两端私网能够互相访问&#xff0c;Tunnel 口 IP 地址如图在 R1 和 R3 上配置动态路由协议来传递两端私网路由 实…

盘点2024年双十一最值得入手的好物,双十一必买清单大汇总

随着科技的飞速发展&#xff0c;数码产品已成为我们生活中不可或缺的伙伴。2024年双十一购物狂欢节即将来临&#xff0c;众多消费者早已摩拳擦掌&#xff0c;准备在这个年度盛事中淘到心仪的数码好物。在这个信息爆炸的时代&#xff0c;如何从琳琅满目的商品中挑选出性价比高、…

项目管理系统介绍,核心概念与操作技巧

项目管理系统通过分解任务、管理工时、规划项目等提升效率&#xff0c;支持多种使用场景&#xff0c;具备高度可定制性&#xff0c;适合不同用户群体&#xff0c;注重数据安全&#xff0c;能与其他软件集成。ZohoProjects因全面功能、低价和友好界面受中小企业青睐。 一、项目管…

视频背景音乐怎么提取出来?音乐爱好者必看:视频音轨提取指南

在数字媒体时代&#xff0c;视频成为了一种非常流行的信息传播方式。有时候&#xff0c;我们在观看视频时会被其中的背景音乐所吸引&#xff0c;想要将其提取出来单独欣赏或用于其他用途。那么&#xff0c;视频背景音乐怎么提取出来呢&#xff1f;本文将为您详细介绍几种提取视…

【Windows】【DevOps】Windows Server 2022平台启用WinRM实现远程powershell登陆 采用自签名证书开启HTTPS方案

快速配置开启WinRM(HTTP) quiciconfig 在目标服务器上&#xff0c;管理员权限启动powershell&#xff0c;执行指令 winrm quickconfig 输入y&#xff0c;完整日志如下 PS C:\Windows\system32> winrm quickconfig 已在此计算机上运行 WinRM 服务。 WinRM 没有设置成为了…

探索SAM:介绍、应用与衍生方向

Segment Anything Model&#xff08;简称SAM&#xff09;是Facebook Research团队开发的一项先进的图像分割技术。它通过使用深度学习模型&#xff0c;能够识别并分割出图像中的各个物体。SAM的创新之处在于其能够通过不同的交互方式&#xff08;如鼠标悬停、点击、框选和全图分…

pyQT生成界面,更改后不清除自定义代码的方法

基本原理就是作个子类继承生成的界面&#xff0c;在子类里写代码 工程结构 dialog_ui.py 界面子类 from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtGui import QStandardItemModel, QStandardItem, QColor, QFont from PyQt5.QtCore import Qt import Ui_dialog i…

带隙基准Bandgap电路学习(一)

一、原理图 Bandgap中的运放&#xff08;折叠式Cascode&#xff09;采用P输入对&#xff0c;是因为运放输入端接的PNP三极管发射极端的电位&#xff0c;电压小&#xff0c;为了确保输入对管能够饱和工作&#xff0c;故采用P输入对管。此外&#xff0c;P管作为输入管&#xff0c…

UE5.3.2查看引擎真正版本

编译好的插件给别人用&#xff0c;发现引擎不一致&#xff0c;而且双方都是5.3.2版本引擎 打开Help->About Unreal Editor可以看到引擎版本 或者直接查看引擎版本文件&#xff1a;XXXXX\Engine\Build\Build.version 里面能看到对应的分支名字

【AIGC】OpenAI Canvas发布,代码能力秒杀Copilot,360°碾压Claude

目录 在Canvas界面中&#xff0c;你可以&#xff1a;Canvas有哪些强大功能&#xff1f;写作助手的升级编程功能的提升 网友反响如何&#xff1f;[如何直接使用ChatGPT4o、o1、OpenAI Canvas](https://www.nezhasoft.cn/) 10月3日凌晨1点、太平洋时间的上午 10 点&#xff0c;Op…

RK3568平台(平台总线篇)IIC光感BH1721模块调试

一.BH1721硬件信息 总上,读取数据全过程为: 通过查看数据手册得知BH1721的设备地址为0x23,光感并不是直接去读取设备地址上某个寄存器的地址,而是通过向0x23直接写入数据0x01 0x10后直接读取光感值。 第一步:通过I2C总线,主机给bh1721设备发送数据:0x01 ——> (Pow…

PyQt5 布局管理、事件、信号以及对话框

布局管理 绝对定位 每个程序都是以像素为单位区分元素的位置&#xff0c;衡量元素的⼤⼩。所以我们完全可以使⽤绝对定位搞定每个元素和窗⼜的位置。 局限性&#xff1a; 元素不会随着我们更改窗⼜的位置和⼤⼩⽽变化不能适⽤于不同的平台和不同分辨率的显⽰器更改应⽤字体…

老板的“神助攻”:公司电脑监控软件

在当今的商业世界中&#xff0c;企业管理者都希望员工能全身心投入工作&#xff0c;为企业创造更多价值。然而&#xff0c;员工上班摸鱼的现象却让许多老板头疼不已。公司电脑监控软件的出现&#xff0c;为解决这一问题提供了可能。接下来&#xff0c;我们将详细介绍几款优质的…

新160个crackme - 079-DueList.5

运行分析 提示需要注册 PE分析 32位程序&#xff0c;PE Diminisher壳 手动脱壳 x32dbg打开程序&#xff0c;按一下F8&#xff0c;根据ESP定律&#xff0c;在此处下断点按一下F9&#xff0c;两下F8&#xff0c;来到OEP处00401000打开Scylla&#xff0c;点击转储保存文件点击IAT自…

深入理解Dubbo源码核心原理-Part3

到此开始讲解Dubbo消费端的源码 在消费一端&#xff0c;需要关注两件事情。第一&#xff0c;接口的proxy如何生成。第二&#xff0c;请求如何发送。 首先看到启动类 接下来看真正inject方法 现在需要思考&#xff0c;待注入的Bean从哪儿来&#xff0c;这个Bean必然注入的是一…

Basic penetration_1靶机渗透

项目地址 plain https://download.vulnhub.com/basicpentesting/basic_pentesting_1.ova 实验过程 开启靶机虚拟机 ![](https://img-blog.csdnimg.cn/img_convert/4135d3c176bdca1f661f756b8321c97a.png) 使用nmap进行主机发现&#xff0c;获取靶机IP地址 plain nmap 192.1…