Flink学习——基本转换算子

news2024/12/26 15:28:43

目录

一、filter算子

二、map算子

三、聚合算子

1.keyBy——按键分区

2.简单聚合

(1)min:在输入流上,对指定的字段求最小值

(2)minBy:返回包含字段最小值的整条数据

(3)max:在输入流上,对指定的字段求最大值

(4)maxBy:返回包含字段最大值的整条数据

(5)sum:在输入流上,对指定的字段做叠加求和的操作

3.reduce——归约聚合


一、filter算子

import source.SensorReading
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO filter函数用法一:匿名函数
//    val filterStream: DataStream[SensorReading] = dataStream.filter(data => data.id.contains("sensor_1"))
//    val filterStream: DataStream[SensorReading] = dataStream.filter(data => data.id=="sensor_1")
    val filterStream: DataStream[SensorReading] = dataStream.filter(data => data.id.equals("sensor_1"))
    filterStream.print()

    // TODO filter函数用法二:FilterFunction函数
    val filterStream1: DataStream[SensorReading] = dataStream.filter(new FilterFunction[SensorReading] {
      override def filter(value: SensorReading): Boolean = {
        value.id.contains("sensor_1")
      }
    })
    //    filterStream1.print()

    // TODO filter函数用法三:自定义函数
    val filterStream2: DataStream[SensorReading] = dataStream.filter(new FilterTest)
//    filterStream2.print()

    env.execute()
  }
}

class FilterTest extends FilterFunction[SensorReading]{
  override def filter(value: SensorReading): Boolean = {
    value.id.equals("sensor_1")
       true
    else
       false
  }
}

运行结果:
SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_1,1684201910,36.8)
SensorReading(sensor_1,1684202012,44.7)
SensorReading(sensor_1,1684201973,38.16)
SensorReading(sensor_1,1684201973,38.16)

二、map算子

import source.SensorReading
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO map算子用法一:传入匿名函数
    val mapStream: DataStream[String] = dataStream.map(data => data.id + " temp")
    //    mapStream.print()

    // TODO map算子用法二:传入MapFunction
    val mapStream1: DataStream[String] = dataStream.map(new MapFunction[SensorReading, String] {
      override def map(value: SensorReading): String = {
        value.id + " temp"
      }
    })
    //    mapStream1.print()
    
    // TODO map算子用法三:传入自定义函数
    val mapStream2: DataStream[String] = dataStream.map(new MapTest)
    mapStream2.print()
    
    env.execute()
  }
}

class MapTest extends MapFunction[SensorReading, String] {
  override def map(value: SensorReading): String = {
    value.id + " temp"
  }
}

运行结果:
sensor_1 temp
sensor_4 temp
sensor_3 temp
sensor_7 temp
sensor_1 temp
sensor_1 temp
sensor_1 temp
sensor_1 temp

三、聚合算子

1.keyBy——按键分区

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)
    keyStream.print()

    env.execute()
  }
}

2.简单聚合

(1)min:在输入流上,对指定的字段求最小值

只选择指定字段的最小值,其他字段会保留最初第一个数据的值。

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)

    // TODO min传参的第一种方法:field: String
    val minStream: DataStream[SensorReading] = keyStream.min("temperature")
    minStream.print()

    // TODO min传参的第二种方法:position: Int
    val minStream1: DataStream[SensorReading] = keyStream.min(2)
    minStream1.print()

    env.execute()
  }
}

运行结果:

SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_4,1684202000,17.7)
SensorReading(sensor_3,1684202064,27.3)
SensorReading(sensor_7,1684202068,13.8)
SensorReading(sensor_1,1684201947,36.8)
SensorReading(sensor_1,1684201947,36.8)
SensorReading(sensor_1,1684201947,36.8)
SensorReading(sensor_1,1684201947,36.8)

(2)minBy:返回包含字段最小值的整条数据

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)

    // TODO minBy传参的第一种方法:field: String
    val minByStream: DataStream[SensorReading] = keyStream.minBy("temperature")
    minByStream.print()

    // TODO minBy传参的第二种方法:position: Int
    val minByStream1: DataStream[SensorReading] = keyStream.minBy(2)
    minByStream1.print()

    env.execute()
  }
}

