【kafka03】消息队列与微服务之Kafka 读写数据

news2024/12/27 10:36:14

Kafka 读写数据

参考文档

Apache Kafka

常见命令

kafka-topics.sh            #消息的管理命令 
kafka-console-producer.sh  #生产者的模拟命令 
kafka-console-consumer.sh  #消费者的模拟命令   

创建 Topic

创建topic名为 chen,partitions(分区)为3,replication(每个分区的副本数/每个分区的分区因子)为 2

#新版命令
[root@node1 bin]#/usr/local/kafka/bin/kafka-topics.sh --create --topic chen --bootstrap-server 10.0.0.187:9092 --partitions 3 --replication-factor 2

#在各节点上观察生成的相关数据
[root@node1 ~]#ls /usr/local/kafka/data/
[root@node2 ~]#ls /usr/local/kafka/data/
[root@node3 ~]#ls /usr/local/kafka/data/
#旧版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --chen --zookeeper 10.0.0.187:2181,10.0.0.188:2181,10.0.0.189:2181 --partitions 3 --replication-factor 2 --topic wang
Created topic wang.

获取所有 Topic

#新版命令
[root@node1 bin]#/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.0.0.187:9092 
chen

#旧版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 
chen

验证 Topic 详情

状态说明:wang 有三个分区分别为0、1、2,分区0的leader是3 (broker.id),分区 0 有2 个副本,并且状态都为 lsr(ln-sync,表示可 以参加选举成为 leader)。

[root@node1 bin]#/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 10.0.0.187:9092  --topic chen
Topic: chen	TopicId: OHzFQnjYTYS_t-PyomxkSQ	PartitionCount: 3	ReplicationFactor: 2	Configs: 
	Topic: chen	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1	Elr: N/A	LastKnownElr: N/A
	Topic: chen	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2	Elr: N/A	LastKnownElr: N/A
	Topic: chen	Partition: 2	Leader: 2	Replicas: 2,3	Isr: 2,3	Elr: N/A	LastKnownElr: N/A

[root@node1 bin]#ls -1  /usr/local/kafka/data/
chen-0
chen-1
[root@node2 ~]#ls -1  /usr/local/kafka/data/
chen-1
chen-2
[root@node3 ~]#ls -1  /usr/local/kafka/data/
chen-0
chen-2

node3 leadernode1 leadernode2 leader
node1 follwernode2 follwernode3 follwer
p0p1p2

生产 Topic

kafka-console-producer.sh 格式

#发送消息命令格式:
 kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2>:<端口> --topic <topic名称>
 
#/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.187:9092,10.0.0.102:9092,10.0.0.103:9092 --topic chen

范例:

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.187:9092 --topic chen
>message1
>message2
>message3
>
#或者下面方式
[root@node1 ~]#/usr/local/kafka/bin/kafka-console-producer.sh --topic wang --bootstrap-server 10.0.0.101:9092

消费 Topic

kafka-console-consumer.sh 格式

#接收消息命令格式:
kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumer
property group.id=<组名称>

注意:

  • 消息者先生产消息,消费都后续才启动,也能收到之前生产的消息

  • 同一个消息在同一个group内的消费者只有被一个消费者消费,比如:共100条消息,在一个group内有A,B两个消费者,其中A消费 50条,B消费另外的50条消息。从而实现负载均衡,不同group内的消费者则可以同时消费同一个消息--from-beginning

  • 表示消费发布的消息也能收到,默认只能收到消费后发布的新消息

范例:

#交互式持续接收消息,按Ctrl+C退出
/usr/local/kafka/bin/kafka-console-consumer.sh --topic chen --bootstrap-server 10.0.0.187:9092 --from-beginning

 #一个消息同时只能被同一个组内一个消费者消费(单播机制),实现负载均衡,而不能组可以同时消费同一个消息(多播机制)
