多流转换 (分流,合流,基于时间的合流——双流联结 )

news2025/1/6 18:54:32

目录

一,分流

 1.实现分流

2.使用侧输出流 

 二,合流

1,联合  

2,连接 

三,基于时间的合流——双流联结 

 1,窗口联结

 1.1 窗口联结的调用

1.2 窗口联结的处理流程  

2,间隔联结

2.1 间隔联结的原理

2.2 间隔联结的调用


        在数据处理中,多流转换是一个重要的概念。它主要涉及分流和合流两种操作。分流通常通过侧输出流实现,有助于将数据流拆分成多个子流进行独立处理。合流则提供了多种算子,如union()、connect()和join(),根据实际需求合并不同数据流。

一,分流

        分流操作是指将一条数据流拆分为多个完全独立的数据流。基于一个DataStream,我们可以获得多个等价的子DataStream。为了实现这一过程,通常会定义特定的筛选条件,以确保符合特定标准的数据被正确地分配到相应的流中。通过这种方式,我们可以对数据进行更细致的处理和分析,同时确保每个子流中的数据都是独特的,避免重复。

                   

 1.实现分流

        根据条件筛选数据的需求确实可以通过多次独立调用filter()方法来实现。这种方法允许我们针对同一条数据流进行多次筛选,从而将数据拆分成多个子流。通过这种方式,我们可以根据不同的筛选条件对数据进行分类和分离,以满足不同的处理和分析需求。这种分流操作在数据处理中非常常见,它有助于提高数据处理的灵活性和效率。 

import org.apache.flink.api.scala._  
  
object SplitStreamExample {  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 定义数据源  
    val dataStream = env.fromElements(1, 2, 3, 4, 5)  
  
    // 定义筛选条件  
    val condition1: (Int) => Boolean = (x: Int) => x % 2 == 0  
    val condition2: (Int) => Boolean = (x: Int) => x > 3  
  
    // 对DataStream进行分流操作,得到两个子DataStream  
    val stream1 = dataStream.filter(condition1) // 筛选出偶数  
    val stream2 = dataStream.filter(condition2) // 筛选出大于3的数  
  
    // 输出结果到控制台  
    stream1.print() // 输出偶数到控制台  
    stream2.print() // 输出大于3的数到控制台  
  
    // 执行程序  
    env.execute("Split Stream Example")  
  }  
}

         在上面的代码中,我们首先导入了必要的库。然后,我们创建了一个执行环境,并定义了一个包含一些整数的DataStream。接下来,我们定义了两个筛选条件,分别为条件1和条件2。条件1用于筛选出偶数,条件2用于筛选出大于3的数。然后,我们使用filter()方法对DataStream进行分流操作,得到两个子DataStream,分别为stream1和stream2。最后,我们使用print()方法将两个子DataStream中的数据输出到控制台。最后,我们执行程序。

2.使用侧输出流 

        在Flink 1.13版本中,split()方法确实已被弃用,取而代之的是使用处理函数(process function)的侧输出流(side output)。这意味着您不再需要将数据流拆分为独立的流,而是可以通过侧输出流将数据发送到不同的目的地。

        为了将数据发送到侧输出流,您需要使用处理函数中的上下文(Context)对象的output()方法。这个方法允许您输出任意类型的数据,并将其发送到指定的侧输出流。

侧输出流的标记和提取都离不开一个“输出标签”(OutputTag)。这个标签相当于split()分流时的“戳”,用于指定侧输出流的id和类型。通过使用OutputTag,您可以轻松地标记和提取侧输出流中的数据,以便进一步处理或分析。

        总之,Flink 1.13版本通过引入处理函数的侧输出流,使得数据分流更加灵活和方便。使用OutputTag和上下文对象的output()方法,您可以轻松地将数据发送到不同的侧输出流,并根据需要对其进行处理或分析。

import org.apache.flink.api.scala._  
import org.apache.flink.util.OutputTag  
  
object SideOutputExample {  
  // 定义侧输出标签  
  val outputTag: OutputTag[Int] = OutputTag[Int]("side-output")  
  