运行结果:

SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_4,1684202000,17.7)
SensorReading(sensor_3,1684202064,27.3)
SensorReading(sensor_7,1684202068,13.8)
SensorReading(sensor_1,1684201910,36.8)
SensorReading(sensor_1,1684201910,36.8)
SensorReading(sensor_1,1684201910,36.8)
SensorReading(sensor_1,1684201910,36.8)

(3)max:在输入流上,对指定的字段求最大值

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)

    // TODO max传参的第一种方法:field: String
    val maxStream: DataStream[SensorReading] = keyStream.max(2)
    maxStream.print()

    // TODO max传参的第二种方法:field: String
    val maxStream1: DataStream[SensorReading] = keyStream.max("temperature")
    maxStream1.print()

    env.execute()
  }
}

运行结果:

SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_4,1684202000,17.7)
SensorReading(sensor_3,1684202064,27.3)
SensorReading(sensor_7,1684202068,13.8)
SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_1,1684201947,44.7)
SensorReading(sensor_1,1684201947,44.7)
SensorReading(sensor_1,1684201947,44.7)

(4)maxBy:返回包含字段最大值的整条数据

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)

    // TODO maxBy传参的第一种方法:field: String
    val maxByStream: DataStream[SensorReading] = keyStream.maxBy("temperature")
    maxByStream.print()

    // TODO maxBy传参的第二种方法:position: Int
    val maxByStream1: DataStream[SensorReading] = keyStream.maxBy(2)
    maxByStream1.print()

    env.execute()
  }
}

运行结果:

SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_4,1684202000,17.7)
SensorReading(sensor_3,1684202064,27.3)
SensorReading(sensor_7,1684202068,13.8)
SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_1,1684202012,44.7)
SensorReading(sensor_1,1684202012,44.7)
SensorReading(sensor_1,1684202012,44.7)

(5)sum:在输入流上,对指定的字段做叠加求和的操作

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)

    // TODO sum算子传参的第一种方法:field: String
    val sumStream: DataStream[SensorReading] = keyStream.sum("temperature")
    sumStream.print()

    // TODO sum算子传参的第一种方法:position: Int
    val sumStream1: DataStream[SensorReading] = keyStream.sum(2)
    sumStream1.print()

    env.execute()
  }
}

运行结果:

SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_4,1684202000,17.7)
SensorReading(sensor_3,1684202064,27.3)
SensorReading(sensor_7,1684202068,13.8)
SensorReading(sensor_1,1684201947,76.6)
SensorReading(sensor_1,1684201947,121.3)
SensorReading(sensor_1,1684201947,159.45999999999998)
SensorReading(sensor_1,1684201947,197.61999999999998)

3.reduce——归约聚合

object Transform {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 加载数据源
    val path = "D:\\javaseprojects\\flinkstu\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(path)

    // 数据计算
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    // TODO keyBy
    val keyStream: KeyedStream[SensorReading, String] = dataStream.keyBy(x => x.id)

    // TODO reduce使用方法一:传入匿名函数
    val reduceStream: DataStream[SensorReading] = keyStream.reduce((x, y) => {
      if (x.temperature <= y.temperature) {
        x
      } else {
        SensorReading(x.id, x.timestamp, x.temperature)
      }
    })
    reduceStream.print()

    // TODO reduce使用方法二:传入自定义函数
    val reduceStream1: DataStream[SensorReading] = keyStream.reduce(new ReduceTest)
    reduceStream1.print()

    env.execute()
  }
}

class ReduceTest extends ReduceFunction[SensorReading] {
  override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
    // todo 通过reduce实现min
    // SensorReading(value1.id, value1.timestamp, value1.temperature.min(value2.temperature))

    // todo 通过reduce实现minBy
    SensorReading(value1.id, value2.timestamp, value1.temperature.min(value2.temperature))
  }
}

运行结果:

SensorReading(sensor_1,1684201947,39.8)
SensorReading(sensor_4,1684202000,17.7)
SensorReading(sensor_3,1684202064,27.3)
SensorReading(sensor_7,1684202068,13.8)
SensorReading(sensor_1,1684201910,36.8)
SensorReading(sensor_1,1684202012,36.8)
SensorReading(sensor_1,1684201973,36.8)
SensorReading(sensor_1,1684201973,36.8)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/549696.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Navicat 连接MySQL时出现错误1251:客户端不支持服务器请求的身份验证协议;请考虑升级MySQL客户端】

