Spark(38):Streaming DataFrame 和 Streaming DataSet 转换

news2024/11/28 11:47:51

目录

0. 相关文章链接

1. 基本操作

1.1. 弱类型 api

1.2. 强类型

1.3. 直接执行 sql

2. 基于 event-time 的窗口操作

2.1. event-time 窗口理解

2.2. event-time 窗口生成规则

3. 基于 Watermark 处理延迟数据

3.1. 什么是 Watermark 机制

3.2. update 模式下使用 watermark

3.3. append 模式下使用 wartermark

3.4. watermark 机制总结

4. 流数据去重

5. join操作

5.1. Stream-static Joins

5.1.1. 内连接

5.1.2. 外连接

5.2. Stream-stream Joins

5.2.1. inner join

4.2.2. outer join

6. Streaming DF/DS 不支持的操作


0. 相关文章链接

 Spark文章汇总 

1. 基本操作

在 DF/DS 上大多数通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上。

准备处理数据: people.json

{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

1.1. 弱类型 api

代码示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._


        // 创建格式,并读取数据
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: DataFrame = spark
            .readStream
            .schema(peopleSchema)
            .json("/Project/Data/json")


        // 弱类型 api
        val df: DataFrame = peopleDF
            .select("name", "age", "sex")
            .where("age > 20")
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

1.2. 强类型

代码示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._


        // 创建格式,并读取数据
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: DataFrame = spark
            .readStream
            .schema(peopleSchema)
            .json("/Project/Data/json")


        // 强类型,转成 ds
        val peopleDS: Dataset[People] = peopleDF.as[People]
        val df: Dataset[String] = peopleDS
            .filter((_: People).age > 20)
            .map((_: People).name)
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
        

        // 关闭执行环境
        spark.stop()

    }
}

case class People(name: String, age: Long, sex: String)

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+
|  value|
+-------+
|Michael|
|   Andy|
|zhiling|
+-------+

1.3. 直接执行 sql

代码示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._


        // 创建格式,并读取数据
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: DataFrame = spark
            .readStream
            .schema(peopleSchema)
            .json("/Project/Data/json")


        // 直接执行SQL,创建临时表
        peopleDF.createOrReplaceTempView("people")
        val df: DataFrame = spark.sql("select * from people where age > 20")

        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

2. 基于 event-time 的窗口操作

2.1. event-time 窗口理解

        在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作, 即基于 event-time 进行操作。在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系。因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量。我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量。 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达。现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。

统计后的结果应该是这样的:

代码示例:

import org.apache.spark.sql.functions.window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 使用socket数据源
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .option("includeTimestamp", value = true) // 给产生的数据自动添加时间戳
            .load

        // 把行切割成单词, 保留时间戳
        val words: DataFrame = lines
            .as[(String, Timestamp)]
            .flatMap((line: (String, Timestamp)) => {
                line._1.split(" ").map(((_: String), line._2))
            })
            .toDF("word", "timestamp")


        // 按照窗口和单词分组, 并且计算每组的单词的个数,最后按照窗口排序
        val wordCounts: Dataset[Row] = words
            .groupBy(
                // 调用 window 函数, 返回的是一个 Column 类型
                // 参数 1: df 中表示时间戳的列
                // 参数 2: 窗口长度
                // 参数 3: 滑动步长
                window($"timestamp", "60 seconds", "10 seconds"),
                $"word"
            )
            .count()
            .orderBy($"window")

        wordCounts
            .writeStream
            .outputMode("complete")
            .format("console")
            .option("truncate", "false") // 不截断.为了在控制台能看到完整信息, 最好设置为 false
            .start
            .awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
+------------------------------------------+----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|a   |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|abc |1    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|a   |2    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|abc |1    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|a   |2    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|abc |1    |
|[2023-08-02 21:19:40, 2023-08-02 21:20:40]|a   |2    |
+------------------------------------------+----+-----+
only showing top 20 rows

由此可以看出, 在这种窗口机制下, 无论事件何时到达, 以怎样的顺序到达, Structured Streaming 总会根据事件时间生成对应的若干个时间窗口, 然后按照指定的规则聚合。

2.2. event-time 窗口生成规则

可以查看 org.apache.spark.sql.catalyst.analysis.TimeWindowing 类下的如下代码:

