Flink中聚合算子介绍

news2025/4/21 10:15:34

前言

在flink api中,聚合算子是非常常用的。所谓的聚合就是在分组的基础上做比较计算的操作。下面通过几个简单案例来说明聚合算子的用法和注意事项。

聚合算子案例

因为flink的api操作流程比较固定,从获取执行环境==》获取数据源==》执行数据转换操作==》输出结果。为了复用代码,参考代码使用了一个模板设计模式。

先定义一个Stream的泛型接口

package com.tml.common;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public interface StreamService<T> {

    StreamExecutionEnvironment getEnv();

    DataStream<T>  getSource(StreamExecutionEnvironment env);
}

抽象一个模板

package com.tml.common;

import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public abstract class AbsStreamCommonService<T> implements StreamService<T> {


    public void processStream(Integer parallelism) throws Exception {
        StreamExecutionEnvironment env = getEnv();
        env.setParallelism(parallelism);
        DataStream<T> stream = getSource(env);
        handle(stream);
        env.execute();
    }

    public abstract void handle(DataStream<T> source);

    @Override
    public StreamExecutionEnvironment getEnv() {

        return StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    }

    public DataStream<String> getSourceFromSocket(StreamExecutionEnvironment environment) {
        return environment.socketTextStream("43.139.114.233", 9999);
    }

    public DataStream<CommonMsg> getSourceFromCollection(StreamExecutionEnvironment environment) {
        DataStreamSource<CommonMsg> source = environment.fromElements(
                new CommonMsg("11", "hello world", 11L),
                new CommonMsg("11", "hello flink", 3L),
                new CommonMsg("12", "hello kitty", 13L),
                new CommonMsg("13", "hello world", 12L),
                new CommonMsg("11", "hello java", 23L));

        return source;
    }

    public DataStream<Long> getSourceFromDataGenerator(StreamExecutionEnvironment environment) {
        DataGeneratorSource<Long> dataGeneratorSource =
                new DataGeneratorSource<>((GeneratorFunction<Long, Long>) o -> o, 100000L,RateLimiterStrategy.perSecond(2), Types.LONG);
        return environment.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource", Types.LONG);
    }


}

注:使用 

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())可以在控制台看到flink的web-ui界面,默认是http://localhost:8081,方便看到flink job的执行参数,这种方式适用于本地调试和学习

 比如这样

 对应的pom文件依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.tml</groupId>
  <artifactId>flink-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>flink-demo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 -->
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink Streaming API -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink Table API and SQL -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.20</version>
    </dependency>
  </dependencies>
</project>

keyBy

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class KeyByDemo extends AbsStreamCommonService<CommonMsg> {

    public static void main(String[] args) throws Exception {
        new KeyByDemo().processStream(4);
    }

    @Override
    public void handle(DataStream<CommonMsg> stream) {
        /**
         * keyby算子返回的是一个keyedStream
         * 1.keyby不是一个转换算子,只对数据进行了重分区,另外还不能设置并行度
         * 2.keyby分组和分区的概念
         *  keyby是对数据进行分组,保证同一个分组的数据会落到同一个数据分区内
         *  分区:一个子任务可以理解为一个分区,一个分区可以包含有多个分组的数据
         */
        KeyedStream<CommonMsg, String> keyBy = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
        keyBy.print();
    }


    @Override
    public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromCollection(env);
    }
}

数据源是一个有界的数组,对应的数据是程序中自己new出来的,执行结果如下

 2> CommonMsg(id=11, msg=hello world, time=11)
2> CommonMsg(id=11, msg=hello flink, time=3)
2> CommonMsg(id=11, msg=hello java, time=23)
1> CommonMsg(id=12, msg=hello kitty, time=13)
3> CommonMsg(id=13, msg=hello world, time=12)

可以看到,通过keyBy的分组操作,相同的数据放在了同一个分区去执行。 

sum/min/minBy/max/maxBy

这几个是最基本的聚合算子。

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SimpleAggregateDemo extends AbsStreamCommonService<CommonMsg> {

    public static void main(String[] args) throws Exception {
        new SimpleAggregateDemo().processStream(1);
    }


    @Override
    public void handle(DataStream<CommonMsg> stream) {
        KeyedStream<CommonMsg, String> keyStream = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
        //使用sum聚合
        //SingleOutputStreamOperator<CommonMsg> time = stream.sum("time");
        //SingleOutputStreamOperator<CommonMsg> min = stream.min("time");
        /**
         * max、maxyBy的区别在于
         * max不会对非比较字段重新赋值,而maxBy会更新非比较字段的值
         */
        SingleOutputStreamOperator<CommonMsg> minBy = keyStream.minBy("time");
        //min.print();
        minBy.print();
    }

    @Override
    public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromCollection(env);
    }
}

