SparkStreaming

news2025/1/11 9:04:54

sparkstreaming

1.批处理与流处理

spark本身作为引擎时是批处理,从信息源全部读取数据,然后一批一批处理数据。处理sparkSQL等之后再存入hdfs。

sparkstreaming是实时引擎,在一个窗口时间内(比如1s)积攒数据,然后处理。更像伪实时处理。

DStream 就是streaming在一段十时间内收集数据的抽象,类似于RDD。

DStream内部是由一系列连续的RDD组成的.

2.DEMO

package org

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author noor9
 * @date 2021-02-01-19:55
 */
object StreamingWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[6]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    ssc.sparkContext.setLogLevel("WARN")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
      hostname = "xxx.xxx.xxx.xxxx",
      port = 9999,
      storageLevel = StorageLevel.MEMORY_AND_DISK_SER
    )

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }

}


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>scala_config</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Spark 中, 一般使用 XXContext 来作为入口, Streaming 也不例外, 所以创建 StreamingContext 就是创建入口
  2. 开启 SocketReceiver, 连接到某个 TCP 端口, 作为 Socket client, 去获取数据
  3. 选择 Receiver 获取到数据后的保存方式, 此处是内存和磁盘都有, 并且序列化后保存
  4. 类似 RDD 中的 Action, 执行最后的数据输出和收集
  5. 启动流和 JobGenerator, 开始流式处理数据
  6. 阻塞主线程, 后台线程开始不断获取数据并处理

注意点

  • Spark Streaming 并不是真正的来一条数据处理一条

    20190620005146

    Spark Streaming 的处理机制叫做小批量, 英文叫做 mini-batch, 是收集了一定时间的数据后生成 RDD, 后针对 RDD 进行各种转换操作, 这个原理提现在如下两个地方

    • 控制台中打印的结果是一个批次一个批次的, 统计单词数量也是按照一个批次一个批次的统计
    • 多长时间生成一个 RDD 去统计呢? 由 new StreamingContext(sparkConf, Seconds(1)) 这段代码中的第二个参数指定批次生成的时间
  • Spark Streaming 中至少要有两个线程

    在使用 spark-submit 启动程序的时候, 不能指定一个线程

    • 主线程被阻塞了, 等待程序运行
    • 需要开启后台线程获取数据

创建 StreamingContext

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
  • StreamingContextSpark Streaming 程序的入口

  • 在创建 StreamingContext 的时候, 必须要指定两个参数, 一个是 SparkConf, 一个是流中生成 RDD 的时间间隔

  • StreamingContext
    

    提供了如下功能

    • 创建 DStream, 可以通过读取 Kafka, 读取 Socket 消息, 读取本地文件等创建一个流, 并且作为整个 DAG 中的 InputDStream
    • RDD 遇到 Action 才会执行, 但是 DStream 不是, DStream 只有在 StreamingContext.start() 后才会开始接收数据并处理数据
    • 使用 StreamingContext.awaitTermination() 等待处理被终止
    • 使用 StreamingContext.stop() 来手动的停止处理
  • 在使用的时候有如下注意点

    • 同一个 Streaming 程序中, 只能有一个 StreamingContext
    • 一旦一个 Context 已经启动 (start), 则不能添加新的数据源 **

各种算子

20190620005229

  • 这些算子类似 RDD, 也会生成新的 DStream
  • 这些算子操作最终会落到每一个 DStream 生成的 RDD
算子释义
flatMaplines.flatMap(_.split(" "))将一个数据一对多的转换为另外的形式, 规则通过传入函数指定
mapwords.map(x => (x, 1))一对一的转换数据
reduceByKeywords.reduceByKey(_ + _)这个算子需要特别注意, 这个聚合并不是针对于整个流, 而是针对于某个批次的数据

SparkStreaming原理

  1. 静态 DAG
  2. 动态切分
  3. 数据流入
  4. 容错机制

关于receiver的一些知识

  1. receiver是分片的
  2. receiver可以在每一个executer中运行
  3. receiver是专门用于接受数据的一个组件

3.MovieDemo

package org
import org.apache.spark.sql.SparkSession