The windows are calculated as below:
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)
  windowId <- ceil((timestamp - startTime) / slideDuration)
  windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
  windowEnd <- windowStart + windowDuration
  return windowStart, windowEnd

        将event-time 作为“初始窗口”的结束时间, 然后按照窗口滑动宽度逐渐向时间轴前方推进, 直到某个窗口不再包含该 event-time 为止。 最终以“初始窗口”与“结束窗口”之间的若干个窗口作为最终生成的 event-time 的时间窗口。

每个窗口的起始时间与结束时间都是前必后开的区间, 因此初始窗口和结束窗口都不会包含 event-time, 最终不会被使用。

得到窗口如下:

3. 基于 Watermark 处理延迟数据

3.1. 什么是 Watermark 机制

        在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达。 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time。 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤。现在考虑如果事件延迟到达会有哪些影响。 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用。 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11。 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态。

        但是, 如果这个查询运行数天, 系统很有必要限制内存中累积的中间状态的数量。 这意味着系统需要知道何时从内存状态中删除旧聚合, 因为应用不再接受该聚合的后期数据。为了实现这个需求, 从 spark2.1, 引入了 watermark(水印), 使用引擎可以自动的跟踪当前的事件时间, 并据此尝试删除旧状态。通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark。 针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到(max event time seen by the engine - late threshold > T)。 换句话说, 延迟时间在上限内的被聚合, 延迟时间超出上限的开始被丢弃。

        可以通过withWatermark() 来定义watermark,watermark 计算方式:watermark = MaxEventTime - Threshhod;而且, watermark只能逐渐增加, 不能减少。

Structured Streaming 引入 Watermark 机制, 主要是为了解决以下两个问题:

  • 处理聚合中的延迟数据
  • 减少内存中维护的聚合状态.

注意:在不同输出模式(complete, append, update)中, Watermark 会产生不同的影响。

3.2. update 模式下使用 watermark

在 update 模式下, 仅输出与之前批次的结果相比, 涉及更新或新增的数据。

代码示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 使用socket数据源
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 输入的数据中包含时间戳, 而不是自动添加的时间戳
        val words: DataFrame = lines
            .as[String]
            .flatMap((line: String) => {
                val split: Array[String] = line.split(",")
                split(1)
                    .split(" ")
                    .map(((_: String), Timestamp.valueOf(split(0))))
            })
            .toDF("word", "timestamp")

        // 使用 withWatermark 方法,添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.
        val wordCounts: Dataset[Row] = words
            .withWatermark("timestamp", "2 minutes")
            .groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word")
            .count()

        // 数据输出
        val query: StreamingQuery = wordCounts
            .writeStream
            .outputMode("update")
            .trigger(Trigger.ProcessingTime(1000))
            .format("console")
            .option("truncate", "false")
            .start
        query.awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

初始化的wartmark是 0,通过如下输入的几条数据,可以看到水位线的变化。

第一次输入数据:  2023-08-07 10:55:00,dog 。这个条数据作为第一批数据。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 个窗口。 由于是第一批, 所有的窗口的结束时间都大于 wartermark(0), 所以 5 个窗口都显示,如下所示:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53 。

