【《Kafka 入门指南:从零基础到精通》】

news2024/11/15 19:40:41

前言:
💞💞大家好,我是书生♡,本阶段和大家一起分享和探索KAFKA,本篇文章主要讲述了:消息队列的基础知识,KAFKA消息队列等等。欢迎大家一起探索讨论!!!
💞💞代码是你的画笔,创新是你的画布,用它们绘出属于你的精彩世界,不断挑战,无限可能!

个人主页⭐: 书生♡
gitee主页🙋‍♂:闲客
专栏主页💞:大数据开发
博客领域💥:大数据开发,java编程,前端,算法,Python
写作风格💞:超前知识点,干货,思路讲解,通俗易懂
支持博主💖:关注⭐,点赞、收藏⭐、留言💬

目录

  • 1. 消息队列
    • 1.1 消息队列概念
    • 1.2 消息队列的主要特点
    • 1.3 消息队列应用场景
    • 1.4 消息队列的模式
      • 1.4.1 点对点模式
      • 1.4.2 发布与订阅
      • 1.4.3 总结
    • 1.5 常用消息队列比较
  • 2. Kafka消息队列
    • 2.1 Kafka的概念
    • 2.2 Kafka 的特性
    • 2.3 Kafka 的架构
  • 3. kafka的启动
  • 4. Kafka的使用
    • 4.1 主题操作
      • 4.1.1 创建主题
      • 4.1.2 查看主题
      • 4.1.3 增加主题分区数
      • 4.1.4 删除主题
    • 4.2 读取写入消息
    • 4.3 kafka tool工具
    • 4.3.1 使用Kafka Tool
      • 4.3.2 使用工具写入数据
  • 5. spark操作kafka
    • 5.1 读取消息数据
    • 5.2 写入数据
      • 5.2.1 粘性写入策略
      • 5.2.2 指定partition写入策略
      • 5.2.3 指定key写入策略
    • 5.3 读取写入演示
  • 6. python操作kafka
    • 6.1 读取消息数据
    • 6.2 写入消息数据
    • 6.3 演示操作
  • 7. kafka消息数据存储机制
    • 7.1 kafka消息数据存储介绍
      • Kafka 存储架构概述
    • 7.2 存储文件
    • 7.3 修改段文件的配置
  • 8. kafka读写流程
    • 8.1 写入消息流程
    • 8.2 读取消息流程

1. 消息队列

1.1 消息队列概念

消息队列是一种软件系统,它允许在两个或多个进程之间发送消息。这种通信模式非常适合分布式系统,因为它提供了进程间通信的解耦机制,使得生产者和消费者可以独立地进行扩展而不影响彼此。

  • 定义: 存储计算机之间传递的消息的容器(消息队列是用来存储传递的消息)
  • 作用: 存储数据, 防止数据丢失

在这里插入图片描述

1.2 消息队列的主要特点

消息队列的主要特点包括:

  1. 异步通信:消息队列允许一个程序向另一个程序发送消息,而不需要等待接收方处理完毕。
    • 同步: 同时执行, 应用B完成上一次指令后应用A再次发送指令
    • 异步: 应用之间不需要进行等待
  2. 解耦:发送者和接收者并不需要直接相互了解。它们只需要知道消息队列的存在。(解除应用之间的耦合性(应用之间的直接关联关系))
  3. 可靠性:消息队列通常会保证消息至少被传递一次,并且在消息被成功处理之前不会丢失。
  4. 可扩展性:由于消息队列可以处理大量并发的消息,因此很容易扩展系统以处理更多的负载。

常见的消息队列技术包括:

  • AMQP (Advanced Message Queuing Protocol):一种提供统一消息服务的应用层协议。
  • RabbitMQ:基于AMQP的一个消息中间件。
  • Apache Kafka:最初由LinkedIn开发,现在是Apache项目的一部分,主要用于构建实时数据管道和流应用。
  • Amazon Simple Notification Service (SNS)Simple Queue Service (SQS):AWS提供的消息服务。

1.3 消息队列应用场景

  • 解耦

    • 解除应用之间的耦合性(应用之间的直接关联关系)
      在这里插入图片描述
  • 异步

    • 同步: 同时执行, 应用B完成上一次指令后应用A再次发送指令
    • 异步: 应用之间不需要进行等待
      在这里插入图片描述
  • 削峰

    • 解决高并发问题
    • 某一刻突然产生大量数据, 应用无法立即处理完, 可以将数据存储在消息队列中
      在这里插入图片描述

1.4 消息队列的模式

