Kafka消费者api编写教程

news2024/11/19 19:29:58

1.基本属性配置

输入new Properties().var 回车

//创建属性

        Properties properties = new Properties();

       //连接集群

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

        //反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //指定消费者组id

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
    //创建属性
        Properties properties = new Properties();
       //连接集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //指定消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

    //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    /*//订阅主题
        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);*/

    //订阅分区
        //创建一个数组列表变量接收主题分区值
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        //指定要订阅的分区
        topicPartitions.add(new TopicPartition("customers",2));
        //订阅分区
        kafkaConsumer.assign(topicPartitions);

    //消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1791705.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云端狂飙:Django项目部署与性能优化的极速之旅

Hello&#xff0c;我是阿佑&#xff0c;这次阿佑将手把手带你亲自踏上Django项目从单机到云端的全过程&#xff0c;以及如何通过Docker实现项目的无缝迁移和扩展。不仅详细介绍了Docker的基本概念和操作&#xff0c;还深入探讨Docker Compose、Swarm和Kubernetes等高级工具的使…

车辆路径规划之Dubins曲线与RS曲线简述

描述 Dubins和RS曲线都是路径规划的经典算法&#xff0c;其中车辆运动学利用RS曲线居多&#xff0c;因此简单介绍Dubins并引出RS曲线。 花了点时间看了二者的论文&#xff0c;并阅读了一个开源的代码。 Dubins曲线 Dubins曲线是在满足曲率约束和规定的始端和末端的切线&#…

【python】IndexError: Replacement index 1 out of range for positional args tuple

成功解决“IndexError: Replacement index 1 out of range for positional args tuple”错误的全面指南 一、引言 在Python编程中&#xff0c;IndexError: Replacement index 1 out of range for positional args tuple这个错误通常发生在使用str.format()方法或者f-string&am…

网络编程 —— Http使用httpClient实现页面爬虫

先去找类型的a标签 取出图片所在网址 取出https://desk.3gbizhi.com/deskMV/438.html 搭建Form界面 Http类 public static HttpClient Client { get; } static Http() {HttpClientHandler handler new HttpClientHandler();//处理消息对象//ServerCertificateCustomValidat…

NeuralForecast TokenEmbedding 一维卷积 (Conv1d) 与矩阵乘法

NeuralForecast TokenEmbedding 一维卷积 (Conv1d) 与矩阵乘法 flyfish TokenEmbedding中使用了一维卷积 (Conv1d) TokenEmbedding 源码分析 在源码的基础上增加调用示例 下面会分析这段代码 import torch import torch.nn as nn class TokenEmbedding(nn.Module):def __i…

刷机 iPhone 进入恢复模式

文章目录 第 1 步&#xff1a;确保你有一台电脑&#xff08;Mac 或 PC&#xff09;第 2 步&#xff1a;将 iPhone 关机第 3 步&#xff1a;将 iPhone 置于恢复模式第 4 步&#xff1a;使用 Mac 或 PC 恢复 iPhone需要更多协助&#xff1f; 本文转载自&#xff1a;如果你忘记了 …

手写mybatis-预编译sql语句