object MoviesDemo {
  def main(args: Array[String]): Unit = {

    // 创建一个SparkContext来初始化Spark
    // 2.0 以前的用法
    // val conf = new SparkConf().setMaster("local").setAppName("movie demo")
    // val sc = new SparkContext(conf)

    // 2.0 以后的用法
    val spark = SparkSession.builder().master("local[*]").appName("movie demo").getOrCreate()
    val sc = spark.sparkContext

    // 加载数据,构造RDD(注:这里数据集放在项目的src/data/movielens/目录下)
    val ratingsFile = "src/data/movielens/rating.csv"      // 评分数据集
    val moviesFile = "src/data/movielens/movie.csv"        // 电影数据集

    val ratingsRDD = sc.textFile(ratingsFile)
    val moviesRDD = sc.textFile(moviesFile)

    // 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回
    // 因为第一行是标题行,所以过滤掉
    val movieAvgScore = ratingsRDD
      .filter(line => !line.startsWith("\"userId\""))
      .map(line => {val fields = line.split(","); (fields(1).trim.toInt, fields(2).trim.toDouble)})
      .groupByKey()
      .map(t => (t._1, t._2.sum/t._2.size))
      .filter(t => t._2 > 4.0)

    // 从电影数据集中抽取电影名称,以(movieId, movieName)的形式返回
    // 因为第一行是标题行,所以过滤掉
    val moviesInfo = moviesRDD
      .filter(line => !line.startsWith("\"movieId\""))
      .map(line => {val fields = line.split(","); (fields(0).toInt, fields(1))})

    // 将两个数据集连接起来,得到(movieId, movieName, avgScore)
    val result =  movieAvgScore.join(moviesInfo)
      .map(f => (f._2._1,(f._1, f._2._2, f._2._1)))
      .sortByKey(ascending = false)
      .map(t => t._2)

    // 列表显示
    result.collect.foreach(println)

    // 将查询结果保存到HDFS文件系统中
    //result.saveAsTextFile("src/data/movielens/result")
    result.repartition(1).saveAsTextFile("file:///C://Users//esvtek//IdeaProjects//scala_config//src//data//movielens//result")
  }
}

4.开发依赖项

还有,对于从Kafka、Flume以及Kinesis这类数据源提取数据的流式应用来说,还需要额外增加相应的依赖项,下表列出了各种数据源对应的额外依赖项:

数据源Maven工件
Kafkaspark-streaming-kafka_2.10
Flumespark-streaming-flume_2.10
Kinesisspark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitterspark-streaming-twitter_2.10
ZeroMQspark-streaming-zeromq_2.10
MQTTspark-streaming-mqtt_2.10

5.初始化

要初始化任何一个Spark Streaming程序,都需要在入口代码中创建一个StreamingContext对象。

而StreamingContext对象需要一个SparkConf对象作为其构造参数。

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

上面代码中的 appName 是你给该应用起的名字,这个名字会展示在Spark集群的web UI上。而 master 是Spark, Mesos or YARN cluster URL,如果支持本地测试,你也可以用”local[]”为其赋值。通常在实际工作中,你不应该将master参数硬编码到代码里,而是应用通过spark-submit的参数来传递master的值(launch the application with spark-submit )。不过对本地测试来说,”local[]”足够了(该值传给master后,Spark Streaming将在本地进程中,启动n个线程运行,n与本地系统CPU core数相同)。注意,StreamingContext在内部会创建一个 SparkContext 对象(SparkContext是所有Spark应用的入口,在StreamingContext对象中可以这样访问:ssc.sparkContext)。

StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定。详见Spark性能调优( Performance Tuning)。

StreamingContext对象也可以通过已有的SparkContext对象来创建,示例如下:

val sc = ...                // 已有的SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

context对象创建后,你还需要如下步骤:

  • 创建DStream对象,并定义好输入数据源。
  • 基于数据源DStream定义好计算逻辑和输出。
  • 调用streamingContext.start() 启动接收并处理数据。
  • 调用streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误)
  • 你可以主动调用 streamingContext.stop() 来手动停止处理流程。

需要关注的重点:

  • 一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。
  • 一旦streamingContext被stop掉,就不能restart。
  • 单个JVM虚机同一时间只能包含一个active的StreamingContext。
  • StreamingContext.stop() 也会把关联的SparkContext对象stop掉,如果不想把SparkContext对象也stop掉,可以将StreamingContext.stop的可选参数 stopSparkContext 设为false。
  • 一个SparkContext对象可以和多个StreamingContext对象关联,只要先对前一个StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可。
  • 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
  • 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。

6.可靠性

从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类:

  • 可靠接收器(Reliable Receiver) – 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。
  • *不可靠接收器(*Unreliable Receiver) – 不可靠接收器不会发送任何确认信息。不过这种接收器常用语于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源。