第二次输入数据:  2023-08-07 11:00:00,dog 。 这条数据作为第二批数据, 计算得到 5 个窗口。 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark。 在 update 模式下, 只输出结果表中涉及更新或新增的数据。

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:58:00, 2023-08-07 11:08:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |2    |
|[2023-08-07 10:56:00, 2023-08-07 11:06:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |2    |
|[2023-08-07 11:00:00, 2023-08-07 11:10:00]|dog |1    |
+------------------------------------------+----+-----+

其中: count 是 2 的表示更新, count 是 1 的表示新增。 没有变化的就没有显示(但是内存中仍然保存着)。此时的的 watermark = 11:00 - 2min = 10:58 。如下数据为在内存中保存着,但是没有打印出来的数据:

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |

第三次输入数据:   2023-08-07 10:55:00,dog  。 这条数据作为第 3 批次,相当于一条延迟数据,计算得到 5 个窗口。此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58。

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |

则立即删除这两个窗口在内存中的维护状态。 同时, 当前批次中新加入的数据所划分出来的窗口, 如果窗口结束时间低于 11:58, 则窗口会被过滤掉。

所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |2    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |3    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |3    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变(因为 watermask 只能增加不能减少)。

3.3. append 模式下使用 wartermark

代码示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 使用socket数据源
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 输入的数据中包含时间戳, 而不是自动添加的时间戳
        val words: DataFrame = lines
            .as[String]
            .flatMap((line: String) => {
                val split: Array[String] = line.split(",")
                split(1)
                    .split(" ")
                    .map(((_: String), Timestamp.valueOf(split(0))))
            })
            .toDF("word", "timestamp")

        // 使用 withWatermark 方法,添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.
        val wordCounts: Dataset[Row] = words
            .withWatermark("timestamp", "2 minutes")
            .groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word")
            .count()

        // 数据输出
        val query: StreamingQuery = wordCounts.writeStream
            .outputMode("append")
            .trigger(Trigger.ProcessingTime(0))
            .format("console")
            .option("truncate", "false")
            .start
        query.awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

在 append 模式中, 仅输出新增的数据, 且输出后的数据无法变更。

第一次输入数据: 2023-08-07 10:55:00,dog  。 这个条数据作为第一批数据。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 个窗口。 由于此时初始 watermask=0, 当前批次中所有窗口的结束时间均大于 watermask。但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容。 因此, 基于 Append 模式的特点, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark。 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53

第二次输入数据: 2023-08-07 11:00:00,dog  。这条数据作为第二批数据, 计算得到 5 个窗口。 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark, 仍然不会输出。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后计算 watermark = 11:00 - 2min = 10:58

第三次输入数据: 2023-08-07 10:55:00,dog 。相当于一条延迟数据,这条数据作为第 3 批次, 计算得到 5 个窗口。 此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58。

|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |

则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态。所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变。(因为 watermask 只能增加不能减少)

3.4. watermark 机制总结

  • watermark 在用于基于时间的状态聚合操作时, 该时间可以基于窗口, 也可以基于 event-timeb本身。
  • 输出模式必须是append或update。 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果。 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义。
  • 在输出模式是append时, 必须设置 watermask 才能使用聚合操作。 其实, watermask 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态。
  • 在输出模式是update时, watermask 主要用于过滤过期数据并及时清理过期状态。
  • watermask 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用。 但如果节点发送故障, 则可能延迟若干批次生效。
  • withWatermark 必须使用与聚合操作中的时间戳列是同一列。df.withWatermark("time", "1 min").groupBy("time2").count() 无效。
  • withWatermark 必须在聚合之前调用。 f.groupBy("time").count().withWatermark("time", "1 min") 无效。

4. 流数据去重

需求内容:根据唯一的 id 实现数据去重

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 使用socket数据源
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load

        // 数据预处理
        val words: DataFrame = lines
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), Timestamp.valueOf(arr(1)), arr(2))
            })
            .toDF("uid", "ts", "word")

        // 去重重复数据 uid 相同就是重复.  可以传递多个列
        val wordCounts: Dataset[Row] = words
            .withWatermark("ts", "2 minutes")
            .dropDuplicates("uid")

        // 输出数据
        wordCounts.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()


        // 关闭执行环境
        spark.stop()

    }
}

数据输入(按顺序从上到下):

1,2023-08-09 11:50:00,dog
2,2023-08-09 11:51:00,dog
1,2023-08-09 11:50:00,dog
3,2023-08-09 11:53:00,dog
1,2023-08-09 11:50:00,dog
4,2023-08-09 11:45:00,dog

注意点:

  • dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates 
  • 使用watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。 
  • 没有watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。

测试:

  • 第一次输入数据:1,2023-08-09 11:50:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  1|2023-08-09 11:50:00| dog|
+---+-------------------+----+
  • 第二次输入数据:2,2023-08-09 11:51:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  2|2023-08-09 11:51:00| dog|
+---+-------------------+----+
  • 第三次输入数据:1,2023-08-09 11:50:00,dog (id 重复无输出)
  • 第四次输入数据:3,2023-08-09 11:53:00,dog (此时 watermask=11:51)
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  3|2023-08-09 11:53:00| dog|
+---+-------------------+----+
  • 第五次输入数据:1,2023-08-09 11:50:00,dog (数据重复, 并且数据过期, 所以无输出)
  • 第六次输入数据:4,2023-08-09 11:45:00,dog (数据过时, 所以无输出)

5. join操作

        Structured Streaming 支持 streaming DataSet/DataFrame 与静态的DataSet/DataFrame 进行 join, 也支持 streaming DataSet/DataFrame与另外一个streaming DataSet/DataFrame 进行 join。join 的结果也是持续不断的生成, 类似于前面的 streaming 的聚合结果。

5.1. Stream-static Joins

静态数据:

lisi,male
zhiling,female
zs,male

流式数据:

lisi,20
zhiling,40
ww,30

