Flink第五章:处理函数

news2025/1/11 7:50:07

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数


文章目录

  • 系列文章目录
  • 前言
  • 一、基本处理函数(ProcessFunction)
  • 二、按键分区处理函数(KeyedProcessFunction)
    • 1.处理时间定时服务
    • 2.事件时间定时服务
  • 三、TopN案例
    • 1.ProcessAllWindowFunction
    • 2.KeyedProcessFunction
  • 总结


前言

处理函数
简单来时就是比DataStream API更加底层的函数,能够处理更加复杂的问题
创建scala文件
在这里插入图片描述


一、基本处理函数(ProcessFunction)

我们用它来实现一个简单的Map操作,如果点击用户是Marry就输出用户名,是Alice就输出用户名+url

ProcessFunction.scala

package com.atguigu.chapter04

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    stream.process(new ProcessFunction[Event,String] {
      override def processElement(value: Event, ctx: ProcessFunction[Event, String]#Context, out: Collector[String]): Unit = {
        if (value.user.equals("Mary"))
          out.collect(value.user)
        else if (value.user.equals("Alice")){
          out.collect(value.user+value.url)
        }
      }
    }).print()

    env.execute()

  }
}

在这里插入图片描述

二、按键分区处理函数(KeyedProcessFunction)

在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。

1.处理时间定时服务

主要是在数据到达一段时间后进行数据操作

ProcessingTimeTimerTest.scala

package com.atguigu.chapter04

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessingTimeTimerTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    stream.keyBy(data=>true)
      .process(new KeyedProcessFunction[Boolean,Event,String] {
        override def processElement(value: Event, ctx: KeyedProcessFunction[Boolean, Event, String]#Context, out: Collector[String]): Unit = {
          val currenTime: Long = ctx.timerService().currentProcessingTime()
          out.collect("数据到达,当前时间是:"+currenTime)

          //注册一个5秒之后的定时器
          ctx.timerService().registerProcessingTimeTimer(currenTime+5*1000)
        }

        //执行逻辑
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
          out.collect("定时器触发,触发时间为:"+timestamp)
        }
      }).print()
    env.execute()
  }
}

在这里插入图片描述

2.事件时间定时服务

在数据产生一段时间后进行处理
EventTimeTimeTest.scala

package com.atguigu.chapter04

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object EventTimeTimeTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new CustomSource)
      .assignAscendingTimestamps(_.timestamp)

    stream.keyBy(data=>true)
      .process(new KeyedProcessFunction[Boolean,Event,String] {
        override def processElement(value: Event, ctx: KeyedProcessFunction[Boolean, Event, String]#Context, out: Collector[String]): Unit = {
          val currenTime: Long = ctx.timerService().currentWatermark()
          out.collect(s"数据到达,当前时间是: $currenTime,当前数据时间戳是:${value.timestamp}")

          //注册一个5秒之后的定时器
          ctx.timerService().registerEventTimeTimer(ctx.timestamp()+5*1000)
        }

        //执行逻辑
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
          out.collect("定时器触发,触发时间为:"+timestamp)
        }
      }).print()
    env.execute()
  }


  class CustomSource extends SourceFunction[Event]{
    override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
      ctx.collect(Event("Mary","./home",1000L))

      Thread.sleep(5000)

      ctx.collect(Event("Mary","./home",2000L))

      Thread.sleep(5000)
      ctx.collect(Event("Mary","./home",6000L))
      Thread.sleep(5000)

      ctx.collect(Event("Mary","./home",6001L))
      Thread.sleep(5000)
    }

    override def cancel(): Unit = ???
  }
}

在这里插入图片描述

三、TopN案例

1.ProcessAllWindowFunction

TopNProcessAllWindowExample.scala

package com.atguigu.chapter04

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable

object TopNProcessAllWindowExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    // 直接开窗统计
    stream.map(_.url)
      .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          // 1.统计每个url的访问次数
          // 初始化Map (url,count)
          val urlCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()
          elements.foreach(
            data => urlCountMap.get(data) match {
              case Some(count) => urlCountMap.put(data, count + 1)
              case None => urlCountMap.put(data, 1)
            }
          )

          //2.对数据进行排序提取
          val urlCountList: List[(String, Long)] = urlCountMap.toList.sortBy(-_._2).take(2)

          //3.包装信息打印输出
          val result = new mutable.StringBuilder()
          result.append(s"=========窗口: ${context.window.getStart} - ${context.window.getEnd}=======\n")

          for (i <- urlCountList.indices){
            val tuple: (String, Long) = urlCountList(i)
            result.append(s"浏览量TopN ${i+1}")
              .append(s"url: ${tuple._1} ")
              .append(s"浏览量是: ${tuple._2} \n")
          }
          out.collect(result.toString())
        }
      }).print()
    env.execute()
  }
}

在这里插入图片描述

2.KeyedProcessFunction

TopNkeyProcessFunctionExample.scala

package com.atguigu.chapter04

import com.atguigu.chapter02.Source.{ClickSource, Event}
import com.atguigu.chapter03.UrlViewCount
import com.atguigu.chapter03.UrlViewCountExample.{UrlViewCountAgg, UrlViewCountResult}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
import scala.collection.mutable

object TopNkeyProcessFunctionExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    // 1.结合使用增量聚合函数和全窗口函数,统计每个url的访问频次

    val urlCountStream: DataStream[UrlViewCount] = stream.keyBy(_.url)
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)

    // 2.按照窗口信息进行分组提取,排序输出
    val resultStream: DataStream[String] = urlCountStream.keyBy(_.windowEnd)
      .process(new TopN(2))

    resultStream.print()

    env.execute()

  }

  class TopN(n: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {
    // 声明列表状态
    var urlViewCountListState: ListState[UrlViewCount] = _


    override def open(parameters: Configuration): Unit = {
      urlViewCountListState = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("list-state", classOf[UrlViewCount]))
    }

    override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
      //每来一个数据,就直接放入ListState中
      urlViewCountListState.add(value)
      //注册一个窗口结束时间1ms之后的定时器
      ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      // 先把数据提取出来放到List里
      val urlViewCountList: List[UrlViewCount] = urlViewCountListState.get().toList
      val topnList: List[UrlViewCount] = urlViewCountList.sortBy(-_.count).take(n)

      //结果包装输出
      val result = new mutable.StringBuilder()
      result.append(s"=========窗口: ${timestamp - 1 - 10000} - ${timestamp - 1}=======\n")

      for (i <- topnList.indices) {
        val urlViewCount = topnList(i)
        result.append(s"浏览量Top ${i + 1} ")
          .append(s"url: ${urlViewCount.url} ")
          .append(s"浏览量是: ${urlViewCount.count} \n")
      }
      out.collect(result.toString())
    }
  }
}

在这里插入图片描述


总结

有关Flink底层处理函数的Api就到这里.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/548675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux进阶之路】基本权限的理解

文章目录 一.用户1.分类2.su3.su-4.sudo 二.文件1.文件分类2.文件权限3.文件权限的身份4.chmod——改写文件权限第一种方式第二种方式 5.chown——改写文件拥有者身份6.chgrp ——改写文件所属组身份7.umask ——设置权限掩码8.目录权限9.粘滞位——特殊的可执行权限 一.用户 …

关于Markdown文件的处理【笔记】

关于Markdown文件的处理【笔记】 前言推荐关于Markdown文件的处理一、md文件转word文档1 准备2 打开3 转为word文档4 导出结果5 打开 二、word文档转md文件1 准备2 导入3 打开4 显示图片5 打开 三、导入到CSDN中1 选择导入2 查看 四、导入设置1 前言2 导入设置3 修改配置 最后 …

