【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

news2025/1/11 20:54:37

1.3 入门案例:WordCount

入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。
文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example

功能演示
运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:
在这里插入图片描述
演示运行案例步骤:
第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999
第二步、打开另一个终端Terminal,执行如下命令

# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example \
--master local[2] \
--conf spark.sql.shuffle.partitions=2 \
org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
node1.itcast.cn 9999
# 测试数据
spark hadoop spark hadoop spark hive
spark spark spark
spark hadoop hive

发送数据以后,最终统计输出结果如下:
在这里插入图片描述
Socket 数据源
从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数:

参数一:host,主机名称,必须指定参数
参数二:port,端口号,必须指定参数

范例如下所示:
在这里插入图片描述
Console 接收器
将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:

参数一:numRows,打印多少条数据,默认为20条;
参数二:truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;

范例如下所示:
在这里插入图片描述
编程实现
可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:

  • Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;
  • Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,指定读取Stream数据和保存Streamn数据,具体语法格式:
    • 加载数据Load:读取静态数据【spark.read】、读取流式数据【spark.readStream】
    • 保存数据Save:保存静态数据【ds/df.write】、保存流式数据【ds/df.writeStrem】

词频统计案例:从TCP Socket实消费流式数据,进行词频统计,将结果打印在控制台Console。
第一点、程序入口SparkSession,加载流式数据:spark.readStream
第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
第三点、启动流式应用,设置Output结果相关信息、start方法启动应用

