kafka之java客户端实战

news2024/11/13 16:30:48

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.13</artifactId>
   <version>3.4.0</version>
  </dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //单向发送:不关心服务端的应答。
            producer.send(record);
            System.out.println("message "+i+" sended");
        }
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //同步发送:获取服务端应答消息前,会阻塞当前线程。
            RecordMetadata recordMetadata = producer.send(record).get();
            String topic = recordMetadata.topic();
            int partition = recordMetadata.partition();
            long offset = recordMetadata.offset();
            String message = recordMetadata.toString();
            System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);

        }
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        CountDownLatch latch = new CountDownLatch(5);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(null != e){
                        System.out.println("消息发送失败,"+e.getMessage());
                        e.printStackTrace();
                    }else{
                        String topic = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        String message = recordMetadata.toString();
                        System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
                    }
                    latch.countDown();
                }
            });
        }
        //消息处理完才停止发送者。
        latch.await();
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        //kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        //key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));

        while (true) {
            //PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            //PART3:处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
            }


            //提交offset,消息就不会重复推送。
            consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }
    }
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 从客户端一些属性来认识kafka客户端工作机制

内容更新中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1381545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UniRepLKNet实战:使用 UniRepLKNet实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…

【数模百科】距离美赛还有20天,不要忘了阅读往年获奖论文(附04-23年美赛获奖论文)

之前发了很多数模相关的知识&#xff0c;受到了一些人的关注&#xff0c;也有很多人私下问我&#xff0c;距离美赛还有20几天了&#xff0c;还来不来得及。 对此我想说&#xff0c; 来不来得及重要吗&#xff1f; 你名都报了&#xff0c;钱也交了&#xff0c;还是笔不小的钱…

Vue.js设计与实现阅读-3

Vue设计与实现阅读-3 1、声明式描述UI2、渲染器3、组件4、模板的工作原理5、Vue.js 是各个模块组成的有机整体 前言 前面一章我们了解了&#xff0c;开发体验是衡量一个框架的重要指标之一。提供友好的警告信息至关重要&#xff0c;但是越详细的警告信息&#xff0c;意味着框架…

前端 TS 语法继承 多态 修饰符 readonly 抽象类 ts 基本写法 可选 剩余参数 函数重载 接口 类(3)

继承 继承之间的叫法 A类继承了B类&#xff0c;那么A类叫做子类&#xff0c;B类叫成基类 子类 ---》派生类 基类 ---》超类&#xff08;父类&#xff09; // 继承之间的叫法 // A类继承了B类&#xff0c;那么A类叫做子类&#xff0c;B类叫成基类 // 子类 ---》派生类 // 基类 …

性能瓶颈分析定位

用vmstat、sar、iostat检测是否是CPU瓶颈 用free、vmstat检测是否是内存瓶颈 用iostat、dmesg 检测是否是磁盘I/O瓶颈 用netstat检测是否是网络带宽瓶颈 1 首先进行OS层面的检查确认 首先要确认当前到底是哪些进程引起的负载高&#xff0c;以及这些进程卡在什么地方&#x…

sectigo ip证书种类买一年送一月

Sectigo旗下的IP证书是专为只有公网IP地址的网站准备的。Sectigo旗下的数字证书大多是域名证书&#xff0c;例如&#xff0c;单域名SSL证书、多域名SSL证书、通配符SSL证书等。这些证书申请时必须验证域名所有权&#xff0c;申请者需要有一个拥有管理全的域名网站&#xff0c;那…

SpringIOC之support模块GenericGroovyApplicationContext

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

RAG代码实操之斗气强者萧炎

&#x1f4d1;前言 本文主要是【RAG】——RAG代码实操的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304;每日一句&#x…

京东宣布启动鸿蒙原生应用开发,全力支持鸿蒙生态 | 百能云芯

华为常务董事、终端BG CEO、智能汽车解决方案BU董事长余承东于1月10日在微博上发布了一条令人振奋的消息&#xff1a;京东即将启动鸿蒙原生应用的开发。这一消息在科技圈掀起了不小的波澜&#xff0c;也为鸿蒙系统的发展注入了新的动力。 京东集团首席执行官兼执行董事许冉和余…

10年果粉拯救老掉牙Mac心得(没错我是标题党)

