Spark实时(三):Structured Streaming入门案例

news2024/11/24 11:38:05

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.4.3</spark.version>
  </properties>

  <dependencies>
    <!-- Spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- SparkSQL -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- SparkSQL  ON  Hive-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!--mysql依赖的jar包-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>
    <!--SparkStreaming-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- Kafka 0.10+ Source For Structured Streaming-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- 向kafka 生产数据需要包 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>

    <!-- Scala 包-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.12.15</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-compiler</artifactId>
      <version>2.12.15</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-reflect</artifactId>
      <version>2.12.15</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.12</version>
    </dependency>
    <dependency>
      <groupId>com.google.collections</groupId>
      <artifactId>google-collections</artifactId>
      <version>1.0</version>
    </dependency>

  </dependencies>

一、Scala代码如下

package com.lanson.structuredStreaming

/**
 *  Structured Streaming 实时读取Socket数据
 */

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * Structured Streaming 读取Socket数据
 */
object SSReadSocketData {
  def main(args: Array[String]): Unit = {

    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("StructuredSocketWordCount")
      //默认200个并行度,由于源头数据量少,可以设置少一些并行度
      .config("spark.sql.shuffle.partitions",1)
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node3")
      .option("port", 9999)
      .load()

    //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作
    val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})

    //4.按照单词分组,统计个数,自动多一个列count
    val wordCounts: DataFrame = words.groupBy("value").count()

    //5.启动流并向控制台打印结果
    val query: StreamingQuery = wordCounts.writeStream
      //更新模式设置为complete
      .outputMode("complete")
      .format("console")
      .start()
    query.awaitTermination()

  }

}

 

二、Java 代码如下

package com.lanson.structuredStreaming;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class SSReadSocketData01 {

    public static void main(String[] args) throws StreamingQueryException, TimeoutException {
        SparkSession spark = SparkSession.builder().master("local")
            .appName("SSReadSocketData01")
            .config("spark.sql.shuffle.partitions", 1)
            .getOrCreate();
        
        spark.sparkContext().setLogLevel("Error");

        Dataset<Row> lines = spark.readStream().format("socket")
            .option("host", "node3")
            .option("port", 9999)
            .load();

        Dataset<String> words = lines.as(Encoders.STRING())
            .flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
            }, Encoders.STRING());

        Dataset<Row> wordCounts = words.groupBy("value").count();

        StreamingQuery query = wordCounts.writeStream()
            .outputMode("complete")
            .format("console")
            .start();
        
        query.awaitTermination();
    }
}

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c
第二次输入:d a c
第三次输入:a b c

可以看到控制台打印如下结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1943743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Delphi 11.2 配置Android SDK 环境

打开 Delphi 11 点击 Tools–Options… 然后点击 Deployment–SDK Manager–Add… 这里如果配置64位就选 Android 64-bit&#xff0c;如果配置32位就选 Android 32-bit 点击 Select an SDK version–Add New… 有警告图标的就是有问题的项&#xff0c;需要手动更新一下&#xf…

NO.1 Hadoop概述

1.1 Hadoop是什么 1.2 Hadoop优势 1.3 Hadoop组成 1.3.1 HDFS架构概述 1.3.2 YARN架构概述 1.3.3 MapReduce架构概述 1.3.4 HDFS、YARN、MapReduce三者关系 1.4 大数据技术生态体系 1.5 推荐系统框架图

【UE5】可反射的射线检测

目录 效果 步骤 一、准备射线 二、生成第一次反射后的射线 三、多次反射 四、通过循环进行多次反射 效果 步骤 一、准备射线 1. 新建一个工程&#xff0c;添加一个俯视角游戏资源包 2. 双击打开俯视角游戏地图 删除大纲中的后期处理体积使得地图可以正常显示 3. 添加一…

【JavaEE初阶】线程的概念及创建

目录 &#x1f4d5; 前言 &#x1f4d5; 认识线程&#xff08;Thread&#xff09; &#x1f6a9; 概念 &#x1f60a;线程是什么 &#x1f642; 为啥要有线程 &#x1f62d; 进程和线程的区别&#xff08;面试题重点&#xff09; &#x1f92d; Java的线程和操作系统线程…

黑马JavaWeb企业级开发(知识清单)01——前端介绍,HTML实现标题:排版

文章目录 前言一、认识web前端、HTML、CSS二、VS Code开发工具&#xff08;插件弃用问题&#xff09;三、HTML结构标签介绍1. 标签页标题< title >2. 图片标签< img >1) 常见属性2) src路径书写方式 3. 标题标签< h >4. 水平分页线标签< hr > 四、用Vs…

“萝卜快跑”自动驾驶技术,夺走了谁的方向盘?

在前几年&#xff0c;科幻电影中无人驾驶车自如地穿梭在城市大街小巷的场景&#xff0c;似乎还遥不可及&#xff0c;然而&#xff0c;随着“萝卜快跑”无人驾驶车辆在多个城市的成功运营&#xff0c;这一愿景已悄然变为现实。由百度Apollo倾力打造的“萝卜快跑”&#xff0c;以…

基于FPGA的以太网设计(3)----详解各类xMII接口

1、什么是xMII接口 MII (Media Independent Interface)接口,即介质无关接口或称为媒体独立接口,它是IEEE-802.3定义的以太网行业标准。“介质无关” 表明在不对MAC硬件重新设计或替换的情况下,任何类型的PHY设备都可以正常工作。 MII接口是MAC和PHY之间的通信接口,MAC产生…

