Kafka和Spark Streaming的组合使用(Spark 3.5.1)

news2025/1/12 6:56:45

一、安装Kafka

1.执行以下命令完成Kafka的安装:
cd ~  //默认压缩包放在根目录
sudo tar -zxf  kafka_2.11-2.3.1.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-2.3.1 kafka-2.3.1
sudo chown -R qiangzi ./kafka-2.3.1

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd  /usr/local/kafka-2.3.1
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端,输入下面命令启动Kafka服务:
cd  /usr/local/kafka-2.3.1
./bin/kafka-server-start.sh  config/server.properties

//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.3.1
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.3.1
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.2.0表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark/jars”目录)。此外,还需要把“/usr/local/kafka/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
   // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}
2.编写消费者(Consumer)程序

在“/usr/local/spark/mycode/kafka/src/main/scala”目录下创建代码文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 
object KafkaWordCount{
  def main(args:Array[String]){
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("wordsender")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      val lines = maped.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val pair = words.map(x => (x,1))
      val wordCounts = pair.reduceByKey(_+_)
      wordCounts.foreach(println)
    })
    ssc.start
    ssc.awaitTermination
  }
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala


/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }                                                                                                                     }                                                                                                                     } 
5.编译打包程序

现在在“/usr/local/spark/mycode/kafka/src/main/scala”目录下,就有了如下3个代码文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" %% "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
> --class "KafkaWordProducer"   \
> ./target/scala-2.12/simple-project_2.12-1.0.jar  \
> localhost:9092  wordsender  3  5
 /usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" --packages org.apache.kafkkafka-clients:2.6.0 ./target/scala-2.12/simple-project_2.12-1.0.jar localhost:9092 wordsender 3 5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
> --class "KafkaWordCount"  \
> ./target/scala-2.12/simple-project_2.12-1.0.jar
出现报错信息,暂时无法解决

待完善!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1654441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java内存是怎样分配的

Java内存是怎样分配的 一、 1. 有些编程语言编写的程序会直接向操作系统请求内存&#xff0c;而 Java 语言为保证其平台无关性&#xff0c;并不允许程序直接向操作系统发出请求&#xff0c;而是在准备执行程序时由Java虚拟机&#xff08;JVM&#xff09;向操作系统请求一定的…

大模型方向好书推荐

我们已经加速进入了大模型的时代。以ChatGPT为首的一些超强模型服务&#xff0c;背后是百亿或千亿参数的基础模型&#xff0c;它们学到了丰富的世界知识&#xff0c;领悟了“与人类打交道”的门路&#xff0c;甚至开始连接和使用外部工具、成为“万物接口”。 新的时代有新的机…

源代码加密的重要性

在数字化时代&#xff0c;企业面临的最大挑战之一是如何保护其核心数据不被泄露。企业源代码防泄密是指企业采取措施保护其软件或应用程序源代码不被未授权的人员获取、泄露或盗用的一种安全措施。源代码是软件的核心组成部分&#xff0c;其中包含了程序员编写的具体指令和算法…

C++反射之检测struct或class是否实现指定函数

目录 1.引言 2.检测结构体或类的静态函数 3.检测结构体或类的成员函数 3.1.方法1 3.2.方法2 1.引言 诸如Java, C#这些语言是设计的时候就有反射支持的。c没有原生的反射支持。并且&#xff0c;c提供给我们的运行时类型信息非常少&#xff0c;只是通过typeinfo提供了有限的…

Julia 语言环境安装与使用

1、Julia 语言环境安装 安装教程&#xff1a;https://www.runoob.com/julia/julia-environment.html Julia 安装包下载地址为&#xff1a;https://julialang.org/downloads/。 安装步骤&#xff1a;注意&#xff08;勾选 Add Julia To PATH 自动将 Julia 添加到环境变量&…

【Linux 性能详解】CPU性能分析工具篇

目录 uptime mpstat 实时监控 查看特定CPU核心 pidstart 监控指定进程 组合多个监控类型 监控线程资源 按用户过滤进程 vmstart 用途 基本用法 输出字段 perf execsnoop dstat 通俗解释 技术层面解释 使用示例 总结 uptime uptime 是一个在 Linux 和 Unix…

暴打前任互动玩法开播教程

暴打前任互动玩法开播教程 暴打前任互动玩法开播教程【相关直播互动插件咩播需要额外官方购买】 网盘自动获取 链接&#xff1a;https://pan.baidu.com/s/1lpzKPim76qettahxvxtjaQ?pwd0b8x 提取码&#xff1a;0b8x

小微公司可用的开源ERP系统

项目介绍 华夏ERP是基于SpringBoot框架和SaaS模式的企业资源规划&#xff08;ERP&#xff09;软件&#xff0c;旨在为中小企业提供开源且易用的ERP解决方案。它专注于提供进销存、财务和生产功能&#xff0c;涵盖了零售管理、采购管理、销售管理、仓库管理、财务管理、报表查询…

