SparkStreaming_window_sparksql_reids

news2025/4/13 4:21:55

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/**
  * 统计,截止到目前为止出现的每一个key的次数
  * window窗口操作,每个多长M时间,通过过往N长时间内产生的数据
  * M就是滑动长度sliding interval
  * N就是窗口长度window length
  */
object Demo05_WCWithWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("WordCountUpdateStateByKey")
      .setMaster("local[*]")
    val batchInterval = 2
    val duration = Seconds(batchInterval)
    val ssc = new StreamingContext(conf, duration)
    val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)
    val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​
    val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,
      windowDuration = Seconds(batchInterval * 3),
      slideDuration = Seconds(batchInterval * 2))
​
    ret.print()
​
    ssc.start()
    ssc.awaitTermination()
  }
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/**
 * SparkStreaming整合SparkSQL的案例之,热门品类top3排行
 * 输入数据格式:
 * id brand category
 * 1 huwei watch
 * 2 huawei phone
 *
 */
object Demo06_SQLWithStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingIntegerationSQL")
      .setMaster("local[*]")
    val batchInterval = 2
    val duration = Seconds(batchInterval)
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, duration)
    ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")
    val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)
    //001 mi moblie
    val pairs:DStream[(String, Int)] = lines.map(line => {
      val fields = line.split("\\s+")
      if(fields == null || fields.length != 3) {
        ("", -1)
      } else {
        val brand = fields(1)
        val category = fields(2)
        (s"${category}_${brand}", 1)
      }
    }).filter(t => t._2 != -1)
​
    val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​
    usb.foreachRDD((rdd, bTime) => {
      if(!rdd.isEmpty()) {//category_brand count
        import spark.implicits._
        val df = rdd.map{case (cb, count) => {
          val category = cb.substring(0, cb.indexOf("_"))
          val brand = cb.substring(cb.indexOf("_") + 1)
          (category, brand, count)
        }}.toDF("category", "brand", "sales")
​
        df.createOrReplaceTempView("tmp_category_brand_sales")
        val sql =
          """
            |select
            |  t.category,
            |  t.brand,
            |  t.sales,
            |  t.rank
            |from (
            |  select
            |    category,
            |    brand,
            |    sales,
            |    row_number() over(partition by category order by sales desc) rank
            |  from tmp_category_brand_sales
            |) t
            |where t.rank < 4
            |;
                    """.stripMargin
        spark.sql(sql).show()
      }
    })
​
    ssc.start()
    ssc.awaitTermination()
  }
