07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

news2025/1/16 3:01:35

目录

  • Kafka --- 消息生产者
    • ★ 消息
    • ★ 消息的分发机制
    • ★ 分发到哪个分区
    • ★ 轮询策略(round-robin)
    • ★ 使用命令行工具发送消息
      • 演示添加消息
  • Kafka --- 消息消费者
    • ★ 消息消费者命令
    • ▲ 监听 【指定主题】 的所有消息:
    • ▲ 监听 【指定主题、指定分区】的所有消息:
    • ▲ 监听【指定主题、指定分区】的【新】消息:
    • ▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:
    • kafka灵活之处:

Kafka — 消息生产者


★ 消息

简单来说,就是一个数据项。

▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。

从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。

▲ 下面是一个示例事件:
key: “fkjava”
value: “publish a new Book”
timestamp: “Feb. 15, 2021 at 2:06 p.m.”


★ 消息的分发机制

【注意:】当程序向主题发送消息时,该消息会被立即分给该主题下某一个领导者分区。

消息生产者向消息主题发送消息,这些消息将会立即分发给该主题下某一个分区(此处说的都是领导者分区)来保存

主题下的每条消息只会保存在一个领导者分区中,而不会在多个领导者分区中保存多份

消息实际上是存在分区中的,往主题发送消息只是一种逻辑说法。
当生产者发送一条消息到一个主题的时候,实际上这个消息马上就会被直接分发到对应的某一个领导者分区当中。

非领导者分区只是领导者分区的后备,也就是备份而已,当领导者分区挂掉的时候,非领导者分区就有可能成为领导者分区。

但是真正能够直接与客户进行交互的,就是直接接收用户的数据,或者让用户来消费数据的只能是领导者分区。

在这里插入图片描述


★ 分发到哪个分区

当生产者发送一条消息时,它会按如下规则来决定该消息被分发到哪个分区:

优先级:

最优先:(1)如果在发送消息时指定了分区,则消息分发到指定的分区。

(2)如果发送消息时没有指定分区,但消息的key不为空,则基于key的hashCode来选择一个分区。

此处暗示了:在一段时间内:同一个key的多条消息,通常会被分发到同一个分区

最次:(3)如果既没有指定分区,且消息的key也是空,则用轮询策略(round-robin)来选择一个分区。


★ 轮询策略(round-robin)

轮询策略(round-robin)就是按顺序来分发消息,

比如下面一个主题有P0、P1、P2三个分区,

那么第一条消息被分发到P0分区,
第二条消息被分发到P1分区,
第三条消息被分发到P2分区,

以此类推,第四条消息又被分发到P0分区。
在这里插入图片描述


★ 使用命令行工具发送消息

Kafka提供了kafka-console-producer.bat(.sh)工具来发送消息,例如执行如下命令:

下面命令发送带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.key=true

上面命令指定向test2这个主题发送消息,并通过“parse.key=true”指定发送消息时会解析消息的key,

默认解析规则为:制表符(Tab键)之前的是key,制表符(Tab键)之后的是value。

如果不指定“parse.key=true”属性,则默认不解析消息的key,也就是发送不带key的消息。

下面命令发送不带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2


演示添加消息

因为演示过程中,我弄的3个节点老是只有一个能存活,所以把 kafka和zookeeper的存储数据的文件夹都删除了,重新启动,就好了。方便演示。

删除Kaka和zookeeper的存储数据的文件夹,重新启动

现在3个节点就能正常存活了。
在这里插入图片描述

接下来继续演示:
添加 不带key 和带key的消息,演示生产者发送消息:

解释:
1、演示生产者发送消息,消息发送到主题 test2 那里
2、发送没有带 key 的消息,kafka 采用的是轮询的策略,把消息存放到不同的分区里面;如图,test2 主题有4个分区(属于领导者分区),那么这8条消息就会轮询的发送到这4个分区里面
3、发送带key的消息,如图,key 是 ljh,kafka就会计算 ljh 这个key的hashcode 值,然后存放都某一个分区里面,因为key都是一样的,所以这几个key为ljh的消息都会被发到同一个分区里面。
但是具体发送到哪个分区,是无法指定的。
4、发送带key的消息,如图,key 和 value 之间要间隔一个 Tab 键,不要弄成空格键。

