Kafka基础入门-代码实操

news2025/1/12 21:01:17

   Kafka是基于发布/订阅模式的消息队列,消息的生产和消费都需要指定主题,因此,我们想要实现消息的传递,第一步必选是创建一个主题(Topic)。下面我们看下在命令行和代码中都是如何创建主题和实现消息的传递的。

使用命令行操作Kafka

使用命令行操作主题

  • 使用kafka-topics.sh脚本来实现对Topic的操作
sh kafka-topics.sh

   执行命令之后,我们可以找到到下面这行提示,REQUIRED代表必须的,就是说我们想要实现对Kafak的操作必须要带有这个参数,表示我们要连接的Kafka具体服务。

--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
  connect to>                              to.   

   接下来,就让我们创建一个主题吧。

# --bootstrap-server  用于指定我们连接的Kafka服务地址,9092是默认端口号  
# --topic  指定要操作的Topic名称  
# --create 表示本次是要创建一个主题  
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --create
# 执行结果
Created topic test.

   查看下我们的主题是否创建成功

sh kafka-topics.sh --bootstrap-server localhost:9092 --list    
# 执行结果
test

   查看某一个主题的详细信息

sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --describe  
# 执行结果
Topic: test	TopicId: ehyjS3R3Saq8Cx2V1x0p7g	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: test	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

使用命令行消费数据

  • 我们通过kafka-console-consumer.sh来生产消息。
 sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
 #输出
hello kafka

使用命令行生产数据

  • 我们通过kafka-console-consumer.sh来生产消息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test  
# 输入
>hello kafka

   想了解如何启动Kafka,可以看这篇文章《Kafka基础入门》。

使用代码操作Kafka

   添加依赖包

 <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.1</version>
        </dependency>
    </dependencies>

生产者代码

        // 创建配置对象
        Map<String,Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
        // 对生产的数据的K,V 进行序列化的操作
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());

        // 创建生产者对象
        //      生产者需要设定泛型:数据类型的约束
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(configMap);
        // 创建数据
        //      构建数据时,需要传递三个参数
        //          第一个参数表示主题名称,主题不存在时会自动创建
        //          第二个参数表示数据的Key
        //          第二个参数表示数据的Value
        ProducerRecord<String,String> record = new ProducerRecord<String,String>("test","key","value");
        // 通过生产者对象,将数据发送到Kafka
        producer.send(record);
        //关闭生产者对象
        producer.close();

消费者代码

        // 创建配置对象
        Map<String,Object> configMap = new HashMap<>();
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
        // 对生产的数据的K,V 进行反序列化的操作
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"com.kafka.test");
        // 创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(configMap);
        //订阅主题
        kafkaConsumer.subscribe(Collections.singleton("test"));
        // 从Kafka中获取数据
        //      消费者从Kafka中拉取数据
        ConsumerRecords<String, String> datas = kafkaConsumer.poll(1000);
        datas.forEach(data ->{
            System.out.println(data);
        });
        // 关闭消费者对象
        kafkaConsumer.close();

在这里插入图片描述
点击下方名片,关注『编程青衫客』
随时随地获取最新好文章!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1926305.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TDesign组件库日常应用的一些注意事项

【前言】Element&#xff08;饿了么开源组件库&#xff09;在国内使用的普及率和覆盖率高于TDesign-vue&#xff08;腾讯开源组件库&#xff09;&#xff0c;这也导致日常开发遇到组件使用上的疑惑时&#xff0c;网上几乎搜索不到其文章解决方案&#xff0c;只能深挖官方文档或…

Python编程工具PyCharm和Jupyter Notebook的使用差异

在编写Python程序时需要用到相应的编程工具&#xff0c;PyCharm和Jupyter Notebook是最常用2款软件。 PyCharm是很强大的综合编程软件&#xff0c;代码提示、代码自动补全、语法检验、文本彩色显示等对于新手来说实在太方便了&#xff0c;但在做数据分析时发现不太方便&#xf…

UGUI优化篇(更新中)

UGUI优化篇 1. 基础概念2. 重要的类1. MaskableGraphic类继承了IMaskable类2. 两种遮罩的实现区别RectMask2DMask 3. 渲染部分知识深度测试深度测试的工作原理 渲染队列透明物体在渲染时怎么处理为什么透明效果会造成性能问题 1. 基础概念 所有UI都由网格绘制的如image由两个三…

成为CMake砖家(2): macOS创建CMake本地文档的app

大家好&#xff0c;我是白鱼。 使用 CMake 的小伙伴&#xff0c; 有的是在 Windows 上&#xff0c; 还有的是在 macOS 上。之前咱们讲了 windows 上查看 cmake 本地 html 文档的方式&#xff0c; 这篇讲讲 macOS 上查看 cmake 本地 html 文档的方法。 1. 问题描述 当使用 CMa…

数模·图论

matlab中图的表示 顶点集权值集的形式 s是源点&#xff0c;t是终点&#xff0c;w是对应的权值 调用graph(s,t,w)作为参数创建图 调用plot函数绘图plot(G,EdgeLabel,G.Edges.Weight,LineWidth,2) 设置x和y的坐标范围set(gca,XTick,[],YTick,[]) s[1 2 3]; t[4 1 2]; w[5 2 6]; …