5.1.1. 内连接

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 1. 静态 df
        val arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));
        val staticDF: DataFrame = spark
            .sparkContext
            .parallelize(arr)
            .toDF("name", "sex")

        // 2. 流式 df
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
        val streamDF: DataFrame = lines
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt)
            })
            .toDF("name", "age")

        // 3. join   等值内连接  a.name=b.name
        val joinResult: DataFrame = streamDF.join(staticDF, "name")

        // 4. 输出
        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

数据输出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|   lisi| 20|  male|
+-------+---+------+

5.1.2. 外连接

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 1. 静态 df
        val arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));
        val staticDF: DataFrame = spark
            .sparkContext
            .parallelize(arr)
            .toDF("name", "sex")

        // 2. 流式 df
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
        val streamDF: DataFrame = lines
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt)
            })
            .toDF("name", "age")

        // 3. join
        val joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")

        // 4. 输出
        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}

数据输出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|     ww| 30|  null|
|   lisi| 20|  male|
+-------+---+------+

5.2. Stream-stream Joins

        在 Spark2.3, 开始支持 stream-stream join。Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长。 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长。

第 1 个数据格式:姓名,年龄,事件时间

lisi,female,2023-08-09 11:50:00
zs,male,2023-08-09 11:51:00
ww,female,2023-08-09 11:52:00
zhiling,female,2023-08-09 11:53:00
fengjie,female,2023-08-09 11:54:00
yifei,female,2023-08-09 11:55:00

第 2 个数据格式:姓名,年龄,事件时间

lisi,18,2023-08-09 11:50:00
zs,19,2023-08-09 11:51:00
ww,20,2023-08-09 11:52:00
zhiling,22,2023-08-09 11:53:00
yifei,30,2023-08-09 11:54:00
fengjie,98,2023-08-09 11:55:00

5.2.1. inner join

对 2 个流式数据进行 join 操作,输出模式仅支持append模式。

不带 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 第 1 个 stream
        val nameSexStream: DataFrame = spark
            .readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1), Timestamp.valueOf(arr(2)))
            }).toDF("name", "sex", "ts1")

        // 第 2 个 stream
        val nameAgeStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 8888)
            .load
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
            }).toDF("name", "age", "ts2")

        // join 操作
        val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")

        // 数据输出
        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .trigger(Trigger.ProcessingTime(0))
            .start()
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}


//      数据输出:
//      +-------+------+-------------------+---+-------------------+
//      |   name|   sex|                ts1|age|                ts2|
//      +-------+------+-------------------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00| 20|2023-08-09 11:52:00|
//      |  yifei|female|2023-08-09 11:55:00| 30|2023-08-09 11:54:00|
//      |     zs|  male|2023-08-09 11:51:00| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+---+-------------------+

带 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 第 1 个 stream
        val nameSexStream: DataFrame = spark
            .readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1), Timestamp.valueOf(arr(2)))
            }).toDF("name1", "sex", "ts1")

        // 第 2 个 stream
        val nameAgeStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 8888)
            .load
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
            }).toDF("name2", "age", "ts2")
            .withWatermark("ts2", "1 minutes")

        // join 操作
        val joinResult: DataFrame = nameSexStream
            .join(
                nameAgeStream,
                expr(
                    """
                      |name1=name2 and
                      |ts2 >= ts1 and
                      |ts2 <= ts1 + interval 1 minutes""".stripMargin
                )
            )

        // 数据输出
        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .trigger(Trigger.ProcessingTime(0))
            .start()
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}


//      数据输出:
//      +-------+------+-------------------+-------+---+-------------------+
//      |  name1|   sex|                ts1|  name2|age|                ts2|
//      +-------+------+-------------------+-------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//      |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+-------+---+-------------------+

4.2.2. outer join

外连接必须使用 watermast,和内连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可:joinType = "left"。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.sql.Timestamp

object StreamTest {

    def main(args: Array[String]): Unit = {

        // 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamTest")
            .getOrCreate()
        import spark.implicits._

        // 第 1 个 stream
        val nameSexStream: DataFrame = spark
            .readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1), Timestamp.valueOf(arr(2)))
            }).toDF("name1", "sex", "ts1")

        // 第 2 个 stream
        val nameAgeStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 8888)
            .load
            .as[String]
            .map((line: String) => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
            }).toDF("name2", "age", "ts2")
            .withWatermark("ts2", "1 minutes")

        // join 操作
        val joinResult: DataFrame = nameSexStream
            .join(
                nameAgeStream,
                expr(
                    """
                      |name1=name2 and
                      |ts2 >= ts1 and
                      |ts2 <= ts1 + interval 1 minutes""".stripMargin
                ),
                joinType = "left"
            )

        // 数据输出
        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .trigger(Trigger.ProcessingTime(0))
            .start()
            .awaitTermination()

        // 关闭执行环境
        spark.stop()

    }
}