[root@node2 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic chen  --bootstrap-server 10.0.0.102:9092 --from-beginning --consumer-property group.id=group1
[root@node2 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic chen --bootstrap-server 10.0.0.102:9092 --from-beginning --consumer-property group.id=group1

删除 Topic

范例:

#注意:需要修改每个节点配置文件server.properties中的delete.topic.enable=true并重启
#新版本
[root@node3 ~]#/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic chen
#旧版本
[root@node3 ~]#/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 
10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 --topic chen
Topic wang is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

范例:删除zk下面 topic test

#无需修改配置文件server.properties,此方法很危险
[root@zookeeper-node1 ~]#zkCli.sh -server 10.0.0.103:2181
[zk: 10.0.0.103:2181(CONNECTED) 0] ls /brokers/topics
[zk: 10.0.0.103:2181(CONNECTED) 0] deleteall /brokers/topics/test
[zk: 10.0.0.103:2181(CONNECTED) 0] ls /brokers/topics

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2251429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LuaForWindows_v5.1.5-52.exe

Releases rjpcomputing/luaforwindows GitHub #lua C:\Users\Administrator\Desktop\test.lua print("Hello lua&#xff01;") print("ZengWenFeng 13805029595")

软件无线电(SDR)的架构及相关术语

今天简要介绍实现无线电系统调制和解调的主要方法&#xff0c;这在软件定义无线电(SDR)的背景下很重要。 外差和超外差 无线电发射机有两种主要架构——一种是从基带频率直接调制到射频频率&#xff08;称为外差&#xff09;&#xff0c;而第二种超外差是通过两个调制阶段来实…

【Electron学习笔记(四)】进程通信(IPC)

进程通信&#xff08;IPC&#xff09; 进程通信&#xff08;IPC&#xff09;前言正文1、渲染进程→主进程&#xff08;单向&#xff09;2、渲染进程⇌主进程&#xff08;双向&#xff09;3、主进程→渲染进程 进程通信&#xff08;IPC&#xff09; 前言 在Electron框架中&…

Power BI - Connect to SharePoint online list with Image column

1.简单介绍 当前SharePoint online list有modern和classic两种模式&#xff0c;现在使用modern模式的比较多。list中有Image类型的列&#xff0c;Power BI如何连接到SharePoint list并显示image呢 note, SharePoint list中的Image列&#xff0c;Lookup列&#xff0c;People列…

电机控制理论基础及其应用

电机控制理论是电气工程和自动化领域中的一个重要分支&#xff0c;它主要研究如何有效地控制电机的运行状态&#xff0c;包括速度、位置、扭矩等&#xff0c;以满足各种应用需求。电机控制理论的基础知识涵盖了电机的工作原理、数学模型、控制策略以及实现技术等方面。下面是一…

二十一、QT C++

1.1QT介绍 1.1.1 QT简介 Qt 是一个跨平台的应用程序和用户界面框架&#xff0c;用于开发图形用户界面&#xff08;GUI&#xff09;应用程序以及命令行工具。它最初由挪威的 Trolltech &#xff08;奇趣科技&#xff09;公司开发&#xff0c;现在由 Qt Company 维护&#xff…

基于Java Springboot蛋糕订购小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 微信…

Kubernetes 01

MESOS&#xff1a;APACHE 分布式资源管理框架 2019-5 Twitter退出&#xff0c;转向使用Kubernetes Docker Swarm 与Docker绑定&#xff0c;只对Docker的资源管理框架&#xff0c;阿里云默认Kubernetes Kubernetes&#xff1a;Google 10年的容器化基础框架&#xff0c;borg…

7. 现代卷积神经网络

文章目录 7.1. 深度卷积神经网络&#xff08;AlexNet&#xff09;7.2. 使用块的网络&#xff08;VGG&#xff09;7.3. 网络中的网络&#xff08;NiN&#xff09;7.4. 含并行连结的网络&#xff08;GoogLeNet&#xff09;7.5. 批量规范化7.5.1. 训练深层网络7.5.2. 批量规范化层…

芯片测试-射频中的单位

射频中的单位 &#x1f4a2;dB&#xff0c;dBc&#x1f4a2;&#x1f4a2;dB&#x1f4a2;&#x1f4a2;dBc&#x1f4a2;&#x1f4a2;3dB和0dB&#x1f4a2; &#x1f4a2;dBm和dBw&#x1f4a2;&#x1f4a2;dBuV&#xff0c;dBmV和dBV&#x1f4a2;&#x1f4a2;dBuV&#…

【C++】数字位数提取:从个位到十位的深入分析与理论拓展

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;第一题&#xff1a;提取个位数解法代码解法分析代码优化拓展思考&#xff1a;取模运算的普适性 &#x1f4af;第二题&#xff1a;提取十位数题目解读与思路分析方法一&…

opengl 三角形

最后效果&#xff1a; OpenGL version: 4.1 Metal 不知道为啥必须使用VAO 才行。 #include <glad/glad.h> #include <GLFW/glfw3.h>#include <iostream> #include <vector>void framebuffer_size_callback(GLFWwindow *window, int width, int heigh…

如何使用Postman优雅地进行接口自动加密与解密

引言 在上一篇文章中&#xff0c;分享了 Requests 自动加解密的方法&#xff0c;本篇文章分享一下更加方便的调试某个服务端接口。 Postman Postman 这个工具后端小伙伴应该相当熟悉了&#xff0c;一般情况下我们会在开发和逆向过程中使用它来快速向接口发送请求&#xff0c;…

PDF view | Chrome PDF Viewer |Chromium PDF Viewer等指纹修改

1、打开https://www.browserscan.net/zh/ 2、将internal-pdf-viewer改为 internal-pdf-viewer-jdtest看下效果&#xff1a; 3、源码修改&#xff1a; third_party\blink\renderer\modules\plugins\dom_plugin_array.cc namespace { DOMPlugin* MakeFakePlugin(String plugin_…

2024143读书笔记|《遇见》——立在城市的飞尘里,我们是一列忧愁而又快乐的树

2024143读书笔记|《遇见》——立在城市的飞尘里&#xff0c;我们是一列忧愁而又快乐的树 第1章 年年岁岁岁岁年年第2章 遇见第3章 有个叫“时间”的家伙走过第4章 初雪第6章 回首风烟 《华语散文温柔的一支笔&#xff1a;张晓风作品集&#xff08;共5册&#xff09;》作者张晓风…

python基础(五)

正则表达式 在编写处理字符串的程序或网页时&#xff0c;经常会有查找符合某些复杂规则的字符串的需要。正则表达式就是用于描述这些规则的工具。换句话说&#xff0c;正则表达式就是记录文本规则的代码。 符号解释示例说明.匹配任意字符b.t可以匹配bat / but / b#t / b1t等\…

【连接池】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…

《业务流程--穿越从概念到实践的丛林》读后感一:什么是业务流程

1.4 流程建模 画流程图常见的方法派别如下: 1)流图。美国国家标准协会(ANSI)制定的规范,利用VISIO中的流程符号进行画图。 2)事件驱动流程链(EPC)。ARIS(集成信息系统架构)。认为流程是由一系列事件触发,并且 针对事件的行为又将引发新的事件,流程的表现为“事件--功…

Sybase数据恢复—Sybase数据库无法启动,Sybase Central连接报错的处理案例

Sybase数据库数据恢复环境&#xff1a; Sybase数据库版本&#xff1a;SQL Anywhere 8.0。 Sybase数据库故障&分析&#xff1a; Sybase数据库无法启动。 错误提示&#xff1a; 使用Sybase Central连接报错。 数据库数据恢复工程师经过检测&#xff0c;发现Sybase数据库出现…

redis基础spark操作redis

Redis内存淘汰策略 将Redis用作缓存时&#xff0c;如果内存空间用满&#xff0c;就会自动驱逐老的数据。 为什么要使用内存淘汰策略呢&#xff1f; 当海量数据涌入redis&#xff0c;导致redis装不下了咋办&#xff0c;我们需要根据redis的内存淘汰策略&#xff0c;淘汰一些不那…