【数据采集与预处理】数据接入工具Kafka

news2025/4/8 10:30:58

目录

一、Kafka简介

(一)消息队列

(二)什么是Kafka

二、Kafka架构

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

(三)Zookeeper 存储结构

(四)Kafka 消费过程

四、Kafka准备工作

(一)Kafka安装配置

(二)启动Kafka

(三)测试Kafka是否正常工作

五、编写Spark Streaming程序使用Kafka数据源


一、Kafka简介

(一)消息队列

消息队列内部实现原理

1、点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
        点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

2、发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
        发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

(二)什么是Kafka

        Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

在流式计算中,Kafka 一般用来缓存数据,Storm 通过消费 Kafka 的数据进行计算。
1、Apache Kafka 是一个开源消息系统。是由 Apache 软件基金会开发的一个开源消息系统项目。
2、Kafka 最初是由 LinkedIn 公司开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3、Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。
4、无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性。

二、Kafka架构

1、Producer :消息生产者,就是向 kafka broker 发消息的客户端;
2、Consumer :消息消费者,向 kafka broker 取消息的客户端;
3、Topic :可以理解为一个队列;
4、Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic;
5、Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic;
6、Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给consumer,不保证一个 topic 的整体(多个 partition 间)的顺序;
7、Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

Producer写入流程:

1)producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2)producer 将消息发送给该 leader
3)leader 将消息写入本地 log
4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK

(三)Zookeeper 存储结构

注意:producer 不在 zk 中注册,消费者在 zk 中注册。 

(四)Kafka 消费过程

消费者组:

        消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
        在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。

四、Kafka准备工作

(一)Kafka安装配置

1、到官网下载jar包,保存至“/usr/local/uploads”目录下。

Apache Kafkaicon-default.png?t=N7T8https://kafka.apache.org/downloads

2、解压安装Kafka,并重命名解压后的文件夹。

[root@bigdata uploads]# tar -zxvf kafka_2.11-0.8.2.2.tgz -C /usr/local
[root@bigdata uploads]# cd ..
[root@bigdata local]# mv kafka_2.11-0.8.2.2/ kafka

3、配置Spark环境

[root@bigdata local]# cd ./spark/conf
[root@bigdata conf]# vi spark-env.sh

在文件的第一行接着添加如下内容: 

:/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*

接着,在“/usr/local/spark/jars”目录下新建文件夹kafka,并将“/usr/local/kafka/libs/”目录下的所有jar包都拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata spark]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir kafka
[root@bigdata jars]# cd kafka
[root@bigdata kafka]# cp /usr/local/kafka/libs/* .

然后,将“/usr/local/uploads/”下的spark-streaming-kafka-0-8_2.11-2.4.0.jar包也拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata kafka]# cp /usr/local/uploads/spark-streaming-kafka-0-8_2.11-2.4.0.jar .

spark-streaming-kafka-0-8_2.11-2.4.0.jar的下载地址:

http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0

下图是拷贝完成后的“/usr/local/spark/jars/kafka”目录下的所有jar包。

这样,Spark环境就配好了。

(二)启动Kafka

1、启动Zookeeper服务

打开一个终端,输入下面命令启动Zookeeper服务:

[root@bigdata kafka]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/zookeeper-server-start.sh config/zookeeper.properties

千万不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。

2、启动Kafka服务

打开第二个终端,然后输入下面命令启动Kafka服务:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# bin/kafka-server-start.sh config/server.properties

千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了

(三)测试Kafka是否正常工作

再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的Topic:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181 --replication-factor  1  --partitions  1  --topic  wordsendertest
#可以用list列出所有创建的Topic,验证是否创建成功
[root@bigdata kafka]# ./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181

replication-factor:每个partition的副本个数 

下面用生产者(Producer)来产生一些数据,请在当前终端(记作“数据源终端”)内继续输入下面命令:

[root@bigdata kafka]# ./bin/kafka-console-producer.sh  --broker-list  localhost:9092  --topic  wordsendertest

上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:

hello hadoop

hello spark

现在可以启动一个消费者,来查看刚才生产者产生的数据。请另外打开第四个终端,输入下面命令:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-console-consumer.sh  --zookeeper  localhost:2181  --topic  wordsendertest  --from-beginning

可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:

五、编写Spark Streaming程序使用Kafka数据源

