05、Kafka 操作命令

news2025/1/12 18:35:15

05、Kafka 操作命令

1、主题命令

(1)创建主题

kafka-topics.sh --create --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 4 --replication-factor 3

–bootstrap-server:设置kafka执行节点

–topic:主题名称

–partitions:设置分区数,可以用于并发消费。

–replication-factor:设置副本因子,数量不能大于kafka节点数。

(2)查看主题

kafka-topics.sh --list --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

image-20240509122153192

(3)在我们配置的文件夹下,

/usr/local/kafka_2.12-3.7.0/data

就可以看到topic对应的文件夹,其中 0,1,2,3是因为我们指定的partitions为4,创建了4个分区。

image-20240509160712720

在其他两台机器上,也同样有这三个文件夹,是因为我们的replication-factor 为3,表示会有三个副本,刚好对应三台机器。

(3)查询主题描述信息

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

image-20240509161337423

ReplicationFactor 表示副本数为3

Partition:表示当前的分区号

Replicas:表示副本存放的机器

Leader:表示三个副本中的leader

Isr:表示当前可用的机器id

(4)修改主题

分区数partitions可以修改,只能比原来的大。

replication-factor 一旦确定就不能修改了。

kafka-topics.sh --alter --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 6

image-20240509162034581

(5)删除主题

kafka-topics.sh --delete --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test

image-20240509165305344

2、生产消息

给test1主题,发送消息

[root@localhost bin]# kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

如果给一个不存在的主题,发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2
>message1

image-20240509171549191

第一个消息发送后会有警告,后续发送消息没有警告,因为会自动创建topic,并且指定

partitions 和 replication-factor 都为1。

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2

image-20240509171756305

但是,建议 topic 还是手动创建,应该 partitions 是可以修改的,但是 replication-factor

是不允许修改的。

3、消费消息

(1)消费最新数据

消费 test1 主题下的消息:

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

我们再启动一个生产者来发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

image-20240509172354876

此时查看我们的消费者,就会打印接受到的消息:

image-20240509172430840

上面消费者,只能消费最新的数据,无法消费历史数据。

(2)消费历史数据

消费 test1 主题下的历史消息:

  • –from-beginning 表示从头开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --from-beginning

image-20240509173256632

  • –offset earliest --partition 1 表示从1号分区的头部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset earliest --partition 1

image-20240509173418365

  • –offset latest --partition 1 表示从1号分区的尾部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset latest --partition 1

image-20240509173610825

  • –offset 2 --partition 1 从1号分区的offset 为2的位置开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset 2 --partition 1

image-20240509173747666

4、消费者组

消费者组其实就是一个容器,可以容纳若干个消费者,每一个消费者必须被包含在一个消费者组里面。

(1)通过 --group 来指定消费者组。

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --group my-group1

如果分组存在,则会将消费者添加到对应分组下,如果不存在,则会创建消费者组。

(2)查看已有的消费者组

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --list

image-20240509174505505

(3)删除消费者组

删除消费者组,必须要先保证消费者组下没有消费者:

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --delete --group my-group1

image-20240509174700845

(4)查看消费的位置

kafka-consumer-groups.sh --describe  --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --group my-group1

image-20240509175128763

PARTITION:表示分区号

CURRENT-OFFSET:当前消费到的offset下标

LOG-END-OFFSET:当前分区最新的offset下标

LAG:是偏移量,未消费的数量。

消费详情:

kafka消费者在消费数据的时候,都是分组消费的,不同的消费者组之间没有影响,都会去消费。在同一个组内消费,一条消息只能被一个消费者消费,同时一个分区数据只能被一个消费者消费,如果消费者数量大于分区数,则多余出来的消费者永远不会消费消息。如果分区数大于消费者,则会均匀的将分区分配给多个消费者。

image-20240510105221038

因此,kafka 的 topic 的 partition 个数代表是 kafka 的 topic 的并行度,同一时间最多可以有多个线程来消费 topic 的数据,所以如果要提高 kafka 的 topic 的消费能力,应该增大 partition 的个数。

5、手动平衡 leader:

# 使用的脚本是:kafka-leader-election.sh
# 必要的参数:
# --bootstrap-server:指定服务器列表
# --election-type:选举的类型,默认选择preferred即可
# 下面的三个参数三选一:
# --all-topic-partitions:平衡所有的主题、所有的分区
# --topic:平衡指定的主题,如果选择这个参数,则必须使用--partition指定分区
# --path-to-json-file:将需要平衡的主题、分区信息写入一个json文件,指定这个文件
#     json的格式: {"partitions": [{"topic": "test", "partition": 1}, {"topic": "test", "partition": 2}]}

[root@qianfeng01 data]$ kafka-leader-election.sh \
--bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 \
--election-type preferred \
--all-topic-partitions

6、kafka 自带的压力测试工具

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。

一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

(1)生产者Producer压测

