Kafka相关API开发

news2024/11/7 13:51:24

(一)引入依赖

        用API直接去操作kafka(读写数据)在实际开发中用的并不多,学习它主要还是为了加深对Kafka功能的理解。kafka的读写操作,实际开发中,是通过各类更上层的组件去实现。而这些组件在读写kafka数据时,用的当然是kafka的java api,比如flink、spark streaming和flume等。

<properties> 
   <kafka.version>2.4.1</kafka.version>
</properties>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>${kafka.version}</version>       
</dependency>

(二)API 开发——producer 生产者

1.构造一个生产者,可以持续发送大量数据

2.构造一个生产者,有必须设置的参数:

bootstrap.server

key.seralizer

value.seralizer

其他的,可选

3.使用特定接口

kafka的生产者发送用户的业务数据时,必须使用org.apache.kafka.common.serialization.Serializer接口的实现类这一序列化框架来序列化用户的数据。

4.发往指定topic

构造一个Kafka生产者后,并没有固定数据要发往的topic,因此,可以将不同的数据发往不同的topic

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * kafka生产者API代码示例
 */
public class ProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        // 泛型K:要发送的数据中的key
        // 泛型V:要发送的数据中的value
        // 隐含之意:kafka中的message,是Key-Value结果的(可以没有Key)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node141:9092,node142:9092");

        // 因为,kafka底层存储没有类型维护机制,用户所发的所有数据类型,都必须变成 序列化后的byte[]
        // 所以,kafka的producer需要一个针对用户要发送的数据类型的序列化工具类
        // 且这个序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializer
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**
         * 代码中进行客户端参数配置的另一种写法
         */
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all");// 消息发送应答级别

        // 构造一个生产者客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            // 将业务数据封装成客户端所能发送的封装格式
            // 0->abc0
            // 1->abc1

            // TODO:奇数发往abcx,偶数发往abcy
            ProducerRecord<String, String> message = null;
           if (i % 2 == 0) {
                message = new ProducerRecord<>("abcy", "user_id" + i, "doit_edu" + i);
            } else {
                message = new ProducerRecord<>("abcx", "user_id" + i, "doit_edu" + i);
            }
            // 消费时只会打印value的值,key并没有读到
            // 调用客户端去发送
            // 数据的发送动作在producer的底层是异步线程去异步发送的,即调用send方法立即执行完毕,直接走之后的代码,不代表数据发送成功
            producer.send(message);
            Thread.sleep(100);
        }

        // 关闭客户端
//        producer.flush();
        producer.close();
    }
}

5.消费消息

(三)API开发——consumer消费者

kafka消费者的起始消费位置有两种决定机制:

1.手动指定了起始位置,它肯定从指定的位置开始

2.如果没有手动指定起始位置,它去找消费者组之前所记录的偏移量开始

3.如果之前的位置也获取不到,就看参数:auto.offset.reset所指定的重置策略

4.如果指定的offset>原有分区内的最大offset,就自动重置到最大的offset

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * kafka消费者API代码示例
 */
public class ConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // kafka的消费者,默认是从所属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略来确定消费起始偏移量:
        // 1.earliest:自动重置到每个分区的最前一条消息
        // 2.latest:自动重置到每个分区的最新一条消息
        // 3.none:如果没有为使用者的组找到以前的偏移,则向使用者抛出异常
        // 如果输入除了上述三种之外的,会向使用者抛出异常
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 如果latest消息找不到,consumer.seek就起作用了

        // 设置消费者所属的组id
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "d30-1");

        // 设置消费者自动提交最新的的消费位移——默认是开启的
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        // 设置自动提交位移的时间间隔——默认是5000ms
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

        // 构造一个消费者客户端
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题(可以是多个)
//        consumer.subscribe(Collections.singletonList("abcx"));
        consumer.subscribe(Arrays.asList("abcx","abcy"));
        // 正则订阅主题