消息队列是一种常用的设计模式,用于实现应用程序之间的异步通信。消息队列允许一个或多个生产者(Producer)发送消息到队列中,而一个或多个消费者(Consumer)可以从队列中取出消息并进行处理。这种模式有助于解耦系统组件,并提高系统的可扩展性和可靠性。

  1. 点对点(Point-to-Point, P2P)模式

    • 每个消息只被一个消费者消费。
    • 生产者发送消息后,消息队列负责将消息传递给一个消费者。
    • 一旦消息被消费者接收,它就会从队列中移除。
  2. 发布/订阅(Publish/Subscribe, Pub/Sub)模式

    • 生产者发布消息到一个特定的主题(Topic)。
    • 多个消费者可以订阅同一个主题。
    • 所有订阅该主题的消费者都会接收到生产者发布的消息副本。
  3. 请求/响应(Request/Response)模式

    • 一个客户端发送请求到服务器。
    • 服务器处理请求并返回响应。
    • 这种模式通常使用应答队列来处理往返通信。
  4. 广播(Broadcast)模式

    • 生产者发送消息到所有消费者。
    • 这种模式类似于 Pub/Sub,但通常不涉及订阅机制,而是所有消费者都能直接接收到消息。
  5. 扇入/扇出(Fan-In/Fan-Out)模式

    • 扇入:多个生产者将消息发送到一个队列,由一个或多个消费者处理。
    • 扇出:一个生产者将消息发送到多个队列,由不同的消费者分别处理。
  6. 优先级队列(Priority Queue)模式

    • 消息具有不同的优先级。
    • 消费者总是先处理优先级最高的消息。
  7. 工作队列(Work Queue)模式

    • 工作队列用于分发任务到多个工作者(Worker)之间。
    • 每个任务只被一个工作者处理一次。

1.4.1 点对点模式

点对点(Point-to-Point, P2P)模式:

在点对点模式下,消息被发送到队列中,每个消息只能被一个消费者接收。一旦消息被接收,它就会从队列中移除。这意味着消息是专有的,并且不会被其他消费者重复处理。

点对点模式包括三个角色:消息队列发送者(生产者)、接收者(消费者)

特点:

  • 每个消息只被一个消费者消费。
  • 每个消息只有一个接收者(Consumer),一旦被消费,消息就不再存在于消息队列中。
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息。
  • 接收者在成功接收消息之后需要向队列应答成功,以便消息队列删除当前接收的消息。
  • 消费者可以是临时的,即消费者不需要事先存在就能接收消息。
  • 如果消费者在消息到达之前没有连接,则会丢失消息,除非使用持久化或其他机制。

消息发送者生产消息发送到queue中,然后消息接收者从queue中取出并且消费消息。消息被消费后,queue中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

实际例子:
假设有一个电商网站,每当有新的订单产生时,需要通过电子邮件通知客服部门。在这个场景中,我们可以设置一个订单队列,每当有新订单时,订单系统将订单信息作为消息发送到这个队列中。客服部门的邮件服务订阅这个队列,一旦有新消息出现,就立即处理(发送邮件通知)。因为每条订单消息只需要被处理一次,所以这是一个典型的点对点模式的应用场景。

在这里插入图片描述

1.4.2 发布与订阅

发布/订阅(Publish/Subscribe, Pub/Sub)模式

在发布/订阅模式下,消息的生产者并不直接与消费者通信。相反,生产者将消息发送到一个特定的主题(Topic),而消费者则订阅这些主题。所有订阅了同一主题的消费者都会接收到生产者发送的所有消息。

发布/订阅模式包括三个角色:角色主题(Topic)、发布者(Publisher)、订阅者(Subscriber)。

特点:

  • 生产者和消费者之间是松散耦合的。
  • 消费者可以订阅多个主题。
  • 每个消息可以有多个订阅者。
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行。
  • 同一条消息可以被多个消费者接收。

实际例子:
假设有一个新闻聚合器应用,它需要从多个来源收集新闻并将其展示给用户。在这种情况下,不同类型的新闻源(如体育、科技、娱乐等)可以被视为不同的主题。新闻聚合器作为生产者,将新闻发送到相应的主题中。用户(消费者)可以根据自己的兴趣订阅不同的主题,比如订阅“科技”和“娱乐”主题。这样,每当有新的科技或娱乐新闻时,订阅了相应主题的用户就会接收到这些新闻。
在这里插入图片描述

1.4.3 总结

  • 点对点模式适用于消息需要被单一消费者处理的情况,例如订单处理或文件上传任务。
  • 发布/订阅模式适用于消息需要被多个消费者同时处理的情况,例如实时数据更新或新闻推送。

1.5 常用消息队列比较

在这里插入图片描述

  • 如果消息队列不是将要构建系统的重点,对消息队列功能和性能没有很高的要求,只需要一个快速上手易于维护的消息队列,建议使用 RabbitMQ
  • 如果系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,需要低延迟和高稳定性,建议使用 RocketMQ
  • 如果需要处理海量的消息,像收集日志、监控信息或是埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合的消息队列。

2. Kafka消息队列

2.1 Kafka的概念

Apache Kafka 是一款高性能、分布式、基于发布/订阅的消息系统,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的一个顶级项目。Kafka 被设计用于处理大量实时数据流,它的设计目标是提供高吞吐量、低延迟的数据传输能力,并且能够处理大规模的数据流。

