Flink第六章:多流操作

news2024/11/24 3:54:37

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作


文章目录

  • 系列文章目录
  • 前言
  • 一、分流
    • 1.侧输出流(process function)
  • 二、合流
    • 1. 联合(Union)
    • 2. 连接(Connect)
    • 3.实时对账案例
    • 4.窗口联结(Window Join)
    • 5.间隔联结(Interval Join)
    • 6.窗口同组联结(Window CoGroup)
  • 总结


前言

之前我们进行的都是Flink的单流操作,接下来我们我们进行Flink的多流操作.
创建scala


一、分流

1.侧输出流(process function)

我们根据不同的点击用户进行输出

SplitStreamTest.scala

package com.atguigu.chapter05

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object SplitStreamTest {
  // 定义输出标签
  val maryTag = OutputTag[(String, String, Long)]("mary-tag")
  val bobTag = OutputTag[(String, String, Long)]("bob-tag")


  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
    
    val elseStream: DataStream[Event] = stream.process(new ProcessFunction[Event, Event] {
      override def processElement(value: Event, ctx: ProcessFunction[Event, Event]#Context, out: Collector[Event]): Unit = {
        if (value.user == "Mary") {
          ctx.output(maryTag, (value.user, value.url, value.timestamp))
        } else if (value.user == "Bob") {
          ctx.output(bobTag, (value.user, value.url, value.timestamp))
        } else {
          out.collect(value)
        }
      }
    })

    elseStream.print("else")
    elseStream.getSideOutput(maryTag).print("mary")
    elseStream.getSideOutput(bobTag).print("bob")


    env.execute()
  }

}

在这里插入图片描述

二、合流

1. 联合(Union)

UnionTest.scala

package com.atguigu.chapter05

import com.atguigu.chapter02.Source.Event
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object UnionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 读取两条流进行合并
    val stream1: DataStream[Event] = env.socketTextStream("127.0.0.1", 7777).map(
      data => {
        val fields: Array[String] = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      }
    ).assignAscendingTimestamps(_.timestamp)

    val stream2: DataStream[Event] = env.socketTextStream("127.0.0.1", 8888).map(
      data => {
        val fields: Array[String] = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      }
    ).assignAscendingTimestamps(_.timestamp)

    stream1.union(stream2)
      .process(new ProcessFunction[Event,String] {
        override def processElement(value: Event, ctx: ProcessFunction[Event, String]#Context, out: Collector[String]): Unit = {
          out.collect(s"当前水位线: ${ctx.timerService().currentWatermark()}+${value}")
        }
      }).print()

    env.execute()

  }
}

在这里插入图片描述
我们发出数据的顺序是
7777
Mary,./home,1000
Mary,./home,2000
8888
Mary,./home,500
Mary,./home,1000
可以看到,当只有一条流有数据是,水位线不会推进,因为合流后的水位线默认用最晚的,当第二条流的第二条数据到达,水位线推到500-1,因为程序是先打印后推进水位线.

2. 连接(Connect)

ConnectTest.scala

package com.atguigu.chapter05

import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala._

object ConnectTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //定义两条整数流
    val stream1: DataStream[Int] = env.fromElements(1, 2, 3)
    val stream2: DataStream[Long] = env.fromElements(1L, 2L, 3L)

    // 连接两条流
    stream1.connect(stream2)
      .map(new CoMapFunction[Int,Long,String] {
        override def map1(value: Int): String = s"Int:$value"

        override def map2(value: Long): String = s"Long:$value"
      })
      .print()
    env.execute()

  }

}

在这里插入图片描述

3.实时对账案例

现在有两条流,我们用Key分组,两条流中都含有同样的Key,对账成功,否则对账失败.
BillCheckExample.scala

