spark stream入门案例:netcat准实时处理wordCount(scala 编程)

news2025/1/13 19:39:06

目录

案例需求

代码

结果

解析


         案例需求:

        使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

        -- 1. Spark从socket中获取数据:一行一行的获取
        -- 2. Driver程序执行时,streaming处理过程不能结束
        -- 3. 采集器在正常情况下启动后就不应该停止,除非特殊情况
        -- 4. 采集器位于一个executor中,是一个线程,执行时需要一个核,如果设定的总核数为1时,那么在运行时因为没有核数,所以不会有打印结果,所以sparkStreaming使用的核数至少为2个
        -- 5. print()方法,默认是打印10行结果
        -- 6. netcat的指令:
 

      在Windows下:nc -lp 9999
      在linux下: nc -lk 9999

        代码: 
package cn.olo.stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamDemo {
  def main(args: Array[String]): Unit = {
    // 连接SparkStreaming
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
    /*
  1.方法:StreamingContext(形参)
  2.形参:
    形参1:conf: SparkConf:spark配置对象
    形参2:batchDuration: Duration:采集时间
 */
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    // 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

    // 1. 获取netcat工具9999端口的连接,并开始接收数据
    // 从socket中获取数据:一行一行的获取

    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)

    // 2. 数据处理
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))

    val wordToSumDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_ + _ )

    // 3. 打印数据
    wordToSumDS.print()

    // 4. Driver程序执行时,streaming处理过程不能结束

    // 采集器在正常情况下启动后就不应该停止,除非特殊情况

    // 启动采集器
    ssc.start()

    // 等待采集器的结束
    ssc.awaitTermination()


  }

}

结果:

解析:

        a、采集周期时间之间,每一个采集周期生成一个RDD,按照时间的顺序依次进行
        b、在每一个采集周期内,会执行wordcount计算,最终得出:统计出每一个采集周期时间的wordcount

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1096770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用CRM系统实现销售目标的步骤

每个销售都要有自己的目标计划,在定销售计划时要把握方面问题,一个严格执行,另一个是可控。明确销售目标后,合理分配时间,运用销售基本工作方法严格把控销售进度。那我们该如何用CRM销售管理系统实现销售目标&#xff…

js面向对象(工厂模式、构造函数模式、原型模式、原型和原型链)

1.封装 2. 工厂模式 function createCar(color, style){let obj new Object();obj.color color;obj.style style;return obj;}var car1 createCar("red","car1");var car2 createCar("green","car2"); 3. 构造函数模式 // 创建…

Mybatis-Plus3.x的使用

MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为 简化开发、提高效率而生。 一、引入 创建步骤: 1.创建Spring Boot工程 2.添加依赖 引入 Spring Boot Starter 父工程&am…

小程序开发平台源码系统+内容付费小程序功能 带完整的搭建教程

来喽来喽!今天来给大家分享的是一款小程序开发平台源码系统,这款小程序开发平台的功能很多,本文主要给大家介绍一下内容付费小程序功能。以下是部分核心代码: 系统主要功能如下: 知识付费系统开发的优势。一是提高获取…

《永远的爱犬》The forever dog英文版

爱狗人士必读经典,主页左下角有英文版下载方式 手机可阅读

C++标准模板(STL)- 类型支持 (数值极限,traps,tinyness_before)

数值极限 std::numeric_limits 定义于头文件 <limits> 定义于头文件 <limits> template< class T > class numeric_limits; numeric_limits 类模板提供查询各种算术类型属性的标准化方式&#xff08;例如 int 类型的最大可能值是 std::numeric_limits&l…

ESD静电电压监控系统的作用是什么

ESD静电电压监控系统的作用是实时监测生产环境中的静电电压&#xff0c;及时检测和预防ESD静电电压过高的情况&#xff0c;保护设备和产品的质量&#xff0c;确保生产过程的安全和稳定。 具体来说&#xff0c;ESD静电电压监控系统可以实现以下功能&#xff1a; 实时监测静电电压…

华为云应用中间件DCS系列—Redis实现(社交APP)实时评论

云服务、API、SDK&#xff0c;调试&#xff0c;查看&#xff0c;我都行 阅读短文您可以学习到&#xff1a;应用中间件系列之Redis实现&#xff08;社交APP&#xff09;实时评论 1 什么是DEVKIT 华为云开发者插件&#xff08;Huawei Cloud Toolkit&#xff09;&#xff0…

[科研琐事] 安装服务器的二三事

