SparkStreaming--scala

news2025/1/12 19:39:52

文章目录

  • 第1关:QueueStream
    • 代码
  • 第2关:File Streams
    • 代码


第1关:QueueStream

任务描述
本关任务:编写一个清洗QueueStream数据的SparkStreaming程序。

相关知识
为了完成本关任务,你需要掌握:1.如何使用SparkStreaming,2.如何使用 SparkStreaming读取QueueStream。

SparkStreaming 的开发步骤
初始化SparkConf并设置相关参数
val conf = new SparkConf().setMaster(master).setAppName(appName)

说明:

appName 是应用程序在集群 UI 上显示的名称。

master 是Spark,Mesos或YARN集群的URL,或在本地模式下运行使用 local[*]

初始化JavaStreamingContext并设置处理批次的时间
val ssc = new StreamingContext(conf, Seconds(1))

设置数据源
例如:

val inputStream = ssc.queueStream(rddQueue)

批次数据处理(使用相关算子完成相应的操作)
算子 含义
map(func) 通过将源DStream的每个元素传递给函数func来返回一个新的DStream
flatMap(func) 与map类似,但每个输入项可以映射到0个或更多输出项。
filter(func) 通过仅选择func返回true的源DStream的记录来返回新的DStream
repartition(numPartitions) 通过创建更多或更少的分区来更改此DStream中的并行度级别
union(otherStream) 返回一个新的DStream,它包含源DStream和otherDStream中元素的并集
count() 通过计算源DStream的每个RDD中的元素数量,返回单元素RDD的新DStream
reduce(func) 通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream 。该函数应该是关联的和可交换的,以便可以并行计算
countByValue() 当在类型为K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率
reduceByKey(func,[ numTasks ]) 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性确定spark.default.parallelism进行分组。您可以传递可选numTasks参数来设置不同数量的任务
join(otherStream, [numTasks]) 当在(K,V)和(K,W)对的两个DStream上调用时,返回(K,(V,W))对的新DStream与每个键的所有元素对
cogroup(otherStream, [numTasks]) 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream
transform(func) 通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作
updateStateByKey(func) 返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态
foreachRDD(func) 最通用的输出运算符,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD。
启动流计算
ssc.start();

等待处理停止
ssc.awaitTermination();

QueueStream
QueueStream(队列流):推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。

编程要求
在右侧编辑器补充代码,完成以下需求:

将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss )

提取数据中的起始URL(切割符为空格)

拼接结果数据,格式如下:

Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl: https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
将最终结果写入Mysql数据库
测试说明
平台将对你编写的代码进行评测:

预期输出:

1 Ip:100.143.124.29,visitTime:2017-10-27 14:58:05,startUrl:www/1,targetUrl:https://www.baidu.com/s?wd=反叛的鲁鲁修,statusCode:404
2 Ip:30.132.167.100,visitTime:2018-12-02 11:29:39,startUrl:www/4,targetUrl:-,statusCode:302
3 Ip:30.156.187.132,visitTime:2016-05-17 17:18:56,startUrl:www/2,targetUrl:-,statusCode:200
4 Ip:29.100.10.30,visitTime:2016-10-12 01:25:47,startUrl:www/3,targetUrl:http://cn.bing.com/search?q=游戏人生,statusCode:302
5 Ip:132.187.167.143,visitTime:2017-01-08 23:21:09,startUrl:pianhua/130,targetUrl:-,statusCode:200
6 Ip:143.187.100.10,visitTime:2016-09-21 19:27:39,startUrl:www/1,targetUrl:-,statusCode:302
7 Ip:10.100.124.30,visitTime:2018-09-16 02:49:35,startUrl:www/4,targetUrl:http://cn.bing.com/search?q=来自新世界,statusCode:200
8 Ip:29.10.143.187,visitTime:2017-09-29 15:49:09,startUrl:www/1,targetUrl:-,statusCode:404
9 Ip:29.187.132.100,visitTime:2018-11-27 05:43:17,startUrl:www/1,targetUrl:-,statusCode:200
10 Ip:187.167.124.132,visitTime:2016-01-28 13:34:33,startUrl:www/6,targetUrl:-,statusCode:200
开始你的任务吧,祝你成功!

代码

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
 
