Flink 06 聚合操作入门学习,真不难

news2025/1/19 14:24:49

抛砖引玉

  1. 让你统计1小时内每种商品的销售额,用Flink 该怎么实现。

  2. 还是让你统计1小时内每种商品的销售额,但是要过滤掉退款的订单,用Flink 该怎么实现。

学了本文两个操作,不信你还不会。

AggregateFunction

通常用于对数据流中的数据进行分组聚合。它可以将一组数据逐步合并、计算,最终得到一个聚合结果。

AggregateFunction 接口包含几个关键的方法,这些方法定义了如何进行状态初始化、累加、合并和获取结果:

createAccumulator():该方法在聚合前被调用,用于初始化聚合状态。

add(value, accumulator)该方法将新的输入值加到累加器上。在每个事件到达时调用会调用该方法。

getResult(accumulator):该方法用于返回最终聚合结果。这在聚合操作结束时被调用。

merge(acc1, acc2)(可选):该方法作用是,在并行流处理情况下,需要合并不同实例的聚合结果。

以下示例模拟统计每小时各商品的销售额

public class AggregateFunctionDemo {

    public static class Order{

        String goods;

        int amount;

        public Order(String goods, int amount) {
            this.goods = goods;
            this.amount = amount;
        }
    }

    public static class OrderACC{

        String goods;

        int amount;

        public OrderACC(String goods, int amount) {
            this.goods = goods;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "OrderACC{" +
                    "goods='" + goods + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }

    public  static class OrderACCFunction implements AggregateFunction<Order, OrderACC, OrderACC> {


        @Override
        public OrderACC createAccumulator() {
            return new OrderACC(null,0);
        }

        @Override
        public OrderACC add(Order value, OrderACC accumulator) {
            
            if (accumulator.goods == null) {
                accumulator.goods = value.goods;
            }
            accumulator.amount += value.amount;
            return accumulator;
        }

        @Override
        public OrderACC getResult(OrderACC accumulator) {

            return accumulator;
        }

        @Override
        public OrderACC merge(OrderACC a, OrderACC b) {
            a.amount += b.amount;
            return a;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Order> dataStream = env.addSource(new SourceFunction<Order>() {

            boolean running = true;

            List<String> goods = Arrays.asList("书包","本子","笔");

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {

                Random random = new Random();
                while (running){
                  int goodsIndex =   random.nextInt(goods.size());
                  int amount = random.nextInt(1000);
                    Order order = new Order(goods.get(goodsIndex), amount);
                    ctx.collect(order);


                    Thread.sleep(200);
                }

            }

            @Override
            public void cancel() {
                running = false;
            }
        });

        DataStream<OrderACC> resultStream =
                dataStream.keyBy(order -> order.goods).
                        window(TumblingProcessingTimeWindows.of(Time.hours(5))).
                aggregate(new OrderACCFunction());

        resultStream.print();

        env.execute();

    }
}


AggregateFunction 小结

  • AggregateFunction 常用于对窗口内的数据进行聚合计算。

例如,你可能需要计算某个时间窗口内某个指标的平均值、总和、最大值或最小值等。

  • 在分布式计算环境中,通过实现 merge 方法,Flink 可以在不同的节点上并行地执行聚合计算,并在最后将结果合并。

ProcessWindowFunction

ProcessWindowFunction 是 Flink 提供的一个强大的窗口函数接口,允许开发者对窗口中的元素进行自定义处理,包括访问窗口的元数据和状态。

来看看ProcessWindowFunction中 process方法的定义

 void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

图片

从上面方法定义我们基本可以推断ProcessWindowFunction 的特点

  • Iterable<IN> elements 窗口中所有元素 ,这与 ReduceFunction 或 AggregateFunction 不同,后者主要关注于元素之间的聚合操作。我们可以遍历elements,实现自己的聚合逻辑。

  • Context context:你可以通过Context获取到窗口的元数据,如窗口的开始和结束时间戳。甚至进行状态管理

ProcessWindowFunction 的使用

public class AggregateFunctionDemo2 {

    public static class Order{

        String goods;

        int amount;

        boolean refund;

        public Order(String goods, int amount, boolean refund) {
            this.goods = goods;
            this.amount = amount;
            this.refund = refund;
        }
    }

    public static class OrderACC{

        String goods;

        int amount;


        public OrderACC(String goods, int amount) {
            this.goods = goods;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "OrderACC{" +
                    "goods='" + goods + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }

    public  static class OrderProcessWindowFunction extends ProcessWindowFunction<Order,OrderACC,String, TimeWindow> {