Kafka 的核心概念

  1. 主题(Topics)

    • 主题是 Kafka 中消息分类的一种方式。
    • 生产者将消息发送到特定的主题,而消费者则订阅这些主题来接收消息。
    • 一个主题可以有多个分区(Partitions),以支持分布式存储和处理。
  2. 分区(Partitions)

    • 每个主题可以分为多个分区,这使得数据可以分布在多个服务器上。
    • 分区有助于提高系统的吞吐量和可用性。
    • 每个分区是一个有序的消息序列。
  3. 偏移量(Offsets)

    • 偏移量是消费者跟踪其在分区中位置的方式。
    • 消费者可以使用偏移量来确定从哪个位置开始读取消息。
  4. 生产者(Producers)

    • 生产者是向 Kafka 主题发布消息的客户端。
    • 生产者可以选择将消息发布到特定的主题和分区。
  5. 消费者(Consumers)

    • 消费者是从 Kafka 主题中读取消息的客户端。
    • 消费者可以组成消费者组(Consumer Groups),以便更好地管理消息的处理。
  6. 消费者组(Consumer Groups)

    • 消费者组是一组可以一起工作的消费者。
    • 每个消费者组内的消费者可以分配不同的分区,从而实现并行处理。
    • 当消息被一个消费者组内的任意消费者消费后,其他消费者就不会再接收到这条消息。
  7. Broker

    • Broker 是 Kafka 集群中的服务器节点。
    • Kafka 集群由一个或多个 Broker 组成。
    • Broker 负责存储消息、维护元数据、处理生产者和消费者的请求。

2.2 Kafka 的特性

Kafka 的特性

  • 高吞吐量:Kafka 能够处理大量的数据流,每秒可以处理数百万条消息。
  • 低延迟:Kafka 能够实现实时数据处理,延迟极低。
  • 持久性:Kafka 使用磁盘存储消息,因此消息不会丢失。
  • 容错性:Kafka 支持复制和分区,即使某些节点失败也能保证数据完整性和服务可用性。
  • 可伸缩性:Kafka 可以很容易地在集群中添加或删除节点。

使用场景

  • 日志收集:Kafka 常用于收集和聚合来自不同数据源的日志数据。
  • 流式处理:Kafka 可以作为实时数据流处理管道的基础,例如与 Apache Storm 或 Apache Flink 结合使用。
  • 消息系统:Kafka 可以替代传统的消息队列系统,如 RabbitMQ 或 ActiveMQ。
  • 事件处理:Kafka 可以用于处理各种事件,例如用户活动、设备状态变化等。

2.3 Kafka 的架构

在这里插入图片描述

  • 发布者(producer): 将生产的消息数据存储到消息队列的角色主题
  • 订阅者(consumer): 逻辑上是一个消费者组(由多个消费者组成), 消费消息队列中的消息数据(数据处理)
    • 消费者组中的每个消费者消费同一个主题下的不同分区消息
  • 消息队列(kafka)
    • broker: 服务器代理节点, 每台服务器上的kafka就是一个broker
    • topic: 角色主题, 人为地将消息数据分类
    • partition: 分区, 角色主题下的消息数据是分区(RDD分区,HDFS分块)存储
    • Replication: 副本, 每个分区的消息数据可以有多个副本
      • leader: 领导者副本, 处理消息的读写请求, 由broker主节点选取领导者
      • follower: 追随者副本, 只是用于备份
  • zookeeper: 管理消息队列
    • 选择kafka的broker主节点(控制器)
    • 管理消息队列中的元数据(主题名称, 偏移量等)
  • 偏移量(offset)
    • 消息数据是有顺序存储, 每个消息都有一个偏移量
    • 在同一个分区中, 消息数据按顺序存储, 偏移量是从0开始
    • 不同分区的消息数据是没有顺序, 偏移量都是从0开始

3. kafka的启动

  1. 安装 Kafka
    首先需要安装 Kafka 和 ZooKeeper。ZooKeeper 是 Kafka 集群所需的协调服务。