ES6之生成器

文章目录 前言一、生成器是什么&#xff1f;二、生成器总结 前言 生成器 一、生成器是什么&#xff1f; 生成器就是一个特殊的函数&#xff0c;实现异步编程。格式function *名称(){...} (这个*靠近function写&#xff0c;靠近名称写&#xff0c;或者两边空格都不靠近均正确)…

[比赛简介]Parkinson‘s Freezing of Gait Prediction

比赛链接&#xff1a;https://www.kaggle.com/competitions/tlvmc-parkinsons-freezing-gait-prediction 比赛简介 本次比赛的目标是检测步态冻结&#xff08;FOG&#xff09;&#xff0c;这是一种使人衰弱的症状&#xff0c;困扰着许多帕金森病患者。您将开发一个机器学习…

YOLO V3 SPP ultralytics 第三节:关于yolo 中cfg的网络配置信息和读取cfg配置文件

目录 1. 介绍 2. 关于yolo的cfg网络配置文件 2.1 关于卷积层 2.2 关于池化层 2.3 关于捷径分支shortcut 2.4 关于route 层 2.5 关于上采样层 2.6 关于yolo层 3. 解析cfg 文件 4. 代码 1. 介绍 根据 第二节 的步骤&#xff0c;生成了属于自己的 my_yolov3.cfg 配置…

Python 墨西哥湾流(gulf stream)可视化

背景介绍 墨西哥湾流和黑潮分别是北半球两支强大的西边界流&#xff0c;墨西哥湾流的流速还要强于黑潮&#xff0c;也是温盐环流的重要组成部分。 引入涡度的概念&#xff0c;将涡度分为两个部分&#xff1a; 1、行星涡度&#xff0c;记为 f f f&#xff0c;与地球自转有关…

【软考数据库】第十四章 数据库主流应用技术

目录 14.1 分布式数据库 14.2 Web与数据库 14.3 XML与数据库 14.4 面向对象数据库 14.5 大数据与数据库 14.6 NewSQL 前言&#xff1a; 笔记来自《文老师软考数据库》教材精讲&#xff0c;精讲视频在b站&#xff0c;某宝都可以找到&#xff0c;个人感觉通俗易懂。 14.1 …

Springcloud1---->openFeign

目录 简介快速入门导入依赖开启Feign配置Feign客户端接口Feign使用小结feign feign配置负载均衡feign配置Hystix支持 简介 Feign可以把Rest的请求进行隐藏&#xff0c;伪装成类似SpringMVC的Controller一样。你不用再自己拼接url&#xff0c;拼接参数等等操作&#xff0c;一切…

WebSocket 详解,以及用QWebSocket 实现服务端和客户端(含代码例子)

目录 1、WebSocket 诞生背景 2、WebSocket的特点&#xff1a; 3、 WebSocket 简介 4、WebSocket 优点 5、QWebSocket通讯—客户端&#xff1a; 6、QWebSocket通讯—服务端&#xff1a; 1、WebSocket 诞生背景 早期&#xff0c;很多网站为了实现推送技术&#xff0c;所用的技术都…

初始Linux发展

目录 前言 Linux概念&#xff1a; 一.Linux发展历史 二.Linux的发展现状 三.发行版本 四.Linux 环境的搭建方式 主要有三种 : 4.6下载方式&#xff1a; 五.XShell软件 前言 Linux概念&#xff1a; Linux&#xff0c;全称GNU/Linux&#xff0c;是一套免费使用和自由传播的…

python中的对象和变量的关系

这里写目录标题 对象简介对象的结构变量和对象 对象简介 Python是一门面向对象的编程语言&#xff01; 一切皆对象&#xff01; 程序运行当中&#xff0c;所有的数据都是存储到内存当中然后再运行的&#xff01; 对象就是内存中专门用来存储指定数据的一块区域 对象实际上就是…

《计算机网络—自顶向下方法》 Wireshark实验(九):DHCP 协议分析