先看一下minBy这个算子结果输出

CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello flink, time=3)

将聚合操作的api换成min(),对比一下程序的输出

CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello world, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello world, time=3)

两个对比输出可以发现,min、minBy的区别在于

min不会对非比较字段重新赋值,而minBy会更新非比较字段的值

当然max、maxBy也是一样

reduce

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReduceDemo extends AbsStreamCommonService<CommonMsg> {

    public static void main(String[] args) throws Exception {
        new ReduceDemo().processStream(1);

    }

    @Override
    public void handle(DataStream<CommonMsg> source) {
        KeyedStream<CommonMsg, String> stream = source.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));

        /**
         * reduce函数是非常灵活的,可以根据业务需求,非常灵活的进行聚合计算
         * 当每个分组中只有一条数据的时候,是不会进行reduce的,因为只有一条数据,没有比较的数据,进行reduce没有必要
         */
        SingleOutputStreamOperator<CommonMsg> reduce = stream.reduce((t1, t2) -> {
            System.out.println("t1==>" + t1);
            System.out.println("t2==>" + t2);
            CommonMsg commonMsg = new CommonMsg(t1.getId(), t2.getMsg(), t1.getTime() + t2.getTime());

            return commonMsg;
        });

        reduce.print();
    }

    @Override
    public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromCollection(env);
    }
}

看一下运行结果

CommonMsg(id=11, msg=hello world, time=11)
t1==>CommonMsg(id=11, msg=hello world, time=11)
t2==>CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=11, msg=hello flink, time=14)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
t1==>CommonMsg(id=11, msg=hello flink, time=14)
t2==>CommonMsg(id=11, msg=hello java, time=23)
CommonMsg(id=11, msg=hello java, time=37) 

通过运行结果可以看到,reduce算子是非常灵活的,可以在两个数据之间做非常灵活的操作,当然,如果对应的分组中只有一条数据,自然是 不会触发reduce函数的执行了。 

richFunction

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * richfunction添加了一些额外的功能
 * 提供了一些生命周期的管理方法,比如open()\close()
 * open() 在每个子任务启动的时候调用一次
 * close() 在每个任务结束的时候调用一次,如果是flink程序挂掉,不会调用这个close方法,在控制台上点击cancel任务,这个close方法也是可以额正常调用的
 *
 * 另外多了一些运行时上下文,可以通过getRuntimeContext() 来获取上下文中的一些关键信息
 * 在close方法中可以做一些释放资源的操作,回调通知操作等一些hook函数
 */
public class RichFunctionDemo extends AbsStreamCommonService<String> {
    public static void main(String[] args) throws Exception {
        new RichFunctionDemo().processStream(1);
    }

    @Override
    public void handle(DataStream<String> stream) {
        SingleOutputStreamOperator<String> map = stream.map(new RichMapFunction<String, String>() {

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                RuntimeContext context = getRuntimeContext();
                String taskName = context.getTaskName();
                int subtasks = context.getNumberOfParallelSubtasks();
                System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call open()");
            }

            @Override
            public void close() throws Exception {
                super.close();
                RuntimeContext context = getRuntimeContext();
                String taskName = context.getTaskName();
                int subtasks = context.getNumberOfParallelSubtasks();
                System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call close()");
            }

            @Override
            public String map(String value) throws Exception {
                return "(" + value + ")";
            }
        }, TypeInformation.of(String.class));

        map.print();
    }

    @Override
    public DataStream<String> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromSocket(env);
    }
}

运行程序前需要先运行socket,这里使用了nc,详细可以参考Flink实时统计单词【入门】

看一下运行结果 

taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call open()
(hello kitty)
(hello flink)
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call close()

总结

  • 这些聚合算子的基础实在keyBy之后,只有对数据进行了分组之后,才能执行后面的聚合操作。
  • min、minBy和max、maxBy之间有细微的区别,前者不会对非比较字段重新赋值,而后者会更新非比较字段的值
  • reduce算子是在两个数据之间进行操作的,可以非常灵活
  • richFunction不算聚合函数,这里写进来是富函数可以做非常多的额外功能,open()方法是对应的子任务启动时调用一下,close()方法是在对应的子任务结束的时候调用一次,通过这个可以做一些监控或者hook通知的操作