命令在上面

在这里插入图片描述

我自己再添加不同key的消息,可以看出新添加的两条消息,是存放在0分区的。

在这里插入图片描述



Kafka — 消息消费者

★ 消息消费者命令

消费者用于从消息主题读取消息,Kafka提供了kafka-console-consumer.bat工具命令从指定主题、甚至指定分区读取消息。
该工具支持如下常用选项:

 --bootstrap-server:指定要连接的Kafka主机和端口。
 --from-beginning:指定从开始读取消息。
 --group:指定组ID。
 
 --offset <String: consume offset>:指定从特定下标开始读取消息,
 比如将该选项设为1,表明从第2条消息开始读取;
 该选项还支持earliest和latest两个字符串值,
 其中earliest表示从最开始处读取(类似于--from-beginning选项的作用),
 latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。
 
 --partition <Integer: partition>:指定哪个分区。
 
 --property:用于指定一些额外属性,
   比如 print.timestamp=true 指定要输出时间戳,
       print.key=true表示输出消息key,
       print.offset=true表示打印消息的下标,
       print.partition=true表示打印分区信息。
       
 --topic:指定哪个主题。 


▲ 监听 【指定主题】 的所有消息:

这个监听命令,运行后是一直存在的,会一直监听,有新消息会马上监听出来的。

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

可以看到消息都成功存放在分区里面。
可以看到指定主题 test2 下的所有消息
但是目前还演示出轮询存放,先不理。

在这里插入图片描述



▲ 监听 【指定主题、指定分区】的所有消息:

看看分区2下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --partition 2^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

在这里插入图片描述

看看分区3下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --partition 3^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

分区3没有消息,所以没有任何显示:正确
在这里插入图片描述




查看分区0的消息:
有两条,正确
在这里插入图片描述


注意点:
–from-beginning:指定从开始读取消息。
添加这个命令,就是一直都会读取到从一开始就添加的消息,这样演示不够真实,因为我们消息消费之后,正常情况下我们不需要再查出来,所以可以用offset这个命令:
–offset <String: consume offset>:指定从特定下标开始读取消息。
下面就来演示这个offset命令:

 --offset <String: consume offset>:指定从特定下标开始读取消息,
 比如将该选项设为1,表明从第2条消息开始读取;
 该选项还支持earliest和latest两个字符串值,
 其中
 earliest表示从最开始处读取(类似于--from-beginning选项的作用),
 latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。

▲ 监听【指定主题、指定分区】的【新】消息:

(模拟传统的ActiveMQ、RabbitMQ的消息模型):

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --offset latest ^
 --partition 0^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

演示 latest 从最新处开始读取、即不读取之前的消息,latest是默认值。

如图:现在从最新开始出监听消息,为了演示能监听最新消息,我们再打开一个命令行小黑窗,来往这个 0 分区发送消息。

因为上面key 为 l 的消息是发送到 0 分区,所以接下来发送的消息key也设置为l

在这里插入图片描述

如图:生产者刚发送消息,消费者这边马上就监听到主题为test2,分区为0 的最新的消息。

在这里插入图片描述



▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:

这个 --offset 2 ^ 指定下标,查出来的消息是包括索引下标2 这条消息的。


先查出主题test2,分区0的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --partition 0^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

在这里插入图片描述


再查出 【主题test2,分区0,指定索引下标为2及之后】 的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --offset 2 ^
 --partition 0^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

上面有4条消息,所以2及之后的消息,应该有2条,为索引2 nnnnnnnnnn,索引3 mmmmmmmm

在这里插入图片描述



kafka灵活之处:

灵活之处, --offset latest ^ 这个设置,默认就是latest ,就是从最新处开始读取、不读取之前的消息,始终读取最新的消息,以前用过的消息就不会管了。
因为 kafka 内部有一个偏移主题来存储每一个分区里面的消息及消息曾经被读取到哪一条(哪个位置)。

更灵活之处:
我们可以通过 --offset 加上指定的索引下标,非常灵活的读取我们想要读取的哪个位置的消息。
从上面的消息监听可以看出,消息是一直保存在分区当中的,意味着消息被消费之后,并没有立即从分区中被删除,还可以被重复的使用,这就是kafka非常灵活的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1371021.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI与低代码解锁无限可能

