kafka详解(三)

news2025/1/13 2:23:02

2.2 Kafka命令行操作

在这里插入图片描述

2.2.1 主题命令行操作

1)查看操作主题命令参数

[aa kafka]$ bin/kafka-topics.sh

在这里插入图片描述
2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/)

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3)创建first topic
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 3 --replication-factor 3
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

4)查看first主题的详情

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first	TopicId: 3pIfoppvRmq84FjACWzAgw	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102
	Topic: first	Partition: 1	Leader: 103	Replicas: 103,102,104	Isr: 103,102,104
	Topic: first	Partition: 2	Leader: 102	Replicas: 102,104,103	Isr: 102,104,103
[aa ~]$

5)修改分区数( 注意:分区数只能增加,不能减少,如果减少会报错!

[a kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 4
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first	TopicId: 3pIfoppvRmq84FjACWzAgw	PartitionCount: 4	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102
	Topic: first	Partition: 1	Leader: 103	Replicas: 103,102,104	Isr: 103,102,104
	Topic: first	Partition: 2	Leader: 102	Replicas: 102,104,103	Isr: 102,104,103
	Topic: first	Partition: 3	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102 

[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 2
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2.
[2023-09-13 19:22:16,891] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
 (kafka.admin.TopicCommand$)
[aa ~]$

6)再次查看first主题的详情

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

7)删除topic

[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --delete
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list

[aa ~]$

2.2.2 生产者命令行操作

1)查看操作生产者命令参数
[aa kafka]$ bin/kafka-console-producer.sh

在这里插入图片描述
2)发送消息