7.缓存/持久化

和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。

对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。

注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。

8.检查点/checkpoint

streaming一般长时间运行,所以需要有一个备份机制,防止意外宕机时数据重头计算。

检查点一般保存以下两种数据。

元数据检查点(Metadata checkpointing)

– 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括:

  • Configuration – 创建Streaming应用程序的配置信息。
  • DStream operations – 定义流式处理逻辑的DStream操作信息。
  • Incomplete batches – 已经排队但未处理完的批次信息。

数据检查点(Data checkpointing)

– 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间。

检查点的启用时间

  • 使用了有状态的转换算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。
  • 支持驱动器故障中恢复(Recovering from failures of the driver running the application) – 这时候需要元数据检查点以便恢复流式处理的进度信息。

注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/25091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring IOC源码:实例化前的准备工作

前言 上篇文章我们讲解了IOC比较重要的后置处理器注册方法&#xff0c;本篇文章讲解实例化前的准备工作&#xff0c;包括国际化、多播器创建、监听器注册等节点。 正文 进入refresh方法中&#xff0c;可以看到在正式实例化初始化方法前&#xff0c;还有4个方法&#xff1a; …

Day09--小程序API的Promise化

1.基于回调函数的异步API的缺点 ************************************************************************************************************** 2.啥子是API Promise化呢&#xff1f; *****************************************************************************…

【Java八股文总结】之MyBatisPlus

文章目录MybatisPlus一、MyBatis Plus介绍1、Mybatis 和 Mybatis Plus 的区别Q&#xff1a;MyBatis的优缺点Q&#xff1a;MyBatis Plus的优点Q&#xff1a;MyBatis-Plus中的lambda表达式&#xff1f;Q&#xff1a;MyBatis中的动态标签有哪些&#xff1f;2、MyBatis Plus常用注解…

ZYNQ之FPGA学习----RAM IP核使用实验

1 RAM IP核介绍 RAM 的英文全称是 Random Access Memory&#xff0c; 即随机存取存储器&#xff0c; 它可以随时把数据写入任一指定地址的存储单元&#xff0c;也可以随时从任一指定地址中读出数据&#xff0c;其读写速度由时钟频率决定 Xilinx 7 系列器件具有嵌入式存储器结…

【OpenCV 例程 300篇】248. 特征描述之HOG描述符

『youcans 的 OpenCV 例程300篇 - 总目录』 【youcans 的 OpenCV 例程 300篇】248. 特征描述之HOG描述符 1. 方向梯度直方图 方向梯度直方图&#xff08;Histogram of Oriented Gradient, HOG&#xff09;使用梯度方向的分布作为特征来构造描述符&#xff0c;应用非常广泛。 梯…

07-HTTPS双向认证及Java案例

1.双向认证流程 客户端发起建立HTTPS连接请求&#xff0c;将SSL协议版本的信息发送给服务端&#xff1b;服务器端将本机的公钥证书&#xff08;server.crt&#xff09;发送给客户端&#xff1b;客户端读取公钥证书&#xff08;server.crt&#xff09;&#xff0c;取出了服务端公…

wordpress的手工迁移

我的场景 将某个在阿里云服务器&#xff08;windows操作系统&#xff09;上apache容器下的wordpress服务迁移到另一个linux主机上的apache上。 迁移要点 1、迁移源主机下的wordpress文件夹&#xff0c;在apache容器下的htdocs文件夹中 2、迁移数据库 3、根据目标&#xff0…

表弟大学毕业要学前端,我给他制定了一份亲属自学计划

表弟也终于到了马上要大学毕业的时间&#xff0c;然后听说我在做前端开发工作&#xff0c;就想着能不能和我一起搞一搞。 我说这又不是小时候一起去地里抓兔子&#xff0c;说走就一起走&#xff0c;拿上工具一起走了&#xff0c;这得学啊。看着表弟期待的眼神&#xff0c;他问了…

Scientific Reports|比较转录组分析揭示了杀菌剂氰烯菌酯对尖孢镰刀菌的抗性调控机制和杀菌活性

TITLE&#xff1a;Comparative transcriptome analysis reveals the resistance regulation mechanism and fungicidal activity of the fungicide phenamacril in Fusarium oxysporum 译名&#xff1a;比较转录组分析揭示了杀菌剂氰烯菌酯对尖孢镰刀菌的抗性调控机制和杀菌活性…