前言 近年来&#xff0c;人工智能&#xff08;AI&#xff09;和低代码开发技术逐渐成为数字化转型的重要推动力。AI作为一项具有革命性潜力的技术&#xff0c;正在改变我们生活的方方面面。而低代码开发则提供了一种快速构建应用程序的方法&#xff0c;使得开发者无需深入编写…

【刷题日记】青少年CTF-A2 Crypto(全)

Caesar 题目难度&#xff1a;★ 题目描述&#xff1a;凯撒大帝在很早的时候发明了这个&#xff0c;你能解密出来吗&#xff1f;flag格式为&#xff1a;qsnctf{xxx}。 下载附件&#xff0c;题目提示告诉我们是凯撒了&#xff0c;一个简单的移位操作。 使用在线解码网站&#…

C语言基础语法跟练

题源&#xff1a;牛客网 1、输出"Hello Nowcoder!"。开始你的编程之旅吧。 #include <stdio.h>int main() {printf("Hello Nowcoder!");return 0; } 2、KiKi学会了printf在屏幕输出信息&#xff0c;他想输出一架小飞机。请帮他编写程序输出这架小…

react native中使用tailwind并配置自动补全

使用的第三方库是tailwind-react-native-classnames&#xff0c;同类的也有tailwind-rn&#xff0c;但是我更喜欢前者官方demo&#xff1a; import { View, Text } from react-native; import tw from twrnc;const MyComponent () > (<View style{twp-4 android:pt-2 b…

智能化配网故障定位技术:未来发展趋势与应用前景

在当今这个科技高速发展的时代&#xff0c;智能化技术已经渗透到了我们生活的方方面面。作为电力行业的重要组成部分&#xff0c;配电网的自动化和智能化水平也在不断提高。本文将重点介绍一种基于成熟的行波测距技术的智能化配网故障定位技术——配网行波型故障预警与定位系统…

iPhone语音备忘录怎么导出?这3种方法任你选择!

作为iPhone用户&#xff0c;我们应该会经常使用语音备忘录来记录一些重要的信息。有时候&#xff0c;我们可能需要将这些语音备忘录导出&#xff0c;以方便分享或备份。iphone语音备忘录怎么导出&#xff1f;今天&#xff0c;小编将为大家介绍3种导出iPhone语音备忘录的方法&am…

PyTorch: torch.nn 子模块及其在循环神经网络中的应用

目录 torch.nn子模块详解 nn.utils.rnn.PackedSequence 参数说明 注意事项 示例代码 nn.utils.rnn.pack_padded_sequence 参数说明 返回值 注意事项 示例代码 nn.utils.rnn.pad_packed_sequence 参数说明 返回值 注意事项 示例代码 nn.utils.rnn.pad_sequence …

FPGA之按键消抖

目录 1.原理 2.代码 2.1 key_filter.v 2.2 tb_key_filter.v 1.原理 按键分为自锁式按键和机械按键&#xff0c;图左边为自锁式按键 上图为RS触发器硬件消抖&#xff0c;当按键的个数比较多时常常使用软件消抖。硬件消抖会使用额外的器件占用电路板上的空间。 思路就是使用延…

XS5018B是一款针对 CMOS 图像传感器的高性价比图像信号处理芯片

产品概述 XS5018B 是一款针对 CMOS 图像传感器的高性价比图像信号处理芯片&#xff0c;支持 1M/2M 像素 图像传感器&#xff0c;一组 10-bit DVP 输入接口&#xff0c; ISP 具备优异的 3D 降噪功能&#xff0c;标清模拟输出支持 960H &#xff0c; 高清模拟输出支…

NR cell配置带宽时,如何设置carrierBandwidth?

NR中带宽在38.101中有规定。 如上是FR1 38.101-1中与带宽设定有关的table&#xff0c;协议中根据SCS规定的传输带宽和可以配置的RB 数如上表&#xff0c;也就是说在实网下或者lab测试配置带宽时要根据上表内容去配置&#xff0c;举例如下。 如上图分别是几种带宽的配置参数&…

设备管理系统建设方案书

