Spark(21):SparkStreaming之DStream入门

news2024/9/23 23:26:15

目录

0. 相关文章链接

1. WordCount 案例实操

1.1. 需求

1.2. 添加依赖

1.3. 编写代码

1.4. 启动程序并通过netcat发送数据

2. WordCount 解析


0. 相关文章链接

 Spark文章汇总 

1. WordCount 案例实操

1.1. 需求

使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数 

1.2. 添加依赖

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.12</artifactId> 
    <version>3.0.0</version> 
</dependency> 

1.3. 编写代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamTest{

    def main(args: Array[String]): Unit = {

        //1.初始化Spark配置信息
        val sparkConf: SparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("StreamTest")

        //2.初始化SparkStreamingContext
        val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))

        //3.通过监控端口创建DStream,读进来的数据为一行行
        val lineStreams: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        //将每一行数据做切分,形成一个个单词
        val wordStreams: DStream[String] = lineStreams.flatMap((_: String).split(" "))
        //将单词映射成元组(word,1)
        val wordAndOneStreams: DStream[(String, Int)] = wordStreams.map(((_: String), 1))
        //将相同的单词次数做统计
        val wordAndCountStreams: DStream[(String, Int)] = wordAndOneStreams.reduceByKey((_: Int)+(_: Int))
        //打印
        wordAndCountStreams.print()

        //启动SparkStreamingContext
        ssc.start()
        ssc.awaitTermination()

    }

}

1.4. 启动程序并通过netcat发送数据

# centos7中的启动netcat命令
nc -lk 9999

# win10中的启动netcat命令
nc -l -p 9999

最终输入数据和输出结果如下图片所示:

输入数据
输出结果

2. WordCount 解析

Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据:

对数据的操作也是按照 RDD 为单位来进行的:

计算过程由 Spark Engine 来完成:


注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/749049.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter系列文章-Flutter环境搭建和Dart基础

Flutter是Google推出的一个开源的、高性能的移动应用开发框架&#xff0c;可以用一套代码库开发Android和iOS应用。Dart则是Flutter所使用的编程语言。让我们来看看如何搭建Flutter开发环境&#xff0c;并了解Dart语言的基础知识。 一、Flutter环境搭建 1. 安装Flutter SDK …

Blender 3.6 LTS更新的5个新功能,一定要试试

Blender基金会已正式发布Blender 3.6 LTS&#xff08;长期支持&#xff09;。它是备受期待的该公司开源 3D 软件的长期支持版本&#xff0c;也是 Blender 3.x 系列的最终 LTS 版本。Blender 3.6有一个用于设置基于粒子的模拟的模拟节点和一个升级的 UV 封装系统&#xff0c;其中…

IDEA自动添加注释作者版本时间等信息

File | Settings | Editor | Live Templates 点击加号&#xff0c;选择第二项 设置一个名称 再次点击加号&#xff0c;选择第一项 填写名称&#xff08;设置完成后再代码中输入该名称即可插入该注释内容&#xff09;&#xff0c;描述&#xff0c;及内容 /*** author 名字…

深度图像Range Image

