大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式

news2025/1/5 10:42:50

喜大普奔!破百了!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark Streaming 基础数据源
  • 文件流、Socket流、RDD队列流
  • 引入依赖、Java编写多种流进行测试

在这里插入图片描述

DStream 转换

DStream上的操作与RDD类似,分为Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如:

  • updateStateByKey
  • transform
  • window相关操作

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

map(func)

对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。
例如,将每个记录转换为其长度。
示例:val lengths = lines.map(line => line.length)

flatMap(func)

对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。
例如,将每一行文本拆分为单词。
示例:val words = lines.flatMap(line => line.split(" "))

filter(func)

对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。
例如,过滤掉长度小于 5 的单词。
示例:val filteredWords = words.filter(word => word.length > 5)

reduceByKey(func)

对键值对 DStream 进行聚合操作,对具有相同键的元素应用 func 函数。
例如,计算每个单词的总数。
示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

groupByKey()

对键值对 DStream 中的每个键进行分组,并将具有相同键的值聚合到一个列表中。
示例:val grouped = pairs.groupByKey()

count()

统计 DStream 中每个 RDD 的元素个数。
示例:val count = words.count()

countByValue()

统计 DStream 中每个 RDD 中每个值的出现次数。
示例:val valueCounts = words.countByValue()

union(otherDStream)

将两个 DStream 合并为一个新的 DStream,包含两个 DStream 中的所有元素。
示例:val mergedStream = stream1.union(stream2)

join(otherDStream)

对两个键值对 DStream 进行连接操作,类似 SQL 中的 JOIN 操作。
示例:val joinedStream = stream1.join(stream2)

备注:

  • 在DStream与RDD上的转换操作非常类似(无状态操作)
  • DStream有自己特殊的操作(窗口操作、追踪状态变化操作)
  • 在DStream上的转换操作比RDD上的转换操作少

DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:

  • 无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的RDD转化操作,例如:map、Filter、reduceByKey等
  • 有状态转换操作,需要使用之前批次的数据或者是中间结果来计算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作

无状态转换

无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream中的每一个RDD。
常见的无状态转换包括:

  • map
  • flatMap
  • repartition
  • reduceByKey
  • groupByKey

重要的转换操作:transform,通过对源DStream的每个RDD应用RDD-To-RDD函数,创建一个新的DStream,支持在新的DStream中任何RDD操作。
这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD,也就是说开发者,可以任意提供一个RDDToRDD的函数,这个函数在数据流每个批次中都被调用,生成一个新的流。

案例1 黑名单过滤

假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据
未生效
val arr1 = Array(("spark", true), ("scala", false))
假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操
作。如"2 spark"要被过滤掉
1 hadoop
2 spark
3 scala
4 java
5 hive
结果:"2 spark" 被过滤

方案1 外连接实现

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object BlackListFilter1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("BlackListFilter1")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10))

    // 黑名单
    val blackList = Array(("spark", true), ("scala", true))
    val blackListRDD = ssc.sparkContext.makeRDD(blackList)

    // 测试数据
    val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
      .split("\\s+")
      .zipWithIndex
      .map {
        case (word, index) => s"$index $word"
      }
    val rdd = ssc.sparkContext.makeRDD(strArray)
    val clickStream = new ConstantInputDStream(ssc, rdd)

    // 流式数据的处理
    val clickStreamFormatted = clickStream
      .map(value => (value.split(" ")(1), value))
    clickStreamFormatted.transform(clickRDD => {
      val joinedBlockListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD)
      joinedBlockListRDD.filter {
        case (word, (streamingLine, flag)) => {
          if (flag.getOrElse(false)) {
            false
          } else {
            true
          }
        }
      }.map {
        case (word, (streamingLine, flag)) => streamingLine
      }
    }).print()

    // 启动
    ssc.start()
    ssc.awaitTermination()
  }
}

方案1 运行结果

-------------------------------------------
Time: 1721618670000 ms
-------------------------------------------
5 hive
6 hbase
1 java
7 zookeeper
3 hadoop
4 kafka

... 下一批

对应的结果如下图所示:
在这里插入图片描述

方案2 SQL实现

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object BlackListFilter2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("BlackListFilter2")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("WARN")

    // 黑名单
    val blackList = Array(("spark", true), ("scala", true))
    val blackListRDD = ssc.sparkContext.makeRDD(blackList)

    // 生成测试 DStream
    val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
      .split("\\s+")
      .zipWithIndex
      .map {
        case (word, index) => s"$index $word"
      }
    val rdd = ssc.sparkContext.makeRDD(strArray)
    val clickStream = new ConstantInputDStream(ssc, rdd)

    // 流式数据的处理
    val clickStreamFormatted = clickStream
      .map(value => (value.split(" ")(1), value))
    clickStreamFormatted.transform {
      clickRDD =>
        val spark = SparkSession
          .builder()
          .config(rdd.sparkContext.getConf)
          .getOrCreate()

        import spark.implicits._
        val clickDF: DataFrame = clickRDD.toDF("word", "line")
        val blackDF: DataFrame = blackListRDD.toDF("word", "flag")
        clickDF.join(blackDF, Seq("word"), "left")
          .filter("flag is null or flag == false")
          .select("line")
          .rdd
    }.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

