使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费

news2024/11/29 21:19:52

文章目录

    • 1、指定 Offset 消费
    • 2、指定时间消费


1、指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是 latest
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

这个参数的力度太大了,不是从头,就是从尾
kafka提供了seek方法,可以让我们从分区的固定位置开始消费
seek(TopicPartition topicPartition,offset offset)

示例代码:

package com.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSeek {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 关闭自动提交offset
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 2 订阅一个主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 执行计划
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }
        // 获取所有分区的offset =5 以后的数据
        /*for (TopicPartition tp:assignment) {
            kafkaConsumer.seek(tp,5);
        }*/
        // 获取分区0的offset =5 以后的数据
        //kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
        for (TopicPartition tp:assignment) {
            if(tp.partition() == 0){
                kafkaConsumer.seek(tp,5);
            }
        }
        
        while(true){
            //1 秒中向kafka拉取一批数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> record :records) {
                // 打印一条数据
                System.out.println(record);
                // 可以打印记录中的很多内容,比如 key  value  offset topic 等信息
                System.out.println(record.value());
            }

        }

    }
}

2、指定时间消费

示例代码:

package com.bigdata.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 *  从某个特定的时间开始进行消费
 */
public class Customer05 {

    public static void main(String[] args) {

        // 其实就是map
        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
        // 字段反序列化   key 和  value

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());


        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");

        // 指定分区的分配方案  为轮询策略
        //properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        
        // 指定分区的分配策略为:Sticky(粘性)
        ArrayList<String> startegys = new ArrayList<>();
        startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

        // 创建一个kafka消费者的对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
        List<String> topics = new ArrayList<>();
        topics.add("five");// list总可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);

        // 因为消费者是不停的消费,所以是while true

        // 指定了获取分区数据的起始位置。
        // 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }

        Map<TopicPartition, Long> hashMap = new HashMap<>();
        for (TopicPartition partition:assignment) {
            hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);
        }
        Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);

        for (TopicPartition partition:assignment) {

            OffsetAndTimestamp offsetAndTimestamp = map.get(partition);
            kafkaConsumer.seek(partition,offsetAndTimestamp.offset());
        }

        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {
                // 打印数据中的值
                System.out.println(record.value());
                System.out.println(record.offset());
                // 打印一条数据
                System.out.println(record);
            }

        }


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

现代化水库可视化管理平台:提升水库运行效率与安全保障

随着科技的飞速发展&#xff0c;现代化水利管理逐渐依赖于数字化、智能化手段。作为水利基础设施的重要组成部分&#xff0c;水库的管理不仅关乎水资源的合理利用&#xff0c;还关系到防洪、灌溉、供水等多项社会功能的实现。为了提升水库的管理水平&#xff0c;确保其运行安全…

【05】Selenium+Python 两种文件上传方式(AutoIt)

上传文件的两种方式 一、input标签上传文件 可以用send_keys方法直接上传文件 示例代码 input标签上传文件import time from selenium import webdriver from chromedriver_py import binary_path # this will get you the path variable from selenium.webdriver.common.by i…

【论文笔记】Number it: Temporal Grounding Videos like Flipping Manga

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Number it: Temporal Grou…

软件/游戏提示:mfc42u.dll没有被指定在windows上运行如何解决?多种有效解决方法汇总分享

遇到“mfc42u.dll 没有被指定在 Windows 上运行”的错误提示&#xff0c;通常是因为系统缺少必要的运行库文件或文件损坏。以下是多种有效的解决方法&#xff0c;可以帮助你解决这个问题&#xff1a; 原因分析 出现这个错误的原因是Windows无法找到或加载MFC42u.dll文件。这可…

网络地址转换

NAT概述 解决公有地址不足&#xff0c;并且分配不均匀的问题 公有地址&#xff1a;由专门的机构管理、分配&#xff0c;可以在因特网上直接通信 私有地址&#xff1a;组织和个人可以任意使用&#xff0c;只能在内网使用的IP地址 A、B、C类地址中各预留了一些私有IP地址 A&…

机器学习-神经网络(BP神经网络前向和反向传播推导)

1.1 神经元模型 神经网络(neural networks)方面的研究很早就已出现,今天“神经网络”已是一个相当大的、多学科交叉的学科领域.各相关学科对神经网络的定义多种多样,本书采用目前使用得最广泛的一种,即“神经网络是由具有适应性的简单单元组成的广泛并行互连的网络,它的组织能够…

uniapp组建scroll-view初始化页面设置scrollTop无效解决办法

官方文档&#xff1a;scroll-view | uni-app官网 一 . scroll-view的基本用法 使用竖向滚动时&#xff0c;需要给 <scroll-view> 一个固定高度&#xff0c;通过 css 设置 height&#xff1b; <scroll-view :scroll-top"scrollTop" scroll-y"true&quo…

Web day02 Js Vue Ajax