从点云创建深度图并显示 函数原型 RangeImage::createFromPointCloud (const PointCloudType& point_cloud, float angular_resolution, float max_angle_width, float max_angle_height, …

Leaflet Ant Path(水系流动效果)

一、源代码&#xff1a; 用leaflet库中的Leaflet.AntPath插件 将通量动画&#xff08;如蚂蚁行走&#xff09;放入折线中 <!DOCTYPE html> <html><head><meta http-equiv"Content-Type" content"text/html; charsetutf-8" /><…

【UE】运行游戏时就获取鼠标控制

问题描述 我们经常在点击运行游戏后运行再在视口界面点击一下才能让游戏获取鼠标控制。其实只需做一个设置就可以在游戏运行后自动获取鼠标控制。 解决步骤 点击编辑器偏好设置 如下图&#xff0c;点击“播放”&#xff0c;再勾选“游戏获取鼠标控制” 这样当你运行游戏后直…

12、k8s Namespaces 资源隔离

Kubernetes Namespaces _ Kubernetes(K8S)中文文档_Kubernetes中文社区 Kubernetes Namespaces 实现资源隔离和配额的隔离,比如下面的信息: 所有对象都在Namespace中? 大多数Kubernetes资源(例如pod、services、replication controllers或其他)都在某些Namespace中,…

【LeetCode热题100】打卡第36天:多数元素打家劫舍

文章目录 【LeetCode热题100】打卡第36天&#xff1a;多数元素&打家劫舍⛅前言 多数元素&#x1f512;题目&#x1f511;题解 打家劫舍&#x1f512;题目&#x1f511;题解 【LeetCode热题100】打卡第36天&#xff1a;多数元素&打家劫舍 ⛅前言 大家好&#xff0c;我是…

pytorch安装问题【超级简单版】

pytorch安装问题 当前遇到的问题&#xff1a; python3.9无法安装读取coco数据集的 pycocotools-windows,那么需要切换版本到3.6/7/8&#xff0c;但是切换到python 3.6之后&#xff0c;无法安装torchvision和pytorch【在python就叫torch】&#xff0c;显示没有这个版本 pip i…

MS1205N激光测距用高精度时间测量(TDC)电路

MS1205N 是一款高精度时间测量 (TDC) 电路&#xff0c;具有四通 道、多脉冲的采样能力、高速 SPI 通讯、多种测量模式&#xff0c;适合 于激光雷达和激光测距。 主要特点  单精度模式 60ps  双精度模式 30ps  非校准测量范围 3.5ns(0ns) 至 25μs  单…

案例分析:成功的APP开发背后的故事

如今&#xff0c;我们生活在一个信息化时代&#xff0c;在这个信息时代&#xff0c;不管是工作还是生活都离不开手机 APP。因为有了手机 APP&#xff0c;我们的生活变得更加便捷、智能。但随着移动 APP开发的火热&#xff0c;很多企业都想要制作一个自己的 APP。然而在众多的 A…

822. 走方格

链接&#xff1a; 链接 题目&#xff1a; 给定一个 nmnm 的方格阵&#xff0c;沿着方格的边线走&#xff0c;从左上角 (0,0)(0,0) 开始&#xff0c;每次只能往右或者往下走一个单位距离&#xff0c;问走到右下角 (n,m)(n,m) 一共有多少种不同的走法。 输入格式 共一行&#xff…

【Docker】简单的Linux安装Redis

目录 Docker 安装 Redis拉取镜像安装容器修改配置文件容器随docker启动自动运行redis客户端 史上最详细Docker安装Redis &#xff08;含每一步的图解&#xff09;实战 Docker 安装 Redis 拉取镜像 docker pull redis安装容器 创建redis配置文件目录&#xff1a;如果内部没有相…

数据结构初阶--顺序表

目录 一.顺序表的定义 二.顺序表的分类 2.1.静态顺序表 2.2.动态顺序表 三.顺序表的特点 四.顺序表的功能实现 4.1.顺序表的定义 4.2.顺序表的初始化 4.3.顺序表的销毁 4.4.顺序表的容量检查 4.5.顺序表的打印 4.6.顺序表的尾插 4.7.顺序表的头插 4.8.顺序表的尾…

用ChatGPT搞定12 种编程语言:看看它如何表现

众所周知ChatGPT可以写代码&#xff0c;但当有一堆语言一起抛向它时&#xff0c;它的表现如何呢&#xff1f;答案是&#xff1a;还不错&#xff08;但并不完美&#xff09;。 在过去的几个月里&#xff0c;我们已经领教了ChatGPT的编码能力。我对它进行了PHP和WordPress的测试…

前端学习——Web API (Day6)

正则表达式 语法 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice…

SPEC CPU 2017 1.0.5 不同版本CentOS 7 8 安装笔记

CentOS 7.9.2009 x86_64 gcc版本 安装成功 runcpu编译报错 gcc版本太低&#xff0c;不识别-fno-tree-loop-vectorize 去掉config/gcc.cfg中 -fno-tree-loop-vectorize编译优化参数。 用例编译中 CentOS 8.3.2011 x86_64 gcc版本 安装失败&#xff0c;需要自行编译tools 手动…

#{} 和 ${} 的区别?

一、区别概述 1.1、主要区别&#xff1a; 1、#{} 是预编译处理&#xff0c;${} 是直接替换&#xff1b;2、${} 存在SQL注入的问题&#xff0c;而 #{} 不存在&#xff1b;Ps&#xff1a;这也是面试主要考察的部分~ 1.2、细节上&#xff1a; 1、${} 可以实现排序查询&#xff…

亚马逊云科技联合Nolibox定制工业设计AIGC解决方案

从机器学习算法到深度学习再到强化学习&#xff0c;AI创新浪潮奔流不息。而AIGC&#xff08;AI-generated Content&#xff0c;人工智能生成内容&#xff09;的到来&#xff0c;更是让AI成为众多企业的得力助手&#xff0c;开拓了文本、图像、音视频等领域的天花板。 在洞悉到…

企业云性能监控是一项关键的任务

企业云性能监控是一项关键的任务&#xff0c;它不仅可以保障企业云服务的稳定性和可靠性&#xff0c;还可以加强企业对云服务的掌控和管理&#xff0c;提供卓越的用户体验。 首先&#xff0c;企业云性能监控可以保障云服务的稳定和可靠。在云计算环境下&#xff0c;企业的核心业…