  def main(args: Array[String]): Unit = {  
    // 创建执行环境  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
  
    // 定义数据源  
    val dataStream = env.fromElements(1, 2, 3, 4, 5)  
  
    // 定义处理函数  
    val processFunc = new ProcessFunction[Int, Int] {  
      override def processElement(value: Int,  
                                    ctx: ProcessFunction[Int, Int]#Context,  
                                    out: Collector[Int]): Unit = {  
        // 检查是否属于侧输出流  
        if (ctx.outputTag(outputTag).isOutputDropped(value)) {  
          // 如果是侧输出流的数据,则忽略输出  
          return  
        }  
        // 将数据正常输出到主输出流  
        out.collect(value)  
      }  
    }  
  
    // 将侧输出标签注册到执行环境  
    env.registerOutputTag(outputTag)  
  
    // 创建数据流并连接处理函数和侧输出流  
    val resultStream = dataStream.connect(outputTag) { (in, out) => processFunc }  
  
    // 输出结果到控制台  
    resultStream.print() // 输出主输出流到控制台  
  
    // 执行程序  
    env.execute("Side Output Example")  
  }  
}

        在上面的代码中,我们首先定义了一个侧输出标签outputTag,用于标记侧输出流。然后,我们创建了一个处理函数processFunc,它实现了ProcessFunction接口。在处理函数中,我们使用ctx.outputTag(outputTag).isOutputDropped()方法来检查每个数据是否属于侧输出流。如果是侧输出流的数据,我们将其忽略;否则,我们将数据正常输出到主输出流。最后,我们将侧输出标签注册到执行环境,并创建数据流resultStream,通过使用connect()方法将处理函数和侧输出流连接起来。最后,我们将结果流输出到控制台。执行程序后,主输出流的数据将被打印到控制台。 

 二,合流

        在数据处理中,将多条流进行合并是一个常见的需求。在实际应用中,我们经常遇到来自不同源的多条数据流,需要对它们进行联合处理。因此,Flink 中的合流操作更为普遍,对应的 API 也更加丰富。

1,联合  

        最简单的合流操作是直接将多条流合在一起,被称为“联合”(union)。在Flink中,我们可以使用union()算子来实现这一操作。联合操作要求参与合并的流中的数据类型必须相同,因为只有这样,Flink才能正确地识别和处理数据。

        当多条流进行联合操作后,会形成一个新的流,这个新流包含了所有参与合并的流中的元素,并且数据类型保持不变。这种合流方式非常直接和简单,就像公路上多个车道汇集成一个车道一样。通过联合操作,我们可以将多个数据流有效地整合在一起,以便进行更全面的处理和分析。

        需要注意的是,联合操作可能会导致数据重复,因为所有流中的元素都会包含在新流中。因此,在使用联合操作时,需要谨慎处理重复数据的问题。另外,根据具体的数据处理需求,可能还需要考虑其他合流策略和算子,例如使用connect()算子进行流之间的连接操作,或者使用join()算子进行基于键的流合并等。

                 

2,连接 

        在Flink中,连接(connect)是一种方便的合流操作。与联合(union)不同,连接操作允许两条流直接对接在一起。这意味着你可以在一条流中的每个元素上执行一些操作,然后将结果连接到另一条流中的相应元素上。

 

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

object FlinkConnectExample {
  def main(args: Array[String]): Unit = {
    // 创建Flink流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 创建第一条流
    val stream1 = env.fromElements("A", "B", "C")

    // 创建第二条流
    val stream2 = env.fromElements("X", "Y", "Z")

    // 使用连接操作将两条流对接在一起
    val connectedStream = stream1.connect(stream2)

    // 对连接后的流进行处理
    connectedStream
      .map(new MapFunction[(String, String), String] {
        override def map(value: (String, String)): String = {
          s"Connected: ${value._1} - ${value._2}"
        }
      })
      .print()

    // 启动Flink作业
    env.execute("Flink Connect Example")
  }
}

        在这个示例中,我们首先创建了两个数据流stream1stream2,然后使用connect()方法将它们连接在一起。接下来,我们使用map()操作对连接后的流进行处理,将每个元组的第一个元素和第二个元素连接起来,并打印输出结果。最后,我们通过调用execute()方法启动Flink作业。