Redis集群.md

Redis集群 本章是基于 CentOS7 下的 Redis 集群教程&#xff0c;包括&#xff1a; 单机安装RedisRedis主从Redis分片集群 1.单机安装Redis 首先需要安装Redis所需要的依赖&#xff1a; yum install -y gcc tcl然后将课前资料提供的Redis安装包上传到虚拟机的任意目录&#xf…

将ESP工作为AP路由模式并当成服务器

将ESP8266模块通过usb转串口接入电脑 ATCWMODE3 //1.配置成双模ATCIPMUX1 //2.使能多链接ATCIPSERVER1 //3.建立TCPServerATCIPSEND0,4 //4.发送4个字节在链接0通道上 >ATCIPCLOSE0 //5.断开连接通过wifi找到安信可的wifi信号并连接 连接后查看自己的ip地址变为192.168.4.…

【系统架构师】-选择题(十二)

1、网闸的作用&#xff1a;实现内网与互联网通信&#xff0c;但内网与互联网不是直连的 2、管理距离是指一种路由协议的路由可信度。15表示该路由信息比较可靠 管理距离越小&#xff0c;它的优先级就越高&#xff0c;也就是可信度越高。 0是最可信赖的&#xff0c;而255则意味…

【管理咨询宝藏95】SRM采购平台建设内部培训方案

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏95】SRM采购平台建设内部培训方案 【格式】PDF版本 【关键词】SRM采购、制造型企业转型、数字化转型 【核心观点】 - 重点是建设一个适应战略采…

ue引擎游戏开发笔记(35)——为射击添加轨道,并显示落点

1.需求分析&#xff1a; 我们只添加了开枪特效&#xff0c;事实上并没有实际的效果产生例如弹痕&#xff0c;落点等等。所以逐步实现射击的完整化&#xff0c;先从实现落点开始。 2.操作实现&#xff1a; 1.思路&#xff1a;可以这样理解&#xff0c;每次射击的过程是一次由摄…

MM模块学习一(供应商创建,物料类型的定义及功能)

物料管理流程&#xff1a; 源头&#xff1a;采购需求->采购申请 MRP&#xff1a;物料需求计划。运行物料需求计划的结果&#xff0c;根据物料的性质来判断是外购&#xff08;采购申请&#xff09;或者是生产&#xff08;计划订单->生产订单&#xff09;。 采购申请&am…

Bugku Crypto 部分题目简单题解

抄错的字符 题目描述&#xff1a; 老师让小明抄写一段话&#xff0c;结果粗心的小明把部分数字抄成了字母&#xff0c;还因为强迫症把所有字母都换成大写。你能帮小明恢复并解开答案吗&#xff1a; QWIHBLGZZXJSXZNVBZW 观察疑似base64解码&#xff0c;尝试使用cyberchef解码…

多模态EDA论文小记

论文地址 该论文主要改进点是&#xff1a;通过动态化局部搜索中每个集群大小&#xff0c;高斯和柯西分布共同产生个体。总的来说改进点不多&#xff0c;当然也可能是笔者还没发现。 局部搜索 划分集群 划分集群有两个策略分别是&#xff1a; 随机生成一个点作为中心点&…

Spring Boot | Spring Boot 整合 “RabbitMQ“ ( 消息中间件 ) 实现

目录: Spring Boot 整合 "RabbitMQ" ( 消息中间件 )实现 &#xff1a;一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作…

【智能楼宇秘籍】一网关多协议无缝对接BACnet+OPC+MQTT

在繁华的都市中心&#xff0c;一座崭新的大型商业综合体拔地而起&#xff0c;集购物、餐饮、娱乐、办公于一体&#xff0c;是现代城市生活的缩影。然而&#xff0c;这座综合体的幕后英雄——一套高度集成的楼宇自动化系统&#xff0c;正是依靠多功能协议网关&#xff0c;实现了…

VALSE 2024 Workshop报告分享┆ 大规模自动驾驶仿真系统研究

视觉与学习青年学者研讨会&#xff08;VALSE&#xff09;旨在为从事计算机视觉、图像处理、模式识别与机器学习研究的中国青年学者提供一个广泛而深入的学术交流平台。该平台旨在促进国内青年学者的思想交流和学术合作&#xff0c;以期在相关领域做出显著的学术贡献&#xff0c…

精通GDBus:Linux IPC的现代C接口

目录标题 1. GDBus介绍2. GDBus的优点3. 安装GDBus4. 使用GDBus连接到D-Bus总线实现D-Bus服务调用D-Bus方法发送和接收信号 5. 总结 在Linux环境下&#xff0c;不同的程序需要通过某种方式进行通信和协同工作。GDBus是GLib库的一部分&#xff0c;提供了一个基于GObject系统的、…