目录 1.javascript: 1.js的引入方式&#xff1a; 2.js变量 & 数据类型 & 输出语句&#xff1a; 模板字符串&#xff1a; 3.函数 & 自定义对象&#xff1a; 4. json 字符串 & DOM操作&#xff1a; 5. js事件监听&#xff1a; 6.js的模块化导入或者导出&a…

【面向对象的程序设计——集合框架】主要接口

文章目录 主要接口集合框架的主要接口Collect接口Set接口实现Set接口的类SortedSet接口 List接口&#xff1a;线性表实现List接口的类&#xff1a; Queue接口实现Queue接口的类 Map接口Map接口的定义Map接口的方法SortedMap接口 主要接口 集合框架的主要接口 声明了对各种集合…

工业物联网网关在设备接入物联网中的核心作用

一、工业物联网网关的定义与功能 工业物联网网关是工业领域中的一种重要设备&#xff0c;它位于工业物联网系统的边缘位置&#xff0c;负责连接、管理和协调工业设备与云平台之间的通信。作为边缘计算的关键组件&#xff0c;工业物联网网关能够实现工业设备、传感器、PLC、DCS…

2024年第十三届”认证杯“数学中国数学建模国际赛(小美赛)

↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓

selinux和防火墙实验

1 、 selinux 的说明 SELinux 是 Security-Enhanced Linux 的缩写&#xff0c;意思是安全强化的 linux 。 SELinux 主要由美国国家安全局&#xff08; NSA &#xff09;开发&#xff0c;当初开发的目的是为了避免资源的误用。 系统资源都是通过程序进行访问的&#xff0c;如…

flink学习(12)——checkPoint

如何设置checkPoint package com.bigdata.day06;/** * 1、需要三句话 * 2、设置完checkPoint后若程序出现异常&#xff0c;会一直重启 * 3、此时是自动进行checkPoint保存 * 4、注意&#xff1a;此时如果有checkpoint ,是不会出现异常的&#xff0c;需要将checkpoint的代码关…

前端面试题-1(详解事件循环)

1.了解浏览器的进程模型 1.什么是进程&#xff1f; 程序运行需要有它自己专属的内存空间&#xff0c;可以把这块内存空间简单的理解为进程 每个应用至少有一个进程&#xff0c;进程之间相互独立&#xff0c;即使要通信&#xff0c;也需要双方同意。 2.什么是线程&#xff1f…

http的文件上传和下载原理

目录 一&#xff1a;上传 1&#xff1a;http请求格式 2&#xff1a;文件上传类型分析 1&#xff1a;md5秒传 2&#xff1a;分片上传 1. 什么是分片上传 2. 分片上传的场景 3&#xff1a;断点续传 1. 什么是断点续传 2. 应用场景 3. 实现断点续传的核心逻辑 4. 实现流…

【计算机视觉】图像基本操作

1. 数字图像表示 一幅尺寸为MN的图像可以用矩阵表示&#xff0c;每个矩阵元素代表一个像素&#xff0c;元素的值代表这个位置图像的亮度&#xff1b;其中&#xff0c;彩色图像使用3维矩阵MN3表示&#xff1b;对于图像显示来说&#xff0c;一般使用无符号8位整数来表示图像亮度&…

VSCode 下载 安装

VSCode【下载】【安装】【汉化】【配置C环境&#xff08;超快&#xff09;】&#xff08;Windows环境&#xff09;-CSDN博客 Download Visual Studio Code - Mac, Linux, Windowshttps://code.visualstudio.com/Downloadhttps://code.visualstudio.com/Download 注意&#xff0…

【Python入门】Python数据类型

文章一览 前言一、变量1.1.如何使用变量1.2.如何定义变量的名字&#xff08;标识符&#xff09; 二、数据类型2.1 整型数据2.2 浮点型数据2.3 布尔型&#xff08;bool&#xff09;数据2.4 复数型数据2.5 字符串类型1 > 字符串相加&#xff08;合并&#xff09;&#xff08;&…

算法基础 - 高斯牛顿法(曲线拟合)

文章目录 1. 高斯牛顿法发展历程2、问题的引出3、高斯牛顿法的前世3.1、一阶&#xff0c;二阶梯度法共有原理3.2、最速下降法&#xff08;一阶梯度法&#xff09;3.3、牛顿法&#xff08;二阶梯度法&#xff09; 4、高斯牛顿法4.1 高斯牛顿法的思想4.2 最小二乘问题4.3 高斯牛顿…

Vue+Element Plus实现自定义表单弹窗

目录 一、基本框架 1.父组件index.vue 2.子组件FormPop.vue 二、细节补充 1&#xff09;input、textarea、select、input number 2&#xff09;daterange、date、monthrange 3&#xff09;数据定义 4&#xff09;没改样式的效果 5&#xff09;最终效果 三、最终代码 …