使用Navicat连接时报1251错误&#xff0c;如下图&#xff1a; 原因 MySQL8.0后的版本加密规则是“caching_sha2_password”&#xff0c;而 MySQL8.0之前的版本加密规则是“mysql_native_password” 解决办法 更改加密规则&#xff0c;将MySQL用户登录密码加密规则还原成“…

Python:常见的面试题和答案

1. 什么是Python&#xff1f; 答&#xff1a;Python是一种高级编程语言&#xff0c;被广泛应用于Web开发、数据分析、人工智能等领域。 2. Python的优点是什么&#xff1f; Python的优点包括&#xff1a; 简单易学&#xff1a;Python语法简单&#xff0c;易于上手&#xff…

chatgpt赋能Python-pythonfly

PythonFly介绍 PythonFly是一个功能丰富的Python Web框架&#xff0c;它提供了快速开发Web应用的工具和方法。PythonFly可以轻易扩展、分布式部署和最小化代码重复。PythonFly利用Python的清晰和简单的语法&#xff0c;让Web应用程序更容易阅读和维护。 PythonFly的特点 快速…

CSS图像填充文字(镂空文字效果 / 文字镂空效果)

先展示一下最终效果&#xff1a; 开始做 1. 搭建基本代码结构 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>CSS图像填充文字&#xff08;镂空文字效果&#xff09;</title></head><body><div cl…

20230520查找中国移动的APP在RK3566下调用UVC摄像头出错

20230520查找中国移动的APP在RK3566下调用UVC摄像头出错 2023/5/20 23:34 SDK&#xff1a;Android12RK3566平台 android12 UVC camera 没插摄像头&#xff0c;但是/dev/video0-13标号被占用&#xff0c;是啥原因导致的 板子上也没有摄像头 【板子没有接CSI/MIPI接口的I2C通道…

操作系统(持续更新)

操作系统的定义 操作系统&#xff08;operating system&#xff0c;OS&#xff09;是配置在计算机硬件上的第一层软件&#xff0c;是对硬 件系统的首次扩充&#xff0c;其主要作用是管理硬件设备&#xff0c;提高它们的利用率和系统吞吐量&#xff0c;并为 用户和应用程序提供一…

Spring Cloud 和3种架构分析 以及微服务的详细分析和示意图

目录 SpringCloud & SpringCloud Alibaba架构介绍 Spring Cloud 基本介绍 官方文档 提出问题, 引出微服务 单机架构 - 示意图 动静分离架构&#xff1a;静态缓存 文件存储 解析 分布式架构&#xff1a;业务拆分负载均衡 解析 微服务架构&#xff1a;使用Spring Clo…

UE C++ Windows平台调用讯飞语音合成接口

UE C Windows平台调用讯飞语音合成接口 环境设置调用讯飞语音接口回放语音数据输出EXE 环境设置 下载讯飞语音合成的Windows平台的C版本SDK&#xff0c;包含lib库文件和dll动态链接库在UE工程下新建一个ThirdParty/msc目录&#xff0c;将lib库文件和dll动态链接库放入其中[PRO…

mybatis是如何集成到spring的之托管mapper接口

前言 mybatis集成到spring可以参考spring mvc集成mybatis进行数据库访问 &#xff0c;其中mybatis集成到spring最重要的两个配置分别是SqlSessionFactoryBean和MapperScannerConfigurer&#xff0c;如下所示&#xff1a; <!--mybatis sqlSeesionFactory配置--><bean…

实验五 串行通讯建模以及教程

目录 教程&#xff1a; 第一步下载matlib 第二步找到Simulink 相关文件 链接&#xff1a;https://pan.baidu.com/s/1Im-TUVfV4d8dok2ebXbmjw?pwd2222 提取码&#xff1a;2222 【实验目的】 1、了解MATLAB软件环境和Simulink建模过程&#xff0c;掌握Simulink图形化编程方…

给 compose draw 绘制的非规则图形添加点击监听