代码案例已经上传到了github,欢迎前来围观!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2323492.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【基础】Windows 中通过 VSCode 使用 GCC 编译调试 C++

准备 安装 VSCode 及 C 插件。通过 MSYS2 安装 MinGW-w64 工具链&#xff0c;为您提供必要的工具来编译代码、调试代码并配置它以使用IntelliSense。参考&#xff1a;Windows 中的 Linux 开发工具链 验证安装&#xff1a; gcc --version g --version gdb --version三个核心配…

知识就是力量——物联网应用技术

基础知识篇 一、常用电子元器件1——USB Type C 接口引脚详解特点接口定义作用主从设备关于6P引脚的简介 2——常用通信芯片CH343P概述特点引脚定义 CH340概述特点封装 3——蜂鸣器概述类型驱动电路原文链接 二、常用封装介绍贴片电阻电容封装介绍封装尺寸与功率关系&#xff1…

(windows)conda虚拟环境下open-webui安装与启动

一、创建conda环境 重点强调下&#xff0c;如果用python pip安装&#xff0c;一定要选择python3.11系列版本&#xff0c;我选的3.11.9。 如果你的版本不是这个系列&#xff0c;将会出现一些未知的问题。 conda create -n open-webui python3.11 -y如下就创建好了 二、安装o…

资本运营:基于Python实现的资本运作模拟

基于Python实现的一个简单的资本运营框架&#xff1b; ​企业生命周期演示&#xff1a;观察初创→成长→上市→并购全流程 ​行业对比分析&#xff1a;不同行业的财务特征和估值差异 ​资本运作策略&#xff1a;体验IPO定价、投资决策、并购整合等操作 ​市场动态观察&#xff…

当EFISH-SBC-RK3576遇上区块链:物联网安全与可信数据网络‌

在工业物联网场景中&#xff0c;设备身份伪造与数据篡改是核心安全隐患。‌EFISH-SBC-RK3576‌ 通过 ‌硬件安全模块 区块链链上验证‌&#xff0c;实现设备身份可信锚定与数据全生命周期加密&#xff0c;安全性能提升10倍以上。 1. 安全架构&#xff1a;从芯片到链的端到端防…

分布式系统面试总结:3、分布式锁(和本地锁的区别、特点、常见实现方案)

仅供自学回顾使用&#xff0c;请支持javaGuide原版书籍。 本篇文章涉及到的分布式锁&#xff0c;在本人其他文章中也有涉及。 《JUC&#xff1a;三、两阶段终止模式、死锁的jconsole检测、乐观锁&#xff08;版本号机制CAS实现&#xff09;悲观锁》&#xff1a;https://blog.…

【VSCode的安装与配置】

目录&#xff1a; 一&#xff1a;下载 VSCode二&#xff1a;安装 VSCode三&#xff1a;配置 VSCode 一&#xff1a;下载 VSCode 下载地址&#xff1a;https://code.visualstudio.com/download 下载完成之后&#xff0c;在对应的下载目录中可以看到安装程序。 二&#xff1a;安装…

脱围机制-react18废除forwardRef->react19直接使用ref的理解

采用ref&#xff0c;可以在父组件调用到子组件的功能 第一步&#xff1a;在父组件声明ref并传递ref interface SideOptsHandle {refreshData: () > Promise<void> }const sideOptsRef useRef<SideOptsHandle>(null) // 创建 ref<SideOpts ref{sideOptsRef…

Windows中安装git工具

下载好git安装包 点击next 选择安装目录 根据需要去勾选 点击next 点击next PATH环境选择第二个【Git...software】即可&#xff0c;再点击【Next】。 第一种配置是“仅从Git Bash使用Git”。这是最安全的选择&#xff0c;因为您的PATH根本不会被修改。您只能使用 Git Bash 的…

【CSS】CSS 使用全教程

CSS 使用全教程 介绍 CSS&#xff08;层叠样式表&#xff0c;Cascading Style Sheets&#xff09;是一种样式表语言&#xff0c;用于描述 HTML 或 XML 文档的布局和外观&#xff0c;它允许开发者将文档的内容结构与样式表现分离&#xff0c;通过定义一系列的样式规则来控制网页…