三,基于时间的合流——双流联结 

        在处理两条流的合并时,我们往往不仅仅是将所有数据简单放在一起,而是希望能够根据某个字段的值将它们联结起来,进行更细致的处理。这种需求与关系型数据库中的表连接操作非常相似。在Flink中,我们可以通过connect()操作来实现类似于SQL中的join操作。通过在connect()操作中指定键进行分组后合并,我们可以将两条流根据某个字段的值进行联结,并进行相应的处理。

        除了connect()操作外,Flink的DataStream API还提供了两种内置的join()算子,用于基于时间的合流操作。这些算子使得我们能够更方便地实现基于时间的合流操作,而无需自定义触发逻辑和设置定时器。通过使用这些内置的join()算子,我们可以更高效地处理涉及多条相关数据流的应用场景。

        综上所述,Flink提供了多种合流操作的算子和功能,使得我们能够根据实际需求选择适合的合流策略和算子。通过灵活运用这些功能,我们可以充分利用Flink的强大处理能力,实现更高效、更灵活的数据流处理和分析。

 1,窗口联结

        在处理基于时间的操作时,时间窗口是最基本的操作之一。我们之前已经介绍了Window API的用法,主要用于在特定时间段内对单一数据流进行计算和处理。如果你希望将两条流的数据进行合并,并在特定时间段内进行统计和处理,你可以使用Flink提供的窗口联结(window join)算子。

        窗口联结算子允许你定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。通过这种方式,你可以对两条流的数据进行合并,并在指定的时间窗口内进行聚合、过滤、转换等操作。

        使用窗口联结算子,你可以根据实际需求选择不同的窗口策略,例如滚动窗口、滑动窗口或会话窗口等。你还可以根据时间或事件进行窗口触发,并使用Flink提供的各种函数对窗口内的数据进行处理和分析。

        总之,窗口联结算子为基于时间的合流操作提供了一种强大而灵活的工具,使得你可以在Flink中高效地处理涉及多条相关数据流的应用场景。通过合理使用窗口联结算子,你可以更好地满足实际应用中对时间相关数据处理的需求。

 1.1 窗口联结的调用

        窗口联结在代码中的实现首先需要调用DataStream的join()方法来合并两条流,得到一个JoinedStreams对象。然后,通过where()equalTo()方法指定两条流中用于联结的键。接下来,使用window()方法来定义窗口,并根据实际需求选择窗口策略,如滚动窗口、滑动窗口或会话窗口等。最后,调用apply()方法并传入一个联结窗口函数来进行处理计算。

1.2 窗口联结的处理流程  

        在Flink的窗口联结操作中,JoinFunction是一个重要的函数类型,用于定义如何将两条流中的数据进行联结匹配。JoinFunction有两个参数,分别代表了来自两条流中匹配的数据。

        在窗口中,每成功匹配一对数据,JoinFunctionjoin()方法就会被调用一次。通过在join()方法中定义相应的逻辑,你可以对匹配的数据进行处理和计算,并输出一个结果。

除了JoinFunction,你还可以在apply()方法中传入FlatJoinFunctionFlatJoinFunction的用法与JoinFunction非常类似,主要区别在于其内部实现的join()方法没有返回值。这意味着对于每对匹配的数据,你可以通过FlatJoinFunction输出任意数量的结果,这些结果将被收集并处理。

        在Flink中,结果的输出是通过收集器(Collector)来实现的。通过将收集器作为参数传递给窗口函数,你可以将处理后的结果发送到外部系统或进行进一步的处理。

        通过合理使用JoinFunctionFlatJoinFunction,你可以在Flink的窗口联结操作中实现对两条流数据的匹配和处理,并根据实际需求输出相应的结果。

2,间隔联结

        Flink 提供了一种称为“间隔联结”(interval join)的合流操作。这种联结操作的核心思想是针对一条数据流中的每个数据,根据其时间戳确定一个时间间隔,然后查看另一条数据流中是否存在匹配的数据。间隔联结的主要目的是找到两条数据流中在特定时间范围内相关联的数据。

        在Flink中实现间隔联结操作,需要使用IntervalJoin类。通过将两条数据流作为输入,并指定时间间隔的范围,你可以在时间窗口内找到匹配的数据对。

        在处理间隔联结时,你需要考虑时间窗口的配置,以确保正确的时间范围被用于匹配操作。此外,你还需要根据实际需求选择合适的匹配条件和数据处理逻辑。

        通过合理配置间隔联结操作,你可以有效地在Flink中处理涉及时间相关性的数据流,并找到在特定时间范围内的关联数据。这有助于提高数据处理效率和准确性,为进一步的分析和决策提供有价值的信息。

