Spark实时(五):InputSource数据源案例演示

news2025/1/16 7:55:35

文章目录

InputSource数据源案例演示

一、​​​​​​​File Source

1、读取text文件

2、读取csv文件

3、读取json文件

二、Socket Source 

三、Rate Source


InputSource数据源案例演示

在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。

Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。

一、​​​​​​​​​​​​​​File Source

Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。

1、读取text文件

Scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  *  Structured Streaming监控目录 text格式数据
  */
object SSReadTextData {
  def main(args: Array[String]): Unit = {

    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadTextData")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import  spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.监控目录
    val ds: Dataset[String] = spark.readStream.textFile("./data/")

    val result: DataFrame = ds.map(line => {
      val arr: Array[String] = line.split("-")
      (arr(0).toInt, arr(1), arr(2).toInt)
    }).toDF("id", "name", "age")

    val query: StreamingQuery = result.writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }

}

 结果:

Java代码如下:

package com.lanson.structuredStreaming.source;

import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;

public class SSReadTextData01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
        SparkSession spark = SparkSession.builder().master("local")
                .appName("SSReadSocketData01")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();

        spark.sparkContext().setLogLevel("Error");

        Dataset<String> ds = spark.readStream().textFile("./data/");

        Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
            @Override
            public Tuple3<Integer, String, Integer> call(String line) throws Exception {
                String[] arr = line.split("-");

                return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );
            }
        }, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));

        Dataset<Row> result = ds2.toDF("id", "name", "age");

        result.writeStream()
                .format("console")
                .start()
                .awaitTermination();

    }
}

 结果:

以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:

1-zhangsan-18
2-lisi-19
3-ww-20

2、读取csv文件

Scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType

/**
  * Structured Streaming 读取CSV数据
  */
object SSReadCsvData {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadCsvData")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import  spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.创建CSV数据schema
    val userSchema: StructType = new StructType().add("id", "integer")
      .add("name", "string")
      .add("gender", "string")
      .add("age", "integer")


    val result: DataFrame = spark.readStream
      .option("sep", ",")
      .schema(userSchema)
      .csv("./data/")

    val query: StreamingQuery = result.writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }

}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;

import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;

/**
 * Structured Streaming 读取CSV数据
 */

public class SSReadCsvData01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
        SparkSession spark = SparkSession.builder().master("local")
                .appName("SSReadCsvData")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();

        spark.sparkContext().setLogLevel("Error");

        StructType userSchema = new StructType()
                .add("id", "integer")
                .add("name", "string")
                .add("gender", "string")
                .add("age", "integer");
        Dataset<Row> result = spark.readStream()
                .option("sep", ",")
                .schema(userSchema)
                .csv("./data/");

        result.writeStream()
                .format("console")
                .start()
                .awaitTermination();

    }
}

 结果:

以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:

1,zhangsan,一班,100
2,lisi,二班,200
3,wangwu,一班,300
4,maliu,二班,100
5,tianqi,三班,100
6,gaoba,三班,50
7,zs2,四班,50

3、读取json文件

Scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType

/**
  *  Structured Streaming 监控Json格式数据
  */
object SSReadJsonData {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadCsvData")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import  spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.创建 json 数据schema
    val userSchema: StructType = new StructType().add("id", "integer")
      .add("name", "string")
      .add("age", "integer")



    val result: DataFrame = spark.readStream
      .schema(userSchema)
      .json("./data/")

    val query: StreamingQuery = result.writeStream
      .format("console")
      .start()

    query.awaitTermination()

  }

}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;

/**
 * Structured Streaming实时监控目录中json文件作为数据流
 */
public class SSReadJsonData01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
        SparkSession spark = SparkSession.builder().appName("File Source test")
            .master("local")
            .getOrCreate();

        //2.设置日志
        spark.sparkContext().setLogLevel("Error");

        //3.设置Schema
        StructType userSchema = new StructType().add("id", "integer")
            .add("name", "string")
            .add("age", "integer");

        //4.指定监控目录读取数据json数据
        Dataset<Row> ds = spark.readStream()
            .option("sep", ",")
            .schema(userSchema)
            .json("./data/");

        //5.打印数据到控制台
        StreamingQuery query =ds.writeStream()
            .format("console")
            .start();

        query.awaitTermination();

    }
}

结果:

以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:

{"id":1,"name":"zs","age":18}
{"id":2,"name":"ls","age":19}
{"id":3,"name":"ww","age":20}
{"id":4,"name":"ml","age":21}

注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。