//        consumer.subscribe(Pattern.compile ("abc.*" ));

        // 显式指定消费起始偏移量
        /*TopicPartition abcxP0 = new TopicPartition("abcx", 0);
        TopicPartition abcxP1 = new TopicPartition("abcx", 1);
        consumer.seek(abcxP0,10);
        consumer.seek(abcxP1,15);*/

        // 循环往复拉取数据
        boolean condition = true;
        while (condition) {
            // 客户端去拉取数据的时候,如果服务端没有数据响应,会保持连接等待服务端响应
            // poll中传入的超时时长参数,是指等待的最大时长
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            // Iterator:迭代器
            // Iterable:可迭代的,是迭代器的再封装
            // 实现了Iterable的对象,可以用增强for循环去遍历迭代,也可以从对象上取到iterator,来用iterator.hasNext来迭代
            // Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            // 直接用for循环来迭代本次取到的一批数据
            for (ConsumerRecord<String, String> record : records) {
                // ConsumerRecord中,不光有用户的业务数据,还有Kafka注入的元数据
                String key = record.key();
                String value = record.value();

                // 本条消息所属的topic:拉取的时候可能不止一个topic,所以会有这个方法
                String topic = record.topic();
                // 本条数据所属的分区
                int partition = record.partition();

                // 本条数据的偏移量
                long offset = record.offset();

                //key的长度
                int keySize = record.serializedKeySize();
                //value的长度
                int valueSize = record.serializedValueSize();

                // 当前这条数据所在分区的leader的朝代纪年
                Optional<Integer> leaderEpoch = record.leaderEpoch();

                // kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据
                // timestamp就是其中之一:记录本条数据的时间戳
                // 时间戳有两种类型:一个是CreateTime(这条数据创建的时间——生产者), LogAppendTime(broker往log里面追加的时间)
                TimestampType timestampType = record.timestampType();
                long timestamp = record.timestamp();

                // 数据头:是生产者在写入数据时附加进去的,相当于用户自定义的元数据
                // 在生产者写入消息时,可以自定义元数据,所以record.headers()方法就能够消费到
                // public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
                // 如果生产者写入消息时,没有定义元数据,record.headers()方法就不会消费到
                Headers headers = record.headers();

                //            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }
                System.out.println(String.format(
                        "key = %s, value = %s,topic = %s , partition = %s, offset = %s," +
                                "leader的纪元 = %s, timestampType = %s ,timestamp = %s," +
                                " key序列化的长度 = %s, value 序列化的长度 = %s",
                        key, value, topic, partition, offset,
                        leaderEpoch.get(), timestampType.name, timestamp,
                        keySize, valueSize));
            }
        }

        // 对数据进行业务逻辑处理

        // 关闭客户端
        // consumer.close();
    }
}

有了上面两个API,先开启消费者,然后开启生产者,消费者控制就会输出消息。

 // 当前这条数据所在分区的leader的朝代纪年
Optional<Integer> leaderEpoch = record.leaderEpoch();

当leader有变化,leaderEpoch.get()的值就会+1,初始值为0

(四)API开发——指定偏移量订阅消息

1.subscribe与assign订阅

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * 指定偏移量
 */
public class ConsumerDemo2 {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        // 从配置文件中加载
        props.load(ConsumerDemo2.class.getClassLoader().getResourceAsStream("consumer.properties"));
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "doit30-5");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    /*  // subscribe订阅,会参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区的
        consumer.subscribe(Collections.singletonList("ddd"));
        // 这里无意义地去拉一次数据,主要就是为了确保:分区分配动作已完成
        consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
        // 然后再定义到指定的偏移量,开始正式消费
        consumer.seek(new TopicPartition("ddd",0),2);*/

        // 既然要自己指定一个确定的起始消费位置,那通常隐含之意是不需要去参与消费者组自动再均衡机制,该方法比较常用
        // 那么,就不要使用subscribe来订阅主题
        consumer.assign(Arrays.asList(new TopicPartition("ddd", 0)));
        consumer.seek(new TopicPartition("ddd", 0), 4);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                int keySize = record.serializedKeySize();
                int valueSize = record.serializedValueSize();

                System.out.println(String.format(
                        "key = %s, value = %s,topic = %s , partition = %s, offset = %s," +
                                "leader的纪元 = %s, timestampType = %s ,timestamp = %s," +
                                " key序列化的长度 = %s, value 序列化的长度 = %s",
                        record.key(), record.value(), record.topic(), record.partition(), record.offset(),
                        record.leaderEpoch().get(), record.timestampType().name, record.timestamp(),
                        keySize, valueSize));
            }
        }
    }
}

2.subscribe与assign订阅具体区别

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能:

        在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign()方法订阅分区时,是不具备消费者自动均衡的功能的:

        其实这一点从assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

