Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)

news2025/1/22 12:55:39

一、安装Kafka

1.执行以下命令完成Kafka的安装:
cd ~  //默认压缩包放在根目录
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd  /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端输入下面命令启动Kafka服务:
cd  /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh  config/server.properties

//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

cd ~  .jar文件默认放在根目录
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
   // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}
2.编写消费者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 
object KafkaWordCount{
  def main(args:Array[String]){
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("wordsender")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      val lines = maped.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val pair = words.map(x => (x,1))
      val wordCounts = pair.reduceByKey(_+_)
      wordCounts.foreach(println)
    })
    ssc.start
    ssc.awaitTermination
  }
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala


/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }                                                                                                                     }                                                                                                                     } 
5.编译打包程序

现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender  3  5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1666236.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++ | Leetcode C++题解之第84题柱状图中最大的矩形

题目&#xff1a; 题解&#xff1a; class Solution { public:int largestRectangleArea(vector<int>& heights) {int n heights.size();vector<int> left(n), right(n, n);stack<int> mono_stack;for (int i 0; i < n; i) {while (!mono_stack.em…

【漏洞复现】泛微OA E-Cology portalTsLogin文件读取漏洞

漏洞描述&#xff1a; 泛微E-Cology是一款面向中大型组织的数字化办公产品&#xff0c;它基于全新的设计理念和管理思想&#xff0c;旨在为中大型组织创建一个全新的高效协同办公环境。泛微OA E-Cology portalTsLogin存在任意文件读取漏洞&#xff0c;允许未经授权的用户读取服…

物联网到底物联了个啥?——青创智通

工业物联网解决方案-工业IOT-青创智通 物联网&#xff0c;这个听起来似乎颇具科技感和未来感的词汇&#xff0c;其实早已悄然渗透到我们生活的方方面面。从智能家居到智慧城市&#xff0c;从工业自动化到医疗健康&#xff0c;物联网技术正在以其独特的魅力改变着我们的生活方式…

博通Broadcom (VMware VCP)注册约考下载证书操作手册

博通Broadcom(VMware) CertMetrics 注册约考下载证书等操作指导手册&#xff08;发布日期&#xff1a;2024-5-11&#xff09; 目录 一、原 Mylearn 账号在新平台的激活… 1 二、在新平台查看并下载证书… 5 三、在新平台注册博通账号… 6 四、在新平台下注册考试… 10 一、原…

算法提高之字串变换

算法提高之字串变换 核心思想&#xff1a;双向广搜 双向bfs 建立两个队列 一起bfs到中间态 #include <iostream>#include <cstring>#include <algorithm>#include <queue>#include <unordered_map>using namespace std;const int N 6;int n;…

自存 js course 工厂函数

如图 就是 像工厂一样 生产对象 对象里的函数可以写成简下

圆柱齿轮的旋向如何判断?

上期出了个题&#xff0c;给了两个内齿轮&#xff0c;请大家来判断他们的旋向&#xff0c;看到了有不少小伙伴评论给出了自己的答案&#xff0c;正确和错误差不多各半吧&#xff0c;错的占比要大一些。这期咱们就好好聊一聊这个问题。 外齿轮的旋向大家貌似判断都没什么问题&a…

2024年建筑施工特种作业人员安全生产知识试题

100分题库提供安全员考试试题、建筑安全员考试预测题、建筑安全员ABC考试真题、安全员证考试题库等&#xff0c;提供在线做题刷题&#xff0c;在线模拟考试&#xff0c;助你考试轻松过关。 单选题&#xff08;1-10&#xff09; 1.因生产安全事故受损害的从业人员&#xff0c;除…

如何使用 WavLM音频合成模型

微软亚洲研究院与 Azure 语音组的研究员们提出了通用语音预训练模型 WavLM。通过 Denoising Masked Speech Modeling 框架&#xff08;核心思想是通过预测被掩蔽&#xff08;即遮蔽或删除&#xff09;的语音部分来训练模型&#xff0c;同时还包括去噪的过程&#xff09;&#x…

离线修复.dll,Microsoft Visual C++

在安装mysql时遇到下面的问题&#xff0c;如果是有网络的情况下微软管网下载安装就行了&#xff0c;用的服务器不允许连接互联网。 后面经过寻找&#xff0c;找到了一个修复工具&#xff0c;可一次修复所有的问题&#xff0c;特别好用分享给宝子们。 下载链接&#xff1a;http…