1. 机柜参数 宽度&#xff1a;一般机器都是符合的&#xff1b; 深度&#xff1a;对应服务器最长的那个边&#xff1b; 厚度&#xff08;高度&#xff09;&#xff1a;1/2/3/4U&#xff0c;就是机柜上写的刻度数字&#xff0c;1U1.75英寸。 1U4.45cm 2U4.45cm * 2 3U4.45cm * …

揭秘OLED透明拼接屏的参数规格:分辨率、亮度与透明度全解析

作为一种新型的显示技术&#xff0c;OLED透明拼接屏在市场中正在迅速崭露头角&#xff0c;有很多知名品牌厂家能设计、开发、生产高品质的显示产品。 如尼伽、起鸿、康视界、LG、YCTIMES、腾裕等&#xff0c;这些品牌在显示技术领域拥有丰富的经验和声誉&#xff0c;以其卓越的…

聚观早报 | 特斯拉发布赛博啤酒套装;小米汽车售价曝光

【聚观365】10月16日消息 特斯拉发布赛博啤酒套装 小米汽车售价曝光 新款Model Y 国内已开启交付 苹果将推出新款 iPad mini / Air 保时捷销量中国区大跌 特斯拉发布赛博啤酒套装 特斯拉在美国市场推出CyberBeerCyberStein限量套装&#xff0c;售价150美元&#xff08;约…

USB PD3.1

目前我们大多数Type-C接口仍然采用的是PD3.0快充协议&#xff0c;按当前用户的使用场景来看功率也完全够用&#xff0c;那么PD3.1快充协议是什么&#xff1f;USB PD3.1到底有没有必要&#xff1f; 不妨我们先了解一下PD3.1: 5月25日&#xff0c;USB-IF协会推出了USB Type-C线…

CSS Display(显示) 与 Visibility(可见性)

display属性设置一个元素应如何显示&#xff0c;visibility属性指定一个元素应可见还是隐藏。 隐藏元素 - display:none或visibility:hidden 隐藏一个元素可以通过把display属性设置为"none"&#xff0c;或把visibility属性设置为"hidden"。但是请注意&a…

Linux下内存检测利器Valgrind之Memcheck工具详解

目录 1、Valgrind简介 1.1、Memcheck工具 1.2、Callgrind工具 1.3、Cachegrind工具 1.4、Helgrind工具 1.5、Massif工具 2、如何使用Memcheck 2.1、启动Memcheck 2.2、输出消息解释 3、使用Memcheck检测内存问题实例 4、Valgrind和Memcheck其他命令选项 5、最后 VC…

如何处理前端错误和异常?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

【QT开发笔记-基础篇】| 第四章 事件QEvent | 4.6 定时器事件

本章要实现的整体效果如下&#xff1a; QT 中使用定时器&#xff0c;有两种方式&#xff1a; 定时器类&#xff1a;QTimer定时器事件&#xff1a;QEvent::Timer&#xff0c;对应的子类是 QTimerEvent 本节通过一个案例&#xff0c;同时讲解这两种方式 案例&#xff1a;当点击…

微信怎么加好友?竟然有5种方法!

微信是我们日常生活中不可缺少的通信工具。在我们的工作或者学习中&#xff0c;我们可能需要主动添加他人的微信来建立联系&#xff0c;以便日后进行更好地沟通。那微信怎么加好友呢&#xff1f;小编给大家总结了5种添加微信好友的方法&#xff01;没想到吧&#xff0c;居然有这…

低温下安装振弦采集仪注意事项

低温下安装振弦采集仪注意事项 振弦采集仪是一种用于测量和监测结构物振动状态的设备&#xff0c;通常用于桥梁、大型建筑物、风力发电机、船舰等设施的监测和评估。在一些寒冷地区&#xff0c;设施的使用环境会面临低温的挑战&#xff0c;因此在安装振弦采集仪时需要注意以下…

解决window电脑使用IKE VPN登录时显示上下文已过期,不能再用了的方法。

当电脑登录VPN时出现上下文已过期&#xff0c;不能再用了时解决办法。 1、使用 winR 的按键打打开运行&#xff1b; 2、输入 services.msc 在服务的列表中找到 IKE and AuthIP IPsec Keying Modules 这个服务&#xff0c;可以看到该服务为禁用状态下。 3、双击该服务&#xf…

论文复现:Active Learning by Learning

这篇文章说实在的&#xff0c;写的差强人意。 实质性内容是两个现有方法的拼凑&#xff01; 讲的模模糊糊&#xff01;对于复现代码不太友好&#xff01; 撸一点&#xff0c;通读一遍 &#xff0c;再撸一点&#xff0c;通读一遍~~~ """ 注意&#xff1a;使用…