DHCP&#xff08;Dynamic Host configuration protocol&#xff09;动态主机配置协议&#xff0c;它可以为客户机自动分配 IP 地址、子网掩码以及缺省网关、DNS 服务器的 IP 地址等 TCP/IP 参数&#xff0c; 简单来说&#xff0c;就是在 DHCP 服务器上有一个数据库&#xff0c;…

Go开发PaaS平台核心功能

Go开发PaaS平台核心功能 1 云原生PaaS平台介绍 随着云计算的发展&#xff0c;越来越多的企业逐步的把IT资源迁移到云上。PaaS平台作为基础设施基座&#xff0c;可以帮助企业快速构建功能丰富的容器云平台&#xff0c;提升交付效率&#xff0c;降低成本。 [1.1] 云原生平台使…

【SpringMVC框架】--01.简介、入门、@RequestMapping、获取请求参数、域对象共享数据、视图、RestFul

文章目录 SpringMVC1.简介1.1 什么是MVC1.2 什么是SpringMVC1.3 SpringMVC的特点 2.编写HelloWorld2.1 创建maven工程2.2 配置web.xml2.3 创建请求控制器2.4 创建springMVC的配置文件2.5测试HelloWorld2.6总结 3.RequestMapping注解3.1 RequestMapping注解的功能3.2 RequestMap…

Java自定义类:打造属于自己的编程世界

&#x1f9d1;‍&#x1f4bb;CSDN主页&#xff1a;夏志121的主页 &#x1f4cb;专栏地址&#xff1a;Java核心技术专栏 目录 一、自定义类示例 二、隐式参数与显式参数 三、封装的优点 自定义类是Java中最基本、也是最重要的组成部分之一&#xff0c;使用者可以根据需求创建…

【Go微服务开发】gin+grpc+etcd 重构 grpc-todolist 项目

写在前面 最近稍微重构了之前写的 grpc-todolist 模块 项目地址&#xff1a;https://github.com/CocaineCong/grpc-todoList 1. 项目结构改变 与之前的目录有很大的区别 1.1 grpc_todolist 项目总体 1.1.1 改变前 grpc-todolist/ ├── api-gatway // 网关模块 ├── ta…

【小白版】最简单的 goland package 教程包括自定义包的使用

一、Hello World 最简单的教程&#xff0c;就需要从最简单的事情开始说起&#xff1a; mkdir myappcd myappgo mod init myapp // myapp是主项目名 这行命令将生成一个go.mod文件&#xff0c;这个文件会记录所有的包的依赖关系&#xff0c;一个空的go.mod只有项目名称和go版本…

智能指针详解

概念 在c中&#xff0c;动态内存的管理式通过一对运算符来完成的&#xff1a;new,在动态内存中为对象分配空间并返回一个指向该对象的指针&#xff0c;我们可以选择对对象进行初始化&#xff1b;delete&#xff0c;接受一个动态对象的指针&#xff0c;销毁该对象&#xff0c;并…

gitlab建立新分支提交,cherry-pick部分更新

gitlab介绍 GitLab是一个基于Git的在线代码托管和协作平台&#xff0c;提供源代码管理、单元测试、CI/CD构建、代码审查等功能。它是一个开放源代码的Git仓库管理系统&#xff0c;使用 Ruby on Rails 构建GitLab 不仅具有自己的 Git 仓库管理系统&#xff0c;还具有很多其他的…

AI 加持的代码编写实战:快速实现 Nginx 配置格式化工具

本篇文章聊聊如何使用 GPT 快速完成一个开源小项目&#xff0c;解决实际的问题&#xff0c;顺手点亮 GitHub 上 Nginx 开源社区的贡献者图标。 “Talk is Cheap&#xff0c;Show you the Code。” 写在前面 整理了一篇本该上个月就发出的内容。 前段时间&#xff0c;有个投…