kafka-producer-perf-test.sh \
--topic test \
--record-size 100 \
--num-records 100000 \
--throughput -1 \
--producer-props bootstrap.servers=192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

record-size:一条信息有多大,单位字节

num-records:总共发送多少条信息

throughput:每秒多少条信息,设置成-1,表示不限流,可测生产者最大吞吐量

producer-props:发送端端消息配置

结果:

100000 records sent, 27495.188342 records/sec (2.62 MB/sec), 1461.75 ms avg latency, 2183.00 ms max latency, 1696 ms 50th, 2103 ms 95th, 2177 ms 99th, 2181 ms 99.9th.

解析:

一共写入10万条消息

吞吐量为2.62 MB/sec

每次写入的平均延迟为1461.75ms

最大延迟2183.00 ms

(2)消费者 Consumer 压力测试

consumer测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能

kafka-consumer-perf-test.sh \
--broker-list hadoop100:9092 \
--topic test \
--fetch-size 10000 \
--messages 10000000 \
--threads 1

broker-list:节点地址

topic:指定topic名称

fetch-size:指定每个fetch的数据大小

messages:总共要消费的消息个数

结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

2020-06-27 13:17:57:490, 2020-06-27 13:18:11:751, 20.0272, 1.4043, 210000, 14725.4751, 1593235077858, -1593235063597, -0.0000, -0.0001

解释:

开始时间

结束时间

共消费数据:20.0272M

吞吐量:1.4043MB/s

共消费数据:210000条

平均每秒消费:14725.4751条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1659664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WordPress插件:链接自动识别转为超链接

WordPress插件&#xff1a;链接自动识别转为超链接 <?phpfunction open_links_in_new_tab() {add_filter(the_content, make_clickable);function autoblank($text) {$return str_replace(<a, <a target"_blank", $text);return $return;}add_filter(th…

轻松管理文件夹批量重命名:学会用关键词批量替换文件夹名称技巧

随着计算机中存储的文件和文件夹数量不断增加&#xff0c;如何有效地管理和组织它们变得尤为重要。批量重命名文件夹是提升文件管理效率的关键步骤之一。而利用云炫文件管理器中关键词批量替换文件夹名称的技巧&#xff0c;则可以帮助我们更快速地完成这一任务。 关键词批量替…

H5 云商城 file.php 文件上传致RCE漏洞复现

0x01 产品简介 H5 云商城是一个基于 H5 技术的电子商务平台,旨在为用户提供方便快捷的在线购物体验。多平台适配:H5 云商城采用 H5 技术开发,具有良好的跨平台适配性。无论是在电脑、手机还是平板等设备上,用户都可以通过网页浏览器访问和使用云商城,无需安装额外的应用程…

静电防护:企业生产过程中不可忽视的重要环节

静电对企业生产代加工过程的影响是严重的&#xff0c;它可能导致关键部件损坏、产品质量问题以及巨大的经济损失。 以一家生产空调的企业为例&#xff0c;由于未能有效预防静电&#xff0c;导致质量问题频发&#xff0c;损失惨重。这引发了对静电防护意识的反思与加强&#xf…

基于 C# 开源的 EF Core 查询计划可视化神器

介绍 EFCore.Visualizer 是 Entity Framework Core 查询计划调试器&#xff0c;一个开源的 EF Core 查询计划可视化工具, 您可以直接在 Visual Studio 中查看查询的查询计划&#xff0c;开箱即用&#xff0c;非常方便。目前&#xff0c;可视化工具支持 SQL Server 和 PostgreS…

付费课程系统怎么搭建_教会你制作知识付费网课

在信息爆炸的时代&#xff0c;知识如同繁星点点&#xff0c;但如何找到那颗指引我们前行的明星&#xff1f;付费课程系统&#xff0c;正是这样一个平台&#xff0c;它让知识的光芒汇聚&#xff0c;为你我照亮前行的道路。今天&#xff0c;就让我们一起探讨如何搭建一个引人入胜…

【C++】CentOS环境搭建-编译安装Boost库(附CMAKE编译文件)

【C】环境搭建-编译安装Boost库 Boost库简介Boost库安装通过YUM安装&#xff08;版本较低 V1.53.0&#xff09;通过编译安装&#xff08;官网最新版本1.85.0&#xff09;1.安装相关依赖2.查询官网下载最新安装包并解压3.编译Boost4.安装Boost库到系统路径 Boost库验证 Boost库简…

7-4 是否同一棵二叉搜索树

7-4 是否同一棵二叉搜索树&#xff08;25分&#xff09; 题目描述 给定一个插入序列就可以唯一确定一棵二叉搜索树。然而&#xff0c;一棵给定的二叉搜索树却可以由多种不同的插入序列得到。例如分别按照序列 {2, 1, 3} 和 {2, 3, 1} 插入初始为空的二叉搜索树&#xff0c;都得…

