Kafka - 13 Java 客户端实现消费者消费消息

news2025/1/13 13:27:05

文章目录

    • 1. 独立消费者案例(订阅主题)
    • 2. 独立消费者案例(订阅分区)
    • 3. 消费者组案例

1. 独立消费者案例(订阅主题)

需求:创建一个独立消费者,消费主题中数据:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh    PartitionCount:3    ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,1,0
Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 3,0,2
Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 3,1,2

在这里插入图片描述

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
        }
    }
}

Springboot 自定义日志配置关闭Kafka消费者debug日志打印:在resource目录下添加文件 logback.xml 即可。

<?xml version="1.0" encoding="UTF-8" ?>
<configuration debug="false">
    <!-- 定义日志文件的存储地址 不要在logback的配置中使用相对路径 -->
    <property name="LOG_HOME" value="logs/"></property>

    <!-- 配置控制台输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 按照每天生成日志文件 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <!-- 日志文件输出的文件名 -->
            <FileNamePattern>${LOG_HOME}/%d{yyyy-MM-dd}.%i.log</FileNamePattern>
            <maxFileSize>50MB</maxFileSize>
            <!-- 日志文件保留天数 -->
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 日志输出级别 -->
    <root level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="FILE"/>
    </root>

    <!-- 定制化某些包的日志输出级别 -->
    <logger name="org.apache.kafka" level="info" additivity="false"/>
    <!-- 屏蔽kafka debug,,可以指定为info或者error,使用off可以直接关闭 -->
    <logger name="org.apache.kafka.clients" level="info"  additivity="false"/>
</configuration>

测试生产者发送消息:

在这里插入图片描述

2. 独立消费者案例(订阅分区)

需求:创建一个独立消费者,消费主题 0 号分区的数据。

在这里插入图片描述

① kafka 消费者消费主题0号分区的数据:

public class CustomConsumerPartition {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题对应的分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("hh",0));
        consumer.assign(topicPartitions);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
        }
    }
}

② kafka 生产者向主题的0号分区发送数据:

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                       StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       StringSerializer.class.getName());

        // kafka生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for(int i=0;i<5;i++){
            kafkaProducer.send(new ProducerRecord<>("hh" ,0,"","hello,kafka" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if(exception==null){
                        // 消息发送成功
                        System.out.println("主题"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());
                    }else{
                        // 消息发送失败
                        exception.printStackTrace();
                    }
                }
            });
            Thread.sleep(2);
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

③ 测试:先启动消费者程序,再启动生产者程序

在这里插入图片描述

3. 消费者组案例

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

在这里插入图片描述

① 创建3个消费者:复制2份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的3个消费者。

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 创建消费者组,组名任意起名都可以
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        // 创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hh");
        consumer.subscribe(topics);

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
        }
    }
}

② 生产者发送消息:

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 添加自定义分区器
        // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");

        // kafka生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for(int i=0;i<50;i++){
            kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if(exception==null){
                        // 消息发送成功
                        System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
                    }else{
                        // 消息发送失败
                        exception.printStackTrace();
                    }
                }
            });
            Thread.sleep(2);
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

③ 测试:先启动3个消费者程序,再启动生产者程序

在这里插入图片描述

可以看到发送的50条消息分别被消费者组中的不同消费者消费,他们消费的是不同分区的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/60058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系统移植三:移植Kernel生成zImage和dtb文件

Linux系统移植系列 Linux系统移植一&#xff1a;移植U-BOOT 添加自己的板子并编译&#xff08;非petalinux版&#xff09; Linux系统移植二&#xff1a;生成fsbl引导文件并制作BOOT.bin 下载源码包 Xilinx官方linux源码包下载地址&#xff1a;https://github.com/Xilinx/lin…

linux+window+macos下的JDK安装

1. Linux中安装JDK &#xff08;1&#xff09;下载Linux版本的jdk压缩包 &#xff08;2&#xff09;解压 tar -zxvf 压缩包名 例如&#xff1a; tar -zxvf jdk-8u251-linux-x64.tar.gz&#xff08;3&#xff09;在系统配置文件配置java 编辑profile配置文件 vim /etc/prof…

JVM Metaspace内存溢出问题

更多内容&#xff0c;前往 IT-BLOG 一、现象 x项目线上环境因为jvm报OOM的异常而报警,导致整个服务不可用并被拉出集群,现象如下: 当时的解决方案是增加metaspace的容量: -XX:MaxMetaspaceSize512m, 从原来默认的256m改为512m, 虽然没有再出现oom,但这个只是临时解决方案,通过…

MyBatis ---- 动态SQL

MyBatis ---- 动态SQL1. if2. where3. trim4. choose、when、otherwise5. foreach6. SQL片段MyBatis 框架的动态 SQL 技术是一种根据特定条件动态拼接 SQL 语句的功能&#xff0c;它存在的意义是为了解决拼接 SQL 语句字符串时的痛点问题。 1. if /*** 根据条件查询员工信息if…

eBPF书籍和教程良心推荐

中文 BPF 性能工具&#xff08;书籍&#xff09;&#xff0c;作者 Brendan Gregg。本书的GitHub 回购。系统性能&#xff1a;企业与云&#xff0c;第 2 版 (2020)&#xff0c;作者&#xff1a;Brendan GreggJed Salazar 和 Natalia Reka Ivanko 的 eBPF 安全可观察性什么是 eB…

Metabase学习教程:系统管理-5

