大数据技术架构(组件)35——Spark:Spark Streaming(1)

news2024/11/15 12:20:10

2.3、Spark Streaming

2.3.0、Overview

Spark Streaming 是核心 Spark API 的扩展,它支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源(如 Kafka、Kinesis 或 TCP 套接字)获取,并且可以使用复杂的算法进行处理,这些算法由 map、reduce、join 和 window 等高级函数表示。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。当然也可以在数据流上应用机器学习和图处理。

工作原理如下:Spark Streaming 接收实时输入的数据流,并将数据分成批处理,然后由 Spark 引擎处理以批处理生成最终的结果流。其中SparkStreaming提供了一种离散流或DStream的高级抽象来代表一个连续的数据流,底层就是由一系列RDD来表示。

DStream 中的每个 RDD 都包含来自某个区间的数据,如下图:

2.3.0.1、Example

import org.apache.spark._
import org.apache .spark.streaming._
import org.apache.spark.streaming.StreamingContext_ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new streamingContext(conf, Seconds(1))
// Create a Dstream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextstream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(”"))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this Dstream to the console
wordCounts.print()
ssc.start() // start the computation
ssc.awaitTermination() // Wait for the computation to terminate

如上面的demo所示,每个输入流都会和一个Receiver对象相关联,该对象用来接收数据并将其存储在Spark内存中进行下一步的处理。因此如果你想要在流应用程序中并行接收多个数据流的话,那么就得需要创建多个Receiver对象用来接收数据。同时也需要记住的是SparkStreaming应用程序是属于常驻的,而且也是Spark程序,那么Worker/Executor也会占用一部分资源,所以为了能够保障运行Receiver以及正常处理数据,那么就需要申请到足够的资源,所以其分配的核数一定要大于receivers的个数。

2.3.0.2、Points To Remember

1、一旦Context启动之后,就不能增加或者设置新的流计算

2、一旦Context停止后,就无法重新启动。这里说的是容错方面。

3、同一时间一个JVM内只能有一个StreamingContext。

4、在StreamingContext上调用stop()方法,同时也会把SparkContext给停止;如果只是想停止StreamingContext,那么可以在调用stop()方法的时候指定stopSparkContext=false。

5、一个SparkContext可以被复用创建多个StreamingContext(即在下一个StreamingContext被创建之前停止上一个StreamingContext,且不停止SparkContext)

2.3.1、Receiver

SparkStreaming可以从任意的数据源来接收数据并处理,目前内置的数据源包括Kafka、File、Socket等等。当然目前Spark内置支持的数据源可以满足日常大部分的场景,但有些时候仍然需要自定义Receiver来定制接收数据源。这小节将来讲述如何实现一个自定义的Receiver。首先要继承Receiver,然后重写onStart和onStop方法。onStart()方法会在启动的时候负责接收数据;onStop()方法将停止这些接收数据的线程,当然还可以使用isStopped()方法来检查它们是否停止接收数据。

在 Spark Streaming 中,当一个 Receiver 启动时,每隔 spark.streaming.blockInterval 毫秒就会产生一个新的块,每个块都会变成 RDD 的一个分区,最终由 DStream 创建。例如,由 KafkaInputDStream 创建的 RDD 中的分区数由 batchInterval / spark.streaming.blockInterval 确定,其中 batchInterval 是将流数据分成批次的时间间隔(通过 StreamingContext 的构造函数参数设置)。例如,如果批处理间隔为 2 秒(默认),块间隔为 200 毫秒(默认),则RDD 将包含 10 个分区,还有一个流程路径涉及从迭代器接收数据,由 ReceivedBlockHandler 表示。创建 RDD 后,驱动程序的 JobScheduler 可以将其处理安排为作业。在 Spark Streaming 的当前实现和默认配置下,任何时间点只有一个作业处于活动状态(即正在执行)。因此,如果一个批次的处理时间比批次间隔长,那么下一个批次的作业将保持排队,将其设置为 1 的原因是并发作业可能会导致奇怪的资源共享,并且可能难以调试系统中是否有足够的资源来足够快地处理摄取的数据,当然可以通过实验性 Spark 属性 spark.streaming.concurrentJobs 进行更改,默认情况下设置为 1。一次只运行一个作业,不难看出,如果批处理时间小于批处理间隔,那么系统将是稳定的。

Receiver一旦接收到数据后,那么就会调用store(data)方法进行存储,这里有两种处理方式来保障Receiver是否可靠:

1、来一条存储一条,这种可靠性较差

2、存储整个对象/序列化集合。(阻塞的方式存储)

其自定义实现store()方法会影响到整体的容错和可靠。当应用程序发生了异常时应该要有捕获机制,并要有重试机制。

如果应用程序发生重启的时候,那么会调用Receiver类中的restart()方法,其内部会异步调用onStop方法并隔一定延迟后调用onStart()方法完成重启动作。