sql表 mybatis数据库中的gxa_user表 /*Navicat Premium Data TransferSource Server : rootSource Server Type : MySQLSource Server Version : 80028Source Host : localhost:3306Source Schema : mybatisTarget Server Type : MySQLTarget…

图解DSPy:Prompt的时代终结者?!

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调重新阅读。而最新科技&#xff08;Mamba&#xff0c;xLSTM,KAN&#xff09;则提供了大模…

Windows Linux下查看静态库,动态库各种命令的总结

Windows环境下查看库文件 静态库(.lib) 使用lib.exe查看库内容 命令示例:lib /list C.lib使用dumpbin.exe查看库的详细信息 命令示例:dumpbin /headers C.lib动态链接库(.dll) 使用dumpbin.exe查看DLL的导出信息 命令示例:dumpbin /exports B.dll

选择富唯智能的可重构装配系统,就是选择了一个可靠的合作伙伴

在数字化、智能化的浪潮中&#xff0c;制造业正迎来一场前所未有的变革。而在这场变革中&#xff0c;富唯智能凭借其卓越的技术实力和创新能力&#xff0c;成为引领行业发展的领军企业。选择富唯智能的可重构装配系统&#xff0c;就是选择了一个可靠的合作伙伴&#xff0c;共同…

注册用户超6亿,哈啰发布年度可持续发展暨ESG报告

6月5日&#xff0c;哈啰发布《2023年度可持续发展暨ESG报告》&#xff0c;深入全面地展示2023年哈啰在可持续发展领域的举措和阶段性成果。 报告显示&#xff0c;哈啰始终遵循健康可持续的商业模式&#xff0c;以科技创新推动出行进化&#xff0c;在促进行业发展、环境友好、社…

Spring Boot 应用打 WAR 包后无法注册到 Nacos怎么办

你好&#xff0c;我是柳岸花开。 在微服务架构中&#xff0c;服务注册与发现是至关重要的一环。Nacos 作为阿里巴巴开源的注册中心&#xff0c;能够很好地满足这一需求。然而&#xff0c;在将 Spring Boot 应用打包成 WAR 部署到外部服务器时&#xff0c;可能会遇到服务无法注册…

【C++奇妙冒险】日期类Date的实现

文章目录 前言日期类Date的接口设计构造函数和打印函数获取日期并判断日期是否合法日期类的大小比较关系<运算符重载 判断小于运算符重载 判断相等<运算符重载 判断小于等于>运算符重载 判断大于> 运算符重载 判断大于等于! 运算符重载 不等于 日期类计算日期天数日…

WordPress 插件推荐:菜单缓存插件——Menu Caching

今天在缙哥哥博客上发现了一个 WordPress 速度优化插件的优化感觉很不错&#xff0c;明月自己装上也体验了一番&#xff0c; WordPress 菜单的载入速度无论是 PC 端和移动端都非常不错&#xff0c;并且这个叫 Menu Caching 的菜单缓存插件还完美的兼容 WPRocket&#xff0c;W3 …

现代园区管理工具:“园区运营管理平台”全景解析!

当下&#xff0c;我国各地区产业园区、工业园区、经济开发区、科技园区、商务园区如雨后春笋般迅速崛起&#xff0c;成为推动区域经济增长、促进产业升级的重要载体。然而&#xff0c;如何高效、智能地管理这些园区&#xff0c;提高这些园区的运营效率、服务质量和综合竞争力&a…

AI办公自动化:用kimi批量提取音频中的标题并重命名

很多音频文件&#xff0c;文件名很乱&#xff0c;需要根据音频信息中的标题聪明吗 在kimi中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;一步步的思考&#xff0c;完成以下脚本的撰写&#xff1a; 打开文件夹&#xff1a;E:\有声\a16z播客 读取里面所有的mp3格…

文件夹如何加密码?这4个文件夹加密方法值得一试!

文件夹如何加密码&#xff1f;在与朋友、家人和同事共享同一电脑计算机时&#xff0c;您可能有一些不希望他们查看的重要或机密文件。那么如何避免这种情况呢&#xff1f;使用密码保护锁定文件和文件夹可以提高你的数字隐私和安全性&#xff0c;因为这意味着你需要输入密码才能…

【React篇 】React项目中常用的工具库

我们可以从项目初始化、开发、构建、检查及发布的顺序总结react项目开发常用的工具库。 首先是初始化。 初始化工程项目一般用官方维护的 create-react-app&#xff0c;这个工具使用起来简单便捷&#xff0c;但 create-react-app 的配置隐藏比较深&#xff0c;修改配置时搭配…

重学java 64.IO流 字符流

Action speak louder than words —— 24.6.5 字符输入流 一、字节流读取中文的问题 1.注意&#xff1a; 字节流是万能流&#xff0c;这个万能更侧重于文件复制&#xff0c;但是尽量不要边读边看 2.原因&#xff1a; UTF-8&#xff1a;一个汉字占三个字节 GBK&#xff1a;一…

Tomcat相关概述和部署

目录 一、Tomcat知识 1.Tomcat概述 2.Tomcat组件构成 3.Tomcat 功能组件结构 4.Tomcat的请求过程 二、tomcat服务部署 1.老样子准备工作——关闭防火墙和selinux&#xff0c;防止其对安装过程的干扰 2.将准备好的软件包拖入/opt目录下&#xff0c;进行安装JDK 3.设置J…