下载和安装 下载:访问 Apache Kafka 的官方网站 (http://kafka.apache.org/downloads)
下载最新版本的 Kafka。 解压:将下载的压缩包解压到一个合适的目录中。

  1. 配置
    ZooKeeper:编辑 config/zookeeper.properties 文件来配置 ZooKeeper 的参数。
    Kafka:编辑 config/server.properties 文件来配置 Kafka 的参数,例如监听端口、日志目录等。
  2. 启动 ZooKeeper 和 Kafka
    启动 ZooKeeper:运行 zookeeper-server-start.sh config/zookeeper.properties。
    启动 Kafka:运行 kafka-server-start.sh -daemon /export/server/kafka/config/server.properties

在这里插入图片描述
停止kafka

kafka-server-stop.sh

4. Kafka的使用

4.1 主题操作

4.1.1 创建主题

kafka-topics.sh --bootstrap-server node1:9092 --create --partitions 分区数 --replication-factor 副本数 --topic 主题名

  • –bootstrap-server: 连接kakfa
  • –create: 创建命令
  • –partitions: 分区数, 不设置默认为1
  • –replication-factor: 副本数, 不设置默认为1
kafka-topics.sh --bootstrap-server node1:9092 --create --partitions 3 --replication-factor 3 --topic itcast

在这里插入图片描述

4.1.2 查看主题

list: 查看所有主题名称
–describe: 查看所有主题详情信息
–topic:查看某个具体主题详请信息

查看全部分区列表
kafka-topics.sh --bootstrap-server node1:9092 --list

# list: 查看所有主题名称
kafka-topics.sh --bootstrap-server node1:9092 --list

在这里插入图片描述

查看全部分区的描述
kafka-topics.sh --bootstrap-server node1:9092 --describe

# --describe: 查看所有主题详情信息
kafka-topics.sh --bootstrap-server node1:9092 --describe

在这里插入图片描述

查看指定分区的信息
kafka-topics.sh --bootstrap-server node1:9092 --describe --topic 主题名


# --topic:查看某个具体主题详请信息
kafka-topics.sh --bootstrap-server node1:9092 --describe --topic it

在这里插入图片描述

分区信息
在这里插入图片描述

4.1.3 增加主题分区数

只能增加分区数, 不能减少(减少分区会丢失消息数据)

创建好主题后不能修改副本数, 副本中涉及消息元数据, 不能同步元数据

  • 必须要增加副本的话, 只能重新创建主题设置好副本数, 然后将原主题消息迁移到新主题中

语法:
kafka-topics.sh --bootstrap-server node1:9092 --alter --partitions 设置的分区数 --topic 主题名

# --alter: 修改命令
kafka-topics.sh --bootstrap-server node1:9092 --alter --partitions 3 --topic it

在这里插入图片描述

4.1.4 删除主题

–delete: 删除命令
kafka-topics.sh --bootstrap-server node1:9092 --delete --topic 主题名

# --delete: 删除命令
# 也可以在每台服务器节点上通过 rm -rf 删除目录 linux指令删除
# 延迟删除: 不会立即删除目录
kafka-topics.sh --bootstrap-server node1:9092 --delete --topic it

在这里插入图片描述

4.2 读取写入消息

此操作是kafka提供的测试服务, 测试kafka读写消息功能是否正常

  • 写入数据

创建生产者应用程序, 将其他系统产生的消息数据写入到kafka中
多个生产者可以同时将消息写入到同一个主题

kafka-console-producer.sh --broker-list node1:9092 --topic 主题名

  • kafka-console-producer.sh: 生产脚本
  • –broker-list: 连接kafka
# kafka-console-producer.sh: 生产脚本
# --broker-list: 连接kafka
kafka-console-producer.sh --broker-list node1:9092 --topic it

在这里插入图片描述

  • 读取数据

创建消费者应用程序, 读取kafka中存储的消息数据进行数据处理操作
多个消费者可以同时消费同一个主题消息

kafka-console-consumer.sh --bootstrap-server node1:9092 --topic 主题名

  • kafka-console-consumer.sh: 消费脚本
  • –bootstrap-server: 连接kafka
# kafka-console-consumer.sh: 消费脚本
# --bootstrap-server: 连接kafka
kafka-console-consumer.sh --bootstrap-server node1:9092 --topic it

在这里插入图片描述

  • 数据传递

注意点:写入数据和读取不能在一个页面中,可以在一台服务器的两个客户端中,也可以在不同的服务器中
读取数据只能读取连接之后写入的数据,未连接之前写入的是读取不了的

node1上写入数据
在这里插入图片描述
查看node2(成功读取刚刚写入的数据)在这里插入图片描述

4.3 kafka tool工具

4.3.1 使用Kafka Tool

我们一系列Kafka的操作是可以使用工具的
这个工具就叫kafka tool

安装好之后
在这里插入图片描述

修改node1下的 /etc/ssh/sshd_config 配置文件

  • 将sshd_config文件中 UseDNS yes 改为 UseDNS no,并取消注释
  • 重启sshd服务 systemctl restart sshd.service
  • 在windows系统的 C:\Windows\System32\drivers\etc\hosts 文件中添加映射
192.168.88.100 node1 node1.itcast.cn
192.168.88.101 node2 node2.itcast.cn
192.168.88.102 node3 node3.itcast.cn

连接服务器
在这里插入图片描述
三台都连接,窗机node1 ,node2,node3 变为绿色就可以了
在这里插入图片描述
创建删除查看主题,点击tyopic,右键可以创建主题和刷新,
点击进入页面之后,就可以窗创建。删除和其他的操作。
在这里插入图片描述

4.3.2 使用工具写入数据

在这里插入图片描述
在这里插入图片描述
查看数据
在这里插入图片描述
查看其他服务器
在这里插入图片描述
写入数据的方式
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. spark操作kafka

5.1 读取消息数据

我们使用spark.read.load(format='kafka', **options)读取数据,
**options里面是我们的配置文件

  • kafka.bootstrap.servers — kafka集群地址
  • subscribe ---- 订阅角色主题
  • startingOffsets — 起始偏移量,可选参数, 起始偏移量, -2->从第一条消息开始
  • endingOffsets ---- 结束偏移量,可选参数, 结束偏移量, -1->到最后一条消息结束(包含)

注意点:
① 左闭右开原则
② 起始偏移量不能超过结束偏移量
③ 必须要指定所有分区的偏移量
④ 起始偏移量不写, 表示从第一条消息开始;结束偏移量不写, 表示到最后一条消息结束(包含)

# 当前应用程序是消费者(订阅者)
from pyspark.sql import SparkSession

# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()

# 读取kafka数据源的消息数据
"""
startingOffsets:可选参数, 起始偏移量, -2->从第一条消息开始
endingOffsets:可选参数, 结束偏移量, -1->到最后一条消息结束(包含)
注意点:
① 左闭右开原则
② 起始偏移量不能超过结束偏移量
③ 必须要指定所有分区的偏移量
④ 起始偏移量不写, 表示从第一条消息开始; 结束偏移量不写, 表示到最后一条消息结束(包含) 
"""
options = {
	# kafka集群地址: ip:port
	'kafka.bootstrap.servers': 'node1:9092',
	# 订阅角色主题
	'subscribe': 'itcast',
	# 根据偏移量获取消息数据
	# {主题名称:{'分区编号':偏移量}}
	# -2:起始偏移量, 从第一条消息开始  -1:结束偏移量, 到最后一条消息数据
	'startingOffsets': """{"itcast":{"0":0, "1":2, "2":-2}}""",
	'endingOffsets': """{"itcast":{"0":2, "1":-1, "2":-1}}"""
}
df = spark.read.load(format='kafka', **options)
df.show()
"""
key:key值, kafka中的非结构化数据是以key-value形式存储
value:消息数据, 字节bytes类型
topic:主题
partition:分区
offset:偏移量
timestamp:时间
timestampType:时间类型
+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
"""
# 将bytes字节类型消息数据转换成字符串类型
df2 = df.select(df['value'].cast('string'), 'topic', 'partition', 'offset')
df2.show()

key:key值, kafka中的非结构化数据是以key-value形式存储 value:消息数据, 字节bytes类型 topic:主题
partition:分区
offset:偏移量
timestamp:时间
timestampType:时间类型
±—±-------------------±-----±--------±-----±-------------------±-------- | key| value| topic|partition|offset| timestamp|timestampType|
±—±-------------------±-----±--------±-----±-------------------±--------

5.2 写入数据

5.2.1 粘性写入策略

2版本默认写入策略, 保证应用程序中的消息数据有顺序的存储到同一个分区中,第一条消息写入到哪个分区, 剩余消息都写入和第一条消息相同的分区中

  • 第一步创建df对象
# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()

# 创建df对象
df = spark.createDataFrame(data=[[1, '小明', 18, '男'],
								 [2, '小红', 16, '女'],
								 [3, '张三', 22, '男'],
								 [4, '李四', 20, '男']], schema='id int, name string, age int, gender string')
df.show()
  • 将df的数据进行拼接,重命名为value,kafka中的消息数据是存储在value字段中的
new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'))
new_df.show()
  • 第三步,写入到kafka中

df.write.save(format='kafka', mode='append', **options)

options = {
	'kafka.bootstrap.servers': 'node1:9092, node2:9092',
	# 指定写入的主题, 注意参数名, 和消费者的参数名不一样
	'topic': 'itheima'
}
new_df.write.save(format='kafka', mode='append', **options)
# 当前应用程序是生产者(发布者)
# 粘性写入策略: 默认策略, 第一条消息写入到哪个分区, 剩余消息都写入和第一条消息相同的分区中
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()

# 创建df对象
df = spark.createDataFrame(data=[[1, '小明', 18, '男'],
								 [2, '小红', 16, '女'],
								 [3, '张三', 22, '男'],
								 [4, '李四', 20, '男']], schema='id int, name string, age int, gender string')
df.show()

# 将df的数据保存到kafka中
# 注意点: 将df的数据保存到value字段中, kafka中的消息数据是存储在value字段中的
new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'))
new_df.show()
options = {
	'kafka.bootstrap.servers': 'node1:9092, node2:9092',
	# 指定写入的主题, 注意参数名, 和消费者的参数名不一样
	'topic': 'itheima'
}
new_df.write.save(format='kafka', mode='append', **options)

5.2.2 指定partition写入策略

指定partition写入策略: 创建partition字段, 字段名:partition 字段值:分区编号
根据字段的值将消息数据存储到对应的分区中, 消息数据存储分布均匀

只需要改动,df 拼接这一步骤,判断不同的值,为不同的分区编号即可

字段名一定是partition

new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'),
				   F.when(F.col('gender') == "男", 0).when(df['gender'] == '女', 1).otherwise(2).alias('partition'))