STM32(七):STM32指南者-串口实验

目录 一、基本概念通讯基本概念1、串行和并行2、同步通讯与异步通讯3、全双工、半双工、单工4、通讯速率 串口基本概念1、串口通讯基本概念2、物理层3、协议层 指南者的串口USART 二、串口实验前期准备1、安装安装 USB 转串口驱动_CH3402、野火多功能调试助手3、使用USB转串口&…

RedHat9 | Ansible 编写循环和条件任务

环境版本说明 RedHat9 [Red Hat Enterprise Linux release 9.0]Ansible [core 2.13.3]Python [3.9.10]jinja [3.1.2] 1. 利用循环迭代任务 通过利用循环&#xff0c;管理员无需编写多个使用同一模块的任务。Ansible支持使用loop关键字对一组项目迭代任务&#xff0c;通过配置…

基于单片机控制的变压器油压油温故障检测

摘 要 在电力系统的运行中&#xff0c;通过对其核心设备变压器的故障进行检测&#xff0c;以此能够及时、准确的发现变压器的故障&#xff0c;基于单片机控制的变压器油压油温的故障检测的方法&#xff0c;利用压力传感器、温度传感器对变压器的油压、油温进行采集并送入单片机…

靶机Metasploitable2的安装

Metasploitable2是一款基于Ubuntu Linux的操作系统。Metasploitable2是一个虚拟机文件&#xff0c;从网上下载解压之后就可以直接使用&#xff0c;无需安装。该系统本身设计作为安全工具测试和演示常见漏洞攻击的靶机&#xff0c;所以它存在大量未打补丁漏洞&#xff0c;并且开…

Xilinx Ultrascale+ FPGA 驱动MIPI DSI屏显示源码工程

作者&#xff1a;Hello&#xff0c;Panda 大家早上好&#xff0c;中午好&#xff0c;下午好&#xff0c;我是熊猫君。 曾记否&#xff0c;之前熊猫家发了一篇博文《分享一下使用Xilinx FPGA驱动MIPI DSI屏的心路历程》&#xff0c;此文发布以后&#xff0c;后台收到了不少朋友…

Ubuntu 22.04安装Visual Studio Code(VS Code)配置C++,Python

目录 1,下载 通过命令行安装 2,配置 2.1 vscode安装C/C 2.1.1 vscode安装运行环境 3,测试 vscode测试 4&#xff0c;配置python 选择解释器Python是一个解释性语言&#xff0c;现在需告知VSCode使用哪个解释器 ctrlshiftp 输入&#xff1a;Python: Select Interprete…

记一次因敏感信息泄露而导致的越权+存储型XSS

1、寻找测试目标 可能各位师傅会有苦于不知道如何寻找测试目标的烦恼&#xff0c;这里我惯用的就是寻找可进站的思路。这个思路分为两种&#xff0c;一是弱口令进站测试&#xff0c;二是可注册进站测试。依照这个思路&#xff0c;我依旧是用鹰图进行了一波资产的搜集&#xff…

学习笔记:MySQL数据库操作4

一、数据库和表的创建 创建数据库&#xff1a; 使用create database语句创建一个新的数据库&#xff0c;例如&#xff1a; 选择数据库&#xff1a; 使用use语句来指定后续操作的数据库&#xff0c;例如&#xff1a; 创建表&#xff1a; 使用create table语句来创建表&#xff0…

Java面试八股之后Spring、spring mvc和spring boot的区别

Spring、spring mvc和spring boot的区别 Spring, Spring Boot和Spring MVC都是Spring框架家族的一部分&#xff0c;它们各自有其特定的用途和优势。下面是它们之间的主要区别&#xff1a; Spring: Spring 是一个开源的轻量级Java开发框架&#xff0c;最初由Rod Johnson创建&…

CSS:position属性

一、属性值 1.1 fixed 固定位置的元素&#xff0c;相对于浏览器窗口进行定位。 元素的位置通过 “left”, “top”, “right” 以及 “bottom” 属性进行规定。 网站中的固定 header 和 footer 就是用固定定位来实现的&#xff1b; header效果图 footer效果图 1.2 absol…

“微软蓝屏”事件,给IT行业带来的宝贵经验和教训

“微软蓝屏”事件是指2024年7月19日发生的一次全球性技术故障&#xff0c;主要涉及微软视窗&#xff08;Windows&#xff09;操作系统及其相关应用和服务。 以下是对该事件的详细解析&#xff1a; 一、事件概述 发生时间&#xff1a;2024年7月19日事件影响&#xff1a;全球多个…

linux 解决端口占用

1.查询被占用的端口 netstat -tln | grep 60602.查询该端口对应的服务 lsof -i :60603.杀死该进程 //14868是第二步的PID kill -9 14868

Docker容器逃逸漏洞-CVE-2024-21626

Snyk 在 Docker 引擎以及其他容器化技术(例如 Kubernetes)使用的 runc <=1.1.11 的所有版本中发现了一个漏洞。利用此问题可能会导致容器逃逸到底层主机操作系统,无论是通过执行恶意映像还是使用恶意 Dockerfile 或上游映像构建映像(即使用时FROM) CVE-2024-21626原理…