//      数据输出:
//        +-------+------+-------------------+-------+---+-------------------+
//        |  name1|   sex|                ts1|  name2|age|                ts2|
//        +-------+------+-------------------+-------+---+-------------------+
//        |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//        |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//        |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//        |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//        |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//        +-------+------+-------------------+-------+---+-------------------+

6. Streaming DF/DS 不支持的操作

到目前, DF/DS 的有些操作 Streaming DF/DS 还不支持:

  • 多个Streaming 聚合(例如在 DF 上的聚合链)目前还不支持
  • limit 和取前 N 行还不支持
  • distinct 也不支持
  • 仅仅支持对 complete 模式下的聚合操作进行排序操作
  • 仅支持有限的外连接
  • 有些方法不能直接用于查询和返回结果, 因为他们用在流式数据上没有意义
    • count() 不能返回单行数据, 必须是s.groupBy().count()
    • foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)
    • show() 不能直接使用, 而是使用 console sink

如果执行上面操作会看到这样的异常: operation XYZ is not supported with streaming DataFrames/Datasets。


注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/866669.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机视觉|生成对抗】条件生成对抗网络(CGAN)

本系列博文为深度学习/计算机视觉论文笔记&#xff0c;转载请注明出处 标题&#xff1a;Conditional Generative Adversarial Nets 链接&#xff1a;[1411.1784] Conditional Generative Adversarial Nets (arxiv.org) 摘要 生成对抗网络&#xff08;Generative Adversarial…

04_Hudi 集成 Spark、保存数据至Hudi、集成Hive查询、MergeInto 语句

本文来自"黑马程序员"hudi课程 4.第四章 Hudi 集成 Spark 4.1 环境准备 4.1.1 安装MySQL 5.7.31 4.1.2 安装Hive 2.1 4.1.3 安装Zookeeper 3.4.6 4.1.4 安装Kafka 2.4.1 4.2 滴滴运营分析 4.2.1 需求说明 4.2.2 环境准备 4.2.2.1 工具类SparkUtils 4.2.2.2 日期转换…

读《Flask Web开发实战》(狼书)笔记 | 第1、2章

前言 2023-8-11 以前对网站开发萌生了想法&#xff0c;又有些急于求成&#xff0c;在B站照着视频敲了一个基于flask的博客系统。但对于程序的代码难免有些囫囵吞枣&#xff0c;存在许多模糊或不太理解的地方&#xff0c;只会照葫芦画瓢。 而当自己想开发一个什么网站的时&…

限流在不同场景的最佳实践

目录导读 限流在不同场景的最佳实践1. 前言2. 为什么要限流3. 有哪些限流场景3.1 限流场景分类3.2 限流与熔断降级之间的关系3.3 非业务限流3.4 业务限流 4. 有哪些限流算法4.1 计数器限流算法4.2 漏桶限流算法4.3 令牌桶限流算法4.4 滑动时间窗限流算法4.5 限流算法选型 5. 限…

【数据结构与算法】稀疏数组

文章目录 一&#xff1a;为什么会使用稀疏数组1.1 先看一个实际的需求1.2 基本介绍1.2.1 稀疏数组的处理方法1.2.2 数组的举例说明1.2.3 应用实例1.2.4 整体思路分析二维数组转稀疏数组的思路稀疏数组转原始的二维数组的思路 二&#xff1a;代码实现2.1 创建一个原始的11*11二维…

​LeetCode解法汇总1572. 矩阵对角线元素的和

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给你一个正…

探秘金和OA:解析任意文件读取漏洞的潜在威胁

是喜是悲&#xff0c;但可以慰藉的是&#xff0c;你总不枉在这世界上活了一场&#xff0c;有了这样的认识&#xff0c;你就会珍重生活&#xff0c;而不会玩世不恭&#xff1b;同时也会给人自身注入一种强大的内在力量…… 漏洞复现 访问url&#xff1a; 构造payload /C6/Jh…

【网络编程(二)】NIO快速入门

NIO Java NIO 三大核心组件 Buffer&#xff08;缓冲区&#xff09;&#xff1a;每个客户端连接都会对应一个Buffer&#xff0c;读写数据通过缓冲区读写。Channel&#xff08;通道&#xff09;&#xff1a;每个channel用于连接Buffer和Selector&#xff0c;通道可以进行双向读…