3.取消订阅

        如果将subscribe(Collection)或 assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

组协调器就是x组写消费位移的leader副本所在的broker。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2228870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【K8S系列】Kubernetes 中 NodePort 类型的 Service 无法访问的问题【已解决】

在 Kubernetes 中&#xff0c;NodePort 类型的 Service 允许用户通过每个节点的 IP 地址和指定的端口访问应用程序。如果 NodePort 类型的 Service 无法通过节点的 IP 地址和指定端口进行访问&#xff0c;可能会导致用户无法访问应用。本文将详细分析该问题的常见原因及其解决方…

如何使用AdsPower指纹浏览器克服爬虫技术限制,安全高效进行爬虫!

随着中国开发者日益成熟&#xff0c;应用质量明显提升&#xff0c;越来越多的开发者选择出海寻找机会扩大市场。但“应用出海”说起来容易&#xff0c;做起来难。其中&#xff0c;最大的困恼就是对海外市场缺乏了解。 很多开发者会选择使用网络爬虫&#xff08;Web Crawling&a…

centos7之LVS-DR模式传统部署

介绍 优缺点以及适用场景 优点&#xff1a;能负载更多的Realserver减轻LB的压力,性能高于tun模式。 缺点&#xff1a;不支持端口转发(VIP:80必须代理RIP:80),Realserver和LVS需要在同一网段下。 适用&#xff1a;适用于大多数公司&#xff0c;也是大多数公司用的最多的模式。…

爬虫+数据保存2

爬取数据保存到MySQL数据库 这篇文章, 我们来讲解如何将我们爬虫爬取到的数据, 进行保存, 而且是把数据保存到MySQL数据库的方式去保存。 目录 1.使用pymysql连接数据库并执行插入数据sql代码(insert) 2.优化pymysql数据库连接以及插入功能代码 3.爬取双色球网站的数据并保…

什么样的工程项目管理软件适合中小施工企业?

工程行业是典型的传统行业&#xff0c;劳动密集&#xff0c;协作频繁&#xff0c;依赖经验传承。在工程项目施工过程中&#xff0c;常见的难题纷繁复杂&#xff0c;其中包括效率低下、材料浪费、数据不实、原材料成本上涨、工期延误、质量缺陷和安全风险等。这些问题不仅阻碍了…

机器学习中的嵌入是什么?

一、说明 嵌入是真实世界对象的数字表示&#xff0c;机器学习&#xff08;ML&#xff09;和人工智能&#xff08;AI&#xff09;系统利用它来像人类一样理解复杂的知识领域。例如&#xff0c;计算算法了解 2 和 3 之间的差为 1&#xff0c;这表明与 2 和 100 相比&#xff0c;2…

NVR设备ONVIF接入平台EasyCVR视频融合平台智慧小区视频监控系统建设方案

一、方案背景 智慧小区构成了“平安城市”建设的基石。随着社会的进步&#xff0c;社区安全问题逐渐成为公众关注的热点。诸如高空抛物、乱丢垃圾、破坏车辆、入室盗窃等不文明行为和违法行为频繁出现。目前&#xff0c;许多小区的物业管理和安全防护系统仍然较为简单和陈旧&a…

Typora一款极简Markdown文档编辑器和阅读器,实时预览,序列号生成!免费!最新可用!

文章目录 一、Typora下载和安装二、Typora序列号生成 Typora是一款Markdown编辑器和阅读器&#xff0c;风格极简&#xff0c;实时预览&#xff0c;所见即所得&#xff0c;支持MacOS、Windows、Linux操作系统&#xff0c;有图片和文字、代码块、数学公式、图表、目录大纲、文件管…

uniapp的video视频属性打包app后层级过高

问题&#xff1a;在使用uniapp开发APP时&#xff0c;使用video标签显示视频发现H5可以正常展示&#xff0c;但是打包到APP后&#xff0c;它的层级过高&#xff0c;把底部导航都盖住了。 官网说明&#xff1a;uni-app官网 官网给了cover-view组件或plus.nativeObj.view、subNVue…

人工智能原理实验一:知识的表示与推理实验

一、实验目的 本实验课程是计算机、智能、物联网等专业学生的一门专业课程&#xff0c;通过实验&#xff0c;帮助学生更好地掌握人工智能相关概念、技术、原理、应用等&#xff1b;通过实验提高学生编写实验报告、总结实验结果的能力&#xff1b;使学生对智能程序、智能算法等有…

