Kafka-代码示例

news2024/11/23 13:35:19

一、构建开发环境

File > New > Project

选择一个最简单的模板

项目和坐标命名

配置maven路径

添加maven依赖

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.2.1</version>
    </dependency>
</dependencies>

 加载刚刚添加的依赖

此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了

二、创建一个新的topic

kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092

三、编写生产者

kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了

package org.example.kafkaStudy;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) {
        try{
            //topic名称
            String topicName = "kafka-study";
            //broker列表
            String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";
            //向topic打多少数据
            int numRecords = 10000;
            //是否异步推送数据
            boolean isAsync = true;
            int key = 0;
            int sentRecords = 0;
            //创建生产者
            KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);
            //判断是否达到生产要求
            while (sentRecords < numRecords) {
                if (isAsync) {
                    //异步推送
                    asyncSend(producer,topicName, key, "test" + key,sentRecords);
                } else {
                    //同步推送
                    syncSend(producer,topicName, key, "test" + key,sentRecords);
                }
                key++;
                sentRecords++;
            }
            producer.close();
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)
            throws ExecutionException, InterruptedException {
        try {
            // 发送记录,然后调用get,这会阻止等待来自broker的ack
            RecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();
            Utils.maybePrintRecord(sentRecords, key, value, metadata);
            return metadata;
        } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
                | OutOfOrderSequenceException | SerializationException e) {
            Utils.printErr(e.getMessage());
        } catch (KafkaException e) {
            Utils.printErr(e.getMessage());
        }
        return null;
    }

    private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {
        //异步发送记录,设置一个回调以通知结果。
        //请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止
        producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));
    }

    private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,
              int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {
        Properties props = new Properties();
        // 生产者连接到broker需要引导服务器配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/port
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
        // 设置序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        if (transactionTimeoutMs > 0) {
            // 事务协调器主动中止正在进行的事务之前的最长时间
            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
        }
        if (transactionalId != null) {
            // 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        }
        // 在分区级别启用重复保护
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
        return new KafkaProducer<>(props);
    }

    static class ProducerCallback implements Callback {
        private final int key;
        private final int sentRecords;
        private final String value;

        public ProducerCallback(int key, String value,int sentRecords) {
            this.key = key;
            this.sentRecords = sentRecords;
            this.value = value;
        }

        /**
         * 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,
         * 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。
         *
         * @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。
         * @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                Utils.printErr(exception.getMessage());
                if (!(exception instanceof RetriableException)) {
                    // 我们无法从这些异常中恢复过来
                }
            } else {
                Utils.maybePrintRecord(sentRecords, key, value, metadata);
            }
        }
    }
}

四、编写消费者

package org.example.kafkaStudy;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;

public class KafkaConsumerDemo {

    public static void main(String[] args) {
        //topic名称
        String topicName = "kafka-study";
        //组名称
        String groupName = "my-group-1";
        //broker列表
        String bootstrapServers =  "cdh1:9092,cdh2:9092,cdh3:9092";
        //向topci打多少数据
        int numRecords = 10000;
        int remainingRecords = 10000;
        // 消费来自 topic = kafka-study 的数据
        KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);
        //订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知
        consumer.subscribe(singleton(topicName));
        Utils.printOut("Subscribed to %s", topicName);
        while (remainingRecords > 0) {
            try {
                // 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。
                // 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<Integer, String> record : records) {
                    Utils.maybePrintRecord(numRecords, record);
                }
                remainingRecords -= records.count();
            } catch (AuthorizationException | UnsupportedVersionException
                    e) {
                // 我们无法从这些异常中恢复过来
                Utils.printErr(e.getMessage());
            } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
                // 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效
                Utils.printOut("Invalid or no offset found, using latest");
                consumer.seekToEnd(e.partitions());
                consumer.commitSync();
            } catch (KafkaException e) {
                // 记录异常并尝试继续
                Utils.printErr(e.getMessage());
            }
        }
        consumer.close();
        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
    }

    private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,
                     String groupId , boolean readCommitted) {
        Properties props = new Properties();
        // 消费者连接到broker需要引导服务器配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/port
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
        // 当我们使用订阅(topic)进行组管理时,需要消费者groupId
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //设置静态成员资格以提高可用性(例如滚动重启)
//        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
        //启用EOS时禁用自动提交,因为偏移量与事务一起提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
        //读取数据用到的反序列化器,需要和生产者对应
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        if (readCommitted) {
            // 跳过正在进行和已中止的事务
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        }
        // 在偏移无效或没有偏移的情况下设置重置偏移策略
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new KafkaConsumer<>(props);
    }
}

五、运行程序

生产者日志打印

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)

......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2)

.......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)

我们再次用命令看下每个分区的offset

消费者日志打印

main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records

六、问题说明

从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?

在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。

消费者也是从brokers一批一批的拉取数据来消费的

我们也可以看下broker的日志中数据的索引情况

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10

从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2224429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最长子序列模型二(二分优化版)

文章目录 提高课题解一、拦截导弹二、导弹防御系统三、最长公共上升子序列四、二分函数速写 基础课题解五、最长上升子序列 II 提高课题解 一、拦截导弹 题目链接 第一问非常简单&#xff0c;直接用之前最长上身子序列模板就行 第二问就有难度了&#xff0c;我们要用最少的递…

基于SSM“毛毛宠物店”宠物信息交流平台的设计与实现

开发说明 开发语言&#xff1a;Java 框架&#xff1a;ssm 技术&#xff1a;JSP JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myec…

DEV-C++如何调试

1、先编译&#xff0c;再点击“调试”按钮 2、使用调试按钮&#xff0c;可以输入输出数据 第21次发博客 以后会慢慢更新

【计网】从零开始认识IP协议 --- 认识网络层,认识IP报头结构

从零开始认识IP协议 1 网络层协议1.1 初步认识IP协议1.2 初步理解IP地址 2 IP协议报头3 初步理解网段划分 1 网络层协议 1.1 初步认识IP协议 我们已经熟悉了传输层中的UDP和TCP协议&#xff0c;接下来我们来接触网络层的协议&#xff1a; 网络层在计算机网络中的意义主要体现…

EXCELL中如何两条线画入一张图中,标记坐标轴标题?

1&#xff0c;打开excel&#xff0c;左击选中两列&#xff0c; 2&#xff0c;菜单栏>“插入”>”二维折线图”选中一个 3&#xff0c;选中出现的两条线中的一条右击>最下一行&#xff0c;“设置数据系列格式” 4&#xff0c;右测“系列选项中”>点击“次坐标轴” 5…

Java 开发——(上篇)从零开始搭建后端基础项目 Spring Boot 3 + MybatisPlus

一、概述 记录时间 [2024-10-23] 本文是一个基于 Spring Boot 3 MybatisPlus 的项目实战开发&#xff0c;主要涵盖以下几个方面&#xff1a; 从零开始的项目创建IDEA 中开发环境的热部署Maven、Swagger3、MybatisPlus 等的配置路由映射知识静态资源访问文件上传功能实现拦截器…

颐驰06持续交付,明日科技赋能出行生活

在全球智能出行领域&#xff0c;自动驾驶技术的发展一直是行业关注的焦点。不久前&#xff0c;特斯拉发布的自动驾驶出租车引发了全球关注&#xff0c;但由于缺乏具体的技术细节&#xff0c;导致投资者信心受挫&#xff0c;特斯拉股票一度下跌近10%。与此同时&#xff0c;中国车…

智能台灯设计(一)原理图设计

1. 前言 作者最近突发奇想&#xff0c;想自己做一个小台灯&#xff0c;设想的功能有&#xff1a;带锂电池可充电、可以调节亮度&#xff0c;后续通过增加WIFI模块实现手机控制开关功能。目前先实现最简单的功能&#xff0c;有时间再一步步完善吧。 2. 原理图设计 充电芯片使用…

常用的三角公式

目录 1. 基本公式​ 2. 倍角公式​ 3. 半角公式​ 4. 和差公式​ 5. 和差化积​ 6. 积化和差​ 7. 万能公式​ 1. 基本公式 2. 倍角公式 3. 半角公式 4. 和差公式 5. 和差化积 6. 积化和差 7. 万能公式

self-supervised learning(BERT和GPT)

1芝麻街与NLP模型 我們接下來要講的主題呢叫做Self-Supervised Learning&#xff0c;在講self-supervised learning之前呢&#xff0c;就不能不介紹一下芝麻街&#xff0c;為什麼呢因為不知道為什麼self-supervised learning的模型都是以芝麻街的人物命名。 因為Bert是一個非常…

第九部分 Java API

第九部分 Java API 9.1 Java Number & Math 9.1.1 Java Number类 一般地&#xff0c;当需要使用数字的时候&#xff0c;我们通常使用内置数据类型&#xff0c;如&#xff1a;byte、int、long、double 等。 实例 int a 5000; float b 13.65f; byte c 0x4a;然而&…

《云原生安全攻防》-- K8s攻击案例:权限维持的攻击手法

在本节课程中&#xff0c;我们将一起深入了解K8s权限维持的攻击手法&#xff0c;通过研究这些攻击手法的技术细节&#xff0c;来更好地认识K8s权限维持所带来的安全风险。 在这个课程中&#xff0c;我们将学习以下内容&#xff1a; K8s权限维持&#xff1a;简单介绍K8s权限维持…

VUE中文本域默认展示最底部内容

文本域内容 <textarea ref"textareaRef" style"width: 100%; resize: none;" readonly v-model"errorLog" rows"15"></textarea> 样式展示 this.$nextTick(() > { // 使用$refs获取文本域的DOM元素 const textareaInfo…

SwiftUI:单个App支持设置多语言

SwiftUI 全新多语言方案 简化本地化的字符串- WWDC21 - 视频 本地化您的SwiftUI app - WWDC21 - 视频 构建全球化App&#xff1a;本地化的示例- WWDC22 - 视频 构建支持多语言的App - WWDC24 - 视频 单个App支持设置多语言 工程 Info.plist里添加 键值UIPrefersShowingLangua…

解析三相220V与三相380V变频器的关键差异

在工业自动化的广阔舞台上&#xff0c;作为电力电子技术的杰出代表&#xff0c;扮演着调节电机速度、优化能源利用的重要角色。本文将深入探讨三相220V与三相380V变频器之间的九大核心区别&#xff0c;帮助读者更全面地理解这两种设备的应用特性。 一、电压等级&#xff1a;基础…

【C#】调用本机AI大模型流式返回

【python】AI Navigator的使用及搭建本机大模型_anaconda ai navigator-CSDN博客 【Python】AI Navigator对话流式输出_python ai流式返回-CSDN博客 前两章节我们讲解了使用AI Navigator软件搭建本机大模型&#xff0c;并使用python对大模型api进行调用&#xff0c;使其流式返…

【线下培训】龙信科技应邀参与了由教育部网络安全与执法虚拟教研室(中国刑事警察学院)举办的学术讲座

文章关键词&#xff1a;电子数据取证培训、产学研推进、手机取证、介质取证 2024年10月23日&#xff0c;龙信科技应邀参与了由教育部网络安全与执法虚拟教研室&#xff08;中国刑事警察学院&#xff09;举办的学术讲座。在这次学术交流中&#xff0c;我们公司的技术专家陈杰以…

使用.NET MAUI开发第一个安卓APP

它是.NET 多平台应用 UI (.NET MAUI) 是一个跨平台框架&#xff0c;用于使用 C# 和 XAML 创建本机移动和桌面应用。 .NET MAUI可从单个共享代码库开发可在 Android、iOS、macOS 和 Windows 上运行的应用。 使用 .NET MAUI 开发第一个 Android 应用是一个直观的过程&#xff0c;…

【C++初阶】一文讲通C++内存管理

文章目录 1. C/C内存分布2. C语言中动态内存管理方式3. C内存管理方式3. 1 new/delete操作内置类型3. 2 new和delete操作自定义类型 4. new与delete的原理4. 1 operator new与operator delete函数4. 2 内置类型4. 3 自定义类型 5. 定位new表达式(placement-new)6. malloc/free和…

苍穹外卖--开发记录day09-10

目录 苍穹外卖day09-10一&#xff1a;springtask二&#xff1a;订单状态定时处理三&#xff1a;websocket四&#xff1a;来单提醒五&#xff1a;客户催单 总结 苍穹外卖day09-10 首先第九天是实战日&#xff0c;要完成以下内容的开发&#xff1a; 用户端历史订单模块&#xf…