Spark Structured Streaming使用教程

news2024/9/16 22:18:00

文章目录

    • 1、输入数据源
    • 2、输出模式
    • 3、sink输出结果
    • 4、时间窗口
      • 4.1、时间窗口
      • 4.2、时间水印(Watermarking)
    • 5、使用例子

Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。
Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表示为在静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。

1、输入数据源

  • File source - 以数据流的形式读取写入目录中的文件。文件将按照文件修改时间的先后顺序进行处理。如果设置了latestFirst,则顺序将相反。支持的文件格式为text, CSV, JSON, ORC, Parquet。请参阅DataStreamReader接口的文档,了解最新的列表,以及每种文件格式支持的选项。注意,监视目录的文件改变,只能是原子性的改变,比如把文件放入该目录,而不是持续写入该目录中的某个文件。
  • Kafka source - 从Kafka读取数据。它兼容Kafka代理版本0.10.0或更高版本。查看Kafka集成指南了解更多细节。
  • Socket source (用于测试) - 从套接字连接读取UTF8文本数据。监听服务器套接字位于驱动程序。请注意,这应该仅用于测试,因为它不提供端到端的容错保证。
  • Rate source (用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个时间戳和值。其中timestamp为包含消息发送时间的timestamp类型,value为包含消息计数的Long类型,从0开始作为第一行。该源代码用于测试和基准测试。

2、输出模式

  • 我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。有3种模式:
  • complete mode:所有数据都会被写入外部存储。具体如何写入,是根据不同的外部存储自身来决定的。
  • append mode:只有新的数据,以追加的方式写入外部存储。只有当我们确定,result table中已有的数据是肯定不会被改变时,才应该使用append mode。
  • update mode:只有被更新的数据,包括增加的和修改的,会被写入外部存储中。
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

3、sink输出结果

  • File sink - 将输出存储到一个目录。
    输出模式支持Append
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
  • Kafka sink - 将输出存储到Kafka中的一个或多个主题。
    输出模式支持Append, Update, Complete
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
  • Console sink (用于测试) - 每次有触发器时,将输出打印到控制台/标准输出。支持两种输出模式:Append和Complete。这应该用于在低数据量上进行调试,因为在每次触发后都会收集整个输出并将其存储在驱动程序的内存中。
    输出模式支持Append, Update, Complete
writeStream
    .format("console")
    .start()
  • Memory sink (用于测试) - 输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
    输出模式支持Append, Complete
输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
  • 自定义输出Foreach和ForeachBatch - Foreach:针对每条数据的输出;ForeachBatch:针对每批次的数据输出。
    输出模式支持Append, Update, Complete
// Foreach
streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

// ForeachBatch
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }    
  }
).start();

4、时间窗口

4.1、时间窗口

在业务场景中,经常会遇到按时间段进行聚合操作,Spark提供了基于滑动窗口的事件时间集合操作,每个时间段作为一个分组,并对每个组内的每行数据进行聚合操作。
在这里插入图片描述

可以使用groupBy()和window()操作来表示窗口聚合。

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

4.2、时间水印(Watermarking)

WaterMarking的作用主要是为了解决:延迟到达的数据是否丢弃,系统可以删除过期的数据。
在这里插入图片描述

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();

5、使用例子

package com.penngo.spark;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.art.Art;

import java.io.Serializable;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;

public class SparkStructStream {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static class DataTxt implements Serializable {
        private String text;
        private Timestamp time;

        public DataTxt(String text, LocalDateTime time) {
            this.text = text;
            this.time = Timestamp.valueOf(time);
        }
        public String getText() {
            return text;
        }
        public void setText(String text) {
            this.text = text;
        }
        public Timestamp getTime() {
            return time;
        }
        public void setTime(Timestamp time) {
            this.time = time;
        }
    }
    
    public static void socket(SparkSession spark) throws Exception{
        // 运行:nc -lk 9999
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load();
        Dataset<DataTxt> words = lines
                .as(Encoders.STRING())
                .map((MapFunction<String, DataTxt>) x -> {
                    String[] strs = x.split(",");

                    LocalDateTime date = LocalDateTime.parse(strs[1],formatter);
                    Arrays.asList(x.split(",")).iterator();
                    DataTxt data = new DataTxt(strs[0], date);
                    return data;
                }, Encoders.bean(DataTxt.class));

        Dataset<Row> wordCounts = words.toDF()
                .withWatermark("time", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃
                .groupBy(
                        window(col("time"), "10 minutes", "5 minutes"),
                        col("text"))
                .count();


        wordCounts.writeStream().outputMode("append")
                .foreach(new ForeachWriter<Row>() {
                    @Override public boolean open(long partitionId, long version) {
//                        System.out.println("open==========partitionId:" + partitionId + ",version:" + version);
                        return true;
                    }

                    @Override public void process(Row record) {
                        // Write string to connection
                        System.out.println("recordxxxxxxxxxxxxxxxxxx:======" + record);

                    }

                    @Override public void close(Throwable errorOrNull) {
                        // Close the connection
//                        System.out.println("close==========errorOrNull:" + errorOrNull);
                    }
                })
//                .format("console")
                .start().awaitTermination();
    }

    public static void kafka(SparkSession spark) throws Exception{
        // Subscribe to 1 topic
        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "192.168.245.1:9092")
                .option("subscribe", "topic-news")
                .option("startingOffsets","latest")
                .option("maxOffsetsPerTrigger",1000)
                .load();
        
        df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
        df.printSchema();
        df.writeStream().outputMode("append")
                .format("console")
                .start().awaitTermination();
    }
    public static void main(String[] args) throws Exception{
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF);
        Logger.getLogger("org.apache.kafka").setLevel(Level.WARN);

