Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

news2025/1/12 23:03:58

手动提交offset

在这里插入图片描述
虽然offset十分遍历,但是由于其是基于时间提交的,开发人员难以把握offset提交的实际。因此Kafka还提供了手动提交offset的API
手动提交offset的方法有两种:分别commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交:不同点是,同步提交阻塞当前线程,一致到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)而异步提交则没有重试机制,故有可能提交失败。
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了

同步提交

是否自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
同步提交offset kafkaConsumer.commitSync();

由于同步提交offset有失败重试机制,故更加可靠,但是由于一致等待提交结果,提交的效率比较低。以下为同步提交offset的示例

package com.longer.handsync;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerByHandSync {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
        //是否自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
            //同步提交offset
             kafkaConsumer.commitSync();
        }
    }
}

异步提交

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响,因此更多情况下会选择异步offset的方式
kafkaConsumer.commitAsync();

package com.longer.handasync;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * 同步提交
 */
public class CustomConsumerByHandAsync {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
        //是否自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
            //同步提交offset
            kafkaConsumer.commitAsync();
        }
    }
}

指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是latest
当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
2) latest(默认值):自动将偏移量重置为最新偏移量
3)如果未找到消费者组的先前偏移量,则向消费者抛出异常。
在这里插入图片描述
主要代码

		Set<TopicPartition> assigment=new HashSet<>();

        while (assigment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            //获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assigment= kafkaConsumer.assignment();
        }
        //遍历所有分区,并指定从100得位置开始消费
        for (TopicPartition tp : assigment) {
            kafkaConsumer.seek(tp,100);
        }
package com.longer.seek;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSeek {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");

        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);

        Set<TopicPartition> assigment=new HashSet<>();

        while (assigment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            //获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assigment= kafkaConsumer.assignment();
        }
        //遍历所有分区,并指定从100得位置开始消费
        for (TopicPartition tp : assigment) {
            kafkaConsumer.seek(tp,100);
        }


        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739774.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

conda创建环境等相关知识

1、首先下载Anaconda&#xff0c;官网下载即可&#xff0c;打开如下选项&#xff0c; 2、创建一个环境&#xff1a;命令如下 conda create -n 虚拟环境名称 python?实例&#xff1a;创建一个pytorch环境&#xff0c;指定python版本为3.9版本 conda create -n pytorch pytho…

【七天入门数据库】第一天 MySQL的安装部署

系列文章传送门&#xff1a; 【七天入门数据库】第一天 MySQL的安装部署 【七天入门数据库】第二天 数据库理论基础 【七天入门数据库】第三天 MySQL的库表操作 MySQL数据库存在多种版本&#xff0c;不同的版本在不同的平台上&#xff08;OS&#xff0c;也就是操作系统上&a…

文件资源管理器卡住,使用任务管理器结束任务后桌面图标和任务栏消失的解决方案

事情的起因是这样的&#xff0c;我想删除压缩包里的一张照片&#xff0c;结果文件资源管理器就卡住了&#xff0c;删除进度一直是0%&#xff0c;等了好久也没反应。没办法&#xff0c;只能掏出秘密武器任务管理器了&#xff0c;找到文件资源管理器&#xff0c;右键选择结束任务…

游戏渲染技术:前向渲染 vs 延迟渲染 vs Forward+渲染 (一)

在这篇文章中&#xff0c;会分析和对比三种渲染算法&#xff1a; 前向渲染(Forward Rendering)延迟着色(Deferred Shading)Forward(基于Tile的前后渲染) 介绍 前向渲染 前向渲染是通过在场景中光栅化每个几何对象来工作的&#xff0c;在着色过程中&#xff0c;通过迭代每个灯…

ModaHub魔搭社区:向量数据库Zilliz Cloud删除 Entity和删除 Collection教程

目录 删除单个 Entity 批量删除 Entity 开始前 操作步骤 使用限制 Entity 是指存储在 Zilliz Cloud 集群中的数据实体,包含用于处理、搜索和查询的数据。如果您不再需要某个 Entity,可以执行相关操作将其删除。 本文介绍如何从 Collection 中删除单个或多个 Entity。 …

RocketMQ5.0消息消费<三> _ 消息消费

RocketMQ5.0.0消息消费&#xff1c;三&#xff1e; _ 消息消费 一、消息消费 1. 消费UML图 PUSH模式消息拉取机制参考《RocketMQ5.0.0消息消费&#xff1c;一&#xff1e; _ PUSH模式的消息拉取​》&#xff0c;PullMessageService负责对消息队列进行消息拉取&#xff0c;从B…

Cocktail mac版-Cocktail 苹果版(清理维护优化工具)安装教程

Cocktail for Mac是一款Mac OS X系统清理、修复和优化的常规实用工具。它不仅可以一键清理系统中的残余垃圾。还可以帮助用户修改系统的隐藏属性、隐藏文件&#xff0c;对优化mac系统有很大的帮助。CocktailV11.4破解版增加了清除macOS内容缓存的功能。 内容缓存保留了各种Appl…

如何查看自己windows电脑的ip地址

