大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流

news2024/9/24 19:28:17

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark Streaming 基础概述
  • Spark Streaming 架构概念
  • 编程模型
  • 优点缺点概括
  • 与 Kafka 集成

在这里插入图片描述

基础概念

基础数据源包括:

  • 文件系统(File System):Spark Streaming 支持监控 HDFS、S3、本地文件系统等目录中的新文件,并将这些文件作为数据流的一部分进行处理。这个数据源适用于处理批量生成的文件。
  • Socket 数据流(Socket Stream):这是最简单的数据源之一,Spark Streaming 可以通过 TCP 套接字连接接收文本数据流。例如,你可以使用 nc(Netcat)工具向指定端口发送数据,Spark Streaming 可以实时读取这些数据。
  • Kafka:Kafka 是一个分布式消息系统,常用于构建实时流处理应用。Spark Streaming 提供了直接和高级两种 Kafka 数据源集成方式,支持从 Kafka 主题中读取数据流。
  • Flume:Apache Flume 是一个分布式、可靠且高可用的系统,用于高效收集、聚合和传输大量日志数据。Spark Streaming 可以通过 Flume 接收数据并处理,常用于日志收集和分析。
  • Kinesis:Amazon Kinesis 是一个实时流处理服务,Spark Streaming 提供了 Kinesis 数据源的支持,能够从 Kinesis 流中读取数据,并进行实时分析。
  • 自定义数据源:Spark Streaming 允许用户实现自定义的输入源。用户可以通过实现 Receiver 类或使用 Direct DStream API 来创建新的数据源。

在这里插入图片描述

引入依赖

我们使用的话,需要引入依赖:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

文件数据流

基础概念

通过 textFileStreama 方法进行读取 HDFS 兼容的文件系统文件
Spark Streaming 将会监控 directory 目录,并不断处理移动进来的文件

  • 不支持嵌套目录
  • 文件需要有相同的数据格式
  • 文件进入 Directory 的方式需要通过移动或者重命名来实现
  • 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据
  • 文件流不需要接收器(Receiver),不需要单独分配CPU核

编写代码

package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object FileDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("FileDStream")
      .setMaster("local[*]")

    // 时间间隔
    val ssc = new StreamingContext(conf, Seconds(5))
    // 本地文件,也可以使用 HDFS 文件
    val lines = ssc.textFileStream("goodtbl.java")
    val words = lines.flatMap(_.split("\\s+"))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    // 打印信息
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

代码解析

  • object FileDStream: 定义了一个名为 FileDStream 的单例对象,包含 main 方法,这是 Scala 中的入口点,相当于 Java 的 public static void main 方法。
  • Logger.getLogger(“org”).setLevel(Level.ERROR): 这行代码将日志级别设置为 ERROR,以减少控制台输出的日志信息,只显示错误级别的信息。这通常是为了避免不必要的日志干扰核心的输出。
  • val conf = new SparkConf(): 创建一个 SparkConf 对象,包含了应用程序的配置信息。
  • setAppName(“FileDStream”): 设置应用程序的名称为 “FileDStream”。这个名称会在 Spark Web UI 中显示,用于识别应用。
  • setMaster("local[]"): 设置 Spark 的运行模式为本地模式(local[]),这意味着应用程序将在本地运行,并使用所有可用的 CPU 核心。
  • val ssc = new StreamingContext(conf, Seconds(5)): 创建一个 StreamingContext 对象,负责管理 Spark Streaming 应用程序的上下文。Seconds(5) 指定了微批处理的时间间隔为 5 秒,也就是每 5 秒钟会处理一次数据。
  • val words = lines.flatMap(_.split(“\s+”)): 对每一行文本内容进行处理,使用空格或其他空白字符(\s+)进行分割,将每行文本拆分成单词。flatMap 操作会将结果展开为一个包含所有单词的 DStream。
  • val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _): 通过 map 操作将每个单词映射为 (word, 1) 形式的键值对,然后使用 reduceByKey 按键(即单词)进行聚合,计算每个单词的出现次数。
  • wordCounts.print(): 将计算结果打印到控制台,每 5 秒钟输出一次当前批次中每个单词的计数结果。
  • ssc.start(): 启动 Spark Streaming 的计算,这会使得 Spark 开始监听数据源并开始处理数据流。
  • ssc.awaitTermination(): 阻塞当前线程,等待流计算结束,通常是等待手动停止应用程序。这个方法会让程序保持运行,直到手动终止或遇到异常。

运行结果

【备注:使用 local[],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,使用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理接受到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

在这里插入图片描述

Socket数据流

编写代码

Spark Streaming 可以通过Socket端口监听并接受数据,然后进行相应处理:
打开一个新的命令窗口,启动 nc 程序。(在Flink中也这么用过)

# 如果没有的话 你需要安装一下
nc -lk 9999

编写运行的代码:

package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apachea.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SocketDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("SocketStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1));
    val lines = ssc.socketTextStream("0.0.0.0", 9999)
    val words = lines.flatMap(_.split("\\s+"))
    val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
    wordCount.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

随后可以在nc窗口中随意输入一些单词,监听窗口会自动获取单词数据流信息,在监听窗口每X秒就会打印出词频的统计信息,可以在屏幕是上出现结果。

运行结果

【备注:使用 local[],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,使用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理接受到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

在这里插入图片描述
此时,从控制台启动后,输入内容
在这里插入图片描述

RDD队列流

基础概念

调用 Spark Streaming应用程序的时候,可使用 streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
备注:

  • oneAtTime:缺省为true,一次处理一个RDD,设为False,一次处理全部RDD
  • RDD队列流可以使用 local[1]
  • 涉及到同时出队和入队操作,所以要做同步

每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处理,计算RDD中数据除10取余的个数。

队列流优点

  • 适用于测试和开发:RDD 队列流主要用于开发和调试阶段,它允许你在没有真实数据源的情况下测试 Spark Streaming 应用程序。
  • RDD 队列:你可以创建一个包含 RDD 的队列(Queue),Spark Streaming 会从这个队列中逐一获取 RDD,并将其作为数据流的一部分进行处理。
  • 灵活性:由于是手动创建的 RDD 队列,因此你可以完全控制数据的内容、数量以及生成的速度,从而测试各种场景下的应用表现。

编写代码

编写代码如下:

package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.Queue

object RDDQueueDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val sparkConf = new SparkConf()
      .setAppName("RDDQueueStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val rddQueue = new Queue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
    ssc.start()

    for (i <- 1 to 5) {
      rddQueue.synchronized {
        val range = (1 to 100).map(_ * i)
        rddQueue += ssc.sparkContext.makeRDD(range, 2)
      }
      Thread.sleep(2000)
    }

    ssc.stop()
  }
}

运行结果

运行结果如图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2070927.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GATK ReadsPathDataSource类介绍

GATK(Genome Analysis Toolkit)是一个广泛使用的基因组分析工具包,它的核心库之一是htsjdk,用于处理高通量测序数据。在GATK中,ReadsPathDataSource类是负责管理和提供读取高通量测序数据文件(如BAM、SAM、CRAM)的类。 常见使用场景 数据加载:在GATK的基因组分析工具链…

MySQL的MRR(Multi-Range Read)优化原理详解

❃博主首页 &#xff1a; 「码到三十五」 &#xff0c;同名公众号 :「码到三十五」&#xff0c;wx号 : 「liwu0213」 ☠博主专栏 &#xff1a; <mysql高手> <elasticsearch高手> <源码解读> <java核心> <面试攻关> ♝博主的话 &#xff1a…

LeetCode:反转区间内的链表

&#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;力扣刷题日记 &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 文章目录 反转区间内的链表题目链接方法一&#xff1a;拆开反转…

【TB作品】PIC16F1719单片机,EEPROM,PFM,读写

对于PIC16F1719单片机&#xff0c;没有直接的EEPROM&#xff0c;而是使用高耐久度的程序闪存&#xff08;PFM&#xff09;作为非易失性数据存储区域。这个区域特别适合存储那些需要频繁更新的数据。读写这个内存区域需要操作一些特殊功能寄存器&#xff0c;比如用于地址的PMADR…

2.K8s集群搭建

K8s搭建 搭建方案kubeadm搭建系统初始化操作k8s Master节点初始化将node节点加入集群安装网络插件Calico集群测试 搭建方案 minikube&#xff1a;轻量化的Kubernetes集群&#xff0c;为了能够更好学习和体验k8s功能而推出的&#xff0c;借助个人PC的虚拟化环境就可以实现Kuber…

如何使用ssm实现基于java web的网上书城系统的设计与实现+vue

TOC ssm123基于java web的网上书城系统的设计与实现vue JAVA简介 Java主要采用CORBA技术和安全模型&#xff0c;可以在互联网应用的数据保护。它还提供了对EJB&#xff08;Enterprise JavaBeans&#xff09;的全面支持&#xff0c;java servlet API&#xff0c;JSP&#xff…

【Redis】Redis客户端——Jedis(Java)

Redis Java使用案例 环境配置引入依赖配置端⼝转发连接 Redis Server Java基础代码操作Redisset 和 getexsits 和 del 环境配置 引入依赖 Java 操作 redis 的客⼾端有很多. 其中最知名的是 jedis. 创建 maven 项⽬, 把 jedis 的依赖拷⻉到 pom.xml 中. <!-- https://mvnr…

ssrf--web-ssrfme例题

将web-ssrfme.zip解压缩在Ubuntu下 Docker-compose up -d 更新后的镜像重新启动容器 可以看到已经拉取成功ssrfme镜像 我们使用端口访问文件&#xff0c;可以看到有一个过滤条件&#xff0c;它限制了file&#xff0c;dict协议&#xff0c;127.0.0.1和localhost 也不能用&…

【55-90】结构型模式