2.1 间隔联结的原理

        间隔联结是一种特殊的联结操作,其核心思想是根据指定的时间间隔来匹配两条数据流中的数据。具体来说,给定两个时间点,分别称为“下界”和“上界”,对于一条数据流中的每个数据元素,可以开辟一个时间间隔,即以该数据元素的时间戳为中心,下至下界点、上至上界点的一个闭区间。这个区间被认为是可匹配另一条流数据的“窗口”范围。

        匹配的条件是,另一条流中的数据元素的时间戳必须落在该区间范围内,才能成功配对并进入计算和输出结果。需要注意的是,进行间隔联结的两条流必须基于相同的键,下界应小于等于上界,且两者都可以是正数或负数。此外,间隔联结目前仅支持事件时间语义。

        通过合理配置和使用间隔联结,可以在Flink中高效地处理涉及时间相关性的数据流,找到在特定时间范围内的关联数据,并进一步进行复杂的数据分析和处理。

        流A与流B进行间隔联结。基于流A中的每个数据元素,我们可以确定一个时间间隔。在此示例中,下界设置为-2毫秒,上界设置为1毫秒。

        对于流A中时间戳为2的元素,其可匹配的时间间隔为[0, 3]。在流B中,时间戳为0和1的两个元素落在这个区间内,因此它们与流A中的元素(2, 0)和(2, 1)匹配。同样地,流A中时间戳为3的元素的可匹配区间为[1, 4],而流B中只有时间戳为1的元素与之匹配,得到匹配数据对(3, 1)。

        值得注意的是,间隔联结是一种内连接(inner join),这意味着只有匹配的数据对才会被包括在结果中。与窗口联结不同,间隔联结的时间段是基于流中数据的,因此是不确定的。此外,流B中的数据可以在多个区间内被匹配,这意味着它可以与流A中的多个元素相匹配。

        通过合理配置和使用间隔联结,我们可以有效地处理涉及时间相关性的数据流,并找到在特定时间范围内的关联数据。这有助于提高数据处理效率和准确性,为进一步的分析和决策提供有价值的信息。

2.2 间隔联结的调用

        在代码中实现间隔联结操作,通常基于KeyedStream进行联结(join)操作。在DataStream经过keyBy()方法得到KeyedStream之后,可以调用intervalJoin()方法来合并两条流。传入的参数也是一个KeyedStream,且两者的key类型应该一致。intervalJoin()方法返回一个IntervalJoin类型,后续的操作顺序是固定的。

        首先,通过between()方法指定间隔的上下界,然后调用process()方法来定义对匹配数据对的处理操作。process()方法需要传入一个ProcessJoinFunction,它是处理函数家族中的一员,专门用于处理联结操作。

        通过合理配置和使用间隔联结操作,可以在代码中高效地处理涉及时间相关性的数据流,并找到在特定时间范围内的关联数据。这有助于提高数据处理效率和准确性,为进一步的分析和决策提供有价值的信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1408664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线程、进程、多线程

什么是线程? 当我们用bilibili看电影,我们会看到画面、听到声音、看到弹幕,这三个功能每一个就是一个线程 什么是进程? 当我们打开bilibili软件或者网站,就算什么都不干,计算机就已经形成了一个进程 学术…

网络组件、设备和关系网络图【推荐】