可能有些小伙伴对查看自己电脑的ip地址不太熟悉&#xff0c;今天这里介绍几种方式&#xff1a; 我自己的电脑是Win11&#xff0c;就直接展示截图了。 一、命令行方式&#xff1a; windowsR打开 CMD(命令行窗口)或者windows PowerShell窗口&#xff0c;输入以下命令&#xff1…

企业信息化可以为企业带来什么效益?

一、什么是信息化 在具体谈信息化前我们先来谈一谈信息化和数字化 信息化&#xff1a;信息化是指培养、发展以计算机为主的智能化工具为代表的新生产力&#xff0c;并使之造福于社会的历史过程。与智能化工具相适应的生产力&#xff0c;称为信息化生产力。 数字化&#xff1a;…

MySQL基础篇第6章(多表查询)

文章目录 1、一个案例引发的多表连接1.1 案例说明1.2 笛卡儿积 2、多表查询分类讲解2.1 等值连接2.2 非等值连接2.3 自连接2.4 内连接2.5 外连接(OUTER JOIN)2.5.1 左外连接(LEFT OUTER JOIN)2.5.2 右外连接2.5.3 满外连接(FULL OUTER JOIN) 3、UNION的使用3.1 UNION操作符3.2 …

Linux 命令大全(下)

Linux 命令大全&#xff08;上&#xff09; 本文目录 6. 网络通讯 常用命令6.1 ssh 命令 – 安全的远程连接服务器6.1.1 含义6.1.2 语法格式6.1.3 常用参数6.1.4 参考示例 6.2 netstat 命令 – 显示网络状态6.2.1 含义6.2.2 语法格式6.2.3 常用参数6.2.4 参考示例 6.3 dhclient…

IP协议【图解TCP/IP(笔记九)】

文章目录 IP即网际协议IP相当于OSI参考模型的第3层网络层与数据链路层的关系 IP基础知识IP地址属于网络层地址路由控制■ 发送数据至最终目标地址■ 路由控制表 数据链路的抽象化IP属于面向无连接型 IP即网际协议 TCP/IP的心脏是互联网层。这一层主要由IP&#xff08;Internet…

【netty】TCP 粘包和拆包及解决方案

TCP 粘包和拆包基本介绍 TCP是面向连接的&#xff0c;面向流的&#xff0c;提供高可靠性服务。收发两端&#xff08;客户端和服务器端&#xff09;都要有一一成对的socket&#xff0c;因此&#xff0c;发送端为了将多个发给接收端的包&#xff0c;更有效的发给对方&#xff0c…

电商API接口商品页面数据(详情数据,销量数据,sku数据,视频数据,优惠券数据)接口代码示例

有探讨稳定采集电商等多平台整站实时商品详情历史价格数据接口&#xff0c;通过该接口开发者可以更好地了解商品的情况&#xff0c;商品详情数据详细信息查询&#xff0c;数据参数包括&#xff1a;商品链接&#xff0c;商品列表主图、价格、标题&#xff0c;sku&#xff0c;库存…

技术流 | 使用eBPF增强kubernetes可观测性的实践分享

本文作者&#xff1a;擎创科技某大拿 01 背景与问题 当前&#xff0c;云原生技术主要是以容器技术为基础围绕着 Kubernetes的标准化技术生态&#xff0c;通过标准可扩展的调度、网络、存储、容器运行时接口来提供基础设施&#xff0c;同时通过标准可扩展的声明式资源和控制器来…

narak靶机详解

narak靶机复盘 首先对靶机进行扫描&#xff0c;找到靶机的真实ip地址。 然后dirb进行目录扫描&#xff0c;扫描到一个目录&#xff0c;我们打开发现是一个登陆界面。 并没有用户名和密码&#xff0c;我们就用cewl扫描这个网站&#xff0c;扫出一个字典&#xff0c;用来暴力破…

2、JDk、JRE、JVM三者区别和联系

JDK JRE JVM 含义 JDK: Java Develpment Kit java 开发工具 JRE: Java Runtime Environment java 运行时环境 JVM: java Virtual Machine java 虚拟机 一张图来解释&#xff1a; 联系&#xff1a; JVM不能单独搞定class的执行&#xff0c;解释class的时候JVM需要调用解…

Openlayers实战:非4326,3857的投影

Proj4js 是一个 JavaScript 库,用于将点坐标从一个坐标系转换到另一个坐标系,包括基准转换。Openlayers地图上,除了默认的4326和3857投影方式外,可以通过Proj4js的拓展,可以显示其他的投影。 本实战中,将ESRI:53009投射到Openlayers地图上。 安装依赖 npm install proj4…

java高级语法笔记

Java ArrayList java泛型语法介绍 https://www.runoob.com/java/java-generics.html 匿名函数->&#xff08;Lambda 表达式 &#xff0c;java8的新特性&#xff09;

使用openKylin操作系统下载VMware Tools教程(超详细图文教程)

目录 前言操作步骤验证使用 前言 VMware Tools作为一个VMware十分有用的工具&#xff0c;下载它也经常作为配置VMware的一个常有环节。本篇文章&#xff0c;我将用国产操作系统openKylin为大家演示如何下载安装VMware Tools。 操作步骤 1.点击 虚拟机----安装VMware Tools…