public class JavaCustomReceiver extends Receiver<String> {
    String host = null;
    int port = -1;
    public JavaCustomReceiver(String host_ , int port_) {
        super(storageLevel.MEMORY_AND_DISK_2());
        host = host_;
        port = port_;
    }

    @Override
    public void onstart() {
        // Start the thread that receives data over a connection
        new Thread(this::receive).start();
    }

    @override
    public void onstop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
    }

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {
        Socket socket = nul1;
        String userInput = null;
        try {
            // connect to the server
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(
                new InputstreamReader(socket.getInputstream(), StandardCharsets.UTF 8))
            // Until stopped or connection broken continue reading
            while (!isStopped() && (userInput = reader.readLine()) != null) {
                System.out.println("Received data "" + userInput + "");
                store(userInput);
            }
            reader.close();
            socket.close();
            // Restart in an attempt to connect again when server is active again
            restart("Trying to connect again");
        } catch(ConnectException ce) {
            // restart if could not connect to server
            restart("Could not connect", ce);
        } catch(Throwable t) f
            // restart if there is any other error
            restart("Error receiving data", t);
        }
    }
}

// 调用自定义Receiver:
// Assuming ssc is the JavastreamingContext
JavaDStream<String> customReceiverstream = ssc.receiverstream(
    new JavaCustomReceiver(host, port));
JavaDstream<String> words = customReceiverstream.flatMap(s -> ...);
...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342780.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jieba+wordcloud 词云分析 202302 QCon 议题 TOP 关键词

效果图 步骤 &#xff08;1&#xff09;依赖 python 库 pip install jieba wordcloud数据 概览 $ head -n 5 input.txt 中国软件技术发展洞察和趋势预测报告 2023 QCon 大会内容策划思路 FinOps&#xff1a;从概念到落地 开源芯片的发展现状、机遇和未来 乐观者前行&#xff0…

Axure 9 实战案例,动态面板的应用 4.1,省市区三级联动下拉菜单(重制简易版)

前言 Hello&#xff01;欢迎来到Axure 9 实战案例教程专栏。 本次课程我们继续来学习一下&#xff0c;动态面板的应用。本篇我们来讲解一下&#xff0c;如何绘制省市区联动下拉菜单&#xff08;重新撰写简易版&#xff09;。 下拉菜单初稿为了节省时间&#xff0c;这里提前把…

vue实现打印浏览器页面功能(两种方法)

推荐使用方法二 方法一&#xff1a;通过npm 安装插件 1&#xff0c;安装 npm install vue-print-nb --save 2&#xff0c;引入 安装好以后在main.js文件中引入 import Print from vue-print-nbVue.use(Print); //注册 3&#xff0c;现在就可以使用了 div id"printTest…

ChatGPT爆火,释放了什么不寻常信号?

ChatGPT&#xff0c;真的火了&#xff01; 相信许多朋友都听说过 ChatGPT&#xff0c;但并不清楚它是个啥。 体制内让ChatGPT写材料&#xff0c;广告行业让ChatGPT写策划案&#xff0c;媒体让ChatGPT写新闻稿&#xff0c;程序员让ChatGPT写代码甚至还带修BUG服务。 可以说是“…

告诉ChatGPT,我想读博了!

告诉ChatGPT&#xff0c;我想读博了&#xff01; 上篇文章详细写了如何体验ChatGPT。在实际使用中发现它对固定模板式的文字工作做的比较好。于是我瞬间想起了毕业前被论文支配的恐惧&#xff0c;我突然有一个大胆的想法&#xff0c;那么ChatGPT是否能帮我写一篇毕业论文呢&am…

【求解器-COPT】COPT的版本更新中,老版本不能覆盖的问题

【求解器-COPT】COPT的版本更新中&#xff0c;老版本不能覆盖的问题方法1方法2如果license还是找不到作者&#xff1a;刘兴禄 参考网址&#xff1a; COPT的下载和配置步骤如下&#xff1a; 教程 | Windows系统下如何安装COPT求解器并配置许可文件&#xff1a; https://zhuan…

山洪灾害监测预警平台 山洪灾害监测预警系统解决方案 以人为本 科学防御

平升电子山洪灾害监测预警平台 山洪灾害监测预警系统解决方案&#xff0c;集信息采集、传输、分析和预警等功能于一体&#xff0c;实现预警信息及时、准确地上传下达&#xff0c;提升监测预警能力&#xff0c;使可能受灾区域能够及时采取措施&#xff0c;最大程度减少人员伤亡和…

典型相关分析与R语言实现

典型相关分析学习目标学习内容典型相关分析的原理典型相关分析的理论内容例子具体实现方法内容小结注意解决方法学习目标 我们所采用的学习内容来自B站的Lizongzhang老师的R语言的学习分享 今天学习的主要内容是关于 典型相关分析 学习内容 首先声明,典型相关分析的内容理解…

性能技术分享|Jmeter+InfluxDB+Grafana搭建性能平台(四)