完整案例代码如下:

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
*/
object StructuredWordCount {
def main(args: Array[String]): Unit = {
// TODO: 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2") // 设置Shuffle分区数目
.getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 1. 从TCP Socket 读取数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn").option("port", 9999)
.load()
/*
root
|-- value: string (nullable = true)
*/
//inputStreamDF.printSchema()
// TODO: 2. 业务分析:词频统计WordCount
val resultStreamDF: DataFrame = inputStreamDF
.as[String] // 将DataFrame转换为Dataset进行操作
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词
.flatMap(line => line.trim.split("\\s+"))
.groupBy($"value").count() // 按照单词分组,聚合
/*
root
|-- value: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// TODO: 3. 设置Streaming应用输出及启动
val query: StreamingQuery = resultStreamDF.writeStream
// TODO: 设置输出模式:Complete表示将ResultTable中所有结果数据输出
// .outputMode(OutputMode.Complete())
// TODO: 设置输出模式:Update表示将ResultTable中有更新结果数据输出
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "10").option("truncate", "false")
// 流式应用,需要启动start
.start()
// 流式查询等待流式应用终止
query.awaitTermination()
// 等待所有任务运行完成才停止运行
query.stop()
}
}

其中可以设置不同输出模式(OutputMode),当设置为Complete时,结果表ResultTable中所有数据都输出;当设置为Update时,仅仅输出结果表ResultTable中更新的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/388447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一个Bug让人类科技倒退几十年?

大家好,我是良许。 前几天在直播的时候,问了直播间的小伙伴有没人知道「千年虫」这种神奇的「生物」的,居然没有一人能够答得上来的。 所以,今天就跟大家科普一下这个人类历史上最大的 Bug 。 1. 全世界的恐慌 一个Bug会让人类…

Java中的自动类型提升与强制类型转换

一、自动类型提升 自动类型提升是指在程序运行时因为某种情况需要,JVM将较小的数据类型自动转换为较大的数据类型,以保证精度和正确性。在Java中,需要进行类型提升的情况有以下几种: 1. byte、short和char提升为int类型 当运算…

spark sql(五)sparksql支持查询哪些数据源,查询hive与查询mysql的区别

1、数据源介绍 sparksql默认查询的数据源是hive数据库,除此之外,它还支持其它类型的数据源查询,具体的到源码中看一下: 可以看到sparksql支持查询的数据源有CSV、parquet、json、orc、txt、jdbc。这些数据源中前面五个我还能理解&…

【Python】RPA批量生成word文件/重命名及批量删除

批量生成word文件 场景:需要新建多个类似文件名 比如:今天的事例是新建12个文件名为: ​ 保安员考试试卷1及答案.docx ​ 保安员考试试卷2及答案.docx ​ … ​ 保安员考试试卷12及答案.docx 痛点: ​ 手动操作重复性高&a…

目标检测中回归损失函数(L1Loss,L2Loss,Smooth L1Loss,IOU,GIOU,DIOU,CIOU,EIOU,αIOU ,SIOU)

文章目录L-norm Loss 系列L1 LossL2 LossSmooth L1 LossIOU系列IOU (2016)GIOU (2019)DIOU (2020)CIOU (2020)EIOU (2022)αIOU (2021)SIOU (2022…

【SpringCloud】SpringCloud详解之Eureka实战

目录前言SpringCloud Eureka 注册中心一.服务提供者和服务消费者二.需求三.搭建Eureka-Server四.搭建Eureka-Client(在服务提供者配置:用户订单)前言 微服务中多个服务,想要调用,怎么找到对应的服务呢? 这里有组件的讲解 → SpringCloud组件…

深圳大学《计算机论题》作业:大数据与人工智能技术对人类生活的影响

说明 本作业为小组作业,要求基于一场报告完成(即观后感)。共分4个小题,讨论人工智能时代的伦理思考。由于版权原因,不提供报告的具体内容,只展示答题内容。 第一题 (1) 你如何看待…

winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)

上周在看别人写的上位机demo代码时,发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库,没有程序入口方法Main,但却很神奇能调试,最后发现原来Vistual Studio启动了一个外挂程序UserContr…

LSM(日志结构合并树)_笔记

WAL:Write Ahead Log 写前日志,顺序日志文件 1 LSM tree的定义 LSM tree: Log-Structured-Merge-Tree,日志结构合并树。 Log-Structured Merge-tree (LSM-tree) is a disk-based data structure designed to provide low-cost …

Linux操作系统学习(了解文件系统动静态库)

文章目录浅谈文件系统了解EXT系列文件系统目录与inode的关系软硬链接动静态库浅谈文件系统 当我们创建一个文件时由两部分组成:文件内容文件属性,即使是空文件也有文件属性 一个文件没有被打开是存储在磁盘中的,而磁盘是计算机中的一个机械…

你想赚的钱不一定属于你

昨天一个同行跟我说,最近有个五十多万的订单,客户是拿着别人家的设计来找的他,跟了也有大半个月了,自己明明报的价格比原设计的公司要低,客户一直说会尽快下的,他原本想着能够从这个订单里赚到几万块&#…

王道计算机组成原理课代表 - 考研计算机 第六章 总线 究极精华总结笔记

本篇博客是考研期间学习王道课程 传送门 的笔记,以及一整年里对 计算机组成 知识点的理解的总结。希望对新一届的计算机考研人提供帮助!!! 关于对 “总线” 章节知识点总结的十分全面,涵括了《计算机组成原理》课程里的…

软件测试用例(3)

按照测试对象划分: 一)界面测试: 1)软件只是一种工具,软件和人的信息交流是通过界面来进行的,界面是软件和用户交流的最直接的一层,界面的设计决定了用户对于我们设计软件的第一映像,界面如同人的面孔,具有最吸引用户的…

Java中String详解(从原理理解经典面试题)

本篇文章我先通过经典面试题,筛选需要观看本篇文章的朋友,然后咱们介绍String的基本特性,通过基本特性就可以找到面试题的答案。最后咱们再深入每个面试题,通过字节码、编译原理、基本特性深入剖析所有的面试题,让大家…

jsp试卷分析管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP试卷分析管理系统是一套完善的java web信息管理系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql5.0&…

三、JavaScript

目录 一、JavaScript和html代码的结合方式 二、javascript和java的区别 1、变量 2、运算 3、数组(重点) 4、函数 5、重载 6、隐形参数arguments 7、js中的自定义对象 三、js中的事件 四、DOM模型 五、正则表达式 一、JavaScript和html代码的结合方…

代码执行漏洞 | iwebsec

文章目录00-代码执行漏洞原理环境01-eval函数示例命令执行写入webshellbash反弹shell02-assert函数示例webshell03-call_user_func函数示例04-call_user_func_array函数示例总结05-create_function函数示例06-array_map函数示例总结08-preg_replace漏洞函数示例07-preg_replace…

Centos 部署Oracle 11g

Centos 部署Oracle 11g部署Oracle 11g准备工作服务器信息oracle安装包服务器准备oracle环境安装Oracle静默方式配置监听以静默方式建立新库及实例部署Oracle 11g 在SpringMVC模式下开发web项目,必然会使用到关系型数据库来存储数据,目前使用比较多的关系…

18、多维图形绘制

目录 一、三维图形绘制 (一)曲线图绘制plot3() (二)网格图绘制 mesh() (三)曲面图绘制 surf() (四)光照模型 surfl() (五)等值线图(等高线图)绘制 cont…

电力系统系统潮流分析【IEEE 57 节点】(Matlab代码实现)

👨‍🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…