1.设备管理系统 1.1.系统概述 需求描述 建立设备信息库&#xff0c;对设备相关档案的登录、整理。通过建立完善的设备档案&#xff0c;将设备的各类原始信息进行信息化统一管理&#xff0c;使设备档案查询工作方便快捷&#xff0c;维修管理&#xff0c;针对维修计划、维修工单…

2024律师课程推荐:iCourt律师执行实务集训营(赠《执行实务大礼包》)

律师行业竞争激烈&#xff0c;想要突破困境&#xff0c;就一定要把握蓝海机遇&#xff0c;实现提早布局。 如今&#xff0c;还有哪些业务是尚未被“卷起来”的“蓝海业务”&#xff1f; 从数据来看&#xff0c;执行业务一定是其中之一。 在 Alpha 系统中&#xff0c;以“执行…

JAVA静态引擎企业网站源码带文档

JAVA静态引擎企业网站源码带文档 系统介绍&#xff1a; 1.网站后台采用主流的 SSM 框架 jsp JSTL&#xff0c;网站前台采用freemaker静态化模版引擎生成html5 2.因为是生成的html&#xff0c;无需重复读取数据库&#xff0c;所以访问速度快&#xff0c;轻便&#xff0c;对服务器…

Pytorch张量通过索引获取指定数据

import torch x torch.tensor([1,2,3])x Out[3]: tensor([1, 2, 3])x[0] # 索引操作&#xff1a;取单个数字 Out[4]: tensor(1)x[0:1] # 切片操作&#xff1a;可以保持维度不变 Out[5]: tensor([1])x[torch.tensor([True,False,True])] # 布尔值索引&#xff0c;通过条件筛…

程序员面试技巧:成为HR心动的程序猿

文章目录 程序员必备的面试技巧导语一、准备充分二、突出亮点三、展示解决问题的能力四、良好的沟通能力五、积极展示学习态度示例结语&#x1f636; 写在结尾 程序员必备的面试技巧 “程序员必备的面试技巧&#xff0c;就像是编写一段完美的代码一样重要。在面试战场上&#…

探索Shadowsocks-Android:保护你的网络隐私

探索Shadowsocks-Android&#xff1a;保护你的网络隐私 I. 引言 在数字时代&#xff0c;网络隐私和安全变得愈发重要。我们越来越依赖互联网&#xff0c;但同时也面临着各种网络限制和监控。在这个背景下&#xff0c;Shadowsocks-Android应用程序应运而生&#xff0c;为用户提…

2024PMP考试新考纲-【过程领域】近期典型真题和超详细解析

前面的文章&#xff0c;华研荟讲解了三十多道PMP新考纲下的【人员People领域】的近年真题&#xff0c;这篇文章开始为大家分享【过程Process领域】的新考纲下的真题&#xff0c;进一步帮助大家体会和理解新考纲下PMP的考试特点和如何应用知识来解题&#xff0c;并且举一反三&am…

基于ZU19EG的100G-UDP解决方案

概述 本文档介绍ZU19EG与Mellanox CX6 100G网卡通信解决方案。 环境配置 FPGA硬件&#xff1a;519-ZU19EG的4路100G光纤PCIe加上计算卡 电脑&#xff1a;国产国鑫主板&#xff08;双PCU&#xff09;&#xff1a;Gooxi G2DA-B CPU:Intel Xeon Silver 2.2GHz 内存&#xff1…

React入门 - 04(从编写一个简单的 TodoList 说起)

继上一节我们已经对 React组件和 ”JSX语法“有了大概的了解&#xff0c;这一节我们继续在 react-demo这个工程里编写代码。这一节我们来简单实现一个 TodoList来更加了解编写组件的一些细节。 1、在编辑器中打开 react-demo这个工程 2、打开 index.js文件&#xff0c;将组件 …

Mysql判断一个表中的数据是否在另一个表存在

方式一&#xff1a; 判断A表中有多少条数据在B表中【存在】,并且显示这些数据–EXISTS语句 select A.ID, A.NAME from 表A where EXISTS(select * from 表B where A.IDB.ID) 判断A表中有多少条数据在B表中【不存在】&#xff0c;并且显示这些数据–NOT EXISTS语句 select …