二、Socket Source 

读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。

可以参考案例:

Spark实时(三):Structured Streaming入门案例-CSDN博客

三、Rate Source

Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。

scala代码如下:

package com.lanson.structuredStreaming.source

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * SSRateSource
  */
object SSRateSource {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("rate test")
//      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val result: DataFrame = spark.readStream
      .format("rate")
      // 配置每秒生成多少行数据,默认1行
      .option("rowsPerSecond", "10")
      .option("numPartitions", 5)
      .load()
    result.writeStream
      .format("console")
      .option("numRows","100")
      .option("truncate","false")
      .start()
      .awaitTermination()

  }

}

结果:

Java代码如下:

package com.lanson.structuredStreaming.source;

import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class ssratesource01 {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建对象
       SparkSession spark = SparkSession.builder().master("local")
                .appName("rate test")
                .getOrCreate();
       spark.sparkContext().setLogLevel("Error");

        Dataset<Row> result = spark.readStream()
                .format("rate")
                // 配置每秒生成多少行数据,默认1行
                .option("rowsPerSecond", "10")
                .option("numPartitions", 5)
                .load();

        result.writeStream()
                .format("console")
                .option("numRows","100")
                .option("truncate","false")
                .start()
                .awaitTermination();
    }
}

结果: 

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1949892.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

客户服务知识库最佳实践 7 个步骤

介绍 每个公司的声誉都依赖于其客户。 如果全世界都向你敞开了大门&#xff0c;但你最终在与客户打交道方面做得很糟糕&#xff0c;那么消息就会传出去&#xff0c;无论你的捕鼠器有多好&#xff0c;你都会失去销售。 正如营销依赖于与潜在客户的关系一样&#xff0c;公司的…

【Linux】信号3——信号的捕捉

1.信号的捕捉 我们都说信号被收到了&#xff0c;可能不会立马处理 信号是什么时候被处理的呢&#xff1f; 前提是我们得知道自己收到了信号&#xff0c;进程就得在合适的时候去查自己的pending表和block表&#xff0c;这些属于内核数据结构&#xff0c;进程一定要处于内核态&a…

Easy es问题总结

官网教程&#xff1a;https://www.easy-es.cn/pages/ac41f0/#settings 一 测试项目 1 pom <dependencies><!-- 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突--><dependency><groupId>org.springframework.boot</groupId><artifa…

Java语言程序设计基础篇_编程练习题*15.26 (改变透明度)

*15.26 (改变透明度) 重写编程练习题15.24&#xff0c;当球摆动的时候改变球的透明度 思路&#xff1a;很简单&#xff0c;改写编程练习题15.24&#xff0c;为小圆Circle添加一个FadeTransition&#xff0c;再把暂停和开始方法设置在鼠标事件中 代码示例&#xff1a;编程练习…

基于多种机器学习的豆瓣电影评分预测与多维度可视化【可加系统】

有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主 在本研究中&#xff0c;我们采用Python编程语言&#xff0c;利用爬虫技术实时获取豆瓣电影最新数据。通过分析豆瓣网站的结构&#xff0c;我们设计了一套有效的策略来爬取电影相关的JSON格式数据。…

鸿蒙(HarmonyOS)自定义Dialog实现时间选择控件

一、操作环境 操作系统: Windows 11 专业版、IDE:DevEco Studio 3.1.1 Release、SDK:HarmonyOS 3.1.0&#xff08;API 9&#xff09; 二、效果图 三、代码 SelectedDateDialog.ets文件/*** 时间选择*/ CustomDialog export struct SelectedDateDialog {State selectedDate:…

【数据结构初阶】单链表经典算法题十二道——得道飞升(上篇)

目录 1、移除元素 2、反转链表 3、链表的中间节点 4、合并两个有序链表 Relaxing Time&#xff01;&#xff01;&#xff01; ———————————————— 天气之子幻 ———————————————— 1、移除元素 思路&#xff1a; 创建一个新链表&#xff0…

CPU350% JVM GC频繁并GC不掉EXCEL导出

背景&#xff1a; 有个Excel导出的需求&#xff0c;测试的时候&#xff0c;只要连续导出大量的数据就会导致FAT机器反请求反应迟钝&#xff0c;甚至卡死&#xff0c;无法恢复。 排查&#xff1a; 1 跳板机跳到机器上&#xff0c;查看 项目 ipd 执行ps -ef | grep 项目名称.j…

FFmpeg音视频流媒体的顶级项目