package com.atguigu.chapter05

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object BillCheckExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.来自app的支付日志流,(order-id,status,timestamp)
    val appStream: DataStream[(String, String, Long)] = env.fromElements(
      ("order-1", "success", 1000L),
      ("order-2", "success", 2000L)
    )
      .assignAscendingTimestamps(_._3)

    // 2.来自第三方支付平台的日志流,(order-id,status,timestamp)
    val thirdpartStream: DataStream[(String, String, String, Long)] = env.fromElements(
      ("order-1", "success", "wechat", 3000L),
      ("order-3", "success", "alipy", 4000L)
    )
      .assignAscendingTimestamps(_._4)

    appStream.connect(thirdpartStream)
      .keyBy(_._1,_._1)
      .process(new CoProcessFunction[(String,String,Long),(String, String, String, Long),String] {
        // 定义状态变量,用来保存一节到达的事件
        var appEvent : ValueState[(String,String,Long)]= _
        var thirdpartyEvent :ValueState[(String,String,String,Long)]= _

        override def open(parameters: Configuration): Unit = {
          appEvent = getRuntimeContext.getState(new ValueStateDescriptor[(String, String, Long)]("app-event",classOf[(String,String,Long)]))
          thirdpartyEvent=getRuntimeContext.getState(new ValueStateDescriptor[(String, String, String, Long)]("thirdparty-event",classOf[(String,String,String,Long)]))
        }


        override def processElement1(value: (String, String, Long), ctx: CoProcessFunction[(String, String, Long), (String, String, String, Long), String]#Context, out: Collector[String]): Unit = {

          if (thirdpartyEvent.value()!=null){
            out.collect(s"${value._1}对账成功")
            // 清空状态
            thirdpartyEvent.clear()
          }else{
            // 如果另一条流中事件没有到达就注册定时器,开始等待
            ctx.timerService().registerEventTimeTimer(value._3+5000)
            //保存当前时间到对应的状态
            appEvent.update(value)
          }
        }

        override def processElement2(value: (String, String, String, Long), ctx: CoProcessFunction[(String, String, Long), (String, String, String, Long), String]#Context, out: Collector[String]): Unit = {
          if (appEvent.value()!=null){
            out.collect(s"${value._1}对账成功")
            // 清空状态
            appEvent.clear()
          }else{
            // 如果另一条流中事件没有到达就注册定时器,开始等待
            ctx.timerService().registerEventTimeTimer(value._4+5000)
            //保存当前时间到对应的状态
            thirdpartyEvent.update(value)
          }
        }

        override def onTimer(timestamp: Long, ctx: CoProcessFunction[(String, String, Long), (String, String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {
          // 判断状态是否为空,如果不为空,说明另一条流对应的事件没来
          if (appEvent.value()!=null){
            out.collect(s"${appEvent.value()._1}对账失败,第三方平台支付事件未到")
          }

          if (thirdpartyEvent.value()!=null){
            out.collect(s"${thirdpartyEvent.value()._1}对账失败,app支付事件未到")
          }
          appEvent.clear()
          thirdpartyEvent.clear()
        }
      }).print()
    env.execute()
  }
}

在这里插入图片描述

4.窗口联结(Window Join)

将两条流中,同一个窗口内的数据连接
WindowJoinTest.scala

package com.atguigu.chapter05

import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object WindowJoinTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream1: DataStream[(String, Long)] = env.fromElements(
      ("a", 1000L),
      ("b", 1000L),
      ("a", 2000L),
      ("a", 2000L)
    ).assignAscendingTimestamps(_._2)
    val stream2: DataStream[(String, Long)] = env.fromElements(
      ("a", 3000L),
      ("b", 3000L),
      ("a", 4000L),
      ("b", 4000L)
    ).assignAscendingTimestamps(_._2)

    // 窗口连接操作
    stream1.join(stream2)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .apply( new JoinFunction[(String,Long),(String,Long),String] {
        override def join(in1: (String, Long), in2: (String, Long)): String = {
          in1+"->"+in2
        }
      })
      .print()
    env.execute()
  }
}

在这里插入图片描述

5.间隔联结(Interval Join)

以一条流里的某一个事件的时间戳为原点,匹配另一条流中,同一时间戳的前后一段时内是否有对应的数据.
IntervalJoinTest.scala

package com.atguigu.chapter05

import com.atguigu.chapter02.Source.Event
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object IntervalJoinTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 订单事件流
    val orderStream: DataStream[(String, String, Long)] = env
      .fromElements(
        ("Mary", "order-1", 5000L),
        ("Alice", "order-2", 5000L),
        ("Bob", "order-3", 20000L),
        ("Alice", "order-4", 20000L),
        ("Cary", "order-5", 51000L)
      ).assignAscendingTimestamps(_._3)

    // 点击事件流
    val pvStream: DataStream[Event] = env
      .fromElements(
        Event("Bob", "./cart", 2000L),
        Event("Alice", "./prod?id=100", 3000L),
        Event("Alice", "./prod?id=200", 3500L),
        Event("Bob", "./prod?id=2", 2500L),
        Event("Alice", "./prod?id=300", 36000L),
        Event("Bob", "./home", 30000L),
        Event("Bob", "./prod?id=1", 23000L),
        Event("Bob", "./prod?id=3", 33000L)
      ).assignAscendingTimestamps(_.timestamp)

    // 两条流进行间隔连接,匹配一个订单事件前后一段时间内发生的点击事件
    orderStream.keyBy(_._1)
      .intervalJoin(pvStream.keyBy(_.user))
      .between(Time.seconds(-5),Time.seconds(10))
      .process(new ProcessJoinFunction[(String, String, Long),Event,String] {
        override def processElement(left: (String, String, Long), right: Event, ctx: ProcessJoinFunction[(String, String, Long), Event, String]#Context, out: Collector[String]): Unit = {
          out.collect(left+"=>"+right)
        }
      })
      .print()

    env.execute()

  }
}