方案2 SQL运行结果

-------------------------------------------
Time: 1721619900000 ms
-------------------------------------------
[6 hbase]
[4 kafka]
[7 zookeeper]
[1 java]
[3 hadoop]
[5 hive]

运行结果截图如下图所示:
在这里插入图片描述

方案3 直接过滤

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object BlackListFilter3 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("BlackListFilter3")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("WARN")

    // 黑名单
    val blackList = Array(("spark", true), ("scala", true))
    val blackListBC: Broadcast[Array[String]] = ssc
      .sparkContext
      .broadcast(blackList.filter(_._2).map(_._1))

    // 生成测试DStream
    val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
      .split("\\s+")
      .zipWithIndex
      .map {
        case (word, index) => s"$index $word"
      }

    val rdd = ssc.sparkContext.makeRDD(strArray)
    val clickStream = new ConstantInputDStream(ssc, rdd)

    // 流式数据的处理
    clickStream.map(value => (value.split(" ")(1), value))
      .filter {
        case (word, _) => !blackListBC.value.contains(word)
      }
      .map(_._2)
      .print()

    // 启动
    ssc.start()
    ssc.awaitTermination()
    
  }
}

方案3 直接过滤运行结果

-------------------------------------------
Time: 1721627600000 ms
-------------------------------------------
1 java
3 hadoop
4 kafka
5 hive
6 hbase
7 zookeeper

... 下一批

运行结果如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2069355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【精选】基于django柚子校园影院(咨询+解答+辅导)

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

[240824] 微软更新导致部分 Linux 用户无法启动系统,谁之过?| Chrome 稳定版更新(128.0.6613.84)

目录 微软更新导致部分 Linux 用户无法启动系统,谁之过?Chrome 稳定版更新 (128.0.6613.84) 微软更新导致部分 Linux 用户无法启动系统,谁之过? 最近,微软推送的一项 Windows 更新导致部分 Linux 用户无法启动系统&am…

基于Springboot + vue + mysql 藏区特产销售平台 设计实现

目录 📚 前言 📑摘要 1.1 研究背景 📑操作流程 📚 系统架构设计 📚 数据库设计 💬 E-R表 系统功能模块 系统首页 特产信息 ​编辑 个人中心 购物车 用户注册 管理员功能模块 管理员登录 管…

Stable diffusion模型如何区分?通俗易懂,入门必看!

在Stable Diffusion的基础学习中,很多小伙伴们可能看到繁杂的大模型就蒙圈了,那么多的模型后缀,究竟代表什么呢?如何区分呢?今天就带大家来学习一下~ 不同后缀模型介绍 在Stable diffusion中,…

【Tomact源码解析】——组件介绍

目录 一、简介 二、组件和体系架构简介 三、组件详情 Server Service Connector Engine ​编辑Host Context Wrapper 四、容器详情 生命周期机制 监听器机制 管道机制 五、补充内容 一、简介 Tomcat 服务器是一个免费的开放源代码的 Web 应用服务器,属于…

支持在线编辑的文件管理系统MxsDoc

DocSys是一个基于Web的文件管理系统(全平台支持:Linux,Windows,Mac),它提供了丰富的功能和特性,以满足不同用户在不同场景下的需求。 开源地址:DocSys: MxsDoc是基于Web的文件管理系统&#xff…

校友林小程序的设计

管理员账户功能包括:系统首页,个人中心,用户管理,树木管理管理,所属科管理,树木领取管理,树跟踪状态管理,用户信息统计管理,树木捐款管理,留言板管理 微信端…

【芯片往事】陈大同-展讯和TD

前言:几年前(2012),应邀为校友刊物《水木清华》写了一年创业专栏,其中有几期回忆了当年先后创办硅谷豪威科技(OmniVision)和上海展讯通信(SpreadTrum)的经历,…

ZMQ发布订阅模型

案例一 发布者Publisher(server) // server.cpp #include <zmq.hpp> #include <string> #include <iostream> #include <chrono> #include <thread> using namespace std; using namespace zmq; int main() {context_t context(1);socket_t so…

维纳滤波(Wiener Filtering)