连续两周了&#xff0c;当我不能用Mac,或者说当我闲置了近10年隔三差五的用Mac时&#xff0c;成功发现我的AppleID已经无法登录了。事情是这样的&#xff0c;当我踌躇满志地准备改一篇稿子&#xff08;潜在的稿费啊亲&#xff01;&#xff09;时&#xff0c;发现Pages竟然没有W…

Android Studio导入项目 下载gradle很慢或连接超时

AS最常见的问题之一就是下载gradle非常慢&#xff0c;还经常出现下载失败的情况&#xff0c;没有gradle就无法build项目&#xff0c;所以一定要先解决gradle的下载问题&#xff0c;下面教大家两种常用方法。 因为我的项目绝大多数使用的是gradle-5.6.4-all&#xff0c;下面就以…

谷歌DeepMind最新成果:机器人灵巧操作服务我们日常生活

谷歌DeepMind最新成果&#xff1a;机器人灵巧操作服务我们日常生活 CAAI认知系统与信息处理专委会 2024-01-13 00:00 发表于北京 几乎是和斯坦福“炒虾洗碗”机器人同一时间&#xff0c;谷歌DeepMind也发布了最新具身智能成果。 并且是三连发&#xff1a; 先是一个主打提高决…

FPGA(基于xilinx)中PCIe介绍以及IP核XDMA的使用

Xilinx中PCIe简介以及IP核XDMA的使用 例如&#xff1a;第一章 PCIe简介以及IP核的使用 文章目录 Xilinx中PCIe简介以及IP核XDMA的使用一、PCIe总线概述1.PCIe 总线架构2.PCIe 不同版本的性能指标及带宽计算3.PCIe 接口信号 二、XDMA1.XDMA 与其它 PCIe IP 的区别2.XDMA简介 三…

用Kimi chat识别并整理图片里面的文字

Kimi chat是有OCR功能的&#xff0c;可以识别图片中的文字。 下面这张图片是一本书的注释&#xff0c;里面提到有不少图书&#xff0c;利用Kimi chat就可以轻松完成提取其中图书书名的任务。 先拿一张图片来做实验。Kimichat的回复&#xff1a; 在您提供的文件内容中&#xf…

【重学C语言】二、前期准备和第一个C程序

【重学C语言】二、前期准备和第一个C程序 1. VS 项目1.1 创建项目 2. Clion 项目(本博主主用)2.1 创建项目2.2 Clion 配置 3. 构建类型4. 构建模式5. 注释6. 第一个 C 程序7. 程序闪退8. 新手遇到的问题 1. VS 项目 1.1 创建项目 打开 VS 创建新项目 创建 main.c 书写以下…

如何提高比例阀控制精度效率速度解决电磁阀线圈老化提供电磁阀速度柱塞检测电磁阀过流保护电磁阀菊花链控制MAX22216

越来越多的场合需要对进气量液压量进行精确控制&#xff0c;比例阀被大量采用&#xff0c;如何提高比例阀的控制精度和效率&#xff0c;本文介绍螺线管驱动芯片MAX22216MAX22217的12位PI电流闭环控制以及其他性能&#xff0c;可以提升比例阀的控制精度。提高电磁阀速度精度解决…

oracle—IMU机制

正常的情况下&#xff0c;当事务需要回滚块的时候&#xff0c;是去undo表空间找 现在是在sharepool中分一个IMUbuffer&#xff0c;将所有的回滚信息写入。直接就可以从中取。减少了物理IO 同时这个过程也产生redo&#xff0c;直接就是图中红色的&#xff0c;不防止崩溃 优点 1…

竞赛保研 基于计算机视觉的身份证识别系统

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于机器视觉的身份证识别系统 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-sen…

U盘安装XP纯净版系统教程软件安装教程(附软件下载地址)

软件简介&#xff1a; 软件【下载地址】获取方式见文末。注&#xff1a;推荐使用&#xff0c;更贴合此安装方法&#xff01; U盘安装XP纯净版系统是一种便捷且快速的方式&#xff0c;以实现系统重装或升级的需求。这篇教程将为您详细介绍如何使用U盘来安装XP纯净版系统。XP纯…

JVM基础(7)——ParNew垃圾回收器

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…