Java代码审计——文件操作漏洞

目录 &#xff08;一&#xff09;、 文件操作漏洞简介 &#xff08;二&#xff09; 、漏洞发现与修复案例 2.1 文件包含漏洞 2.2 文件上传漏洞 &#xff08;三&#xff09; 文件下载/读取漏洞 &#xff08;四&#xff09;&#xff0e;文件写入漏洞 &#xff08;五&…

Arcgis建筑面shp由DSM和DEM获取高度拉伸并可视化

效果 1、准备数据 DEM、DSM数据精度尽量高一些 1)DEM 2)DSM 3)建筑shp 所有数据坐标统一,而且加载后位置能对上,DEM和DSM具有相同的像元大小 2、准备数据前的一些操作 1)矢量shp裁剪

C#实现最大公约数和最小公倍数

最大公约数&#xff1a; 最大公因数&#xff0c;也称最大公约数、最大公因子&#xff0c;指两个或多个整数共有约数中最大的一个。a&#xff0c;b的最大公约数记为&#xff08;a&#xff0c;b&#xff09;&#xff0c;同样的&#xff0c;a&#xff0c;b&#xff0c;c的最大公约…

net.sf.json.JSONObject 类的日常使用,非阿里巴巴的JSONObject,附上作者的jsonDemo

文章目录Json介绍作者的Demo项目地址常见的转化使用测试json的添加属性&#xff0c;打印bean与json互转deepBean与json互转list与json互转map与json互转demo所用到的实体类StudentGrade个人使用的依赖常用方法其他参考文档Json介绍 1、JSONObject只是一种数据结构&#xff0c;可…

DJYGUI系列文章七:GDD窗口系统

目录 1 窗口分类及关系 2 窗口的客户区与非客户区 3 坐标系统 4 窗口句柄与窗口ID的作用与区别 5 窗口的关闭、销毁、退出过程 6 API说明 6.1 ScreenToClient&#xff1a; 屏幕坐标转换为客户区坐标 6.2 ClientToScreen&#xff1a; 客户区坐标转换为屏幕坐标 6.3 Scre…

linux篇【11】:linux下的线程

目录 一.linux下的线程 1.linux下的线程概念 &#xff08;1&#xff09;教材上粗略的 线程 定义 &#xff08;2&#xff09;线程的引入 &#xff08;3&#xff09;线程真正定义 以及 示意图 &#xff08;4&#xff09;linux 和 windows等其他操作系统的线程对比 2.重新定…

22-python异常

异常一. 了解异常二. 异常的写法2.1 语法2.2 快速体验2.3 捕获指定异常2.3.1 语法2.3.2 体验2.3.3 捕获多个指定异常2.3.4 捕获异常描述信息2.3.5 捕获所有异常2.4 异常的else2.5 异常的finally三. 异常的传递四. 自定义异常五. 总结一. 了解异常 当检测到一个错误时&#xff…

Hibernate多表的关联关系、懒加载

一、一对多关系&#xff1a;插入&#xff1a; “一”的一方为主表&#xff0c;“多”的一方为副表&#xff0c;主表关联副表&#xff0c;应该在主表中加入副表对象作为属性。 根据顾客ID插入顾客信息 &#xff08;一&#xff09; &#xff0c;同时将顾客名下所有订单插入 &…

Python实现人脸识别检测,对主播进行颜值排行

前言 嗨嗨&#xff0c;我亲爱的家人们 今天来整点不一样的&#xff0c;嘿嘿 用Python简单实现对人脸识别的检测&#xff0c;对某平台主播照片进行评分排名 应该对女主播这个词不陌生吧&#xff0c;怎么说应该还是蛮多人看过一些女主播吧 我无聊的时候也会看看&#xff0c;…

2009年数学二真题复盘

选择题: 间断点的判断的前置芝士: 间断点的定义 设函数f(x)在点的去心领域内有定义,若f(x)满足以下条件之一: 在x=没有定义在x=有定义,但是不存在,或者存在,但是极限值不等于函数值。 类型定义 相关概念第一类间断点

CMS垃圾回收器

概述 CMS(Concurrent Mark-Sweep)是以牺牲吞吐量为代价来获得最短回收停顿时间的垃圾回收器。对于要求服务器响应速度的应用上&#xff0c;这种垃圾回收器非常适合。在启动JVM参数加上-XX:UseConcMarkSweepGC&#xff0c;这个参数表示对于老年代的回收采用CMS。CMS采用的基础算…