在“/home/zhc/mycode/”路径下新建文件夹sparkstreaming,再在该文件夹下新建py文件KafkaWordCount.py。

#/home/zhc/mycode/sparkstreaming/KafkaWordCount.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: KafkaWordCount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

新建一个终端(记作“流计算终端”),执行KafkaWordCount.py,命令如下:

[root@bigdata zhc]# cd /home/zhc/mycode
[root@bigdata mycode]# mkdir sparkstreaming
[root@bigdata mycode]# cd sparkstreaming
[root@bigdata sparkstreaming]# vi KafkaWordCount.py
[root@bigdata sparkstreaming]# spark-submit KafkaWordCount.py localhost:2181 wordsendertest

这时再切换到之前已经打开的“数据源终端”,用键盘手动敲入一些英文单词,在流计算终端内就可以看到类似如下的词频统计动态结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1356248.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

源头厂家定制直线度测量仪 在线与离线检测均可

直线度的检测不再局限于直尺法、重力法等人工检测方式&#xff0c;随着自动化的发展&#xff0c;直线度检测也更需要自动化方便快捷的检测仪器。为此&#xff0c;研发了在线直线度测量仪与离线直线度测量仪&#xff0c;根据不同的需要&#xff0c;选择合适的设备即可。 数据计…

Java集合框架和泛型

1.Java集合框架 架构图&#xff1a; Java的集合框架是一组用于存储和操作数据的类和接口。它提供了各种数据结构&#xff0c;如列表、集合、映射等&#xff0c;以及用于操作这些数据结构的算法和工具。Java集合框架位于Java.util包中&#xff0c;并且是Java编程中常用的核心组…

ROS学习笔记(二):话题通信、服务通信的了解和对应节点的搭建(C++)

ROS学习笔记&#xff08;二&#xff09;&#xff1a;话题通信、服务通信的了解和对应节点的搭建&#xff08;C和Python&#xff09; 前言一、Topics话题通信&#xff08;C&#xff09;0、自定义msg消息类型文件1、发布者&#xff08;Publisher&#xff09;2、订阅者&#xff08…

引导和服务

目录 一、Linux操作系统引导过程 1、引导过程总览图 2、引导过程的详细步骤 二、系统初始化进程 1、init进程&#xff08;串行启动&#xff09; 2、Systemd&#xff08;并行启动&#xff09; 3、Centos6与Centos7的区别&#xff1a; 4、Systemd单元类型 5、运行级别所…

buuctf 逆向 findkey wp

首先看看怎么个事 点开也就这样了&#xff0c;没有输入的点&#xff0c;感觉和之前的 “刮开有奖” 有一点点相像 winmain长这个样子 看到消息循环了&#xff0c;下一步肯定就是找回调函数了 乍一看还没有&#xff0c;函数一个个点进去看发现sub_401023(hInstance&#xff09…

网站迁移和SEO:损害排名的常见错误

正在规划站点迁移&#xff1f; 迁移是更困难的 - 通常是可怕的 - SEO任务之一。 为了让它发挥作用&#xff0c;你需要避免常见的陷阱&#xff0c;这些陷阱可能会影响你的知名度&#xff0c;并导致流量和收入的损失。 8 月 11 日&#xff0c;我主持了一场赞助的搜索引擎杂志网…

分享10篇优秀论文,涉及图神经网络、大模型优化、表格分析

引言 第38届AAAI人工智能年度会议将于2024年2月在加拿大温哥华举行。今天给大家分享十篇AAAI2024论文&#xff0c;主要涉及图神经网络&#xff0c;大模型幻觉、中文书法文字生成、表格数据分析、KGs错误检测、多模态Prompt、思维图生成等。 论文获取方式&#xff0c;回复&am…

Win32 TEXT()宏学习

之前学习了_T()宏&#xff1b; _T()是MFC的&#xff1b; TEXT()是win32的&#xff1b; _T("")定义于tchar.h&#xff1b; TEXT宏是windows程序设计中经常遇到的宏&#xff0c;定义在 <winnt.h>中&#xff1b; 如果使用UNICODE字符集&#xff0c;则TEXT&…

小兔鲜儿 uniapp - 项目打包

目录 微信小程序端​ 核心步骤​ 步骤图示​ 条件编译​ 条件编译语法​ 打包为 H5 端​ 核心步骤​ 路由基础路径​ 打包为 APP 端​ 微信小程序端​ 把当前 uni-app 项目打包成微信小程序端&#xff0c;并发布上线。 核心步骤​ 运行打包命令 pnpm build:mp-weix…