        System.setProperty("hadoop.home.dir", "/usr/local/hadoop-3.3.6");
        System.setProperty("HADOOP_USER_NAME", "root");

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkStructStream")
                .master("local[*]")
                .getOrCreate();

//        socket(spark);
        kafka(spark);
    }
}

参考自官方文档:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1293102.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023-2024-1-高级语言程序设计-第2次月考编程题

注&#xff1a;此前已发布过的题解不再发布&#xff08;原题请在下面位置进行搜索&#xff09;。 7-1-2 排序(算法任意) 本题要求将给定的n个整数从大到小排序后输出&#xff08;可使用任意排序算法&#xff09;。 输入格式: 输入第一行给出一个不超过10的正整数n。第二行给…

WVP-RPO开源项目搭建实践

0.拉取代码 GitHub - 648540858/wvp-GB28181-pro: WEB VIDEO PLATFORM是一个基于GB28181-2016标准实现的网络视频平台&#xff0c;支持NAT穿透&#xff0c;支持海康、大华、宇视等品牌的IPC、NVR、DVR接入。支持国标级联&#xff0c;支持rtsp/rtmp等视频流转发到国标平台&…

详细解读电力DLT698.45-2017通信规约--正向有功总电能

建立连接请看这篇&#xff1a;详细解读DLT698.45-2017通信规约--预连接响应http://mp.weixin.qq.com/s?__bizMzA3NjAwMjQzMQ&mid2652026396&idx1&sna0a17f005d23136c922a7c381ddb7e75&chksm8481f30cb3f67a1a94e66db77e61fe73c22b1904fcdbb0144108e132b265e7b4…

dtaidistance笔记:dtw_ndim (高维时间序列之间的DTW)

1 数据 第一个维度是sequence的index&#xff0c;每一行是多个元素&#xff08;表示这一时刻的record&#xff09; from dtaidistance.dtw_ndim import *s1 np.array([[0, 0],[0, 1],[2, 1],[0, 1],[0, 0]], dtypenp.double) s2 np.array([[0, 0],[2, 1],[0, 1],[0, .5],[0…

水果党flstudio用什么midi键盘?哪个版本的FL Studio更适合我

好消息&#xff01;好消息&#xff01;特大好消息&#xff01; 水果党们&#xff01;终于有属于自己的专用MIDI键盘啦&#xff01; 万众期待的Novation FLKEY系列 正式出炉&#xff01; 话有点多话&#xff0c;先分享一份干货&#xff0c;尽快下载 FL Studio 21 Win-安装包&…

搜索推荐技术-爱奇艺搜索引擎技术

一、爱奇艺的搜索引擎框架示意图 即通过召回系统&#xff0c;即基于文本匹配的matching system&#xff0c;得到大量视频资源的候选集&#xff0c;经过粗排和精排&#xff0c;最后返回给用户。重点在于召回模块和排序模块。 二、召回模块 召回模块比较重要的是基础相关性&am…

Unity Meta Quest 一体机开发(九):【手势追踪】通过录制抓取手势实现自定义抓取姿势

文章目录 &#x1f4d5;教程说明&#x1f4d5;录制前的准备&#x1f4d5;第一种录制方法&#xff08;Hand Grab Pose Tool 场景&#xff09;⭐在运行模式中确认录制⭐保存录制的手势&#xff0c;将物体做成 Prefab⭐在编辑阶段调整抓取手势&#x1f50d;Fingers Freedom&#x…

leetcode 622. 设计循环链表

这道题讲了两种方法&#xff0c;第一个代码是用数组实现的&#xff0c;第二个是用链表实现的&#xff0c;希望对你们有帮助 &#xff08;最好在VS自己测试一遍&#xff0c;再放到 leetcode上哦&#xff09; 下面的是主函数&#xff08;作参考&#xff09;&#xff0c;静下心来…

第21章网络通信