全分辨率免ROOT懒人精灵-自动化编程思维-设计思路-实战训练

全分辨率免ROOT懒人精灵-自动化编程思维-设计思路-实战训练 1.2025新版懒人精灵-实战红果搜索关键词刷视频&#xff1a;https://www.bilibili.com/video/BV1eK9kY7EWV 2.懒人精灵-全分辨率节点识别&#xff08;红果看广告领金币小实战&#xff09;&#xff1a;https://www.bili…

如何在IDEA中借助深度思考模型 QwQ 提高编码效率?

通义灵码上新模型选择功能&#xff0c;不仅引入了 DeepSeek 满血版 V3 和 R1 这两大 “新星”&#xff0c;Qwen2.5-Max 和 QWQ 也强势登场&#xff0c;正式加入通义灵码的 “豪华阵容”。开发者只需在通义灵码智能问答窗口的输入框中&#xff0c;单击模型选择的下拉菜单&#x…

LVS的 NAT 模式实验

文章目录 目录 文章目录 概要 IP规划与题目分析 实验步骤 一、nginx配置&#xff08;rs1、rs2、rs3&#xff09; 二、LVS配置 三、客户端配置 四、防火墙和selinux配置 实验结果 痛点解答 概要 LVS/NAT lvs/nat网络地址转换模式&#xff0c;进站/出站的数据流量经过分发器(IP负…

【MacOS】2025年硬核方法清理MacOS中的可清除空间(Purgeable space)

背景 MacOS使用一段时间之后&#xff0c;硬盘空间会越来越少&#xff0c;但自己的文件没有存储那么多&#xff0c;在储存空间中可以发现可用空间明明还剩很多&#xff0c;但磁盘工具却显示已满&#xff0c;见下图。 尝试解决 df -h 命令却发现磁盘已经被快被占满。使用du命…

ue材质学习感想总结笔记

2025 - 3 - 27 1.1 加法 对TexCoord上的每一个像素加上一个值&#xff0c;如果加上0.1&#xff0c;0.1&#xff0c; 那么左上角原来0,0的位置变成了0.1,0.1 右上角就变成了1.1,1.1&#xff0c;那么原来0,0的位置就去到了左上角左上边&#xff0c;所以图像往左上偏移。 总而言…

信而泰PFC/ECN流量测试方案:打造智能无损网络的关键利器

导语&#xff1a; AI算力爆发的背后&#xff0c;如何保障网络“零丢包”&#xff1f; 在当今数据中心网络中&#xff0c;随着AI、高性能计算&#xff08;HPC&#xff09;和分布式存储等应用的飞速发展&#xff0c;网络的无损传输能力变得至关重要。PFC&#xff08;基于优先级的…

CNN和LSTM的计算复杂度分析

前言&#xff1a;今天做边缘计算的时候&#xff0c;在评估模型性能的时候发现NPU计算的大部分时间都花在了LSTM上&#xff0c;使用的是Bi-LSTM&#xff08;耗时占比98%&#xff09;&#xff0c;CNN耗时很短&#xff0c;不禁会思考为什么LSTM会花费这么久时间。 首先声明一下实…

UniApp 表单校验两种方式对比:命令式与声明式

目录 前言1. 实战2. Demo 前言 &#x1f91f; 找工作&#xff0c;来万码优才&#xff1a;&#x1f449; #小程序://万码优才/r6rqmzDaXpYkJZF 以下主要针对Demo讲解&#xff0c;从实战中的体会 何为命令式 何为声明式 命令式的体验&#xff0c;随时都会有提交的按钮&#xff…

LCR 187. 破冰游戏(python3解法)

难度&#xff1a;简单 社团共有 num 位成员参与破冰游戏&#xff0c;编号为 0 ~ num-1。成员们按照编号顺序围绕圆桌而坐。社长抽取一个数字 target&#xff0c;从 0 号成员起开始计数&#xff0c;排在第 target 位的成员离开圆桌&#xff0c;且成员离开后从下一个成员开始计数…

centOS 7.9 65bit 修复Openssh漏洞

一、背景&#xff1a; 在使用centos 7.9 64bit版本操作系统时有扫描出如下的漏洞&#xff1a; 二、修复openssh漏洞操作 升级注意事项 (一下所有的操作默认都是root或者管理员权限&#xff0c;如果遇到权限问题每个指令以及指令组合都要在前面加sudo) 1、查看CentOS操作系统信…