大屏分辨率适配插件v-scale-screen

前言&#xff1a;大屏分辨率适配繁多&#xff0c;目前我认为最简单且问题最少的的方案就是使用v-scale-screen插件&#xff0c;无需考虑单位转换&#xff0c;position定位也正常使用。 1. 效果 填充满屏幕的效果 保持宽高比的效果 2. 插件原理 原理是通过css transfom 实现…

QT+多线程TCP服务器+进阶版

针对之前的服务器&#xff0c;如果子线程工作类里面需要使用socket发送消息&#xff0c;必须要使用信号与槽的方法&#xff0c; 先发送一个信号给父进程&#xff0c;父进程调用socket发送消息&#xff08;原因是QT防止父子进程抢夺同一资源&#xff0c;因此直接规定父子进程不能…

《破碎之地》删档测试开启,射击游戏领域的超级玩家抢先体验

易采游戏网5月12日消息&#xff0c;近日网易自研的射击类游戏《破碎之地》在广州开启了删档测试&#xff0c;吸引了众多射击游戏领域的超级玩家抢先体验。这款备受期待的新版游戏在操作、攀爬、跳跃、游泳、滑翔、枪械手感等方面均有所提升&#xff0c;为玩家带来了更加流畅、真…

QT学习(2)——qt的菜单和工具栏

目录 引出qt的菜单栏工具栏菜单栏&#xff0c;工具栏状态栏&#xff0c;浮动窗口 属性设计ui编辑控件添加图片 总结 引出 QT学习&#xff08;2&#xff09;——qt的菜单和工具栏 qt的菜单栏工具栏 菜单栏&#xff0c;工具栏 1QMainWindow 1.1菜单栏最多有一个 1.1.1 QMenuBar…

新年首站 | 宝兰德教育行业信创新动力发展研讨会顺利召开

近日&#xff0c;宝兰德携手慧点数码、安超云共同举办了教育行业信创新动力发展研讨会。会议邀请了中国人民公安大学、中国戏曲学院、北京航空航天大学、北京理工大学、华北电力大学、中国矿业大学、北京服装学院、北京城市学院等数十所高校信息中心负责人、专家出席了本次会议…

【class5】建立人工智能系统(2)

【昨日内容复习】 进行监督学习时&#xff0c;第一个步骤是提取数据集的文本特征和对应的标签。 提取文本特征的具体步骤如下&#xff1a; STEP1. 构造词袋模型&#xff0c;提取数据集中的文本特征 STEP2. 使用toarray()函数&#xff0c;将X转换为一个NumPy数组&#xff0c;方…

乡村振兴与乡村环境综合整治:加强农村环境保护,开展农村环境综合整治行动,提升乡村环境质量,打造生态宜居的美丽乡村

目录 一、引言 二、乡村振兴背景下的乡村环境现状 1、乡村环境面临的挑战 2、乡村环境问题的成因 三、加强农村环境保护的重要性 1、促进乡村振兴 2、保障生态安全 3、提升居民生活质量 四、开展农村环境综合整治行动的策略 1、制定科学规划 2、加大投入力度 3、强…

树莓派、ubuntu低版本python3安装库

如果遇到树莓派中自带低版本python3&#xff0c;又不想额外去安装python3时&#xff0c;可能会遇到版本过低&#xff0c;无法安装库的情况&#xff0c;以下用我实际情况举例解决方案。 本次遇到的问题是树莓派低版本中&#xff0c;python3为3.7.3&#xff0c;需要安装numpy&am…

AI算法工程师课程学习-数学基础-高数1-微积分

机器学习数学基础学习路线&#xff1a;1.高中数学-->大学2.微积分-->3.线性代数-->4.概率论-->5.优化理论。 为尽快进入到AI算法课程的学习&#xff0c;现在高数的学习要求&#xff1a; 1.看得懂&#xff0c;知道是什么&#xff0c;能听得懂&#xff0c;能理解讲…

感知机和神经网络

引入 什么是神经网络&#xff1f; 我们今天学习的神经网络&#xff0c;不是人或动物的神经网络&#xff0c;但是又是模仿人和动物的神经网络而定制的神经系统&#xff0c;特别是大脑和神经中枢&#xff0c;定制的系统是一种数学模型或计算机模型&#xff0c;神经网络由大量的人…