object QueueStream {
    def main(args: Array[String]) {
        val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
        val conf = new SparkConf().setMaster("local[2]").setAppName("queueStream")
        
        /********** Begin **********/
 
        //1.初始化StreamingContext,设置时间间隔为1s
        val ssc = new StreamingContext(conf, Seconds(1))
 
        //2.对接队列流
        val inputStream = ssc.queueStream(rddQueue)
 
        /**
        *
        * 数据格式如下:
        *      100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的鲁鲁修,404
        * 数据从左往右分别代表:用户IP、访问时间戳、起始URL及相关信息(访问方式,起始URL,http版本)、目标URL、状态码
        *
        *
        * 原始数据的切割符为逗号,(英文逗号)
        *
        * 需求:
        *      1.将时间戳转换成规定时间(格式为:yyyy-MM-dd HH:mm:ss )
        *      2.提取数据中的起始URL(切割符为空格)
        *      3.拼接结果数据,格式如下:
        * Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
        *      4.将最终结果写入 mysql 数据库, 调用DBUtils.add(line)即可, line:String
        */
 
        //3.获取队列流中的数据,进行清洗、转换(按照上面的需求)
    val data = inputStream.map(data=>{
      val dataliat = data.split(',')
      val ip = dataliat(0)
      val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val lt = dataliat(1).toLong
      val date = new Date(lt)
      val visitTime = simpleDateFormat.format(date)
      val startUrl = dataliat(2).split(' ')(1)
      val targetUrl= dataliat(3)
      val statusCode = dataliat(4)
      val result = "Ip:" + ip + ",visitTime:" + visitTime + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCode
      result
    })
 
        //4.将最终结果写入 mysql 数据库, 调用DBUtils.add(line)即可, line:String
    data.foreachRDD(rdd => {
      rdd.foreachPartition(it => {
        it.foreach(line => {
          DBUtils.add(line)
        })
      })
    })
 
 
        //5.启动SparkStreaming
    ssc.start()
 
        /********** End **********/
        DBUtils.addQueue(ssc, rddQueue)
    }
}

在这里插入图片描述

第2关:File Streams

任务描述
本关任务:编写一个清洗File Streams数据的SparkStreaming程序。

相关知识
为了完成本关任务,你需要掌握:1. 文件流,2. SparkStreaming的编程流程。

文件流
文件流(File Streams):从与HDFS API兼容的任何文件系统上的文件读取数据

通过文件流创建Dstream:

val lines=streamingContext.fileStreamKeyClass,ValueClass, InputFormatClass

对于简单的文本文件,有一种更简单的方法:

val lines=streamingContext.textFileStream(dataDirectory)

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的任何文件(不支持嵌套目录中写入的文件)。

注意:

文件必须具有相同的数据格式。

文件移动到该目录后,不能在添加新数据,即使添加也不会读取新数据。

只会监听目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。

说明:文件流不需要运行receiver,因此不需要分配core

SparkStreaming编程流程
设置SparkConf相关参数
val conf = new SparkConf().setMaster(master).setAppName(appName)

初始化StreamingContext
val ssc = new StreamingContext(conf, Seconds(1))
Seconds(1)表示每一秒处理一个批次;

设置数据源创建Dstream
val lines = ssc.textFileStream(dataDirectory)

通过将转换和输出操作应用于DStream来定义流式计算
比如flatmap,map,foreachRDD,updateStateByKey等等;

启动流计算
ssc.start();

等待处理停止
ssc.awaitTermination();

编程要求
在右侧编辑器中补全代码,要求如下:

/root/step11_fils下有两个文件,文件内容分别为:
hadoop hadoop hadoop hadoop hadoop hadoop hadoop hadoop spark spark
hello hello hello hello hello hello hello hello study study
要求清洗数据并实时统计单词个数,并将最终结果导入MySQL
step表结构:

列名 数据类型 长度 非空
word varchar 255 √
count int 255 √
测试说明
平台会对你编写的代码进行测试:

预期输出:

hadoop 8
spark 2
hello 8
study 2

代码

package com.sanyiqi
 
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
 