在这里插入图片描述

6.窗口同组联结(Window CoGroup)

和窗口连接很象,区别是,CoGroup以整个窗口为一组进行连接
CoGroupTest.scala

package com.atguigu.chapter05

import org.apache.flink.api.common.functions.{CoGroupFunction, JoinFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import java.lang

object CoGroupTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream1: DataStream[(String, Long)] = env.fromElements(
      ("a", 1000L),
      ("b", 1000L),
      ("a", 2000L),
      ("a", 2000L)
    ).assignAscendingTimestamps(_._2)
    val stream2: DataStream[(String, Long)] = env.fromElements(
      ("a", 3000L),
      ("b", 3000L),
      ("a", 4000L),
      ("b", 4000L)
    ).assignAscendingTimestamps(_._2)

    // 窗口同组连接操作
    stream1.coGroup(stream2)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .apply(new CoGroupFunction[(String, Long),(String, Long),String] {
        override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {
          collector.collect(iterable+"=>"+iterable1)
        }
      })
      .print()

    env.execute()

  }

}

在这里插入图片描述


总结

多流操作基本就这些了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/555586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CVE-2018-2894WebLogic未授权任意文件上传

CVE-2018-2894WebLogic未授权任意文件上传 这个洞的限制就比较多了 限制版本 Oracle WebLogic Server版本 10.3.6.0 12.1.3.0 12.2.1.2 12.2.1.3 限制配置 该漏洞的影响模块为web服务测试页,在默认情况下不启用。 /ws_utc/config.do /ws_utc/begin.do 默认情况下不…

在职字节6年,一个29岁女软件测试工程师的心声

简单的先说一下,坐标杭州,14届本科毕业,算上年前在字节跳动的面试,一共有面试了有6家公司(因为不想请假,因此只是每个晚上去其他公司面试,所以面试的公司比较少) 其中成功的有4家&a…

Linux防火墙----firewalld

文章目录 一、firewalld概述二、firewalld 与 iptables 的区别三、firewalld 区域的概念四、firewalld数据处理流程五、firewalld防火墙的配置方法5.1 使用firewall-config 图形工具5.2 编写/etc/firewalld/中的配置文件5.3使用firewall-cmd 命令行工具 一、firewalld概述 fir…

AI:帮助你更好地发声!

正文共 978 字,阅读大约需要 3 分钟 公务员必备技巧,您将在3分钟后获得以下超能力: 快速生成倡议书 Beezy评级 :B级 *经过简单的寻找, 大部分人能立刻掌握。主要节省时间。 推荐人 | Kim 编辑者 | Linda ●图片由Lex…

当你知道前后端分离与不分离的6个特点,你就不该再当点工了

Web 应用的开发主要有两种模式: 前后端不分离 前后端分离 理解它们的区别有助于我们进行对应产品的测试工作。 前后端不分离 在早期,Web 应用开发主要采用前后端不分离的方式,它是以后端直接渲染模板完成响应为主的一种开发模式。以前后端不…

linux存储技术学习资料

参考 https://www.cnblogs.com/pengdonglin137/p/16525428.html Linux I/O栈 Linux内核的I/O栈大图知乎Linux I/O专栏1Linux 块设备之Block Layer层架构演变Linux VFS机制简析(一)Linux VFS机制简析(二)Linux Kernel文件系统写I…

keycloak入门

realm:领域,指的是在某一个软件业务领域中所涉及的用户认证授权管理相关的对象,在这个realm中有用户、角色、会话session等等用于认证授权管理的对象。 假设一个公司A使用一个erp系统,那么就可以给这个公司A设置一个realm&#xf…

微信小程序nodejs+vue高校食堂餐厅点餐订餐系统ja221

