【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

news2024/10/6 18:27:00

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.11"

lazy val root = (project in file("."))
  .settings(
    name := "SparkLearning",
    idePackagePrefix := Some("cn.lh.spark"),
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",
    libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",
    libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",
    libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  
  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  
  
object NetworkWordCountStatefultoMysql {  
  
  def main(args: Array[String]): Unit = {  
    //    定义状态更新函数  
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {  
      val currentCount = values.foldLeft(0)(_ + _)  
      val previousCount = state.getOrElse(0)  
      Some(currentCount + previousCount)  
    }  
  
    //    设置log4j日志级别  
    StreamingExamples.setStreamingLogLevels()  
  
    val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  
    val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  
  
    //    设置检查点,具有容错机制  
    scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  
  
    val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  
    val words: DStream[String] = lines.flatMap(_.split(" "))  
    val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  
    val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  
    // 打印出状态  
    stateDstream.print()  
    // 将统计结果保存到MySQL中  
    stateDstream.foreachRDD(rdd =>{  
      val repartitionedRDD = rdd.repartition(3)  
      repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  
    })  
  
    scc.start()  
    scc.awaitTermination()  
  
    scc.stop()  
  }  
  
  
}

StreamingSaveMySQL8.scala

package cn.lh.spark  
  
import java.sql.DriverManager  
  
object StreamingSaveMySQL8 {  
  
  // 定义写入 MySQL 的函数  
  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  
    // 保存到MySQL  
    val ip = "192.168.137.110"  
    val port = "3306"  
    val db = "sparklearning"  
    val username = "lh"  
    val pwd = "Lh123456!"  
    val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  
    val conn = DriverManager.getConnection(jdbcurl, username, pwd)  
    val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  
  
    try {  
      // 写入数据  
      iter.foreach { wc =>  
        statement.setString(1, wc._1.trim)  
        statement.setInt(2, wc._2.toInt)  
        statement.executeUpdate()  
      }  
    } catch {  
      case e:Exception => e.printStackTrace()  
    } finally {  
      if(statement != null){  
        statement.close()  
      }  
      if(conn!=null){  
        conn.close()  
      }  
    }  
  }  
  
}

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/845508.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一个月 PMP 3A上岸经验复盘

2023年5月参加的线下考试,总复习时间一个月左右,刷到3A小绿饼 作为拖延症晚期+工作任务比较多,所以全程没有跟上老师的复习,最后集中在考前一个月临时抱佛脚,成功上岸不是梦 下面分享一下报名和备考经验 1月…

python画小车

文章目录 import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import matplotlib.transforms as transforms import numpy as np # 创建图形窗口和坐标轴对象 fig, ax = plt.subplots()# 绘制小车矩形 def plot_robot(x, y, yaw, robot_length=2, robo…

python编写ocr识别图片汉字

当你需要构建一个简单的图形用户界面(GUI)应用程序,并在其中实现光学字符识别(OCR)功能时,wxPython是一个强大而灵活的选择。wxPython是一个基于Python的跨平台GUI开发框架,结合了wxWidgets C库…

Spring MVC项目概述及创建

Spring MVC项目概述及创建 1.什么是Spring MVC Spring MVC是基于SevletAPI的原始Web框架。Spring MVC项目也叫做SpringWeb项目。 它是在springboot项目中引入了web框架,原本的spring项目不具备网络通信能力,而spring mvc允许http响应,当用…

芯片热处理设备 HTR-4立式4寸快速退火炉

HTR-4立式4寸快速退火炉 HTR-4立式4寸快速退火炉(芯片热处理设备)广泛应用在IC晶圆、LED晶圆、MEMS、化合物半导体和功率器件等多种芯片产品的生产,和欧姆接触快速合金、离子注入退火、氧化物生长、消除应力和致密化等工艺当中,通…

调整vscode

调整vscode 连wifi linux连接wifi

noisy_crt 题目复现

文章目录 题一([NeepuCtf 2023]loud)题目描述:题目分析: 题二([NeepuCtf 2023]loud2)题目描述:题目分析: 浅记一下 论文在此 不过吧,内容太多了,我也不想看 题一([NeepuCtf 2023]loud) 题目描述&#xff…

C语言调试实用技巧之 2