RK3399平台入门到精通系列讲解(实验篇)IO 多路复用实验之poll实验

🚀返回总目录 文章目录 一、IO 多路复用:poll介绍二、实验源码2.1、Makefile2.2、poll 实验驱动2.3、poll 驱动测试应用程序一、IO 多路复用:poll介绍 IO 多路复用是一种同步的 IO 模型。IO 多路复用可以实现一个进程监视多个文件描述符。 一旦某个文件描述符准备就绪,就通…

三款推荐的 FTP 工具

&#x1f947; 版权: 本文由【墨理学AI】原创、在CSDN首发、各位大佬、敬请查阅&#x1f389; 声明: 作为全网 AI 领域 干货最多的博主之一&#xff0c;❤️ 不负光阴不负卿 ❤️ 文章目录 三款推荐的 FTP 工具filezillawinscpFinalShell SSHXftp❤️ 人生苦短&#xff0c; 欢迎…

Excelize 入选“2023开源创新榜”优秀开源项目

近日&#xff0c;由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办&#xff0c;CSDN 承办的 2023 开源创新榜专家评审会在国家科技传播中心成功举办。Excelize 电子表格文档开源基础库入选“2023开源创新榜”优秀开源项目。 评审委员…

Javaweb之Mybatis的基础操作之删除的详细解析

1.3 删除 1.3.1 功能实现 页面原型&#xff1a; 当我们点击后面的"删除"按钮时&#xff0c;前端页面会给服务端传递一个参数&#xff0c;也就是该行数据的ID。 我们接收到ID后&#xff0c;根据ID删除数据即可。 功能&#xff1a;根据主键删除数据 SQL语句 -- 删除…

java每日一题——输出星星塔(答案及编程思路)

前言&#xff1a; 打好基础&#xff0c;daydayup! 题目&#xff1a;请编写输出如下图的星星塔 编程思路&#xff1a;1&#xff0c;计算要输入几行&#xff1b;2&#xff0c;计算每行的⭐数量&#xff0c;及空格的数量&#xff1b;计算相应的关系&#xff1b; 如图&#xff1a;假…

SpringBoot项目处理 多数据源问题(把本地库数据 推送 到另外一个平台的库)

一、需求梳理 把我方数据库的表中数据 ----------> 推送到第三方的数据库 相当于库对库的数据插入, 但是需要的是用代码的方式实现; 二、解决思维 (1) 首先,平台与平台之间的数据库对接; 处理点1: 字段转换 (库表之间的数据字段不一致问题) 解决方式: 挨个字段的对应,如…

软件测试基础理论学习-软件测试方法论

软件测试方法论 软件测试的方法应该建立在不同的软件测试类型上&#xff0c;不同的测试类型会存在不同的方法。本文以软件测试中常见的黑盒测试为例&#xff0c;简述常见软件测试方法。 黑盒测试用例设计方法包括等价类划分法、边界值分析法、因果图法、判定表驱动法、正交试…

(NeRF学习)NeRF复现 win11

目录 一、获取源码二、环境三、准备数据集方法一&#xff1a;官方命令方法二&#xff1a;官网下载数据集 四、开始训练1.更改迭代次数2.开始训练方法一&#xff1a;方法二&#xff1a; 3.使用预训练模型 五、NeRF源码学习 一、获取源码 git clone https://github.com/bmild/ne…

LeetCode-141环形链表 LeetCode-142环形链表二

一、前言 本篇文章在我之前讲完的链表、链表与递归的基础上进行讲解&#xff0c;本次我们以leetcode为例&#xff0c;讲解链表的其他题型&#xff0c;今天我们先了解一下环形链表&#xff0c;这里我们以leetCode141和leetCode142为例。 二、LeetCode141 首先关于这道题&#…

【MongoDB】关于MongoDB更新文档update的操作,十分详细,建议收藏!!!

&#x1f601; 作者简介&#xff1a;一名大四的学生&#xff0c;致力学习前端开发技术 ⭐️个人主页&#xff1a;夜宵饽饽的主页 ❔ 系列专栏&#xff1a;MongoDB数据库学习 &#x1f450;学习格言&#xff1a;成功不是终点&#xff0c;失败也并非末日&#xff0c;最重要的是继…