仪表板优化 如何使您的仪表板加载更快。 说到仪表板性能方面&#xff0c;基本上有四种方法可以让仪表板更快地加载&#xff1a; 要求更少的数据.缓存问题答案.组织数据以预测常见问题.提出有效的问题。图1。包含三个筛选器小部件的示例仪表板&#xff0c;它们使用Metabase附…

友宝在线在港交所上市申请“失效”:连续两年亏损,王滨为大股东

近日&#xff0c;贝多财经从港交所披露易了解到&#xff0c;Beijing UBOX Online Technology Corp.&#xff08;北京友宝在线科技股份有限公司&#xff0c;下称“友宝”或“友宝在线”&#xff09;的上市申请材料已经失效&#xff0c;目前已经无法查看。 其中&#xff0c;招股书…

期末前端web大作业:餐饮美食网站设计与实现——餐厅响应式网站制作html+css+javascript+jquery+bootstarp

&#x1f380; 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

Android开发的UI设计——Material Design

前言 Material Design 是用于指导用户在各种平台和设备上进行视觉、动作和互动设计的全面指南。如需在您的 Android 应用中使用 Material Design&#xff0c;请遵循 Material Design 规范中定义的准则&#xff0c;并使用 Material Design 支持库中提供的新组件和样式。 正篇 …

【软件安装】Ubuntu18.04及20.04中安装omnet++

注意&#xff1a;安装omnet首先看官方安装指导&#xff0c;不要直接百度。 omnet6.0.1官方安装指导omnet6.0只能在Ubuntu20.04及之后的版本使用&#xff0c;因为glibc版本不适配。 Ubuntu18.04安装omnet5.6.2 安装必要支持 更新apt-get $ sudo apt-get update安装依赖软件 $ s…

2022年四川省职业院校技能大赛网络搭建与应用赛项

2022年四川省职业院校技能大赛 网络搭建与应用赛项 &#xff08;一&#xff09; 技能要求 &#xff08;总分1000分&#xff09; 网络搭建与应用赛项执委会及专家组 2022年06月 竞赛说明 一、竞赛内容分布 “网络搭建与应用”竞赛共分三个部分&#xff0c;其中&#xff1a; 第一…

3个常用的损失函数

1. L2 loss &#xff08;均方损失&#xff09; 除以2就是可以在求导时2和1/2可以相乘抵消。 蓝色的曲线表示&#xff1a;y0时&#xff0c;变化预测值y’的函数。 绿色曲线表示&#xff1a;似然函数。e^-l。 是一个高斯分布。 橙色的线&#xff1a;表示损失函数的梯度 可以看到…

记录Windows下mysql更改my.ini文件中datadir路径后启动不起来的问题

1.mysql默认安装到了C盘&#xff0c;想将数据库存储路径改到别的盘下 将Data文件夹和日志复制到H盘 找到mysl服务&#xff0c;右键停止服务 更改my.ini文件中的路径 保存然后启动发现启动不起来 猜测原因1&#xff1a;文件夹没有权限 将文件夹权限给到所有的用户 右击 ”…

[附源码]Python计算机毕业设计Django青栞系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Linux的进程创建

在Linux下面&#xff0c;对二进制程序有着严格的格式要求&#xff0c;这就是ELF&#xff0c;这个格式可以根据编译的结果不同&#xff0c;分为不同的格式。 ELF的三种类型 一&#xff1a;可重定位文件 在编译的时候&#xff0c;先做预处理工作&#xff0c;例如将头文件嵌入到…

VueX简单又详细的解读,看了就会用

一、VueX是什么 Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式 库。它采用集中式存储管理应用的所有组件的状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。 二、为什么要用VueX “单向数据流”理念的简单示意&#xff1a; 当我们的应用遇到多个组…

Redis缓存

一.简介 缓存就是数据交换的缓冲区&#xff08;称作Cache [ kʃ ] &#xff09;&#xff0c;是存贮数据的临时地方&#xff0c;一般读写性能较高 二.添加Redis缓存 三.缓存更新策略 1.主动更新策略 Cache Aside Pattern(推荐) 需要调用者自己编码&#xff0c;但可控性高 Re…

SimSiam-Exploring Simple Siamese Pepresentation Learning

SimSiam Abstract 模型坍塌&#xff0c;在siamese中主要是输入数据经过卷积激活后收敛到同一个常数上&#xff0c;导致无论输入什么图像&#xff0c;输出结果都能相同。 而He提出的simple Siamese networks在没有采用之前的避免模型坍塌那些方法&#xff1a; 使用负样本lar…

K_A08_003 基于 STM32等单片机驱动L9110模块按键控制直流电机正反转加减速启停

目录 一、资源说明 二、基本参数 1、参数 2、引脚说明 三、驱动说明 L9110模块驱动时序 对应程序: PWM信号 四、部分代码说明 接线说明 1、STC89C52RCL9110模块 2、STM32F103C8T6L9110模块 五、基础知识学习与相关资料下载 六、视频效果展示与程序资料获取 七、项目主要…

【Android工具】群晖安卓客户端基础套件:Drive、video、Photos和DS video安卓TV客户端...

微信关注公众号 “DLGG创客DIY”设为“星标”&#xff0c;重磅干货&#xff0c;第一时间送达。最近终于把all in one搞起来了&#xff0c;all in one就是把一堆功能一堆软件装一台主机里。。all in one&#xff08;以后简称AIO&#xff09;相关内容回头慢慢聊。今天先聊聊群晖&…