​
  def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {
    Option(seq.sum + option.getOrElse(0))
  }
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{
  val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可
  jedis.auth("root")
  jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1344238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Stable Diffusion WebUI安装合成面部说话插件SadTalker

SadTalker可以根据一张图片、一段音频&#xff0c;合成面部说这段语音的视频。图片需要真人或者接近真人。 安装ffmpeg 下载地址&#xff1a; https://www.gyan.dev/ffmpeg/builds/ 下载ffmpeg-git-full.7z 后解压&#xff0c;将解压后的目录\bin添加到环境变量的Path中。 在…

鸿蒙原生应用再添新丁!爱奇艺入局鸿蒙

鸿蒙原生应用再添新丁&#xff01;爱奇艺 入局鸿蒙 来自 HarmonyOS 微博12月29日消息&#xff0c;#爱奇艺完成鸿蒙原生应用Beta版#作为中国头部在线视频平台&#xff0c;爱奇艺 完成鸿蒙原生应用Beta版&#xff0c;将以丰富的正版高清视频资源促进鸿蒙生态的进一步繁荣&#x…

python实现图像的二维傅里叶变换——冈萨雷斯数字图像处理

原理 二维傅里叶变换是一种在图像处理中常用的数学工具&#xff0c;它将图像从空间域&#xff08;我们通常看到的像素排列&#xff09;转换到频率域。这种变换揭示了图像的频率成分&#xff0c;有助于进行各种图像分析和处理&#xff0c;如滤波、图像增强、边缘检测等。 在数学…

YOLOv5算法进阶改进(10)— 更换主干网络之MobileViTv3 | 轻量化Backbone

前言:Hello大家好,我是小哥谈。MobileViTv3是一种改进的模型架构,用于图像分类任务。它是在MobileViTv1和MobileViTv2的基础上进行改进的,通过引入新的模块和优化网络结构来提高性能。本节课就给大家介绍一下如何在主干网络中引入MobileViTv3网络结构,希望大家学习之后能够…

Stable Diffusion WebUI制作光影文字效果

在huggingface上下载control_v1p_sd15_brightness模型。 将模型放在stable-diffusion-webui\extensions\sd-webui-controlnet\models目录下。 SD参数配置 正向提示词&#xff1a; city,Building,tall building,Neon Light, gentle light shines through, anime style, paint…

冒泡排序--------(C每日一题)

冒泡排序&#xff1a; 每次将相邻的两个数比较,将小的调到前头--升序 冒泡排序一个结论&#xff1a; n个数要进行n-1轮比较&#xff0c;第j轮要进行n-j次两两比较 循环体代码&#xff1a; int main() {int i, j,n,a[10],t;//n是几个数比较for(j1;j<n-1;j)//控制轮次for…

PNG免抠素材库,免费下载,可商用~

本期分享5个高质量PNG素材网站&#xff0c;让你在工作中大大提高效率&#xff0c;节省更多的时间&#xff0c;赶紧收藏起来吧~ 1、菜鸟图库 https://www.sucai999.com/searchlist/66008----all-0-1.html?vNTYxMjky 网站主要分享设计素材为主。像平面海报、免抠元素、背景图片…

英语打卡分析12

[爱心]长难句分享第十二天解析 [玫瑰]【词汇】&#xff1a; • appropriate [əˈproʊpriət] adj. 恰当的 • in place 准备妥当 • caregiver [ˈkerɡɪvər] n. 看护人 • no more … than… 和……一样不 • newsworthy [ˈnuːzwɜːri] adj. 值得报道的 • capable […

记一次应急响应练习(Linux)

记一次应急响应练习(Linux) Linux&#xff1a; 请提交攻击者的IP地址 答&#xff1a; 192.168.31.132 思路&#xff1a; 通过查看历史命令和开放的8080端口看到这台主机上运行的是Tomcat服务。并且在历史命令中看到了Tomcat的安装路径。那么就算是找到了日志的查看点了&#x…

SpringBoot3 核心技能

1. 常用注解 SpringBoot摒弃XML配置方式&#xff0c;改为全注解驱动 1. 组件注册 Configuration、SpringBootConfiguration Bean、Scope Controller、 Service、Repository、Component Import ComponentScan 步骤&#xff1a; 1、Configuration 编写一个配置类 2、在…

中科院1区TOP,Elsevier出版社,均1-2个月录用!检索超稳!

【SciencePub学术】本期&#xff0c;小编给大家推荐的是一本Elsevier旗下、工程技术领域、影响因子为6.0的中科院1区TOP。其详情如下&#xff1a; 期刊简介 TRIBOLOGY INTERNATIONAL ISSN&#xff1a;0301-679X E-ISSN&#xff1a;1879-2464 IF&#xff08;2022&#x…

Python生成器 (Generators in Python)

Generators in Python 文章目录 Generators in PythonIntroduction 导言贯穿全文的几句话为什么 Python 有生成器Generator&#xff1f;如何获得生成器Generator&#xff1f;1. 生成器表达式 Generator Expression2. 使用yield定义生成器Generator 更多Generator应用实例表示无…

深度优先和广度优先

文章目录 前言一、深度和广度的区别二、代码演示1.准备数据,构造树2.深度优先遍历3.广度优先遍历 总结 前言 深度优先和广度优先的区别&#xff1a; 搜索方式不同 。深度优先搜索算法不全部保留结点&#xff0c;扩展完的结点从数据库中弹出删去&#xff1b;广度优先搜索算法需…

隐身之术:深入解析代理模式的神秘力量

一、定义 代理模式&#xff08;Proxy Pattern)为其他对象提供一种代理以控制对这个对象的访问,属于结构型模式。 二、解决什么问题 主要解决在直接访问对象时带来的问题&#xff0c;比如说&#xff1a;要访问的对象在远程的机器上。在面向对象系统中&#xff0c;有些对象由于…

Python中matplotlib库的使用1

1 matplotlib库简介 matplotlib是一个数学绘图库&#xff0c;可以将数据通过图形的方式显示出来&#xff0c;也就是数据可视化。 2 matplotlib库的安装 2.1 打开cmd窗口 点击键盘的“Win”“R”键&#xff0c;在弹出的“运行”对话框的“打开”栏中输入“cmd”&#xff0c;…

OCR在审核应用落地

本文字数&#xff1a;6686字 预计阅读时间&#xff1a;35分钟 01 背景 1、业务背景 在传统视频审核场景中&#xff0c;审核人员需要对进审视频中的文字内容进行逐一审核&#xff0c;避免在文字上出现敏感词、违禁词或者广告等相关词汇。这种人工审核费时费力&#xff0c;并且由…

听GPT 讲Rust源代码--src/tools(36)

File: rust/src/tools/clippy/clippy_lints/src/loops/empty_loop.rs 在Rust源代码中&#xff0c;empty_loop.rs文件位于src/tools/clippy/clippy_lints/src/loops/目录下&#xff0c;它的作用是实现并提供一个名为EMPTY_LOOP的Lint规则。Clippy是一个Rust的静态分析工具&#…

RS®FSW 信号与频谱分析仪

R&SFSW 信号与频谱分析仪 简述&#xff1a; R&SFSW 信号与频谱分析仪高性能 R&SFSW 信号与频谱分析仪可用于完成严苛任务。它具备较高的内部分析带宽&#xff0c;可对宽带组件和通信系统进行特征校准。分析仪具备出色的相位噪声&#xff0c;有助于开发适用于雷达…

关于Citrix NetScaler ADC 和网关设备受到攻击的动态情报

一、基本内容 据美国网络安全和基础设施安全局CISA的公告&#xff0c;最新披露的Citrix NetScaler应用交付控制器&#xff08;ADC&#xff09;和网关设备中存在关键的安全漏洞&#xff0c;已被攻击者滥用。这些漏洞使得攻击者能够在易受攻击的系统上投放Web shell&#xff0c;…

揭秘营销返利模式!

随着互联网的普及和发展&#xff0c;越来越多的商家开始采用营销返利模式来吸引消费者。这种模式不仅可以提高销售额&#xff0c;还可以让消费者获得实实在在的优惠。本文将详细解析营销返利模式的秘密&#xff0c;让你轻松掌握这一有效的营销策略&#xff01; 一、什么是营销返…