Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

news2025/1/15 22:42:17

目录

    • 一、独立消费者消费某一个主题中某个分区数据案例
      • 1.1、案例需求
      • 1.2、案例代码
      • 1.3、测试

一、独立消费者消费某一个主题中某个分区数据案例

1.1、案例需求

  • 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示:
    在这里插入图片描述

1.2、案例代码

  • 生产者往firstTopic主题 0 号分区发送数据代码

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
    
  • 消费者消费firstTopic主题 0 分区数据代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerPartition {
    
        public static void main(String[] args) {
            // 配置
            Properties properties = new Properties();
    
            // 连接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            // 1 创建一个消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题对应的分区
            ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
            topicPartitions.add(new TopicPartition("firstTopic",0));
            kafkaConsumer.assign(topicPartitions);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

1.3、测试

  • 在 IDEA 中执行消费者程序,如下图:
    在这里插入图片描述
  • 在 IDEA 中执行生产者程序 ,在控制台观察生成几个 0号分区的数据,如下图:
    在这里插入图片描述
  • 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/985945.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【逗老师的无线电】MMDVM盒子安装高颜值仪表盘

目录 开篇、高颜值仪表盘展示1、实时通联卡片2、精简信息仪表盘3、主仪表盘 一、下载W0CHP-PiStar-Dash二、安装1、解压2、刻录SD卡 三、基础配置1、首次启动2、初始化配置2.1、先配置联网2.2、配置热点参数2.3、配置DMR参数2.4、显示屏配置 3、通联测试 四、进阶操作1、自定义…

Unity——脚本与导航系统(上)

Unity内置了一个比较完善的导航系统&#xff0c;一般称为Nav Mesh&#xff08;导航网格&#xff09;&#xff0c;用它可以满足大多数游戏中角色自动导航的需求。 一、导航系统相关组件 Unity的导航系统由以下几个部分组成&#xff1a; Nav Mesh。Nav Mesh与具体的场景关联&…

Linux目录结构和远程使用

目录名作用根目录 ‘/’文件系统结构的起始点/root系统管理员的工作目录/home普通用户工作目录/bin存放二进制可执行文件&#xff0c;存放最经常使用的命令/sbin系统管理员使用的系统管理程序/boot启动linux时使用的一些核心文件/dev设备文件&#xff0c;包括块设备和字符设备/…

Google Chrome如何同步书签

前提 先确保能科学上网 操作步骤 然后&#xff0c;要在设备之间同步Google Chrome书签&#xff0c;可以用以下步骤&#xff1a; 在您的Mac和Windows设备上安装Google Chrome浏览器。 在您的Google Chrome账户中启用同步功能。如果您还没有Google Chrome账户&#xff0c;请先…

导入jdk源码并进行使用

jdk下载地址 JDK1.8源码下载地址 idea打开jdk项目 打开项目结构&#xff0c;进入SDK这一栏&#xff0c;选择一个使用的jdk&#xff0c;选择jdk的类路径 将类路径进行删除&#xff0c;并且添加我们自己下载的jdk中的src文件夹到类路径

去掉Egde浏览器选择文本弹出的搜索小按钮

去掉Egde浏览器选择文本弹出的搜索小按钮 小按钮 去掉&#xff1a;在设置中找到选择文本时的微型菜单&#xff0c;关闭【选择文本时显示迷你菜单】选项

洛谷 Array 数论

题目&#xff1a; 对于长度为n的数组A&#xff0c;A中只包含从1到n的整数&#xff08;可重复&#xff09;。如果A单调不上升或单调不下降&#xff0c;A就可称为美丽的。 找出在长度为n时&#xff0c;有几个美丽的A。 思路&#xff1a; 这是一道数论题。 我们先找找“单调不递…

计算机网络第三章——数据链路层(下)

提示&#xff1a;任何命运无论多么复杂&#xff0c;都只是反映在一瞬间 文章目录 局域网以太网无线局域网广域网及相关协议HDLC协议&#xff08;408已删&#xff09;链路层设备冲突域和广播域 局域网 局域网也有无线局域网&#xff0c; 我们日常生活中局域网主要是使用总线型这…

Linux 服务器运维管理面板1Panel体验

地址 https://github.com/1Panel-dev/1Panel 安装 根据GitHub提示运行即可 curl -sSL https://resource.fit2cloud.com/1panel/package/quick_start.sh -o quick_start.sh && sudo bash quick_start.sh安装成功&#xff0c;期间会安装docker 、docker-compose

box_iou交并比及assign_anchor_to_bbox个人理解

接上篇文章&#xff0c;李沐沐神的《动手学深度学习》中的show_bboxes还是比较好理解的&#xff0c;于是来看这两个方法 以下内容建议对照源代码理解 def box_iou 首先我们来设置boxes1和boxes2的初始值 boxes1 torch.tensor([[1,2,5,6],[2,1,4,6],[-1,2,7,6],[1,2,5,8]]) …

ubuntu14.04改静态ip

现在可能已经用ubuntu14.04的人已经不多了&#xff0c;这里讲一下Ubuntu14.04怎么改静态ip 第一步&#xff1a;输入ifconfig查看ip和子网掩码 第二步&#xff1a;输入route -n查看网关 上面ip是192.168.88.136&#xff0c;子网掩码是255.255.255.0&#xff0c;网关是192.168.…

中国ui设计师年终工作总结

一、萌芽阶段 记得初次应聘时&#xff0c;我对公司的认识仅仅局限于行业之一&#xff0c;对UI设计师一职的认识也局限于从事相对单纯的界面的设计创意和美术执行工作。除此之外&#xff0c;便一无所知了。所以&#xff0c;试用期中如何去认识、了解并熟悉自己所从事的行业&…

C++,day0907

#include <iostream>using namespace std; struct stu { private:int num; private:double score[32];public:void setNum(){cout <<"请输入学生人数:";cin >>num;}void input(){cout<<"请输入学生的成绩:"<<endl;for(int i…

Spring IOC之ListableBeanFactory

博主介绍&#xff1a;✌全网粉丝3W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

go work 不同包下mod + work实现.go文件的互相调用

一、文件架构 . ├── go.mod ├── go.work ├── main │ └── main.go └── util├── go.mod└── util.go其中go.mod module testgo 1.21.0其中go.work go 1.21.0use (../util )main/main.go 1 package main …

海外风控中这类征信数据与模型实践,实操落地效果最有效

今天的文章我们跟大家详细介绍&#xff0c;海外现金贷征信数据的特征介绍与建模实践。 希望今天的内容可以帮助各位童鞋了解海外现金贷的数据维度及其特征体系&#xff0c;并重点熟悉特征衍生的加工方法&#xff0c;在实际任务场景中加以实践应用。 首先&#xff0c;我们需要明…

从驾考科目二到自动驾驶,聊聊 GPU 为什么对自动驾驶很重要

“下一个项目&#xff0c;坡道起步。” …… “考试不合格&#xff0c;请将车子开到起点&#xff0c;重新验证考试。你的扣分项是&#xff1a;起步时间超30秒&#xff1a;扣100分。行驶过程中车轮轧到边线&#xff1a;扣100分。” 想必经历过驾驶证考试的同学&#xff0c;对…

Unity 之Material 类型和 MeshRenderer 组件中的 Materials 之间有一些重要的区别

文章目录 区别代码例子 区别 在Unity中&#xff0c;Material 类型和 MeshRenderer 组件中的 Materials 之间有一些重要的区别。 Material 类型&#xff1a; Material 是 Unity 中用来定义渲染属性的资源。它包含了一系列定义了如何绘制一个对象的属性&#xff0c;比如颜色、纹…

windows苹果商店上架ipa(基于appuploader)

参考文章&#xff1a; 上传ipa到appstore详细步骤 1、苹果商店地址&#xff1a;https://appstoreconnect.apple.com/apps 2、创建我的app 使用hbuilderx或apicloud云打包后&#xff0c;会生成一个ipa文件&#xff0c;而iphone是无法直接安装这个ipa文件的&#xff0c;需要将这…

【专栏必读】数字图像处理(MATLAB+Python)专栏目录导航及学习说明

文章目录 第一章&#xff1a;绪论第二章&#xff1a;数字图像处理基础第三章&#xff1a;图像基本运算第四章&#xff1a;图像的正交变换第五章&#xff1a;图像增强第六章&#xff1a;图像平滑第七章&#xff1a;图像锐化第八章&#xff1a;图像复原第九章&#xff1a;图像形态…