Internet 提供了大量有用的信息&#xff0c;很少有人能在接触过Internet后拒绝它的诱惑。计算机网络实现了多台计算机间的互联&#xff0c;使得它们彼此之间能够进行数据交流。网络应用程序就是在已连接的不同计算机上运行的程序&#xff0c;这些程序借助于网络协议&#xff0c…

孩子都能学会的FPGA:第二十四课——用FPGA和格雷码实现异步FIFO

&#xff08;原创声明&#xff1a;该文是作者的原创&#xff0c;面向对象是FPGA入门者&#xff0c;后续会有进阶的高级教程。宗旨是让每个想做FPGA的人轻松入门&#xff0c;作者不光让大家知其然&#xff0c;还要让大家知其所以然&#xff01;每个工程作者都搭建了全自动化的仿…

加载离线镜像包:在线镜像离线为tar包、tar离线镜像包加载并根据imageId打tag

第一步&#xff1a;在线环境压缩离线镜像&#xff1a; 需要两个文件&#xff0c;第一个是脚本文件image_offline_load.sh脚本&#xff0c;第二个是image_list.txt 按行 存放需要离线的镜像名称 ./image_offline_load.sh save image_list.txt output.tar第二步&#xff1a;在离…

Nginx 简单入门操作

前言:之前的文章有些过就不罗嗦了。 Nginx 基础内容 是什么? Nginx 是一个轻量级的 HTTP 服务器,采用事件驱动、异步非阻塞处理方式的服务器,它具有极好的 IO 性能,常用于 HTTP服务器(包含动静分离)、正向代理、反向代理、负载均衡 等等. Nginx 和 Node.js 在很多方…

Docker快速理解及简介

docker快速理解及简介 1.Docker为什么出现&#xff1f; 迁移一个项目时&#xff0c;运行文档、配置环境、运行环境、运行依赖包、操作系统发行版、内核等都需要重新安装配置&#xff0c;比较麻烦。 2.Docker是什么&#xff1f; Docker是基于Go语言实现的云开源项目。解决了运行…

Altium Designer实用系列(五)----整理并导出PCB的BOM表

一、引言 最近老师安排了一个小的任务&#xff0c;就是把我们项目的两个电路板BOM整合一下&#xff0c;要注明元器件的耐温、耐压、购买渠道等等内容。    一开始我觉得这工作内容太简单了&#xff0c;两分钟的事。但是当我实际开始干的时候&#xff0c;才发现&#xff0c;好…

【react】动态页面转换成html文件下载,解决样式问题

需求 今天遇到一个需求&#xff0c;挺恶心人的&#xff0c;将一个在线文档页面&#xff0c;可以导出成为html页面查看。 看到网上有使用fs模块&#xff0c;通过react的ReactDOMServer.renderToStaticMarkup将组件转成html字符串&#xff0c;输出文件了。 但是我尝试了&#x…

CodeSys学习笔记

文章目录 1.运动控制的两种方式1.1.SM3_CNC1.2.SM3_Robotics 2.两种运动控制方式的速度、加速度等参数的控制2.1.SM3_CNC2.2.SM3_Robotics 3.CNC的M指令的使用&#xff08;实现&#xff09;逻辑。4.SM3_Robotics中的坐标系5.SM3_Robotics如何实现插补并连续执行&#xff1f; 记…

【Linux】diff命令使用

diff命令 是一个用于比较两个文件或目录之间差异的命令。它可以显示两个文件之间的行级别差异&#xff0c;并以易于阅读的格式输出结果。 著者 由保罗艾格特、迈克海特尔、大卫海耶斯、理查德史泰尔曼和Len Tower撰写。 diff命令 -Linux手册页 语法 diff [选项] [文件1]…

设备间的指令通信

指令通信的概念 要进行设备和设备之间的交流就需要通过串口发送数据进行交流 而串口发送简单的数据只需要传输介质 但是要发送复杂的数据就需要介质和传输的规则了 三种应用场景 比如在上位机和mcu之间 通过上位机管理控制器 从而控制电池 单片机和单片机之间 用户输入数据到…

MySQl int(1)、int(20) 的区别到底在哪里

MySQl int(1)、int(20) 的区别到底在哪里 常思一二&#xff0c;便得自然… int(1)数据类型介绍 在MySQL中&#xff0c;INT(1) 是一种定义整数类型的数据字段&#xff0c;其中的数字表示显示宽度而不是存储范围。具体说&#xff0c;INT(1) 中的数字 1 表示显示宽度&#xff0…

VividTalk创新AI语音匹配图片技术:照片+语音=逼真说话视频!

VividTalk是一个由南京大学、阿里巴巴、字节跳动和南开大学共同开发的项目工具。它通过先进的音频到3D网格映射技术和网格到视频的转换技术&#xff0c;实现了高质量、逼真的音频驱动的说话头像视频生成。这一创新技术使得只需提供一张人物的静态照片和一段语音录音&#xff0c…