Spark2.x 入门:套接字流(DStream)

news2025/1/10 23:27:18

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。

新建NetworkWordCount.scala代码文件,请在该文件中输入如下内容:

package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

退出vim编辑器。上面的代码,不能直接拿去sbt打包编辑,因为,里面有个 StreamingExamples.setStreamingLogLevels(),StreamingExamples来自另外一个代码文件,请在相同目录下再新建另外一个代码文件StreamingExamples.scala,文件内容如下:

package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}

/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {

  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

可以看出,StreamingExamples.scala文件主要是用来对输出的日志信息进行格式设置。

打包成功以后,就可以输入以下命令启动这个程序:

spark2-submit --class   "org.apache.spark.examples.streaming.NetworkWordCount" /home/songxitang/jars/simple-project_2.11-1.0.jar localhost 9999

执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,新打开一个窗口作为nc窗口,启动nc程序:

nc -lk 9999

这样,就可以在nc窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口每隔1秒就会打印出词频统计信息,大概会再屏幕上出现类似如下的结果:

-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
(hadoop,1)
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------
(spark,1)

如果要停止运行上述程序,只要按键盘Ctrl+Z键就可以了。

注意,如果你的电脑屏幕上,不是显示上面这样非常干净的信息,而是夹杂了很多乱七八糟的信息,如下:

//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------

遇到上面这问题,不要紧,这是和log4j的设置有关的。Log4j是Apache的一个开源项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件,甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等;我们也可以控制每一条日志的输出格式;通过定义每一条日志信息的级别,我们能够更加细致地控制日志的生成过程。最令人感兴趣的就是,这些可以通过一个配置文件来灵活地进行配置,而不需要修改应用的代码。

那么如何修改log4j的设置,把这些乱七八糟的信息给过滤掉,不要显示到屏幕上面呢?方法如下:
请新打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:

cd /usr/local/spark/conf
ls

这时,你可以看到一个名字为log4j.properties.template的文件,请执行如下命令复制一份到当前目录:

cp log4j.properties.template log4j.properties

这样就得到一个log4j.properties文件,然后,请使用vim编辑器打开这个文件,修改里面的一个配置选项(只修改log4j.rootCategor,其他配置选项不要修改):

//原来的原始配置如下
log4j.rootCategory=INFO,console
//请修改为下面格式
log4j.rootCategory=WARN,console

修改后,保存退出vim编辑器,就可以关闭当前终端窗口。然后,再次重新运行NetworkWordCount词频统计程序,就可以看到屏幕会输出比较纯净的信息。

在Cloudera Manager中设置Spark2 Log信息:

这里写图片描述

下面我们再前进一步,这回,我们把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。

新建一个名称为DataSourceSocket.scala的代码文件,用来产生Socket数据源,请在该代码文件中输入下面代码:

package org.apache.spark.examples.streaming
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source

object DataSourceSocket {
  def index(length: Int) = {

    val rdm = new java.util.Random

    rdm.nextInt(length)
  }
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("Usage: <filename> <port> <millisecond>")
      System.exit(1)
    }

    val fileName = args(0)
    val lines = Source.fromFile(fileName).getLines.toList
    val rowCount = lines.length

    val listener = new ServerSocket(args(1).toInt)
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(2).toLong)
            val content = lines(index(rowCount))
            println(content)
            out.write(content + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

注意,实际上,这个时候,我们的程序目录下,就有了三个代码文件,分别是NetworkWordCount.scalaStreamingExamples.scalaDataSourceSocket.scala。sbt打包编译是同时对这三个代码文件打包编译。打包成功以后,就可以输入命令启动数据源程序和监听程序。

下面首先启动用来生成数据源的DataSourceSocket程序,不过,DataSourceSocket程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件:

cd /home/songxitang/spark/mycode/streaming/
vim word.txt

在word.txt中随便输入几行英文语句,然后保存并退出vim编辑器。
下面就启动DataSourceSocket程序,这个程序需要三个参数,第一个参数是文本文件路径,第二个参数是端口地址,第三个参数是时间间隔(单位是毫秒,也就是每隔多少毫秒发送一次信息),请执行下面命令启动这个程序:

spark2-submit --class "org.apache.spark.examples.streaming.DataSourceSocket" /home/songxitang/spark/jars/simple-project_2.11-1.0.jar /home/songxitang/spark/mycode/streaming/word.txt 9999 1000

然后,你就会看到,这个窗口会不断打印出一些随机读取到的文本信息,这些信息也是Socket数据源,会被监听程序捕捉到。所以,下面,我们就在另外一个窗口启动监听程序:

spark2-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /home/songxitang/spark/jars/simple-project_2.11-1.0.jar localhost 9999

启动成功后,你就会看到,屏幕上不断打印出词频统计信息。成功完成实验。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2049981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图像数据处理9

二、灰度变换 2.3 非线性灰度变换 以下式子中使用 f 表示输入图像的像素值&#xff0c;g 表示输出图像的像素值 2.3.1伽马校正&#xff08;Gamma Correction&#xff09; γ 是伽马值&#xff0c;通常大于0。调整 γ 的值可以改变图像的亮度。当 γ<1 时&#xff0c;图像…

一个简单的Rtmp推流客户端(QT录音,OpenCV摄像,FFmpeg编码推流)

RTMP&#xff08;Real-Time Messaging Protocol&#xff09;是一种实时流媒体传输协议&#xff0c;常用于音视频直播。 RTMP推流客户端是一种能够将音视频数据推送到直播服务器的工具。QT录音是利用Qt库实现的录音功能。OpenCV摄像是利用OpenCV库实现的对摄像头的控制和图像处理…

Chrome书签搜索插件

效果展示 这是一个chroma插件&#xff0c;可以按住 ctrl/command B 进行搜索您的书签&#xff0c;并且点击打开您的书签。支持上下切换回车打开新页面。支持文件夹搜索。多层级文件夹使用 / 分割。如&#xff1a;文件夹1/文件夹2/标签1 扩展下载地址 bookmark-search 欢迎有…

小程序学习day09-WXS脚本、自定义组件-组件的创建、引用、组件与页面的区别、组件的样式隔离

39、WXS脚本&#xff08;小程序独有的一套脚本语言&#xff09; &#xff08;1&#xff09;作用&#xff1a;结合WXML&#xff0c;可以构建出页面结构 &#xff08;2&#xff09;应用&#xff1a;在小程序中充当过滤器。&#xff08;wxml无法调用在页面的.js中定义的函数&…

DNS in Kubernetes

DNS in Kubernetes 对象分配的名称Service DNS 记录Pod DNS 记录 Cluster DNS参考 DNS for Services and Pods 这里主要讨论集群内不同对象之间的DNS解析 默认情况下&#xff0c;创建集群时&#xff0c;k8s会部署内置的DNS服务器&#xff0c;在集群内&#xff0c;我们不关注…

spring-boot-3.2.6+spring-security-6.2.4+oauth2整合github示例

一、添加依赖 在 pom.xml 中添加如下依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"h…

<数据集>航拍路面病害识别数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;3151张 标注数量(xml文件个数)&#xff1a;3151 标注数量(txt文件个数)&#xff1a;3151 标注类别数&#xff1a;7 标注类别名称&#xff1a;[Longitudinal crack, Transverse crack, Alligator crack, Oblique cr…

腾讯 图标点选 分析

声明: 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 有相关问题请第一时间头像私信联系我…

Element UI中报dateObject.getTime is not a function解决方法~

1、错误信息。 2、该报错原因是Element UI中日期组件的校验规则是type: "date",而一般我们从后台拿到的数据是字符串型的&#xff0c;不满足预期&#xff0c;就会报错。 3、解决方法。 去掉日子组件中的type: "date"校验规则即可。 rules: {newName: [{…

SOMEIP_ETS_046: echoUTF16FIXED

测试目的&#xff1a; 验证设备&#xff08;DUT&#xff09;是否能够正确处理echoUTF16FIXED方法的参数&#xff0c;并确保发送和接收的参数顺序和值保持一致。 描述 本测试用例旨在检查DUT在接收到一个使用echoUTF16FIXED方法的SOME/IP消息时&#xff0c;是否能够按照请求中…

利用PostgreSQL向量数据库和负责任的AI知识库在亚马逊云科技上构建商品推荐机器人

项目简介&#xff1a; 小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。 本次介绍的是如何在亚马逊云科技利用Postg…

软件需求规格说明书编写规范(Doc原件)

软件需求规格说明书编写规范&#xff08;Word原件&#xff09; 1.项目背景 2.项目目标 3.系统架构 4.总体流程 5.名称解释 6.功能模块 软件全套资料部分文档清单&#xff1a; 工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明…

WPF MvvmLight

关于 停止更新 官网&#xff1a;http://www.mvvmlight.net/ 源码地址&#xff1a;GitHub - lbugnion/mvvmlight: The main purpose of the toolkit is to accelerate the creation and development of MVVM applications in Xamarin.Android, Xamarin.iOS, Xamarin.Forms, Wi…

C++ //练习 17.17 更新你的程序,令它查找输入序列中所有违反“ei“语法规则的单词。

C Primer&#xff08;第5版&#xff09; 练习 17.17 练习 17.17 更新你的程序&#xff0c;令它查找输入序列中所有违反"ei"语法规则的单词。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块&#xff1a; /**********…

定制开发AI智能名片O2O商城小程序:基于限量策略与个性化追求的营销创新

摘要:随着科技的飞速发展和消费者需求的日益多元化&#xff0c;传统商业模式正经历着前所未有的变革。在数字化转型的大潮中&#xff0c;定制开发AI智能名片O2O商城小程序作为一种新兴的商业模式&#xff0c;凭借其独特的个性化定制能力、高效的线上线下融合&#xff08;O2O&am…

居住证申报系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;群众用户管理&#xff0c;警方管理&#xff0c;居住证登记管理&#xff0c;回执单管理&#xff0c;领证信息管理&#xff0c;公告栏管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页…

MySQL数据库专栏(四)数据库操作

1、创建数据库 create database if not exists [数据库名称] character set [字符集] COLLATE [排序规则]; 例如&#xff1a;create database if not exists db_demo character set utf8mb4 COLLATE utf8mb4_general_ci; if not exists&#xff1a;判断数据库是否存在&#x…

Ubuntu20, Windows10 双系统安装

1. 制作启动盘 1.1 下载 Ubuntu 系统镜像 ISO 文件 从 Ubuntu 官网下载 (https://cn.ubuntu.com/download/desktop)。官网访问慢的&#xff0c;从国内镜像点下。 1.2 烧录 Ubuntu ISO 镜像 下载 Rufus&#xff1a;从Rufus官网下载 Rufus 工具。 插入U 盘&#xff1a;将U盘插…

详解语义安全(semantically secure)

目录 一. 引入 二. 密文与明文 2.1 通俗性理解 2.2 定理 2.3 定理理解 三. 语义安全的第一个版本 3.1 基本理解 3.2 定理 3.3 定理理解 四. 语义安全的第二个版本 4.1 直观解释 4.2 小结 一. 引入 密码学中安全加密要求&#xff1a;敌手&#xff08;adversary&…

“百板齐发“ — 一个拥有上百个独特看板的代码

精选100套AXURE可视化数据大屏看板 产品经理高效工具 高保真原型模板 赋能智慧城市 元件库 AXURE9.0版本&#xff0c;所有页面均可编辑&#xff0c;部分地图与实现放大缩小&#xff0c;拖拉拽的功能。 01.水质情况实时监测预警系统 02.全国停车云实时数据监测系统 03.中国移…