本文以实际运用为开发背景,运用软件工程原理和开发方法,它主要是采用 语言 node.js 框架:Express 前端:Vue.js 数据库:mysql 数据库工具:Navicat 开发软件:VScode 前端vueelementui, (1) vue引入elementu…

NFC入门介绍

缩写词 NFCNear Field Communication近场通信OEMOriginal Equipment Manufacturer原始设备制造商HWHardware硬件OMAPIOpen Mobile Application Programming Interface开发移动应用程序编程接口eSEEmbedded Secure Element嵌入式安全元件SEMSSecure Element Management Service…

5月22日比特币披萨日,今天你吃披萨了吗?

比特币披萨日 1. Laszlo Hanyecz2. 最贵披萨诞生记3. 梭哈买披萨4. 未完待续 2010年5月22日,美国佛罗里达州的程序员Laszlo Hanyecz(拉兹洛哈涅克斯)用10000个比特币购买了棒约翰(Papa Johns)比萨店一个价值25美元的奶…

Three.js--》实现3d水晶小熊模型搭建

目录 项目搭建 初始化three.js基础代码 加载背景纹理 加载小熊模型 今天简单实现一个three.js的小Demo,加强自己对three知识的掌握与学习,只有在项目中才能灵活将所学知识运用起来,话不多说直接开始。 项目搭建 本案例还是借助框架书写…

vTESTstudio概述

vTESTstudio支持的测试用例编写方式 项目层级结构 从用例编写到测试执行及生成报告的整个流程 vTESTsutido 开发,CANoe执行测试 界面简介 CANoe 创建的测试用例用Test Modules执行,vTESTstudio 创建的测试用例用Test Units执行 先在vTESTstudio里创建pr…

Quard Bayer(COMS SENSOR)

手机越做越紧凑需要模组和芯片尺寸越做越小,在尺寸一定的基础上,高像素和大像素,对于手机摄像头来说,一直是一对矛盾的存在。然而,高像素所带来的高分辨率画质,和大像素带给暗态高感度低噪声的画质&#xf…

Idea使用详解

01.idea简介 (1)idea介绍 IDEA 全称IntelliJ IDEA,是用于java语言开发的集成环境(也可用于其他语言),IntelliJ在业界被公认为最好的java开发工具之一,尤其在智能代码助手、代码自动提示、重构、…

鸿蒙Hi3861学习十七-Huawei LiteOS-M(MQTT)

一、简介 有关MQTT的相关概念介绍,请看之前的文章,这里不做过多的介绍:MQTT学习总结_t_guest的博客-CSDN博客 本章节需要使用如下软件: Mosquitto(MQTT消息代理工具) Eclipsse paho MQTT工具 二、操作说明…

十款优质企业级Java微服务开源项目(开源框架,用于学习、毕设、公司项目、私活等,减少开发工作,让您只关注业务!)

Java微服务开源项目 前言一、pig二、zheng三、SpringBlade四、SOP五、matecloud六、mall七、jeecg-boot八、Cloud-Platform九、microservices-platform十、RuoYi-Cloud 前言 这篇文章为大家推荐几款优质的 Java 开源项目框架,可以用于学习,毕业设计&…

【腾讯云FinOps Crane 集训营】 Crane入门

前言 随着云计算的快速发展和云原生应用的兴起,容器技术成为了现代化应用部署和管理的重要工具。 越来越多的公司正在选择将应用运行在云上或者自建的 Kubernetes 集群上,但是许多机构的调研 发现,绝大多数的用户集群资源利用率并不高&…

STM8 使用74HC164外扩IO

背景 在嵌入式开发过程中,经常使用时、甚至设计时候,考虑成本等因素,需要外扩IO。这里就是使用STM8S003F3P6,这个芯片比较常用的,这个芯片封装很小,只有20个管脚的MCU,实际产品上用的非常多。 …

二进制部署高可用k8s集群

第一章、前置知识点 1.1 生产环境部署K8S集群的两种方式 kubeadm Kubeadm是一个K8S部署工具,提供kubeadm init 和 kubeadm join,用于快速部署Kubernetes集群。 二进制包 从GitHub下载发行版的二进制包,手动部署每个组件,组成…

Idea部署Tomcat项目位置问题

Tomcat部署路径问题 1 默认情况 Tomcat安装目录下有webapps,是部署项目的,项目就运行在那里。 但是IDEA会为每个项目都拷贝一份足够的Tomcat文件放在c盘 ${user.home}/.IntelliJIdea/system/tomcat 或者 ${user.home/AppData/Local/JetBrains/IntelliJIdea2021.1/tomcat这样做…