目录 一.结构型模式概述 二.代理模式 2.1 概述 2.2 结构 2.3 静态代理 2.4 JDK动态代理 2.5 CGLIB动态代理 2.6 三种代理的对比 2.7 优缺点 三.适配器模式 3.1 概述 3.2 结构 3.3 类适配器模式 3.4 对象适配器模式 3.5 应用场景 四.装饰者模式 4.1 概述 4.2 结…

从并发20到并发120之laravel性能优化

调优成果 遇到问题 单台服务并发20&#xff0c;平均响应时间1124ms&#xff0c;通过htop观察&#xff0c;发现cpu占用率达到100%&#xff08;包括sleep的进程&#xff09;&#xff0c;内存几乎没怎么用。 调优后 单机最大吞吐量达到120 响应时长不超过1000ms 硬件信息 …

数学建模----线性回归分析(引入热力图的绘制方法)

目录 0.直击重点 1.一元线性回归分析 1.1散点图的绘制 1.2相关性的分类 1.3计算相关系数 1.4模型的检验 1.5模型的预测 2.多重线性回归分析&#xff08;上&#xff09; 2.1多重线性的概念 2.2散点图的分类 2.3热力图的绘制 2.4根据结果确定新的变量 3.多重线性…

【开端】 如何判断手机号码属于哪个国家(手机号判断正则)汇总

import org.apache.commons.lang3.StringUtils; /** * 手机号判断正则 */ public enum MobileRegularExp { /** * 国家 正则 */ CN("中国", 86, "^(\\?0?86\\-?)?1[3456789]\\d{9}$"), TW("中国台湾", 886, "…

第七节 循环结构;goto语句

目录 7.1 while循环 7.1.1 if 和 while的对⽐ 7.1.2 while的执行流程 7.1.3 while的练习 7.2 for循环 7.2.1 语法形式 7.2.2 for循环的执⾏流程 7.2.3 for 循环的练习 7.3 while 和 for 循环的对比 7.4 do while 循环 7.4.1 do while 的语法形式 7.4.2 do while循…

Jamba前生今世:1.5开源来袭

AI21服务于企业&#xff0c;为企业构建基础模型和AI系统以加速GenAI在生产中的使用。AI21 成立于2017年&#xff0c;已从NVIDIA、Intel、Google等公司共筹集了3.36亿美元。它是最早将生成式AI推向大众的公司之一&#xff0c;借助AI21平台&#xff0c;企业可以构建自己的生成式A…

菲菲更名宝贝:批量处理,文件命名不再繁琐

你是否有这样的经历&#xff1f;曾几何时&#xff0c;在堆积如山的文件中迷失方向&#xff0c;为了一个个手动重命名文件而加班到深夜&#xff1f;是否渴望有一种魔法&#xff0c;能瞬间让你的文件整理得井井有条&#xff0c;让繁琐的命名工作变得轻松愉快&#xff1f;那么&…

大数据毕业设计开题报告100例

文章目录 &#x1f6a9; 1 前言1.1 选题注意事项1.1.1 难度怎么把控&#xff1f;1.1.2 题目名称怎么取&#xff1f; 1.2 开题选题推荐1.2.1 起因1.2.2 核心- 如何避坑(重中之重)1.2.3 怎么办呢&#xff1f; &#x1f6a9;2 选题概览&#x1f6a9; 3 项目概览题目1 : 深度学习社…

前端网站优化-Brotli 压缩

杨绛先生说:“岁不声不响&#xff0c;你且不慌不忙。在凡俗的烟火里&#xff0c;愿以素心&#xff0c;阅来日方长。生活总是一地鸡毛&#xff0c;繁杂琐碎的日常&#xff0c;无力掌控的局面&#xff0c;以及猝不及防的变化&#xff0c;让日子多了几分慌张”。 市井长巷&#xf…

ssrf漏洞复现

环境搭建 zhuifengshaonianhanlu/pikachu: 一个好玩的Web安全-漏洞测试平台 (github.com) 直接将其复制到linux环境下拉取docker就行 我这里已经拉去过了&#xff0c;如果拉去速度慢话&#xff0c;可以在/etc/docker下的daemon.json中配置镜像加速 vim /etc/docker/daemon.js…

大模型学习笔记 - LLM 对齐优化算法 DPO

LLM - DPO LLM - DPO DPO 概述DPO 目标函数推导DPO 目标函数梯度的推导 DPO 概述 大模型预训练是从大量语料中进行无监督学习&#xff0c;语料库内容混杂&#xff0c;训练的目标是语言模型损失&#xff0c;任务是next token prediction&#xff0c;生成的token 不可控&…

MyBatis-Plus分页插件使用详解

一、简述 在使用mybatis开发项目的时候我们通常使用pagehelper来进行分页操作&#xff0c; 但是我们在使用MyBatis-Plus 开发时&#xff0c;MyBatis-Plus内置已经有分页功能了&#xff0c;其实不需要在额外引入pagehelper依赖了&#xff0c;而且两者同时引入有时候还会导致分页…