计算机毕业设计 | vue+springboot汽车销售管理系统(附源码)

1&#xff0c;项目介绍 本项目基于spring boot以及Vue开发&#xff0c;前端实现基于PanJiaChen所提供的开源后台项目vue-element-admin改造。 针对汽车销售提供客户信息、车辆信息、订单信息、销售人员管理、财务报表等功能&#xff0c;提供经理和销售两种角色进行管理。 2&…

程控负载的功能实现原理

程控负载&#xff0c;顾名思义&#xff0c;就是可以通过程序控制其工作状态的负载设备。它的主要功能是模拟实际负载的工作状态&#xff0c;为电源、电子设备等提供稳定的工作电流或电压。程控负载的功能实现原理主要包括以下几个方面&#xff1a; 1. 电流和电压调节&#xff1…

什么是短信群发上行和下行

短信群发是一种广泛应用于商业和个人通信的技术&#xff0c;通过一次多条的方式&#xff0c;可以快速高效地传递信息。在实际的群发过程中&#xff0c;会涉及到上行和下行的概念。本文将详细介绍什么是短信群发上行和下行&#xff0c;并解释它们的应用。 什么是短信群发上行 群…

JS-拖拽位移、放大缩小

对同一盒子拖拽位移、缩放&#xff0c;这其实是不符合js的逻辑的&#xff0c;位移和拖拽必然会互相影响&#xff0c;所以需要在布局上略加调整 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title…

【python量化交易】qteasy使用教程05——创建第一个自定义交易策略

创建第一个自定义交易策略 使用qteasy创建自定义交易策略开始前的准备工作本节的目标自定义策略的实现方法使用 qteasy 的 Strategy 策略类三种不同的自定义策略基类定义一个双均线择时交易策略定义策略运行时机定义策略需要的数据自定义交易策略的实现&#xff1a;realize()获…

短信清空了!华为手机短信删除了怎么恢复?

“有没有人知道这是怎么回事呀&#xff0c;原先有一千多条未读一直放着没管&#xff0c;昨天根本没打开短信这个软件&#xff0c;今晚突然发现只剩一条了&#xff0c;是华为手机自动清理了吗&#xff01;到底该怎么恢复呀&#xff1f;我真崩溃&#xff01;” 在日常生活中&…

(✌)粤嵌—2024/5/9—寻找两个正序数组的中位数

代码实现&#xff1a; int binary_search(int *arr, int n, int key) {int head 0, tail n - 1, mid;while (head < tail) {mid (head tail) / 2;if (arr[mid] key) {return mid;}if (arr[mid] > key) {tail mid - 1;} else {head mid 1;}}return head; }void in…

Spring Boot | Spring Boot 整合 “异步任务“ 的实现

目录&#xff1a; 一、异步任务1.1 "无返回值" 异步任务调用 :① 创建项目② 编写 "异步调用方法" ( 使用 Async 注解 )③ "主程序启动类"中 开启基于 "注解" 的异步任务支持 ( 使用EnableAsync注解 )④ 编写 "控制层" 相关…

Vue中常用指令

Vue中的常用指令 Vue中的常用指令内容渲染指令条件渲染指令事件绑定指令内联语句事件处理函数给事件处理函数传参 属性绑定指令列表渲染指令v-for中的key 双向绑定指令 Vue中的常用指令 概念&#xff1a;指令 是 Vue 提供的带有 v- 前缀 的 特殊 标签属性。Vue 会根据不同的【…

2024年数维杯高校数学建模竞赛(B题) 建模解析| 生物质和煤共热解问题的研究 |小鹿学长带队指引全代码文章与思路

我是鹿鹿学长&#xff0c;就读于上海交通大学&#xff0c;截至目前已经帮200人完成了建模与思路的构建的处理了&#xff5e; 本篇文章是鹿鹿学长经过深度思考&#xff0c;独辟蹊径&#xff0c;实现综合建模。独创复杂系统视角&#xff0c;帮助你解决数维杯的难关呀。 完整内容可…

Linux-远程登录

远程登录Linux服务器的两款小工具&#xff1a; 1、Xshell &#xff08;可以远程登录到Linux终端控制台&#xff09; 2、 Xftp (可以与Linux服务器互相传递文件) 家庭/学校免费 - NetSarang Website 下载地址 1、傻瓜式安装Xshell6 2、在Linux主机上查看 Linux主机的…

Java Web 学习笔记(一) —— MySQL(3)

目录 1 Mysql 函数1.1 日期函数1.2 判断函数1.3 字符函数1.4 数学函数 2 Mysql 性能2.1 提高操作数据库性能2.2 执行次数比较多的语句2.3 sql语句的执行效率 3 Mysql 优化&#xff08;***&#xff09;3.1 定位慢查询3.2 SQL执行计划3.3 索引3.3.1 索引介绍与分类3.3.2 索引的使…