混凝土裂缝图像分割系统:快速图像识别

混凝土裂缝图像分割系统源码&#xff06;数据集分享 [yolov8-seg-C2f-RFAConv&#xff06;yolov8-seg-C2f-SCConv等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challenge 项目来源AAAI Glo…

不再输入单号查快递,批量查快递单号信息的新方法,智能排序快递时效并查找时效相同的单号,一站式物流查询解决方案

厌倦了逐个输入快递单号查询物流信息的繁琐过程&#xff1f;想要一键就能批量查询快递单号&#xff0c;并且智能排序快递时效&#xff0c;轻松查找时效相同的单号&#xff1f;那么&#xff0c;恭喜你&#xff0c;你即将解锁快递查询的新境界&#xff01;快递批量查询高手软件&a…

国标GB28181设备管理软件EasyGBS国标GB28181公网平台应用到“雪亮工程”

随着信息技术的飞速发展&#xff0c;视频监控领域正经历从传统安防向智能化、网络化安防的深刻转变。在这一变革中&#xff0c;国标GB28181设备管理软件EasyGBS凭借其强大的功能和广泛的应用场景&#xff0c;成为推动这一转变的重要力量。特别是在“雪亮工程”这一重要的群众性…

Redis 哨兵 总结

前言 相关系列 《Redis & 目录》《Redis & 哨兵 & 源码》《Redis & 哨兵 & 总结》《Redis & 哨兵 & 问题》 参考文献 《Redis的主从复制和哨兵机制详解》《Redis中的哨兵&#xff08;Sentinel&#xff09;》《【Redis实现系列】Sentinel自动故…

springboot使用配置类从 application.yml 或 application.properties 文件中读取静态属性

springboot使用配置类从 application.yml 或 application.properties 文件中读取静态属性 1. 配置类定义 通过 ConfigurationProperties(prefix “data-base-check”)&#xff0c;Spring Boot 将带有 data-base-check 前缀的属性从 application.yml 或 application.propertie…

Java 中的微服务架构与 Spring Boot 集成(30/30)

目录 Java 中的微服务架构与 Spring Boot 集成 1. 微服务架构概述 2. Spring Boot 简介 2.1 Spring Boot 的特点 3. 使用 Spring Boot 构建微服务 3.1 构建一个简单的微服务 4. 服务发现与注册中心 4.1 使用 Eureka 实现服务注册和发现 5. 使用 Spring Cloud Gateway …

ssm020基于ssm的人才招聘网站+jsp(论文+源码)_kaic

摘 要 随着科技的发展&#xff0c;人才招聘的方式也发生着改变。本基于ssm的人才招聘网站正是采用计算机技术和网络设计的新型系统&#xff0c;可以有效的把招聘信息与网络相结合&#xff0c;为用户提供工作帮助和管理需求。本系统采用mysql数据库存储数据&#xff0c;兼容性…

Servlet 3.0 新特性全解

文章目录 Servlet3.0新特性全解Servlet 3.0 新增特性Servlet3.0的注解Servlet3.0的Web模块支持servlet3.0提供的异步处理提供异步原因实现异步原理配置servlet类成为异步的servlet类具体实现异步监听器改进的ServletAPI(上传文件) Servlet3.0新特性全解 tomcat 7以上的版本都支…

全球最大开源系统遭“绑架”,华为携国产系统冲出国门,优势尽显

被“绑架”的Linux 在科技飞速发展的今天&#xff0c;开源软件已成为全球技术合作与创新的重要基石。其中大家熟知的开源系统Linux内核项目&#xff0c;自1991年由芬兰学生Linus Torvalds创建以来&#xff0c;一直以其开放性、协作性和透明性著称。它鼓励全球各地的开发者共同…

一体化运维监控管理平台:构建高效、可靠的IT运维体系

在当今数字化转型的浪潮中&#xff0c;企业的IT系统日益复杂&#xff0c;运维工作面临着前所未有的挑战。如何确保IT基础设施的稳定运行&#xff0c;提高运维效率&#xff0c;成为每个企业关注的焦点。为此&#xff0c;我们推出了一体化运维监控管理平台&#xff0c;旨在通过全…