导言: 今天也给大家介绍一些调试技巧 1.如何写出好(易于调试)的代码 1.1标准: 1.2推荐技巧 1.2.1assert()//断言 用assert代替if语句 提示:assert是宏,不是函数 需要包含的头文…

配置Picgo图床之COS、OSS、Github图床

简介 PicGo是一款开源的图片上传和管理工具,它提供了简单易用的界面和丰富的功能,方便用户上传、管理和分享图片。 以下是PicGo的一些主要特点和功能: 图片上传:PicGo支持将本地图片快速上传到云存储服务,如七牛云、…

NamedParameterJdbcTemplate.queryForList 方法的使用说明

objectMapper.configure 方法是 Jackson 提供的一个用于配置 ObjectMapper 对象的方法。ObjectMapper 是 Jackson 库的核心类,用于将 Java 对象与 JSON 数据相互转换。 configure 方法的作用是设置 ObjectMapper 的配置选项,例如设置日期格式、设置序列…

博客优化差不多了

博客地址:https://blog.zysicyj.top/ 这篇文章不是教学博客,后续考虑看是否出教学 具体优化视频可以观看B站视频 https://space.bilibili.com/258577429 这个博客是HexoGithub Pages搭建的,如何搭建可以看我之前的文章,主题是…

普及100Hz高刷+1ms响应 微星发布27寸显示器:仅售799元

不论办公还是游戏,高刷及低响应时间都很重要,微星现在推出了一款27寸显示器PRO MP273A, 售价只有799元,但支持100Hz高刷、1ms响应时间,还有FreeSync技术减少撕裂。 PRO MP273A的100Hz高刷新率是其最大的卖点之一&#…

如何为网站进行全面的整站翻译?

要翻译整个网站,可以按照以下步骤进行: 确定翻译需求:确定你需要将整个网站翻译成哪种语言。这可以根据你的目标受众和市场进行决定。 寻找翻译资源:你可以选择以下几种方式来进行网站翻译: a. 人工翻译:雇…

Go微服务实践 - Rpc核心概念理解

概述 从0研究一下Golang已经Golang的微服务生态体系,Golang的微服务首先要从Rpc开始,在升级到Grpc,详细介绍这些技术点都在解决什么技术问题。 Rpc Rpc (Remote Procedure Call) 远程过程调用,简单的理解是一个节点请求另一个节…

解决:树莓派VNC连接屏幕显示不全

目录 前导:我在重新烧录玩树莓派系统,开启完VNC并连接后,发现我的树莓派远程桌面屏幕显示不全,看着很难受! PS:开启VNC服务的过程 问题如下现象: 问题分析:当树莓派通过VNC连接时&…

STDF - 基于 Svelte 和 Tailwind CSS 打造的移动 web UI 组件库,Svelte 生态里不可多得的优秀项目

Svelte 是一个新兴的前端框架,组件库不多,今天介绍一款 Svelte 移动端的组件库。 关于 STDF STDF 是一个移动端的 UI 组件库,主要用来开发移动端 web 应用。和我之前介绍的很多 Vue 组件库不一样,STDF 是基于近来新晋 js 框架 S…

ZABBIX 6.4的完全安装步骤

此安装文档是我一步一步的验证过的,按步骤来可以顺畅的安成ZABBIX6.4的部署。 Zabbix 主要有以下几个组件组成: Zabbix Server6.4:Zabbix 服务端,是 Zabbix 的核心组件。它负责接收监控数据并触发告警,还负责将监控数…

SNAT与DNAT原理

SNAT和DNAT (源地址转换和目标地址转换) SNAT:源地址转换。内网到外网转换的是源地址。 DNAT:目标地址转换:外网到内网转换的是目的地址 (把内部服务器的ip地址转换成一个所有人都可以访问的地址&#xff0…

【Python】Pandas 简介,数据结构 Series、DataFrame 介绍,CSV 文件处理,JSON 文件处理

序号内容1【Python】Pandas 简介,数据结构 Series、DataFrame 介绍,CSV 文件处理,JSON 文件处理2【Python】Pandas 数据清洗操作,常用函数总结 文章目录 1. Pandas 简介2. Pandas 数据结构1. Series(一维数据&#xff…

JavaWeb(9)——前端综合案例3(悬停显示下拉列表)

一、实例需求 ⌛ 实现类似百度首页的“一个简单的鼠标悬停显示的下拉列表效果”。 二、代码实现 ☕ <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>.dropdown-cont…