Kafka3.0.0版本——消费者(消费者组案例)

news2025/1/16 4:02:18

目录

    • 一、消费者组案例
      • 1.1、案例需求
      • 1.2、案例代码
        • 1.2.1、消费者1代码
        • 1.2.2、消费者2代码
        • 1.2.3、消费者3代码
        • 1.2.4、生产者代码
      • 1.3、测试

一、消费者组案例

1.1、案例需求

  • 测试同一个主题的分区数据,只能由一个消费者组中的一个消费。如下图所示:
    在这里插入图片描述

1.2、案例代码

复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。

1.2.1、消费者1代码

  • 代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
    
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("firstTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
                //每一秒拉取一次数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                //输出数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
    
                kafkaConsumer.commitAsync();
            }
        }
    }
    

1.2.2、消费者2代码

  • 代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer2 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("firstTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
    

1.2.3、消费者3代码

  • 代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer3 {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
    
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("firstTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
                //每一秒拉取一次数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                //输出数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
    
                kafkaConsumer.commitAsync();
            }
        }
    }
    

1.2.4、生产者代码

  • 代码

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("firstTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    

1.3、测试

  • 在 Kafka 集群控制台,创建firstTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 3 --replication-factor 1 --topic firstTopic
    

    在这里插入图片描述

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:
    由下图可知:3个消费者在消费不同分区的数据
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/985863.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[CISCN 2019华北Day1]Web1

文章目录 涉及知识点解题过程 涉及知识点 phar反序列化文件读取 解题过程 打开题目&#xff0c;注册用户为admin 进去发现有文件上传的功能&#xff0c;我们随便上传个图片 然后就有下载和删除两个功能 我们尝试抓包下载文件的功能 发现参数可控&#xff0c;我们尝试读取一下…

C++如何查看栈的变量

在如下行下断点&#xff0c;然后运行&#xff0c;中断&#xff1b; 右击中断的代码行&#xff0c;选择 转到反汇编&#xff1b; 红线标示的行是调用AddNum()函数处&#xff1b;看一下之前使用了push把a和b压入栈&#xff1b;使用push压入栈的变量就是放入栈的变量&#xff1b; …

Net跨平台UI框架Avalonia入门-安装和使用(v11版本)

介绍Avalonia v11版本 avalonia v11版本发布了&#xff0c;增加了很多新的功能&#xff0c;Avalonia的扩展也同步升级了。 主要更新内容&#xff1a; 辅助功能&#xff1a;增加了对各种辅助工具的支持&#xff0c;提高了Avalonia应用程序的可用性。输入法编辑器&#xff08;I…

模型压缩-对模型结构进行优化

模型压缩-对模型结构进行优化 概述 模型压缩通常都是对推断过程而言&#xff0c;训练过程的计算代价通常不考虑&#xff0c;因为GPU可以快速完成任意复杂度模型的训练对于推断过程来说&#xff0c;模型应用才是对于速度敏感的场景多数情况下 希望使用尽可能少的能耗完成京可能…

通俗讲解傅里叶变换

参考:六一礼物:给孩子解释什么是傅里叶变换 牛!不看任何数学公式来讲解傅里叶变换 如何直观形象、生动有趣地给文科学生介绍傅里叶变换? - 知乎 从基说起…… 从数学的角度,提供一个形象有趣的解释。理解傅里叶变换的钥匙是理解基♂,它能让你重新认识世界。 1. 什么是…

概率有向图模型(一)

文章目录 前言概率有向图模型验证回到书中隐马尔可夫模型信念网络朴素贝耶斯 总结 前言 经过前面的复习&#xff0c;我们把李航老师的《统计学习方法》中的监督学习部分回顾了一遍&#xff0c;接下来我们在此基础上&#xff0c;开始学习邱锡鹏老师的《神经网络与深度学习》&am…

02_常见网络层协议的头结构

1.ARP报文的报文结构 ARP首部的5个字段的含义&#xff1a; 硬件类型&#xff1a;值为1表示以太网MAC地址。 协议类型&#xff1a;表示要映射的协议地址类型&#xff0c;0x0800 表示映射为IP地址。 硬件地址长度&#xff1a;在以太网ARP的请求和应答中都是6&#xff0c;表示M…

【LeetCode题目详解】第九章 动态规划part06 完全背的讲解 518. 零钱兑换 II 377. 组合总和 Ⅳ (day44补)

本文章代码以c为例&#xff01; 动态规划&#xff1a;完全背包理论基础 思路 # 完全背包 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09;&#…

QT QScrollArea控件 使用详解

本文详细的介绍了QScrollArea控件的各种操作&#xff0c;例如&#xff1a;新建界面、源文件、布局、进度条宽、进度条高、水平滚动条值、垂直滚动条值、移入事件、移出事件、效果图、其它文章等等操作。 实际开发中&#xff0c;一个界面上可能包含十几个控件&#xff0c;手动调…

Qt creator中项目的构建配置和运行设置

使用 Qt Creator 集成开发环境构建和运行程序是一件非常简单的事情&#xff0c;一个按钮或者一个快捷键搞定全部&#xff0c;大家已经都了解了。但是&#xff0c;这些看起来简单的过程&#xff0c;背后到底发生了什么呢&#xff1f; 点击 Qt Creator 项目模式&#xff0c;可以…

elasticsearch的DSL查询文档

DSL查询分类 查询所有&#xff1a;查询出所有数据&#xff0c;一般测试用。例如&#xff1a;match_all 全文检索&#xff08;full text&#xff09;查询&#xff1a;利用分词器对用户输入内容分词&#xff0c;然后去倒排索引库中匹配。例如&#xff1a; match_query multi_ma…

(源码版)2023 年高教社杯全国大学生数学建模竞赛-E 题 黄河水沙监测题一数据分析详解+Python代码

十分激动啊啊啊题目终于出来了&#xff01;&#xff01;官网6点就进去了结果直接卡死现在才拿到题目&#xff0c;我是打算A-E题全部做一遍。简单介绍一下我自己&#xff1a;博主专注建模四年&#xff0c;参与过大大小小数十来次数学建模&#xff0c;理解各类模型原理以及每种模…

50etf期权最多能开仓多少手?

50etf期权限仓限额的操作&#xff0c;是为了能更好防范和控制期权交易的风险&#xff0c;无论是期货还是期权&#xff0c;在交易中都有规定的持仓限额&#xff0c;不能超过某个额度&#xff0c;那么50etf期权最多能开仓多少手&#xff1f;下文为你们全面介绍&#xff01;本文来…

2023高教社杯数学建模E题思路模型 - 黄河水沙监测数据分析

# 1 赛题 E 题 黄河水沙监测数据分析 黄河是中华民族的母亲河。研究黄河水沙通量的变化规律对沿黄流域的环境治理、气候变 化和人民生活的影响&#xff0c; 以及对优化黄河流域水资源分配、协调人地关系、调水调沙、防洪减灾 等方面都具有重要的理论指导意义。 附件 1 给出了位…

项目01—基于nignx+keepalived双vip的负载均衡高可用Web集群

文章目录 一.项目介绍1.拓扑图2.详细介绍 二.前期准备1.项目环境2.IP划分 三. 项目步骤1.ansible部署软件环境1.1 安装ansible环境1.2 建立免密通道1.3 批量部署nginx 2.配置NFS服务器和负载均衡器搭建keepalived2.1 修改nginx的index.html界面2.2 nginx实现七层负载均衡2.4 使…

[LeetCode]栈,队列相关题目(C语言实现)

文章目录 LeetCode20. 有效的括号LeetCode225. 用队列实现栈LeetCode232. 用栈实现队列LeetCode622. 设计循环队列 LeetCode20. 有效的括号 题目 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有…

【数据结构与算法系列4】长度最小的子数组

给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度**。**如果不存在符合条件的子数组&#xff0c;返回 0 。 示例 1&#xff1a; 输入&…

读书笔记-《ON JAVA 中文版》-摘要24[第二十一章 数组]

文章目录 第二十一章 数组1. 数组特性2. 一等对象3. 返回数组4. 多维数组5. 泛型数组6. Arrays的fill方法7. Arrays的setAll方法8. 数组并行9. Arrays工具类10. 数组拷贝11. 数组比较12. 流和数组13. 数组排序14. binarySearch二分查找15. 本章小结 第二十一章 数组 1. 数组特…

【Spring面试】一、SpringBoot启动优化与Spring IoC

文章目录 Q1、SpringBoot可以同时处理多少请求Q2、SpringBoot如何优化启动速度Q3、谈谈对Spring的理解Q4、Spring的优缺点Q5、Spring IoC容器是什么&#xff1f;作用与优点&#xff1f;Q6、Spring IoC的实现机制是什么Q7、IoC和DI的区别是什么Q8、紧耦合与松耦合的区别&#xf…

算法训练day38|动态规划 part01(理论基础、LeetCode509. 斐波那契数、70. 爬楼梯、746. 使用最小花费爬楼梯)

文章目录 理论基础什么是动态规划动态规划的解题步骤 509. 斐波那契数思路分析代码实现思考总结 70. 爬楼梯思路分析代码实现思考总结 746. 使用最小花费爬楼梯思路分析代码实现 理论基础 什么是动态规划 动态规划&#xff0c;英文&#xff1a;Dynamic Programming&#xff0c…