5.2.3 指定key写入策略

指定key写入策略: 创建key字段, 字段名:key 字段值:值可以是某列的值
hash(字段值)%分区数=结果值…余数
根据字段的值进行哈希取余计算(余数=分区编号), 将余数相同的消息数据存储到相同的分区中, 消息数据存储分布均匀

只需要改动,df 拼接这一步骤,新添加一列

字段名一定是key

new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'),
				   df['gender'].alias('key'))

5.3 读取写入演示

  1. 启动写入数据的程序

在这里插入图片描述
在这里插入图片描述
2. 启动读取数据程序代码
在这里插入图片描述

6. python操作kafka

6.1 读取消息数据

  • 创建消费者对象

consumer = KafkaConsumer(‘主题名’,bootstrap_servers=‘node1:9092’)

# 消费者应用程序
# 需要借助kafka-python第三方库, 需要安装 pip install kafka-python
from kafka import KafkaConsumer

# 创建消费者对象
consumer = KafkaConsumer('itheima', bootstrap_servers='node1:9092')
print(consumer)
# 遍历消息,此时应用程序会一直阻塞(死循环),等待接受消息
for msg in consumer:
    # ConsumerRecord()类的对象
	print(msg)
	print(type(msg))
	# 获取对象属性, bytes字节类型
	print(msg.value)
	# 字符串类型
	print(msg.value.decode('utf-8'))

6.2 写入消息数据

  • 创建生产者对象

producer = KafkaProducer(bootstrap_servers=[‘node1:9092’])
producer.send(topic=‘主题名’, value=‘写入的数据’.encode(‘utf-8’))

# 生产者应用程序
# 需要借助kafka-python第三方库, 需要安装 pip install kafka-python
from kafka import KafkaProducer