[aa kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>111
>222
>333
>

2.2.3 消费者命令行操作

[aa kafka]$ bin/kafka-console-consumer.sh

在这里插入图片描述

2)消费消息
[aa kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group test --from-beginning
111
222
333
还可以动态的生产和消费,比如102机器上输入
>444
103机器就会自动在结尾弹出
111
222
333
444

Kafka生产者

生产者消息发送流程

3.1.1 发送原理

Kafka的producer发送消息采用的是异步发送的方式
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程(两个线程是异步!),以及一个线程共享变量:RecordAccumulator。

  1. 在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator。
  2. Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
    在这里插入图片描述
    main线程将外部数据包装成kafka要求的格式ProducerRecord,类似于Flume中的Event.网络中进行数据传输都会序列化(kryo框架)。
    分区策略:涉及到生产者和消费者。生产者分区针对的是数据,消费者分区针对是分区怎么消费数据
    。RecordAccumulator(一种堆内存缓冲区)达到两种标准之后就唤醒Sender进行发送!bachsize(同一个队列中,两个时间非常紧密的数据可以形成一个bachsize)一般就是数据洪峰的时候;linger.ms就是在数据量非常小的时候;默认值0代表来一条发一条;
    sender发送也是异步发送,sender将RecordAccumulator中的数据包装成Request(一个批次包装成一个Request),sender发送Request1之后不等待响应就发送Request2,然后不等待响应就发送Request2,…Request5,Request6必须排队了。
    sender发送过去的数据在Leader中应该是先存在线程对应的内存中,还没等到磁盘中存储数据落盘的一个时间点决定是不是回复ack为0,此时就是不安全,时延最低!。为1的时候就是数据落盘之后再发送ack,此时数据安全性有所提高,稍慢!注意此时的fllower还没有数据!完全保证数据安全,Leader和follwer都罗盘,回复-1
    发送成功:清理网络客户端请求Request
    线程共享变量中RecordAccumulator清理数据,因为只有32M。
    发送失败:重试次数----int的最大值
    Selector是负责决定将数据发送到集群的哪个分区!
    注意:
    中间涉及到数据的发送和拉取都是异步的!main线程放数据和sender拉取数据并发送两个过程异步!
    一个队列只能发送到最右边的集群中的一个分区,假如有两个toptic,5个分区,就需要创建5个双端队列,队列内部才能形成批次(bachsize),所以只能发到一个分区!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1081870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3组件的通信方式

一、vue3组件通信方式 通信仓库地址:vue3_communication: 当前仓库为贾成豪老师使用组件通信案例 不管是vue2还是vue3,组件通信方式很重要,不管是项目还是面试都是经常用到的知识点。 比如:vue2组件通信方式 props:可以实现父子组件、子父组件、甚至兄弟组件通信 自定义事件:可…

【visionOS】从零开始创建第一个visionOS程序

前言:本來是看BonjourWeb的,但不自觉被apple visionOS吸引,因为这个概念的产品真的太前沿新颖了。 说不定到时候我会冲一冲~~~先简单学习下嘿嘿 为Apple Vision Pro创建一个新的应用程序和游戏世界。 介绍visionOS visionOS是苹果Vision Pr…

Linux基本指令(1)

Linux基本指令(1) 1.ls指令1.1ls的用法 2. pwd指令3.cd指令3.1 cd3.2补充内容3.3 cd - 指令3.4 cd ~ 指令 4. touch指令4.1stat指令 5.mkdir 指令6.rmdir/rm指令6.1补充内容 7.man指令8.nano 指令9.cat指令10 cp指令11 mv指令12 echo指令12.1 > 输出重…

二叉搜索树--验证二叉搜索树

验证二叉搜索树-力扣 98 题 解题思路:利用二叉树中序遍历的特性:遍历出来的结果是升序的即符合二叉搜索树 对于二叉树中序遍历不是太理解的,作者推荐的小白书:二叉树的初步认识_加瓦不加班的博客-CSDN博客 中序非递归实现 // 解…

抖音小店创业攻略,快速了解这些适合新手经营的类目

抖音小店是抖音平台上的一种新型电商形态,它允许用户在抖音上开设自己的小店,销售自己的商品。抖音小店的开设门槛低,成本也不高,因此很受新手创业者的青睐。那么,下面不若与众将介绍抖音小店中有哪些适合新手创业者经…

卫星影像-航拍影像-数据叠加到AutoCAD

卫星影像-航拍影像-数据叠加到AutoCAD 发布时间:2018-01-17 版权: 同步视频教程:卫星地图_高清卫星地图_卫星地图视频_卫星图像应用到AutoCAD工程设计(套合、叠加、配准) 视频教程:如何选择中央子午线或者…

.NET 8 中的调试增强功能

作者:James Newton-King 排版:Alan Wang 开发人员喜欢 .NET 强大且用户友好的调试体验。您可以在您选择的 IDE 中设置断点,启动已经附加上调试器的程序,逐步执行代码并查看 .NET 应用程序的状态。 在 .NET 8 中,我们致…

cdsn目录处理:```,```# 目录校正

原标题 <small> cdsn目录处理&#xff1a; &#xff0c;中间添加 # 空格 空行后 遇到的底部空行出错&#xff0c;书接上回&#xff0c;处理空行【python查找替换&#xff1a;查找空行&#xff0c;空行前后添加&#xff0c;中间添加 # 空格 空行后遇到的第1行文字&am…

React 组件传 children 的各种方案

自定义组件的时候往往需要传 children&#xff0c;由于写法比较多样&#xff0c;我就总结了一下。 方案列表 1. 类组件1.1 类组件&#xff0c;不使用解构1.2 类组件&#xff0c;使用解构 2. 函数组件2.1 函数组件&#xff0c;不使用解构2.2 函数组件&#xff0c;外部解构2.3 函…

根据前序遍历结果构造二叉搜索树

根据前序遍历结果构造二叉搜索树-力扣 1008 题 题目说明&#xff1a; 1.preorder 长度>1 2.preorder 没有重复值 直接插入 解题思路&#xff1a; 数组索引[0]的位置为根节点&#xff0c;与根节点开始比较&#xff0c;比根节点小的就往左边插&#xff0c;比根节点大的就往右…

WPF 窗口白屏问题分析与初步解决

环境描述 开发环境&#xff1a; Windows 11 Visual Studio 2022 .NET Framework 4.8 目标电脑环境 Windows10 默认包含了 .NET Framework 4.8 现象 编译好的WPF应用程序&#xff0c;是基于 .NET Framework 4.8开发的&#xff0c;在大部分电脑上可以正常使用。在某个客…

Python算法练习 10.11

leetcode 394 字符串解码 给定一个经过编码的字符串&#xff0c;返回它解码后的字符串。 编码规则为: k[encoded_string]&#xff0c;表示其中方括号内部的 encoded_string 正好重复 k 次。注意 k 保证为正整数。 你可以认为输入字符串总是有效的&#xff1b;输入字符串中没…

湖南首个,万应低代码软件技术专业校企共建基地落成!

导语 9月开学季&#xff0c;湖南省民族职业学院教育技术学院迎来了近5000名新生&#xff0c;而其中软件技术专业的205名新生尤为引人注目——他们是这个校企共建专业的第一批学生&#xff0c;也是学院软件技术专业新型校企合作的第一批受益者。 湖南首个 万应低代码软件技术…

10_11C++

思维导图 #include <iostream>using namespace std; class Person { private:string name; protected:int age; public:char sex; public:Person() {cout << "父类无参构造函数" << endl;}Person(string n,int a,char s):name(n),age(a),sex(s){co…

[译]Sentry:如何从数据存储中获得更强的一致性

翻译自&#xff1a;How to Get Stronger Consistency Out of a Datastore 地址&#xff1a;https://blog.sentry.io/2019/09/17/how-to-get-stronger-consistency-out-of-a-datastore Sentry的首要工作是接收、解析用户的异常信息。当用户异常信息大量上报时&#xff0c;Sentry…

Linux部署kubeedge 1.4

文章目录 一、机器信息二、环境准备&#xff08;所有节点操作&#xff09;2.1. 修改主机名2.2. 开启路由转发2.3.安装Docker&#xff08;所有节点&#xff09;2.4.部署K8S集群(单机版&#xff0c;云端节点) 2.5.安装Mosquitto&#xff08;只在边缘节点安装)三、安装kubeedge 1.…

数据库基础篇二

函数 约束 概述 概念&#xff1a;约束是作用于表字段上的规则&#xff0c;用于限制存储在表中的数据。目的&#xff1a;保证数据库中数据的正确、有效性和完整性。分类&#xff1a; 外键约束 外键用来让两张表的数据之间建立连接&#xff0c;从而保证数据的一致性和完整性…

2023全国大学生软件测试大赛开发者测试练习题满分答案(PairingHeap2023)

2023全国大学生软件测试大赛开发者测试练习题满分答案&#xff08;PairingHeap2023&#xff09; 题目详情题解代码&#xff08;直接全部复制到test类中即可&#xff09; 提示&#xff1a;该题只需要分支覆盖得分即可&#xff0c;不需要变异得分 题目详情 题解代码&#xff08;…

Kubernetes使用OkHttp客户端进行网络负载均衡

在一次内部Java服务审计中&#xff0c;我们发现一些请求没有在Kubernetes&#xff08;K8s&#xff09;网络上正确地实现负载均衡。导致我们深入研究的问题是HTTP 5xx错误率的急剧上升&#xff0c;由于CPU使用率非常高&#xff0c;垃圾收集事件的数量很多以及超时&#xff0c;但…

linux开发环境下出现Segmentation fault问题排查一

一、检测代码中是否有数组越界情况 更改以上数组为128*60后&#xff0c;正常。确认是数组溢出导致越界。 二、分析&#xff1a;一般情况下打印的字符刚好在50以内&#xff0c;但是在其它状态下测试&#xff0c;数据字符数据增加从而导致溢出 打印命令如下&#xff1a; sprin…