目录 网络上的设备: 设备和台式计算机: 防火墙: 服务器: 集线器和交换机: 路由器: 调制解调器和无线接入点调制解调器: 无线接入点: 网络架构(有时称为网络设计&…

[UI5 常用控件] 02.Title,Link,Label

文章目录 前言1. Title1.1 结合Panel1.2 结合Table1.3 Title里嵌套Link 2. Link3. Label3.1 普通用法3.2 在Form里使用 前言 本章节记录常用控件Title,Link,Label。 其路径分别是: sap.m.Titlesap.m.Linksap.m.Label 1. Title Title可以结合其他控件一起使用 1.…

2. figure 常见属性

2. figure 常见属性 一 figsize二 dpi三 facecolor四 edgecolor五 frameon 数据可视化是数据分析中不可或缺的一环,而Matplotlib作为Python中最流行的绘图库之一,扮演着重要的角色。在Matplotlib中,matplotlib.figure.Figure对象是构建图形的…

Spring5系列学习文章分享---第四篇(JdbcTemplate+概念配置+增删改查数据+批量操作 )

目录 JdbcTemplateJdbcTemplate(概念和准备)JdbcTemplate 操作数据库(新增update)JdbcTemplate 操作数据库(修改和删除update)JdbcTemplate 操作数据库(查询返回某个值queryForObject&#xff0…

深入浅出AI落地应用分析:AI视频生成Top 5应用

接下俩会每周集中体验一些通用或者垂直的AI落地应用,主要以一些全球或者国外国内排行较前的产品为研究对象,「AI 产品榜: aicpb.com」以专题的方式在博客进行分享。 一、Loom 二、Runway 产品链接:https://app.runwayml.com/ …

防御实验:(部分)

步骤一:了解前提: 1.1 题目要求: 需求一:DMZ区存在两台服务器,现在要求生产区的设备仅能在办公时间(9:00 - 18:00)访问,办公区的设备全天都可以访问。 需求二…

记录centos安装nginx过程和问题

今天在centos上安装了nginx,遇到了些问题,记录一下。 使用yum直接安装的话安装的版本是1.20.1,使用源码包安装可以装到1.25.0(最新稳定版)。很有意思的一点是两种安装方法下安装的路径是不同的,且源码安装…

ASP.NET Core基础之用扩展方法封装服务配置

阅读本文你的收获 了解C#中的扩展方法机制学会在ASP.NET Core 中,用扩展方法封装服务配置,使得代码更加简洁 一、什么是扩展方法 扩展方法使能够向现有类型添加方法,而无需创建新的派生类型、重新编译或以其他方式修改原始类型。 扩展方法…

Go实现LRU算法

LRU是什么? LRU是内存淘汰策略,LRU (Least recently used:最近最少使用)算法在缓存写满的时候,会根据所有数据的访问记录,淘汰掉未来被访问几率最低的数据。也就是说该算法认为,最近…

惠普战66笔记本进PE系统无硬盘解决方法

1 问题描述 针对惠普战66笔记本,在使用优启通进行系统重装时,当进人 PE 系统后,看不到笔记本自带的固态硬盘,因而无法将系统重装到笔记本中。 现在,介绍一种方法,各位读者可以尝试,博主已经尝试…

自动化Web页面性能测试介绍

随着越来越多的用户使用移动设备访问 Web 应用,使得 Web 应用需要支持一些性能并不是很好的移动设备。为了度量和测试 Web 应用是不是在高复杂度的情况下,页面性能能满足用户的需求。 同时,随着 Web 应用的空前发展,前端业务逐渐…

伸向Markdown的黑手,知名博客平台曝出LFI漏洞

如果你至今依然在坚持写博客,在知乎或其他自媒体平台上发表文章,那你应该对Markdown很熟悉了。这是一种轻量级标记语言,借此可以用纯文本格式编写文档,并用简单的标记设置文档格式,随后即可轻松转换为具备精美排版的内…

红外热成像 ~ 基于matlab的非均匀校正code

红外芯片由于工艺问题存在严重的分均匀性,所以非均匀矫正一直是影响红外图像质量的第一因素。分均匀矫正的算法也是红外图像处理研究的重点区域,建立了一些矫正的方式方法。其中最常用最简单的就应该算是两点温度定标算法。 应用两点法校正有两个前提条…

openresty 安装, nginx与 openresty

openresty VS nginx Nginx 是一款高性能的 Web 服务器和反向代理服务器,具备基础的功能如HTTP服务、负载均衡、反向代理以及动静分离等。它是许多互联网应用的核心组件,因其模块化和可扩展的设计而受到欢迎。1 OpenResty 是基于 Nginx 的 Web 平台&…

Spring如何使用自定义注解来实现自动管理事务?

人可以做他(她)想做的,但不能要他(她)想要的 一个目录 前言业务代码展示手动挡自动挡事务失效的问题代码地址 前言 在两年半以前,我写了一篇博客:框架的灵魂之注解基础篇: 在那篇博客的结尾,我埋了一个坑&#xff1a…

写点东西《检查和更新NPM包》

写点东西《检查和更新NPM包》 检查和更新 NPM 包 TL;DR; 用于检查和更新软件包的 NPM 命令# [](#npm-outdated)npm outdatednpm updatenpm update --save-dev --savenpm update -g npm-check-updates 检查和更新软件包的命令npm install -g npm-check-updatesnpx np…

SQL 系列教程(二)

目录 SQL DELETE 语句 DELETE 语句 演示数据库 DELETE 实例 删除所有行 SQL TOP, LIMIT, ROWNUM 子句 TOP 子句 演示数据库 SQL TOP、LIMIT 和 ROWNUM 示例 SQL TOP PERCENT 实例 添加WHERE子句 SQL MIN() 和 MAX() 函数 MIN() 和 MAX() 函数 演示数据库 MIN() …

ASUS华硕无畏Pro15笔记本电脑(M6500QB,M6500QH)工厂模式原厂OEM预装Windows11.22H2系统 含Recovery恢复

原装出厂Windows11系统适用于华硕无畏15笔记本电脑型号:M6500QB和M6500QH 链接:https://pan.baidu.com/s/1AVGLN6-ILIRogOMj48Mk1w?pwdmi7d 提取码:mi7d 带有ASUS RECOVERY恢复功能、自带所有驱动、出厂主题专用壁纸、系统属性联机支持…

千兆以太网测试仪可以做什么

网络性能测试仪是一种用于测量和监测网络质量的工具。它可以帮助用户评估网络的性能,包括带宽、延迟、丢包率等指标,并及时发现网络故障,以保证网络的高效运行。网络性能测试仪可以应用于多个领域,如网络运营商、企业网络、数据中…