# 创建生产者对象
producer = KafkaProducer(bootstrap_servers=['node1:9092'])

# 调用producer对象的send方法, 发送消息
# str.encode('utf-8'): 将字符串类型转换成字节bytes类型
# bytes.decode('utf-8'): 将字节bytes类型转换成字符串类型
producer.send(topic='itheima', value='hello world'.encode('utf-8'))

# 关闭生产对象
producer.close()

6.3 演示操作

  • 先启动读取数据的程序
    在这里插入图片描述
  • 启动写入数据的程序
    在这里插入图片描述
  • 再次查看读取数据的控制台
    在这里插入图片描述

7. kafka消息数据存储机制

7.1 kafka消息数据存储介绍

Apache Kafka 是一个分布式流处理平台,它提供了一种高效、可靠的方式来发布和订阅消息。Kafka 的消息存储机制是其核心特性之一,下面将详细介绍 Kafka 如何存储消息。

Kafka 存储架构概述

Kafka 的消息存储在磁盘上,并以一种高效的文件格式进行组织。每条消息都归属于一个特定的主题(Topic),每个主题可以被划分为多个分区(Partition),每个分区对应一个有序的消息序列。

主题(Topic)

  • 定义:主题是 Kafka 中消息分类的逻辑单位。每个主题可以有多个分区。
  • 分区:主题中的每个分区都是一个有序的消息队列。

分区(Partition)

  • 定义:分区是物理存储的基本单位,每个分区对应一个文件夹,该文件夹下存储着一系列的消息文件。
  • 复制因子:为了提高可用性和容错能力,每个分区可以有多个副本(Replica),这些副本分布在不同的 Broker 上。

消息存储

  • 消息文件:每个分区的消息存储在一个或多个消息文件中,这些文件按照时间顺序被分成多个段(Segment)。
  • 段文件:每个段文件都有一个唯一的名称,表示其在时间轴上的位置。例如,一个段文件可能命名为 00000000000.offset,其中 00000000000 表示该段的起始偏移量。
  • 索引文件:为了快速定位消息,每个段文件都有一个对应的索引文件,它记录了消息的位置信息。

存储细节

  • 文件格式:消息文件使用二进制格式存储,以提高读写效率。
  • 偏移量:每条消息都有一个全局唯一的偏移量,用于标识消息的位置。偏移量是在分区内的唯一标识符。
  • 保留策略:Kafka 支持两种消息保留策略:基于时间的保留和基于大小的保留。可以根据需要配置保留策略,以决定何时删除旧消息。

总结

  • 高吞吐量:Kafka 的消息存储设计使其能够实现高吞吐量和低延迟。
  • 持久化:消息存储在磁盘上,即使 Kafka Broker 故障也能保证数据不丢失。
  • 容错性:通过复制因子和分区副本,Kafka 提供了高度的容错性。

7.2 存储文件

每个段文件包含【.index、.timeindex、.log】三个文件, 每个段文件默认存储数据量为1G, 超过1G会创建新的segment段文件, 存储时间为168小时

消息数据是以二进制形式存储在.log文件中,需要通过特定的kafka指令查看数据

# --files: log文件名
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log

您提到的关于 Kafka 段文件的描述是正确的。Kafka 的每个段文件确实包含 .index.timeindex.log 三个文件,并且默认情况下,每个段文件的最大大小为 1GB。当达到这个大小限制时,Kafka 会自动创建一个新的段文件。此外,您提到的存储时间为 168 小时,这通常是指基于时间的消息保留策略。

下面是对 Kafka 段文件及其相关文件的详细说明:

段文件组成
每个段文件由以下三个文件组成:

  1. .log 文件:这是主要的消息文件,包含了所有消息的数据。消息是以二进制格式存储的,这使得读写效率非常高。
  2. .index 文件:这是索引文件,记录了消息的位置信息。它允许快速定位到消息在 .log 文件中的位置。
  3. .timeindex 文件:这是时间索引文件,记录了消息的时间戳和位置信息。它允许根据时间戳快速定位消息。

段文件的命名
段文件的命名规则如下:

  • .log 文件的命名格式为:<start_offset>.log,例如 00000000000.log
  • .index 文件的命名格式为:<start_offset>.index,例如 00000000000.index
  • .timeindex 文件的命名格式为:<start_offset>.timeindex,例如 00000000000.timeindex

段文件的大小限制
默认情况下,每个段文件的最大大小为 1GB。当段文件达到这个大小时,Kafka 会自动创建一个新的段文件,并将后续写入的消息存储在这个新的段文件中。这可以通过配置 log.segment.bytes 参数来更改。

消息保留策略
Kafka 支持两种消息保留策略:

  1. 基于时间的保留:可以配置消息保留的时间长度。例如,您可以设置消息保留 168 小时(7 天),这意味着超过 168 小时的消息会被删除。
  2. 基于大小的保留:可以配置消息保留的总大小。当达到设定的大小限制时,较旧的消息会被删除。

总结

  • 高效存储:Kafka 使用 .log 文件以二进制格式高效地存储消息。
  • 快速定位.index.timeindex 文件使得根据偏移量或时间戳快速定位消息成为可能。
  • 灵活的保留策略:可以通过配置灵活地设置消息的保留时间或大小。