        @Override
        public void process(String key, ProcessWindowFunction<Order, OrderACC, String, TimeWindow>.Context context, Iterable<Order> elements, Collector<OrderACC> out) throws Exception {

            int sum = 0;
            for(Order order : elements){
                if(!order.refund){
                    sum += order.amount;
                }
            }
            out.collect(new OrderACC(key,sum));
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Order> dataStream = env.addSource(new SourceFunction<Order>() {

            boolean running = true;

            List<String> goods = Arrays.asList("书包","本子","笔");

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {

                Random random = new Random();
                while (running) {
                    int goodsIndex = random.nextInt(goods.size());
                    int amount = random.nextInt(1000);
                    boolean refund = random.nextBoolean();
                    Order order = new Order(goods.get(goodsIndex), amount, refund);
                    ctx.collect(order);
                    Thread.sleep(100);
                }

            }

            @Override
            public void cancel() {
                running = false;
            }
        });

        DataStream<OrderACC> resultStream = dataStream.keyBy(order -> order.goods).
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new OrderProcessWindowFunction());
        
        resultStream.print();

        env.execute();
        

    }
}

图片

ProcessWindowFunction小结

  • 可以实现复杂的聚合逻辑,比如对窗口内元素进行过滤、排序之后 再进行聚合。

  • 可以获取窗口的状态信息,(如窗口的开始和结束时间)来满足一些特定的需求

总结

本文介绍了如何使用ProcessWindowFunction/AggregateFunction 完成一些聚合操作。通过对比两端代码,相信聪明的你已经体会到两者差异。再回到开头的问题,相信已经不是问题,信手拈来了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2221560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【决策树】- 二分法处理连续值

二分法处理连续值 在决策树算法中&#xff0c;处理连续特征通常采用二分法&#xff0c;将其转化为离散特征。此方法通过寻找最佳分割点&#xff0c;将连续特征划分为两个区间。 1. 原理 二分法的核心思想是将连续值特征转换为离散值&#xff0c;以便于决策树的构建。通过选择…

[Linux#67][IP] 报头详解 | 网络划分 | CIDR无类别 | DHCP动态分配 | NAT转发 | 路由器

目录 一. IP协议头格式 学习任何协议前的两个关键问题 IP 报头与有效载荷分离 分离方法 为什么需要16位总长度 如何交付 二. 网络通信 1.IP地址的划分理念 2. 子网管理 3.网络划分 CIDR&#xff08;无类别域间路由&#xff09; 目的IP & 当前路由器的子网掩码 …

R语言机器学习算法实战系列(九)决策树分类算法 (Decision Trees Classifier)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍教程下载数据加载R包导入数据数据预处理数据描述数据切割调节参数构建模型模型的决策树预测测试数据评估模型模型准确性混淆矩阵模型评估指标ROC CurvePRC Curve特征的重要性保存模…

TCP协议四次挥手

1.第一次挥手&#xff1a;客户端发送FIN报文&#xff0c;请求断开连接。这一过程为主动关闭。客户端由ESTABLISHED转换为FIN-WAIT-1。 FIN和ACK置为1&#xff0c;表示这是一个请求结束报文。seqU表示在之前客户端已经发送的字节数。ackV则表示服务端向客户端发送确认消息累计的…

磁盘分区工具 DiskGenius Pro v5.5.0.1488 中文汉化版

DiskGenius 是一款专家级数据恢复软件&#xff0c;集数据恢复、硬盘分区、系统备份还原等多种功能于一身的超级工具软件&#xff0c;功能全面&#xff0c;安全可靠。可以提供磁盘的数据找回、备份、分区、修复、删除、格式化等操作&#xff0c;也能帮助用户修复磁盘坏道、彻底删…

从零构建大模型训练流程及原理(一)

这是一个大模型训练的系列文章&#xff0c;将从零开始手把手带大家构建大模型训练全流程。话不多说&#xff0c;开始发车~什么是大语言模型 1. LLMs是什么&#xff1f; 大型语言模型&#xff08;LLMs&#xff09;是基于Transformer架构的深度学习模型&#xff0c;旨在理解、生…

[枚举坤坤]二进制枚举基础

我们都知道数据是以二进制形式存储在计算机中的。当我们使用十进制数进行编程时&#xff08;如a10&#xff09;实际上计算机要先进行一步转码&#xff0c;将其化为二进制的形式进行计算。如果在编程的过程中我们可以直接越过转码这一步去操纵二进制形式进行运算&#xff0c;程序…

EMNLP 2024 个性化/风格化 文本生成 论文汇总(19篇主会论文)

引言&#xff1a;调研&#xff0c;搬砖… &#x1f60a; &#x1f318;&#x1f317;&#x1f316; &#x1f607; ✅ 笔者简介&#xff1a;Wang Linyong&#xff0c;NPU&#xff0c;2023级&#xff0c;计算机技术 研究方向&#xff1a;文本生成、大语言模型 文章目录 1 Perso…

【C++】STL----map和set

&#x1f525;个人主页&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收录专栏&#x1f308;&#xff1a;C从小白到高手 &#x1f339;往期回顾&#x1f339;&#xff1a;[C]二叉搜索树 &#x1f516; 流水不争&#xff0c;争的是滔滔不息 文章目录 一、set和map的简介setm…

Java设计模式梳理:行为型模式(策略,观察者等)

行为型模式 行为型模式关注的是各个类之间的相互作用&#xff0c;将职责划分清楚&#xff0c;使得我们的代码更加地清晰。 策略模式 策略模式太常用了&#xff0c;所以把它放到最前面进行介绍。它比较简单&#xff0c;我就不废话&#xff0c;直接用代码说事吧。 下面设计的…

某ai gpt的bug

某ai gpt的bug 背景 遇到了一个奇怪的现象&#xff1a; 输入内容 2024-10-21 10:09:31,052 ERROR o.a.j.t.JMeterThread: Test failed! java.lang.IllegalArgumentException:输出结果

Android开发教程二维码扫描功能

Android开发教程二维码扫描功能 二维码扫描大一点的app都有的功能&#xff0c;因为扫一下真的很方便 一、思路&#xff1a; 用zxing库 二、效果图&#xff1a; 看视频更加直观点&#xff1a; Android开发教程实战案例源码分享-二维码扫描功能 三、关键代码&#xff1a; c…

音频声音怎么调大?将音频声音调大的几个简单方法

音频声音怎么调大&#xff1f;在现代生活中&#xff0c;音频内容无处不在&#xff0c;从在线课程和播客到音乐和电影&#xff0c;音频已经成为我们获取信息和娱乐的重要方式。然而&#xff0c;许多人在使用音频时可能会遇到一个常见问题&#xff1a;音频声音太小&#xff0c;无…

手机投屏到电脑上的scrcpy软件 scrcpy v2.4

下载&#xff1a;https://drive.uc.cn/s/b1285b1fb9f94?public1 最近的工作需要用到用手机演示一些操作&#xff0c;但是手机屏幕比较小&#xff0c;我就想把手机投到我的电脑上&#xff0c;然后电脑连接投影仪就行了。 scrcpy是一款开源的软件&#xff0c;在githus上可以下…

少儿编程学习,如何走,之点评一二

前言&#xff1a; 不少孩子在少儿机构学习编程的家长跟我反馈&#xff0c;机构学习孩子学了记不住&#xff0c;有些家孩子索性就不去&#xff0c;不愿意再谈编程学习之事。 从一位专业信息学教师出身的老师&#xff0c;稍作点评一二&#xff1a; 【同时也引用了一些主流媒体的…

力扣OJ算法题:合并两个有序链表

—————————————————————————————————————————— 正文开始 OJ算法题&#xff1a;合并两个有序链表 思路 创建一个新的空链表&#xff08;可以用malloc优化&#xff09;和两个指针L1、L2分别指向两个链表&#xff0c;遍历两个链表&am…

python poetry包管理的安装和使用

目录 设置国内(清华)镜像源和升级pip 安装poetry pycharm中使用 常用的poetry命令 本文背景为已经安装好python 设置国内(清华)镜像源和升级pip pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple python -m pip install --upgrade pip 安装po…

【峟思仪器】高边坡安全监测起到哪些作用

高边坡安全监测起到哪些作用?在当今的工程建设领域&#xff0c;高边坡监测犹如一道坚实的安全防线&#xff0c;其重要性不容小觑。无论是大型水利工程&#xff0c;还是交通基础设施建设&#xff0c;高边坡的稳定性都直接关系到工程的安全和周边环境的安危。高边坡监测在工程建…

在Debian上安装向日葵

说明&#xff1a; 因为之前服务器上安装了 PVE (Proxmox VE)&#xff0c;之前是用 Proxmox VE 进行服务器资源管理的。出于某些原因&#xff0c;现在不再通过 PVE构建的虚拟机来使用计算资源&#xff0c;而是通过 PVE 自带的 Debian 系统直接使用虚拟机资源&#xff08;因为积…

使用Python抓取房源信息

1. 引言 在当今大数据时代&#xff0c;网络爬虫成为获取信息的重要手段之一。本文将以某家二手房为例&#xff0c;演示如何使用Python爬虫抓取房源信息&#xff0c;并将这些信息保存到Excel文件中。 目标网站 2. 准备工作 2.1 安装必要的库 在开始之前&#xff0c;请确保你…