维纳滤波&#xff08;Wiener Filtering&#xff09; 引言 维纳滤波&#xff08;Wiener Filtering&#xff09;是一种最优线性滤波方法&#xff0c;广泛应用于信号处理、图像处理和通信系统中。它旨在从含噪声的信号中恢复原始信号&#xff0c;最小化均方误差&#xff08;MSE&…

谷粒商城实战笔记-251-商城业务-消息队列-Exchange类型

文章目录 一&#xff0c;Exchange二&#xff0c;Exchange的四种类型1&#xff0c;direct2&#xff0c;fanout3&#xff0c;topic 三&#xff0c;实操1&#xff0c;创建一个exchange2&#xff0c;创建一个queue3&#xff0c;将queue绑定到exchange 一&#xff0c;Exchange AMQP …

二叉树的链式存储(代码实现)

二叉树的链式存储 用链表实现&#xff0c;基于完全二叉树规律来构建树&#xff0c;按照完全二叉树的编号方法&#xff0c;从上到下&#xff0c;从左到右。一共n个节点。 第i个节点&#xff1a; 左子节点编号&#xff1a;2*i &#xff08;2*i<n&#xff09; 右子节点编号&…

【C++题解】1146. 求S的值

欢迎关注本专栏《C从零基础到信奥赛入门级&#xff08;CSP-J&#xff09;》 问题&#xff1a;1146. 求S的值 类型&#xff1a;递归基础、函数 题目描述&#xff1a; 求 S12471116…的值刚好大于等于 5000 时 S 的值。 输入&#xff1a; 无。 输出&#xff1a; 一行&…

写作手三天速成攻略【数学建模国赛赛前必看内容】

第一天&#xff1a;准备论文模板&#xff0c;学习各类基础画图技巧 1、论文模板 对于写作手&#xff0c;除了内容的连贯性&#xff0c;排版是非常重要的&#xff0c;可以说有一个好的排版&#xff0c;只要论文是完整的&#xff0c;有结果的&#xff0c;基本上保底有省奖&#…

CSP-CCF 201412-2 Z字形扫描

目录 一、问题描述 二、解答 三、总结 一、问题描述 在图像编码的算法中&#xff0c;需要将一个给定的方形矩阵进行Z字形扫描(Zigzag Scan)。给定一个nn的矩阵&#xff0c;Z字形扫描的过程如下图所示&#xff1a; 对于下面的44的矩阵&#xff0c;   1 5 3 9   3 7 5 6  …

玩客云刷机armbian后docker启动不起来,提示bpf_prog_query(BPF_CGROUP_DEVICE) failed

/ ___| ( _ )/ |___ \ \___ \ / _ \| | __) |___) | (_) | |/ __/ |____/ \___/|_|_____|Welcome to Armbian 20.12 Bullseye with Linux 5.10.61-aml-s812Linux aml-s812 5.10.61-aml-s812 #20.12 SMP Thu Sep 2 20:11:09 CST 2021 armv7l GNU/Linux 玩客云刷机armbian后dock…

工业气膜仓储:高效、灵活的仓储解决方案—轻空间

在现代工业生产中&#xff0c;仓储设施的选择至关重要。作为一种新型的仓储解决方案&#xff0c;工业气膜仓储凭借其高效、灵活、经济的优势&#xff0c;正在逐渐取代传统建筑仓库&#xff0c;成为各类企业的理想选择。 一、快速搭建&#xff0c;满足多种需求 工业气膜仓储采用…

24年浙江事业单位考试报名流程保姆级教程

2024年浙江事业单位考试报名马上就要开始了&#xff0c;有想要参加考试报名的同学可以提前了解一下报名流程&#xff0c;以及报名照要求。 一、考试时间安排&#xff1a; 报名时间&#xff1a;8月27日9:00 9月2日16:00 资格审核时间&#xff1a;8月27日—9月3日 网上缴费时…

软件开发商业模式的思考:软件最大的竞争力就是低价格

很多程序员在工作的时候&#xff0c;都会有机会碰到做外包项目的机会&#xff0c;还有很多的专业网站&#xff0c;接项目&#xff0c;在这种外包开发中&#xff0c;经常会按照评估的开发时间和程序员的薪酬来定价。 前几年有朋友介绍过一个线下实体店经营者&#xff0c;想做一…

TF SD卡突然容量变小或者名字改变并且电脑就算格式化也恢复不了原状态或者干脆windows系统都格式化失败的解决办法

我自己是因为在使用canmv系统驱动k210时把系统镜像下载到了sd tf卡中导致tf 卡系统发生改变&#xff0c;32g变16mb而且名字也变成boot 这是因为你下载的系统镜像把原来的fat32或者其他常用sd tf卡系统代替了&#xff0c;导致电脑识别时&#xff0c;你现在的sd卡系统把总大小减…