消费者api编写教程

news2025/1/23 13:41:31

1.基本属性配置

输入new Properties().var 回车

//创建属性

        Properties properties = new Properties();

       //连接集群

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

        //反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //指定消费者组id

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
    //创建属性
        Properties properties = new Properties();
       //连接集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //指定消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

    //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    /*//订阅主题
        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);*/

    //订阅分区
        //创建一个数组列表变量接收主题分区值
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        //指定要订阅的分区
        topicPartitions.add(new TopicPartition("customers",2));
        //订阅分区
        kafkaConsumer.assign(topicPartitions);

    //消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1789199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI-PDF 摘要器推荐10个爆款:效率翻倍,省时省力的秘密武器

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

Excel中高级筛选多个条件怎么做?

高级筛选关键点就在条件设置&#xff0c;筛选条件可以设置多行多列&#xff0c;同一行之间的条件是“并且”的关系&#xff0c;同一列之间的条件是“或者”的关系。 我们以筛选厂家通用、大众&#xff0c;在北京、上海、成都&#xff0c;1月的数据为例来演示条件设置 一、按字…

Java项目:100 springboot共享汽车管理系统

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本共享汽车管理系统有管理员和用户。 管理员功能有个人中心&#xff0c;用户管理&#xff0c;投放地区管理&#xff0c;汽车信息管理&#xff0c;汽车…

Qwen-VL论文阅读

论文地址 其他同学的详细讲解 模型结构和参数大小 &#xff08;1&#xff09;LLM&#xff1a;Qwen-7B &#xff08;2&#xff09;Vision Encoder&#xff1a;ViT架构&#xff0c;初始化参数是 Openclip’s ViT-bigG。 在训练和推理过程中&#xff0c;输入的图像都被调整到…

elasticsearch安装与使用(3)-索引库可视化

把新建的index_test倒排索引库可视化 Stack Management->Index Management&#xff0c;查看新建的倒排索引库index_test Data views->Create data view Create data view Discover 参考 无需重新学习&#xff0c;使用 Kibana 查询/可视化 SLS 数据

Xxl-Job二开踩坑记录

Xxl-Job踩坑记录 将xxl-job二次开发了&#xff0c;然后在对接于拓展功能的时候发现了一些xxl-job在使用或性能上隐藏的坑&#xff1b; 接口请求超时 起初是设定业务方通过http接口调用xxl-job的增删改接口完成对任务的数据操作&#xff1b; 因此直接使用了内置提供的 XxlJo…

建筑技工精细木工工种模拟试题

是非题(对的划“√”&#xff0c;错的划“X”&#xff0c;答案写在每题括号内) 1.重结构中的受弯构件&#xff0c;可以使用部分有木节的木材。(√) 2.有部分木节的木材作受弯构件时&#xff0c;应把木节部分安置在受压区域。(√) 3.温度越高&#xff0c;干燥的速度越快&…

软件测试——Java单元测试(常用注解+断言)待续

1.软件及环境 软件&#xff1a;IDEA 环境&#xff1a;JDK1.8&#xff0c;Junit 4.13 2.环境配置 这里我们采用IDEA编辑器&#xff0c;利用Maven对项目进行构建&#xff0c;如下&#xff1a; 然后项目构建完之后&#xff0c;首先第一步是进入pom文件&#xff0c;添加Junit4.13依…

深度学习Day-19:DenseNet算法实战与解析

&#x1f368; 本文为&#xff1a;[&#x1f517;365天深度学习训练营] 中的学习记录博客 &#x1f356; 原作者&#xff1a;[K同学啊 | 接辅导、项目定制] 要求&#xff1a; 根据 Pytorch 代码&#xff0c;编写出 TensorFlow 代码研究 DenseNet 与 ResNetV 的区别改进思路是…

概率论与数理统计,重要知识点——全部公式总结

二、一维随机变量及其分布 五个分布参考另外一篇文章 四、随机变量的数字特征 大数定理以及中心极限定理 六、数理统计

Python量化交易学习——Part4:基于基本面的单因子选股策略

技术分析与基本面分析是股票价格分析最基础也是最经典的两个部分。技术分析是针对交易曲线及成交量等指标进行分析,基本面分析是基于公司的基本素质进行分析。 一般来说选股要先选行业,在选个股,之后根据技术分析选择买卖节点,因此针对行业及个股的基本面分析是选股的基础。…

python数据分析案例-研究生成绩分析

一、简介 在本次研究中&#xff0c;我们对2018年硕士生考试成绩数据进行了深入的统计分析。这项分析旨在探索不同因素如性别、生源背景、基因型以及出生月份等对学生成绩的潜在影响。我们使用了一系列的统计方法&#xff0c;包括描述性统计分析、相关性分析、分组分析以及方差…

【Java数据结构】二叉树详解(二)

&#x1f512;文章目录&#xff1a; 1.❤️❤️前言~&#x1f973;&#x1f389;&#x1f389;&#x1f389; 2. 二叉树的模拟——正文 2.1获取树中节点的个数 2.2获取叶子节点的个数 2.3获取第K层节点的个数 2.4获取二叉树的高度 2.5 检测值为value的元素是否存在 …

WPF Treeview控件开虚拟化后定位节点

不开虚拟化&#xff0c;可以用下面的方法直接定位 <Window x:Class"WpfApplication2.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"Title"Main…

Qt OPC UA通信

介绍 OPC UA全称Open Platform Unified Architecture&#xff0c;开放平台统一架构&#xff0c;是工业自动化领域通用的数据交换协议&#xff0c;它有两套主要的通信机制&#xff1a;1.客户端-服务器通信&#xff1b;2.发布订阅。Qt对OPC UA通信标准也提供了支持&#xff0c;目…

JDBC学习笔记(三)高级篇

一、JDBC 优化及工具类封装 1.1 现有问题 1.2 JDBC 工具类封装 V1.0 resources/db.properties配置文件&#xff1a; driverClassNamecom.mysql.cj.jdbc.Driver urljdbc:mysql:///atguigu usernameroot password123456 initialSize10 maxActive20 工具类代码&#xff1a; p…

代码随想录算法训练营第二十八天|93.复原IP地址 ,78.子集 ,90.子集II

93. 复原 IP 地址 - 力扣&#xff08;LeetCode&#xff09; class Solution {ArrayList<String> results new ArrayList<>();public List<String> restoreIpAddresses(String s) {if(s.length() > 12){return new ArrayList<>();}char[] ipChars …

f4pga环境搭建教程

f4pga环境搭建教程 背景介绍 FOSS Flows For FPGA (F4PGA) project&#xff0c;是一套开源的FPGA工具链&#xff0c;号称the GCC of FPGAs&#xff0c;作用是将写的硬件描述语言&#xff08;verilog或VHDL&#xff09;转化为可以在FPGA上运行的可执行文件&#xff08;bit文件…

Python实现PPT表格的编写包含新建修改插图(收藏备用)

自动创建一个ppt文件并创建好表格 代码要用到pptx库 pip install python-pptx 创建含有表格的ppt文件代码&#xff1a; from pptx import Presentation from pptx.util import Inches# 创建一个PPT对象 ppt Presentation()# 添加一个幻灯片 slide ppt.slides.add_slide(p…

原美团项目管理专业通道执行主席边国华受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 峰项标&#xff08;北京&#xff09;管理咨询有限公司常务副总裁、原美团项目管理专业通道执行主席边国华先生受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“从组织级项目管理能力的评价角度看企业实践”。大会将于6月2…