前言 导言 在之前的两篇文章中&#xff0c;我们从实例出发&#xff0c;以实践的方式简单介绍了 compose 自定义绘制&#xff08;如何自己绘制想要的控件&#xff09;、为自定义绘制增加动画&#xff08;让控件动起来&#xff09;。 在这篇文章中&#xff0c;我们依然从实例出…

Linux 权限-+完整思维导图+实图例子+深入细节+通俗易懂建议收藏

绪论 当时间的主人&#xff0c;命运的主宰&#xff0c;灵魂的舵手。上一回已将基础权限全部学习完了&#xff0c;本章开始我们将进入到权限的学习。 话不多说安全带系好&#xff0c;发车啦&#xff08;建议电脑观看&#xff09;。 附&#xff1a;红色&#xff0c;部分为重点部分…

基于Gitee的webhook编写hugo的自动构建实现博客自动更新

前言 差不多半年前趁着某云优惠&#xff0c;我买了5年的轻量级应用服务器。 拿着这个服务器原本打算做我的某个APP的服务端的&#xff0c;后来又觉得迁移数据好麻烦&#xff0c;所以随便搞了个博客上去。 选来选去&#xff0c;使用了 hugo 作为构建引擎。 正好&#xff0c;…

跟我一起使用 compose 做一个跨平台的黑白棋游戏(1)整体实现思路

前言 为什么写这系列文章 虽然 compose 正式版已经出来很久了&#xff0c;也有很多大佬写了很多教程文章和实例 demo &#xff0c;但是对于 compose 其实我也还是一知半解的。 特别是对于 compose 的状态管理&#xff0c;由于 compose 声明式的特性&#xff0c;如果不对状态…

chatgpt赋能Python-pythonfor怎么用

PythonFor SEO&#xff1a;如何利用Python提高SEO效果 SEO&#xff08;搜索引擎优化&#xff09;是现代数字营销中至关重要的一环。随着搜索引擎算法不断发展&#xff0c;优化网站以提高排名已经成为了一门复杂的艺术。幸运的是&#xff0c;Python提供了一些强大的工具来简化这…

chatgpt赋能Python-pythonelem

PythonELEM - 简易的Python学习工具 作为一名有10年Python编程经验的工程师&#xff0c;我可以深刻地体会到新手们学习Python的难处。PythonELEM是一个以Python为主题的学习工具&#xff0c;它可以帮助初学者更容易地掌握Python编程。 PythonELEM的功能 PythonELEM是一个简易…

餐饮油烟排放监测管理系统的设计与应用

安科瑞虞佳豪 连日来&#xff0c;河东区生态环境保护综合行政执法支队组织开展餐饮行业油烟净化专项检查工作&#xff0c;有效应对即将到来的夏季餐饮油烟对环境的污染&#xff0c;着力解决群众身边的环境问题。 执法人员对辖区餐饮商户集中区域开展常态化巡查&#xff0c;重…

Metal入门学习:绘制渲染三角形

一、编程指南PDF下载链接(中英文档&#xff09; 1、Metal编程指南PDF链接 https://github.com/dennie-lee/ios_tech_record/raw/main/Metal学习PDF/Metal 编程指南.pdf 2、Metal着色语言(Metal Shader Language:简称MSL)编程指南PDF链接 https://github.com/dennie-lee/ios_te…

chatgpt赋能Python-pythoncumsum

Python中的cumsum-累积求和函数 在数据处理中&#xff0c;经常需要对一个序列的元素进行累加。Python中提供了累积求和函数cumsum()&#xff0c;用于对一个序列的元素进行累加求和操作。 什么是cumsum()函数 cumsum()函数是Python中numpy模块中的一个函数&#xff0c;用于对…

通过小米万兆路由器将小米SoundMove 无缝接入 ChatGPT

通过小米万兆路由器将小米SoundMove 无缝接入 ChatGPT 本教程内容参考 Github 地址(可选)部署查看小米 SoundMove 信息的环境(可选)查看小米 SoundMove 的信息以容器方式部署程序到小米万兆路由器实际效果有待改善点 本教程内容 1 是记录了将小米 SoundMove 接入 ChatGPT 的操…