7.3 修改段文件的配置

配置信息存储在kafka/config/server.properties文件中

# 指定存储数据路径
log.dirs=/export/server/kafka/data
# 指定数据的保存时间 单位:小时
log.retention.hours=168
# 指定存储数据的文件最大存储空间
log.segment.bytes=1073741824

8. kafka读写流程

8.1 写入消息流程

  • 基本写入流程
    • 生产者应用程序将消息写入到leader副本中
    • leader副本将写入的消息同步给follower副本
  • 多副本消息数据写入流程
    • 生产者应用程序将消息写入到leader副本中
    • leader副本将写入的消息根据ISR列表的副本编号顺序同步给follower副本
  • 多条消息数据写入时间间隔(如何保证消息数据不丢失)
    • 每条消息写入后都是有一个时间间隔, 可以进行设置
    • ack应答机制
      • 0: 不管当前条消息是否成功写入到leader副本, 直接写入下一条消息
      • 1: 默认机制, 保证当前条消息成功写入到leader副本后, 再写入下一条消息
      • -1或all: 保证当前条消息成功写入到leader和follower副本, 再写入下一条消息

在这里插入图片描述

8.2 读取消息流程

  • 单分区读取

    • 根据偏移量读取消息数据
    • 将消费过的消息元数据保存到 __consumer__offsets 主题下
      • 主题名称, 分区编号, 偏移量
    • 当前消费者对应当前分区
  • 多分区读取

    spark操作kafka, spark会自行进行资源分配, 分区数由executor执行器创建task线程进行处理, 设置了多少个executor, 当前订阅者(消费者组)就有多少个消费者

    • 分区数=消费者数:一个消费者对应一个分区, 不能读取其他分区消息, 负载均衡
    • 分区数>消费者数:采用轮询方式, 读取完第一个分区的第一条数据后再读取第二个分区的第一条数据
    • 分区数<消费者数:部分消费者不工作, 造成资源浪费问题

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2037845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用7EPhone云手机进行TikTok的矩阵运营

“根据市局机构Statista发布的报告显示&#xff0c;截至2024年4月&#xff0c;TikTok全球下载量超过49.2亿次&#xff0c;月度活跃用户数超过15.82亿。TikTok的流量受欢迎程度可想而知&#xff0c;也一跃成为了全球第五大最受欢迎的社交APP。” 人群密集的地方社区也是适合推广…

《pygame游戏开发实战指南》第八节 Sprite类和Group类

Sprite(精灵)是游戏中一个非常重要的概念。在游戏开发中&#xff0c;‌Sprite指的是一个可以移动、‌旋转或变换的二维图像&#xff0c;‌它负责管理游戏中的图像元素&#xff0c;‌使得开发者可以轻松地在游戏中创建各种动态效果和角色。‌pygame.sprite模块提供了Sprite类和G…

分类预测|基于鲸鱼优化-卷积-长短期记忆网络-注意力数据分类预测Matlab程序 WOA-CNN-LSTM-Attention

分类预测|基于鲸鱼优化-卷积-长短期记忆网络-注意力数据分类预测Matlab程序 WOA-CNN-LSTM-Attention 文章目录 前言分类预测|基于鲸鱼优化-卷积-长短期记忆网络-注意力数据分类预测Matlab程序 WOA-CNN-LSTM-Attention 一、WOA-CNN-LSTM-Attention模型1. 鲸鱼优化算法&#xff0…

产品经理的具体职责有哪些?

产品经理作为产品团队的核心角色&#xff0c;负责从概念到市场发布的整个过程&#xff0c;确保产品能够满足用户需求&#xff0c;实现商业目标。其工作职责广泛且深入&#xff0c;涵盖了产品规划、设计、开发、运营、推广等多个方面。以下是详细的工作职责描述以及产品经理的核…

一文彻底搞懂Transformer - 注意力机制

Transformer 一、注意力机制 Seq2Seq 注意力机制目标 Attention模块的主要作用是确定在给定上下文中哪些嵌入向量与当前任务最相关&#xff0c;并据此更新或调整这些嵌入向量的表示。 Transformer注意力机制 注意力机制案例 注意力机制计算公式 生成Q、K、V向量&#xff1a;对…

智能升降晾衣架:NRK3301语音识别模块ic让家务变得更轻松

对于经常做家务的人来说&#xff0c;洗衣服和晾衣服是一件非常耗费体力和时间的任务。传统的晾衣架安装在了一个固定的高度&#xff0c;挂衣服和取衣服需要通过撑衣杆来晾取衣物&#xff0c;即便是电动升降的晾衣架&#xff0c;也需要人手动去操作&#xff0c;增加了工作量。然…

Vue 项目中导入文件时如何默认找寻该文件夹下的 index.vue 文件

文章目录 需求分析 需求 如下图&#xff0c;在Vue 项目中导入 frequencyChange 文件夹时如何默认找寻该文件夹下的 index.vue 文件 分析 确保项目结构和命名约定 首先&#xff0c;确保你的 Vue 单文件组件按照约定命名&#xff0c;例如&#xff1a; components/Example/inde…

Python酷库之旅-第三方库Pandas(080)