object SparkStreaming {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("edu").setMaster("local")
        /********** Begin **********/
        //1.初始化StreamingContext,设置时间间隔为1s
    val ssc = new StreamingContext(conf, Seconds(1))
        //2.设置文件流,监控目录/root/step11_fils
    val lines = ssc.textFileStream("/root/step11_fils")
        /* *数据格式如下:hadoop hadoop spark spark
           *切割符为空格
           *需求:
           *累加各个批次单词出现的次数
           *将结果导入Mysql
           *判断MySQL表中是否存在即将要插入的单词,不存在就直接插入,存在则把先前出现的次数与本次出现的次数相加后插入
           *库名用educoder,表名用step,单词字段名用word,出现次数字段用count
         */
        //3.对数据进行清洗转换
   val wordcount = lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
 
 
//4.将结果导入MySQL
    wordcount.foreachRDD(rdd => {
      rdd.foreachPartition(f = eachPartition => {
        val connection: Connection = createConnection()
        eachPartition.foreach(f = record => {
          val querySql = "SELECT t.count FROM step t WHERE t.word = '" + record._1 + "'"
          val queryResultSet: ResultSet = connection.createStatement().executeQuery(querySql)
          val hasNext = queryResultSet.next()
          print("MySQL had word:" + record._1 + " already  :  " + hasNext)
          if (!hasNext)
          {
            val insertSql = "insert into step(word,count) values('" + record._1 + "'," + record._2 + ")"
            connection.createStatement().execute(insertSql)
          } else {
            val newWordCount = queryResultSet.getInt("count") + record._2
            val updateSql = "UPDATE step SET count = " + newWordCount + " where word = '" + record._1 + "'"
            connection.createStatement().execute(updateSql)
          }
        })
        connection.close()
      })
    })
        //5.启动SparkStreaming
    ssc.start()
 
        /********** End **********/
        Thread.sleep(15000)
        ssc.awaitTermination()
        ssc.stop()
	}
    
    /**
      *获取mysql连接
      *@return
      */
    def createConnection(): Connection ={
    	Class.forName("com.mysql.jdbc.Driver")
    	DriverManager.getConnection("jdbc:mysql://localhost:3306/educoder","root","123123")
    }
}

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1926765.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

<数据集>光伏板缺陷识别数据集<目标检测>

数据集格式:VOCYOLO格式 图片数量:2400张 标注数量(xml文件个数):2400 标注数量(txt文件个数):2400 标注类别数:4 标注类别名称:[Crack,Grid,Spot] 序号类别名称图片数框数1Crack8688922Grid8248843S…

从汇编层看64位程序运行——栈帧(Stack Frame)边界

大纲 RBP,RSP栈帧边界总结参考资料 在《从汇编层看64位程序运行——栈帧(Stack Frame)入门》中,我们简单介绍了栈帧的概念,以及它和函数调用之间的关系。如文中所述,栈帧是一种虚拟的概念,它表达了一个执行中的函数的栈…

Python之Excel自动化处理(二)

一、Excel设置样式 1.1、常用方法与属性 函数名&属性含义xlwt.Font()创建字体样式font.name设置字体类型font.colour_index设置字体颜色font.height设置字体大小font.bold设置字体是否为加粗font.underline设置字体下划线font.italic设置字体斜体xlwt.Alignment()创建字体…

笔记 1 : 课本前 2 章

现在开始跟着彭老师学习 arm 。把重要的知识点归拢一下,便于复习。早日学有所成,为国为家为己,更幸福些。 (1)冯诺依曼架构与哈弗架构,与混合架构: 以及: (2&#xff0…

音视频入门基础:H.264专题(13)——FFmpeg源码中通过SPS属性获取视频色彩格式的实现

一、引言 通过FFmpeg命令可以获取到H.264裸流文件的色彩格式(又译作色度采样结构、像素格式): 在vlc中也可以获取到色彩格式(vlc底层也使用了FFmpeg进行解码): 这个色彩格式就是之前的文章《音视频入门基础…

03-Charles实战

一、抓包分析问题示例 1)问题描述 2)抓包分析 这是后台响应回来的错误信息,说明问题一是后台的原因;但是后台只响应了一条信息,而前端页面却显示两条错误信息,说明前端页面处理异常的时候逻辑有问题&#…

《昇思25天学习打卡营第3天|03张量Tensor》

说在开始 学习下mindspore中对tensor的处理逻辑。 Tensor属性 张量的属性包括形状、数据类型、转置张量、单个元素大小、占用字节数量、维数、元素个数和每一维步长。 形状(shape):Tensor的shape,是一个tuple。 数据类型&…

windows USB 设备驱动开发- USB Type-C支持(一)

