Kafka3.0.0版本——消费者(手动提交offset)

news2025/1/18 17:11:35

目录

    • 一、消费者(手动提交 offset)的概述
      • 1.1、手动提交offset的两种方式
      • 1.2、手动提交offset两种方式的区别
      • 1.3、手动提交offset的图解
    • 二、消费者(手动提交 offset)的代码示例
      • 2.1、手动提交 offset(采用同步提交的方式)代码
      • 2.1、手动提交 offset(采用异步提交的方式)代码

一、消费者(手动提交 offset)的概述

1.1、手动提交offset的两种方式

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1.2、手动提交offset两种方式的区别

  • 相同点:都会将本次提交的一批数据最高的偏移量提交。
  • 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

1.3、手动提交offset的图解

在这里插入图片描述

二、消费者(手动提交 offset)的代码示例

2.1、手动提交 offset(采用同步提交的方式)代码

  • 同步提交代码
    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(同步提交)
    kafkaConsumer.commitSync();
    
  • 同步提交完整代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerByHandSync {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 手动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 sevenTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
    
                // 手动提交offset(同步提交)
                kafkaConsumer.commitSync();
            }
        }
    }
    

2.1、手动提交 offset(采用异步提交的方式)代码

  • 异步提交代码
    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(异步提交)
    kafkaConsumer.commitAsync();
    
  • 异步提交完整代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerByHandSync {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 手动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 sevenTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
                // 手动提交offset(异步提交)
                kafkaConsumer.commitAsync();
            }
        }
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997624.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaScript-----jQuery

目录 前言&#xff1a; 1. jQuery介绍 2. 工厂函数 - $() jQuery通过选择器获取元素&#xff0c;$("选择器") 过滤选择器&#xff0c;需要结合其他选择器使用。 3.操作元素内容 4. 操作标签属性 5. 操作标签样式 6. 元素的创建,添加,删除 7.数据与对象遍历…

torch.cuda.is_available() 解决方

本人使用的显卡如下&#xff0c;打开任务管理器查看 Anaconda下载哪个版本都可以 使用命令conda create -n pytorch python3.6创建一个名为pytorch的环境&#xff0c;解释器使用3.6的 使用命令conda activate pytorch进入该环境 进入pytorch官网&#xff0c;选择下列选项 复…

九 动手学深度学习v2 ——卷积神经网络之AlexNet

文章目录 AlexNetVGG AlexNet AlexNet新引入dropout、ReLU、maxpooling和数据增强。 VGG VGG神经网络连接 图7.2.1的几个VGG块&#xff08;在vgg_block函数中定义&#xff09;。其中有超参数变量conv_arch。该变量指定了每个VGG块里卷积层个数和输出通道数。全连接模块则与Ale…

超详细最新PyCharm+Python环境安装,多图,逐步骤

PyCharmPython环境安装 前言一、pycharm下载安装1. 安装地址2. 安装详细步骤 二、Python下载安装1. 安装地址2. 安装详细步骤3. 环境变量忘记添加4. python安装成功测试 三. PyCharm上配置Python总结推荐文章 前言 文章会详细介绍PyCharmPython详细安装步骤&#xff0c;接下来…

【数据分享】2012-2023年全国范围的逐月NPP/VIIRS夜间灯光数据

在之前的文章中我们分享了2012-2022年全球范围逐年NPP/VIIRS夜间灯光数据&#xff01;以及2012-2022年中国范围的逐年NPP/VIIRS夜间灯光数据&#xff08;均可查看之前的文章获悉详情&#xff09;很多小伙伴询问有没有逐月的夜间灯光数据。 本次我们分享的是2012-2023年中国范围…

Flask 快速上手教程 — 了解与基本使用

Flask 快速上手教程 — 了解与基本使用 这篇博客是我刚接触 flask&#xff0c;研究文档时的一些记录与体会&#xff0c;希望对各位刚接触 flask 的朋友有所帮助。 且在此篇后&#xff0c;我还会另写一篇关于纯后端的 flask 教程&#xff0c;介绍一下如何使用 flask 创建一个较…

Oracle for Windows安装和配置——2.1.Oracle for Windows安装

​2.1.1. 准备Oracle软件 1)下载或拷贝安装软件 下载地址:otn.oracle.com或my oracle support。下载文件列表。具体如图2.1.1-1所示。图2.1.1-1 下载文件列表 --说明: 1)通过otn.oracle.com站点,可以免费下载用于安装的Oracle软件,但通常只能下载到Oracle各大版本的base…

切面(增强)的优先级

Component Aspect Order(value 10)//为增强类指定一个优先级的值,值越小,优先级越高,优先级越高的前置先执行,后置后执行,类似洋葱 为增强类指定一个优先级的值,值越小,优先级越高,优先级越高的前置先执行,后置后执行,类似洋葱 首先会执行前置通知,再执行目标方法,按照顺序和优…