日常问题——使用Java转将long类型为date类型,日期是1970年

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;日常BUG、BUG、问题分析☀️每日 一言 &#xff1a;存在错误说明你在进步&#xff01; 一、问题描述 long类型的日期为&#xff1a;1646718195 装换为date类型&#xff1a; Date date new Dat…

SQL | 使用函数处理数据

8-使用函数处理数据 8.1-函数 SQL可以用函数来处理数据。函数一般是在数据上执行的&#xff0c;为数据的转换和处理提供了方便。 8.1.1 函数带来的问题 每种DBMS都有特定的函数&#xff0c;只有很少一部分函数&#xff0c;是被所有主要的DBMS等同的支持。 虽然所有的类型的…

Linux 基础(五)常用命令-文件属性

文件属性 文件权限文件属性修改文件权限属性 文件所有者 文件权限 文件属性 Linux中文件权限 可以通过文件属性体现&#xff1b; 使用 ll 查看文件列表 最前面的 l d 表示文件类型 1 5 表示硬链接数 或者 子文件夹个数 所属用户 所属用户组 文件大小 创建/更新时间 文件&…

【前端二次开发框架关于关闭eslint】

前端二次开发框架关于关闭eslint 方法一方法二方法三方法四&#xff1a;以下是若想要关闭项目中的部分代码时&#xff1a; 方法一 在vue.config.js里面进行配置&#xff1a; module.exports {lintOnSave:false,//是否开启eslint保存检测 ,它的有效值为 true || false || err…

一个简单实用的线程池及线程池组的实现!

1.线程池简介 线程池&#xff0c;顾名思义&#xff0c;就是一个“池子”里面放有多个线程。为什么要使用线程池呢&#xff1f;当我们编写的代码需要并发异步处理很多任务时候&#xff0c;一般的处理办法是一个任务开启一个线程去处理&#xff0c;处理结束后释放线程。可是这样…

Docker安装 Kibana

目录 前言安装Kibana步骤1&#xff1a;准备1. 安装docker2. 搜索可以使用的镜像。3. 也可从docker hub上搜索镜像。4. 选择合适的redis镜像。 步骤2&#xff1a;拉取 kibana 镜像拉取镜像查看已拉取的镜像 步骤3&#xff1a;创建容器创建容器方式1&#xff1a;快速创建容器 步骤…

vue父页面给iframe子页面传值

在vue父页面有两个个参数 名称和图标&#xff0c;需要把这两个参数传到iframe的地图里面&#xff0c;在地图触发绘点事件的时候&#xff0c;获取到传来的参数并且展示 vue:传值给子页面iframe // 传值给子页面iframe(2个参数)handleIframeLoad() {const iframeWindow this.$re…

海康威视iVMS综合安防系统任意文件上传(0Day)

漏洞描述 攻击者通过请求/svm/api/external/report接口任意上传文件,导致获取服务器webshell权限,同时可远程进行恶意代码执行。 免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危害国家安全、荣誉和…

安全第二次

一&#xff0c;iframe <iframe>标签用于在网页里面嵌入其他网页。 1&#xff0c;sandbox属性 如果嵌入的网页是其他网站的页面&#xff0c;因不了解对方会执行什么操作&#xff0c;因此就存在安全风险。为了限制<iframe>的风险&#xff0c;HTML 提供了sandb…

HCIA---动态路由---RIP协议

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 目录 前言 一.动态路由 二.动态路由协议分类 IGP&#xff1a;内部网关协议 EGP:外部网关协议 三.RIP协议概述 RIP版本分类&#xff1a; RIP三要素&#xff1a; 思维…

TypeScript 关于对【泛型】的定义使用解读

目录 概念导读泛型函数多个泛型参数泛型约束泛型别名泛型接口泛型类总结&#xff1a; 概念导读 泛型&#xff08;Generics&#xff09;是指在定义函数、接口或类的时候&#xff0c;不预先指定具体的类型&#xff0c;而在使用的时候再指定类型的一种特性。使用泛型 可以复用类型…

【非科班如何丝滑转码?】探索计算机领域跳槽之路

近年来&#xff0c;计算机领域的蓬勃发展吸引着越来越多非计算机科班出身的人士投身其中。本文将就如何顺利实现非科班转码&#xff0c;计算机岗位的发展前景&#xff0c;以及现阶段转码的建议&#xff0c;结合个人经验和观察&#xff0c;为您阐述详细全面的观点。 一、如何规划…