传统的 USB 连接使用两端都有 USB A 和 USB B 接头的电缆。 USB A 连接器始终插入主机端,USB B 连接器连接功能端,该功能端是手机) 或外设 (鼠标、键盘) 的设备 (。 使用这些连接器,只能将主机连接到函数;绝不是另一个主机的主机或另一个函数…

如何安装dotenv,避坑指南,安装包的包名有误?

嗨,大家好,我是蓝若姐姐。最近在研究AI大模型,想写一个调用openai接口的demo,结果发现在装一个三方库的时候一直报错,mac电脑安装dotenv报错,具体情况是 执行这个命令: pip install dotenv 遇…

提升 Kubernetes 日志记录能力,提高可观察性

介绍 在微服务和容器化应用时代,有效管理和监控应用的健康和性能至关重要。Kubernetes是一个用于自动部署、扩展和管理容器化应用的开源系统,已成为寻求敏捷性和弹性的企业的首选解决方案。 然而,由于 Kubernetes 的分布式架构、高日志量和…

【数据结构】栈和队列的深度探索,从实现到应用详解

💎所属专栏:数据结构与算法学习 💎 欢迎大家互三:2的n次方_ 🍁1. 栈的介绍 栈是一种后进先出的数据结构,栈中的元素只能从栈顶进行插入和删除操作,类似于叠盘子,最后放上去的盘子最…

Git代码管理工具 — 4 Git分支详解

目录 1 Git 分支概念 2 Git 分支基本操作 2.1 git branch查看与创建本地分支 2.2 git checkout切换分支 2.3 git merge合并分支 2.4 删除分支 3 解决冲突 1 Git 分支概念 Git 分支允许你从当前开发线上分离出来,进行独立的开发工作,而不会影响主…

用 AI 写歌词,让音乐表达与众不同

在音乐的广袤天地中,我们都渴望通过独特的表达来触动人心,展现自我。而如今,AI 技术的崛起为音乐创作带来了全新的突破,让我们能够以一种前所未有的方式赋予音乐独特的灵魂。 “妙笔生词智能写歌词软件(veve522&#…

Window10下安装WSL-Ubuntu20.04

1.开启并更新WSL 1.1开启WSL 首先先来看一下电脑是否能够开启WSL:待补充... 然后再来看一下如何开启WSL:win->设置->应用->应用和功能->程序和功能,如下所示: 最后选择启用或关闭Windows功能,开启两个选项:1.Hyper-V…

大语言模型诞生过程剖析

过程图如下 📚 第一步:海量文本的无监督学习 得到基座大模型🎉 🔍 原料:首先,我们需要海量的文本数据,这些数据可以来自互联网上的各种语料库,包括书籍、新闻、科学论文、社交媒体帖…

K8S系列-Kubernetes基本概念及Pod、Deployment、Service的使用

一、Kubernetes 的基本概念和术语 一、资源对象 ​ Kubernetes 的基本概念和术语大多是围绕资源对象 Resource Object 来说的,而资源对象在总体上可分为以下两类: 1、某种资源的对象 ​ 例如节点 Node) Pod 服务 (Service) 、存储卷 (Volume)。 2、…

记录些Redis题集(4)

Redis 通讯协议(RESP) Redis 通讯协议(Redis Serialization Protocol,RESP)是 Redis 服务端与客户端之间进行通信的协议。它是一种二进制安全的文本协议,设计简洁且易于实现。RESP 主要用于支持客户端和服务器之间的请求响应交互…

Adminer-CVE-2021-21311

在其4.0.0到4.7.9版本之间,连接 ElasticSearch 和 ClickHouse 数据库时存在一处服务端请求伪造漏洞(SSRF)。 VPS开启HTTP服务 VPS 开启HTTP 再同时跑POC 确保能访问poc里的链接文件 第一是目标地址 第二个是跳转地址 第三个是监听地址 如果…

昇思25天学习打卡营第21天|DCGAN生成漫画头像

DCGAN原理 DCGAN(深度卷积对抗生成网络,Deep Convolutional Generative Adversarial Networks)是GAN的直接扩展。不同之处在于,DCGAN会分别在判别器和生成器中使用卷积和转置卷积层。 它最早由Radford等人在论文Unsupervised Re…

【软件建模与设计】-03-软件生存周期模型和过程

目录 1、瀑布模型 2、抛弃型原型 3、演化式-增量模型 4、螺旋模型 5、统一软件开发过程RUP 6、设计验证和确认 6.1、软件质量保证 6.2、软件设计的性能分析 7、软件生存周期的活动 7.1、需求分析和规约 7.2、体系结构设计 7.3、详细设计 7.4、编码 8、软件测试 …