目录 一、用法精讲 331、pandas.Series.str.repeat方法 331-1、语法 331-2、参数 331-3、功能 331-4、返回值 331-5、说明 331-6、用法 331-6-1、数据准备 331-6-2、代码示例 331-6-3、结果输出 332、pandas.Series.str.replace方法 332-1、语法 332-2、参数 33…

【QT常用技术讲解】QTableView添加QCheckBox、QPushButton

前言 QT展示列表信息的时候通常用到列表&#xff08;比如用户信息、机构信息、设备信息等菜单&#xff09;&#xff0c;当需要对某列进行修改、删除操作时&#xff0c;就需要加入按钮&#xff08;QPushButton&#xff09;&#xff0c;当需要对多列进行右键菜单操作时&#xff0…

DjangoRF-15-分布式celery应用

前面我们同步实现了测试任务的执行&#xff0c;但是它有一个致命的问题。 实际项目测试任务耗时会非常长&#xff0c;而django框架的请求是有超时的&#xff0c;哪怕没有超时&#xff0c;这么做显然不妥。所以需要使 用异步任务的方式来执行测试任务。 发送一个执行任务的请求&…

沐风老师3DMAX纹理工具箱TexTools使用方法详解

DMAX纹理工具箱TexTools是一组工具,可帮助任何纹理艺术家完成UV和纹理相关任务。主要理念是将典型步骤简化为简单的上下文相关单击。 大多数功能仅在3dMax中处于editUVW模式时才起作用(展开UVW修改器,然后单击编辑按钮)。 【版本要求】 3dMax9及更高版本 【安装方法】 将…

EmbeddedBuilder_v1.4.1.23782 - 在工程中添加自己的C实现文件

文章目录 EmbeddedBuilder_v1.4.1.23782 - 在工程中添加自己的C实现文件概述笔记添加自己的文件夹在文件夹中建立新文件在文件夹中载入已经存在的文件修改工程编译时的包含路径和库路径添加包含路径添加实现路径 在main.c或其他实现中添加自己的头文件引用和自己的函数调用保存…

Seaborn库

目录 主要功能和特点 使用方法 实例应用 Seaborn库的最新版本有哪些新功能和改进&#xff1f; 如何在Seaborn中实现复杂的数据预处理步骤&#xff0c;例如数据清洗和转换&#xff1f; Seaborn与其他数据可视化库&#xff08;如Matplotlib、Plotly&#xff09;相比有哪些优…

【图像去雾系列】使用暗通道先验去雾算法对图像进行去雾处理

目录 一 暗通道先验去雾算法 1 雾形成机理-大气散射模型 2 暗通道先验的整体思想 二 实践 一 暗通道先验去雾算法 论文名称:Single Image Haze Removal Using Dark Channel Prior 论文地址:Single Image Haze Removal Using Dark Channel Prior | IEEE Journals & …

合合信息的OCR技术在智能文档处理方面有哪些具体的应用案例?

智能文档处理(IDP)是利用人工智能技术,自动从复杂的非结构化和半结构化文档中抽取关键数据,并将其转换成结构化数据的技术。能够自动识别、提取并结构化处理文档中的关键信息。这种技术通常基于自然语言处理&#xff08;NLP&#xff09;和计算机视觉等先进技术&#xff0c;可以…

【连续4届EI检索,SPIE 出版】第五届信号处理与计算机科学国际学术会议(SPCS 2024,8月23-25)

第五届信号处理与计算机科学国际学术会议&#xff08;SPCS 2024) 将于2024年8月23-25日在中国哈尔滨举行。会议主要围绕信号处理与计算机科学等研究领域展开讨论。 会议旨在为从事信号处理与计算机科学研究的专家学者、工程技术人员、技术研发人员提供一个共享科研成果和前沿技…

如何使用Wireshake解密Wi-Fi QoS Data报文?

1. 使用Wireshake解密Wi-Fi数据报文 通常当Wi-Fi发生某些问题时&#xff0c;我们都会抓取Wi-Fi sniffer log&#xff0c;用以协助分析问题&#xff0c;但是如果Wi-Fi使用了加密&#xff0c;则我们无法从sniffer log中获取到IP数据的层级&#xff0c;因为在Wi-Fi报文中&#xf…

非专业人士的编程梦:低代码开发平台的崛起与挑战

文章目录 每日一句正能量前言技术概览基本概念主要特点市场现状适用性分析结论 效率与质量的权衡效率提升质量与安全的挑战企业应用开发中的利弊应对策略结论 挑战与机遇挑战机遇应对策略结论 后记 每日一句正能量 书读的越多而不加思考&#xff0c;你就会觉得你知道得很多&…

24/8/14算法笔记 复习_逻辑回归sigmoid

import numpy as np import matplotlib.pyplot as pltdef sigmoid(x):return 1/(1np.exp(-x))x np.linspace(-5,5,100) y sigmoid(x)plt.plot(x,y,colorgreen) #损失函数 from sklearn import datasets from sklearn.linear_model import LogisticRegression from mpl_toolki…

SpringBoot教程(二十一) | SpringBoot实现定时任务

SpringBoot教程&#xff08;二十一&#xff09; | SpringBoot实现定时任务 单点定时任务方式一&#xff1a;使用ScheduledEnableScheduling注解巨坑&#xff08;Scheduled任务都用了同一个线程去执行&#xff0c;导致定时任务存在堵塞&#xff09;解决办法一&#xff1a;添加自…