四、Jmeter配置InfluxDB4.1 后端监听器(BackendListener)介绍1、什么是后端监听器(BackendListener)&#xff1f;源码给出的解释是&#xff1a;BackendListener是一种异步监听并获取到测试结果的实现类。也就是说发出的如http等响应请求的结果&#xff0c;都会被封装在SampleRe…

[chatGPT]问题分析示例一,mtu太小ip6地址加不进去

根据这两个条件&#xff0c;去查询chatGPT&#xff0c;发现可以找到对应的代码片段。看着chatGPT&#xff0c;已经将Linux代码整合过来了。很强大的一个功能。 Human: if mtu set to 64, why add ipv6 failure? AI: If the MTU is set to 64, it can cause IPv6 fragmentation…

Hudi系列17:离线批量导入

文章目录一. 离线批量导入概述二. 数据源准备三. 案例1&#xff1a;COW表导入(写checkpoint&#xff0c;并行度:1)3.1 Flink SQL端操作3.2 查看任务运行情况四. 案例2&#xff1a;COW表导入(写checkpoint&#xff0c;并行度:4)4.1 Flink SQL 端操作4.2 查看任务运行情况4.2 使用…

CSP-《有趣的数》-感悟

题目 做题过程 注&#xff1a;黄色高亮表示需要注意的地方&#xff0c;蓝色粗体表示代码思路 根据题意可以第一位数字为2&#xff0c;因此只需要考虑后面n-1位的排列&#xff0c;在这n-1位数字中&#xff0c;0和1的总数可能取2,3,……,n-2&#xff0c;当总数为 i 时&#xff0…

“深度学习”学习日记。--加深网络

2023.2.13 深度学习 是加深了层的深度神经网络的学习过程。基于之前介绍的网络&#xff0c;只需要通过 叠加层&#xff0c; 就可以创建深度网络 之前的学习&#xff0c;已经学习到了很多东西&#xff0c;比如构成神经网络的各种层、参数优化方法、误差反向传播法&#xff0c;…

android kotlin 协程(二) 基本入门2

android kotlin 协程(二) config: system: macOS android studio: 2022.1.1 Electric Eel gradle: gradle-7.5-bin.zip android build gradle: 7.1.0 Kotlin coroutine core: 1.6.4 tips:前面几篇全都是协程的基本使用,没有源码,等后面对协程有个基本理解之后,才会简单的…

(一)初识Streamlit(附安装)

本入门指南介绍Streamlit的工作原理、如何在您首选的操作系统上安装Streamlit&#xff0c;以及如何创建第一个Streamlit应用程序&#xff01; 1 安装 1.1 先决条件 Python 3.7 – Python 3.11 **注&#xff1a;我这里使用的是anaconda的虚拟环境&#xff0c;用pycharm编写代…

JavaWeb--MavenMybatis基础

JavaWeb--Maven&Mybatis基础1 Maven1.1 Maven简介1.1.1 Maven模型1.1.2 仓库1.2 Maven基本使用1.2.1 Maven 常用命令1.2.2 Maven 生命周期1.3 IDEA使用Maven1.3.1 IDEA配置Maven环境1.3.2 Maven 坐标详解1.3.3 IDEA 创建 Maven项目1.3.4 IDEA 导入 Maven项目1.4 依赖管理1.…

UVa 11212 Editing a Book 编辑书稿 IDA* Iterative Deepening A Star 迭代加深搜剪枝

题目链接&#xff1a;Editing a Book 题目描述&#xff1a; 给定nnn个(1<n<10)1<n<10)1<n<10)数字&#xff0c;数字分别是1,2,3,...,n1, 2, 3, ...,n1,2,3,...,n&#xff0c;但是顺序是打乱的&#xff0c;你可以选择一个索引区间的数字进行剪切操作。问最少进…

即便考分很好也不予录取的研究生复试红线,都是原则性问题

在浙大研究生招生录取政策文件中有这么一句话&#xff1a;坚持“按需招生、全面衡量、择优录取、宁缺毋滥”的原则&#xff0c;以提高人才选拔质量为核心&#xff0c;在确保安全性、公平性和科学性的基础上&#xff0c;做到统筹兼顾、精准施策、严格管理。字字体现出研究生招生…

保姆级手把手教你如何使用HTTP远程连接Docker?

为什么要远程访问Docker? 可以使用http协议&#xff0c;获取json格式数据&#xff0c;很方便使用代码控制镜像&#xff0c;so easy 怎么配置才可以远程访问呢&#xff1f; 新建或修改这个文件&#xff1a;如果没有就新增哦~ /etc/systemd/system/docker.service.d/overrid…

Flink中核心重点总结

目录 1. 算子链 1.1. 一对一&#xff08;One-to-one&#xff0c; forwarding&#xff09; 1.2. 重分区&#xff08;Redistributing&#xff09; 1.3. 为什么有算子链 2. 物理分区&#xff08;Physical Partitioning&#xff09; 2.1. 什么是分区 2.2. 随机分区&#xff…