并查集介绍和常用模板

并查集介绍和常用模板 前言&#xff1a; 并查集&#xff08;Union-find set 也叫Disjoint Sets&#xff09;是图论里面一种用来判断节点之间是否连通的数据结构&#xff0c;学会使用它可以处理一些跟节点连通性的问题。它有两个很重要的方法&#xff1a; Find(x)&#xff1a;…

2023软件设计师上半年真题解析(上午+下午)

上午试题 1.系统总线 计算机中&#xff0c;系统总线用于&#xff08;&#xff09;连接。 A&#xff0e;接口和外设 B&#xff0e;运算器,控制器和寄存器C&#xff0e;主存及外设部件 D&#xff0e;DMA控制器和中断控制器 总线可以划分为数据总线、地址总线和控制总线。系统总…

自然语言处理应用(一):情感分析

情感分析 随着在线社交媒体和评论平台的快速发展&#xff0c;大量评论的数据被记录下来。这些数据具有支持决策过程的巨大潜力。 情感分析&#xff08;sentiment analysis&#xff09;研究人们在文本中 &#xff08;如产品评论、博客评论和论坛讨论等&#xff09;“隐藏”的情…

面向OLAP的列式存储DBMS-16-[ClickHouse]python操作ClickHouse

clickhouse查询表容量方法 1 clickhouse常用命令 #clickhouse-client进入客户端 pda1:)show databases; pda1:)create database test; pda1:)use system; pda1:)show tables; pda1:) exit; 其余的就是常规的一些sql语句。 2 python操作clickhouse 2.1 clickhouse-driver(9…

Windows下python,psycopg2使用sm3连接HGDB

瀚高数据库 目录 环境 文档用途 详细信息 环境 系统平台&#xff1a;N/A 版本&#xff1a;4.5 文档用途 本文介绍在HGDB使用sm3认证时&#xff0c;python使用psycopg2连接HGDB的方法。 详细信息 Python连接HGDB可以使用psycopg2、Django&#xff0c;Django是依赖psycopg2的&a…

微信小程序登录问题(思路简略笔记)

配置问题 这是小程序登录问题&#xff0c;必要的两个配置。 流程思路 1. 微信小程序端&#xff0c;会返回一个code。 2. 查看需要返回给微信小程序端的数据。 3. 既然需要返回三个数据&#xff0c;先看openid如何拿到 WX-Login https://api.weixin.qq.com/sns/jscode2ses…

C高级 Shell脚本

一、作业&#xff1a;写一个shell脚本&#xff0c;将以下内容放到脚本中 在家目录下创建目录文件&#xff0c;dir在dir下创建dir1和dir2把当前目录下的所有文件拷贝到dir1中&#xff0c;把当前目录下的所有脚本文件拷贝到dir2中把dir2打包并压缩为dir2.tar.xz再把dir2.tar.xz移…

2023-9-10 集合-Nim游戏

题目链接&#xff1a;集合-Nim游戏 #include <iostream> #include <cstring> #include <algorithm> #include <unordered_set>using namespace std;const int N 110, M 10010;int n, m; int s[N], f[M];int sg(int x) {if(f[x] ! -1) return f[x];//…

QT实战之翻金币游戏【未完待续】

文章目录 目录 文章目录 前言 二、创建项目 三、添加资源 四、主界面实现 1、设置游戏主场景配置 2、设置背景图片 3、创建开始按钮 总结 前言 对QT的相关知识与控件进行简单的学习之后&#xff0c;通过实现“翻金币游戏”来巩固与实践所学的QT知识。在制作过程中是根据以下视…

打开游戏显示xinput1_3.dll丢失怎么办?xinput1_3.dll最全修复方法分享

一、xinput1_3.dll 文件总体介绍 xinput1_3.dll 是 Microsoft DirectX 中的一个动态链接库文件&#xff0c;它主要负责处理游戏手柄、鼠标、键盘等输入设备的操作。当运行支持 DirectX 的游戏或程序时&#xff0c;xinput1_3.dll 文件会被操作系统加载到内存中&#xff0c;以提…

想要精通算法和SQL的成长之路 - 课程表II

想要精通算法和SQL的成长之路 - 课程表 前言一. 课程表II &#xff08;拓扑排序&#xff09;1.1 拓扑排序1.2 题解 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 课程表II &#xff08;拓扑排序&#xff09; 原题链接 1.1 拓扑排序 核心知识&#xff1a; 拓扑排序是专…

Sharding-JDBC分库分表-自定义分片算法-4

默认分片算法 Sharding JDBC通过org.apache.shardingsphere.sharding.spi.ShardingAlgorithm接口定义了数据分片算法&#xff0c;5.2.1版本默认提供了如下的分片算法 配置标识自动分片算法详细说明类名MODY基于取模的分片算法ModShardingAlgorithmHASH_MODY基于哈希取模的分片…