程序包不存在【java: 程序包org.springframework.boot不存在】

1、问题提示&#xff1a;java: 程序包org.springframework.boot不存在 注意&#xff1a;已经下载好了程序包&#xff0c;就是提示不存在 2、解决办法

一个开源完全免费的无损视频或音频的剪切/裁剪/分割/截取和视频合并工具

大家好&#xff0c;今天给大家分享一款致力于成为顶尖跨平台FFmpeg图形用户界面应用的软件工具LosslessCut。 LosslessCut是一款致力于成为顶尖跨平台FFmpeg图形用户界面应用的软件工具&#xff0c;专为实现对视频、音频、字幕以及其他相关媒体资产的超高速无损编辑而精心打造。…

《后端程序猿 · EasyPOI 导入导出》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

蓝桥杯嵌入式第十五届模拟考试3解析

1 题目 2 程序 /* USER CODE BEGIN PTD */ char buf1[20],buf2[20],buf3[20],buf4[20],buf5[20],buf6[20],buf7[20],buf8[20],buf9[20]; struct keys {int step;int length;int state; }key[5]; int display; double v1,v2; int t; double v1l1.2,v1u2.2,v2l1.4,v2u3.0; dou…

深度学习复盘与论文复现D

文章目录 一、新环境搭建与适应1、easy_install和pip的安装使用2、关于安装包超时的解决方案3、brew安装包安装4、使用新环境运行以前项目5、解决win的pycharm修改内存后无法启动 二、Dataset 数据读取问题1、Lightning Torch 读取数据2、Pytorch的DataLoader数据读取机制3、Py…

Dify中的经济索引模式实现过程

当索引模式为经济时&#xff0c;使用离线的向量引擎、关键词索引等方式&#xff0c;降低了准确度但无需花费 Token。 一.提取函数**_extract** 根据不同文档类型进行内容的提取&#xff1a; def _extract(self, index_processor: BaseIndexProcessor, dataset_document: Data…

力扣经典题目之->移除值为val元素的讲解,的实现与讲解

一&#xff1a;题目 博主本文将用指向来形象的表示下标位的移动。 二&#xff1a;思路 1&#xff1a;两个整形&#xff0c;一个start&#xff0c;一个end&#xff0c;在一开始都 0&#xff0c;即这里都指向第一个元素。 2&#xff1a;在查到val之前&#xff0c;查一个&…

C语言 ——— 将一句英语短句中的单词进行倒置

目录 题目要求 代码实现 题目要求 将一句英语短句中的单词进行倒置&#xff0c;标点符号不倒置 如&#xff1a; 输入&#xff1a;"I like chongqing very much," 输出&#xff1a;"much, very chongqing like I" 代码实现 #include<stdio.h> #i…

c#与欧姆龙PLC通信——如何更改PLC的IP地址

前言 我们有时候需要改变欧姆龙Plc的ip地址,下图有两种更改方式,一种是已知之前Plc设置的Ip地址,还有一种是之前不知道Pl的Ip地址是多少,下面分别做介绍。 1、已知PLC的IP地址的情况下更改地址 假设已知PLC的Ip地址,比如本文中PLC的IP为192.168.1.2,我首先将电脑的IP地…

搭建调用链监控Zipkin和Sleuth

项目环境: win7、jdk8 1、添加依赖&#xff0c;添加了spring-cloud-starter-zipkin会自动导入Sleuth <!--Sleuth&#xff0c;zipkin--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zipkin</…

安卓onNewIntent 什么时候执行

一.详细介绍 onNewIntent 方法 onNewIntent 是 Android 中 Activity 生命周期的一部分。它在特定情况下被调用&#xff0c;主要用于处理新的 Intent&#xff0c;而不是创建新的 Activity 实例。详细介绍如下&#xff1a; 使用场景 singleTop 启动模式&#xff1a; 如果一个 Ac…

python+mysql图书管理系统,谈谈思路及实现代码

&#x1f3c6;本文收录于《CSDN问答解答》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&…

【链表】算法题(一) ---- 力扣 / 牛客

一、移除链表元素 移除链表中值为val的元素&#xff0c;并返回新的头节点 思路&#xff1a; 题目上这样说&#xff0c;我们就可以创建一个新的链表&#xff0c;将值不为val的节点&#xff0c;尾插到新的链表当中&#xff0c;最后返回新链表的头节点。 typedef struct ListNo…

java《字符串进阶篇》--习题逐语句分析及认识链式编程

一、前言 字符串相关的习题分享&#xff0c;随着学习的深入&#xff0c;应该要多做一些习题来巩固知识点&#xff0c;而不是一味的去学习新的东西。这几天尽可能地去给大家分享一些常用的方法及习题的讲解&#xff0c;希望大家认真观看&#xff0c;每一道题都有对应的分析。基…

GAMMA数据处理(八)

新学习了一个命令&#xff1a; SLC_cat_ScanSAR - Concatenate sequential ScanSAR burst SLC images (Sentinel-1, TSX, RCM...)&#xff0c;做数据拼接的。之前一直没有涉及到拼接问题&#xff0c;就一直没管。如果研究区包含两景SLC&#xff0c;可以拼接成一景。但是不知道…