搞音视频、流媒体的圈子,没法躲开ffmpeg这个神级项目。 FFmpeg 是一个功能强大且广泛使用的多媒体处理工具。FFmpeg 具备众多出色的特性。它支持多种音频和视频格式的转换,能轻松将一种格式的文件转换为另一种,满足不同设备和应用的需求。不仅如此,它还可以进行视频的裁剪、…

机械学习—零基础学习日志(高数11——三角函数)

零基础为了学人工智能&#xff0c;真的开始复习高数 三角函数之所以比较困难&#xff0c;是因为过于抽象&#xff0c;距离生活太过遥远&#xff0c;这里搜集一些资料&#xff0c;帮助大家能加深对三角函数的理解。 三角函数作用——能测距离 三角函数从应用层&#xff0c;开…

增量同步与全量同步:深入解析数据同步的两种策略

目录 一、增量同步 二、全量同步 三、如何选择合适的数据同步策略&#xff1f; 1.增量同步的场景 2.全量同步的场景 数据同步在后端是非常常见的场景&#xff0c;数据同步的稳定性和实时性对业务有非常重要的影响。数据同步的方式主要有全量同步和增量同步两种&#xff0c;本文…

如何学习ClickHouse:糙快猛的大数据之路(技术要点概览)

这个系列文章用"粗快猛大模型问答讲故事"的创新学习方法&#xff0c;让你轻松理解复杂知识&#xff01;涵盖Hadoop、Spark、MySQL、Flink、Clickhouse、Hive、Presto等大数据所有热门技术栈&#xff0c;每篇万字长文。时间紧&#xff1f;只看开头20%就能有收获&#…

[STM32]FlyMcu同时烧写BootLoader和APP文件-HEX文件组成

目录 一、前言 二、HEX文件的格式 三、组合HEX文件 四、使用FlyMcu烧录 一、前言 如题&#xff0c;BootLoader每次烧写都是全部擦除&#xff0c;当我们烧写APP程序的时候&#xff0c;BootLoader程序将不复存在&#xff0c;很多开发者或许只有USB转TTL模块&#xff0c;没有其…

QML ListView snapMode

属性&#xff1a; snapMode 此属性确定视图滚动在拖动或轻拂之后的解决方式 NoSnap:列表滚动停止时可以停在任意位置&#xff0c;即便第一项显示不全 SnapToItem:当放开鼠标时&#xff0c;移动距离超过半个Item时&#xff0c;自动滑动到下一个Item&#xff0c;否则自动滑动回…

Unity中有关Animation的一点笔记

也许更好的阅读体验 Animation Unity中Animation类并不是直接记载了和播放动画有关的信息&#xff0c;可以简单理解Animation为一个动画播放器&#xff0c;播放的具体内容就像卡带一样&#xff0c;当我们有了卡带后我们可以播放动画。 对应的则是编辑器中的组件 所以Anima…

一文解决 | Linux(Ubuntn)系统安装 | 硬盘挂载 | 用户创建 | 生信分析配置

原文链接&#xff1a;一文解决 | Linux&#xff08;Ubuntn&#xff09;系统安装 | 硬盘挂载 | 用户创建 | 生信分析配置 本期教程 获得本期教程文本文档&#xff0c;在后台回复&#xff1a;20240724。请大家看清楚回复关键词&#xff0c;每天都有很多人回复错误关键词&#xf…

ffmpeg ffplay.c 源码分析二:数据读取线程

本章主要是分析 数据读取线程read_thread 中的工作。如上图红色框框的部分 从ffplay框架分析我们可以看到&#xff0c;ffplay有专⻔的线程read_thread()读取数据&#xff0c; 且在调⽤av_read_frame 读取数据包之前需要做&#xff1a; 1.例如打开⽂件&#xff0c; 2.查找配置解…

Springboot集成Elasticsearch High Level REST Client实现增删改查实战

获取源码&#x1f6a9; 需要完整代码资料&#xff0c;请一键三连后评论区留下邮箱&#xff0c;安排发送&#xff01;&#xff01;&#xff01;&#x1f916; 什么是High Level REST Client&#xff1f; Elasticsearch 的 High Level REST Client 是一个用于与 Elasticsearch…

科技与占星的融合:AI 智能占星师

本文由 ChatMoney团队出品 在科技的前沿领域&#xff0c;诞生了一位独特的存在——AI占星师。它并非传统意义上的占星师&#xff0c;而是融合了先进的人工智能技术与神秘的占星学知识。 这能够凭借其强大的数据分析能力和精准的算法&#xff0c;对星辰的排列和宇宙的能量进行深…