Flink开发入门简单案例--统计实时流订单

news2025/2/21 23:51:30

Flink开发入门简单案例

  • 0.简介
  • 1.订单数据生成器
    • 1.1 新建工程TestFlink
    • 1.2 在pom.xml中引入Flink依赖包
    • 1.3 订单数据生成类
      • +订单类(Item)
      • +订单生成数据流类
      • +测试订单生成类
    • 2.订单统计
      • 2.1 仅统计订单中商品的件数
    • 2.2 同时统计商品数量和金额

0.简介

本案例通过一个简单的订单数据生成器生成随机订单数据,基于这个生成器运用Flink DataStream API开发程序统计订单数量和金额,模拟交易看板数据。

环境:IDEA作为IDE,Flink 1.13.2,Scala 2.12,Java 1.8

1.订单数据生成器

利用 Flink 提供的自定义 Source 来实现一个自定义的实时数据生成器

1.1 新建工程TestFlink

在IDEA中新建工程(new Project),选择“Maven Archetype”,catalog选最基础的类型,如下图所示:
在这里插入图片描述

1.2 在pom.xml中引入Flink依赖包

分别引入flink-java, flink-streaming-java, flink-client三个包,配置如下:

  <properties>
    <flink.version>1.13.2</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
    <mysql-connector.version>8.0.23</mysql-connector.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

上面的版本均采用变量定义,可以根据实际情况进行替换。

1.3 订单数据生成类

主要用到三个类,订单类,生成订单数据流类,测试类

+订单类(Item)

普通Java类,代码如下:

public class Item {
    private String name;
    private Integer id;

    Item() {}
    public String getName() {
        return name;
    }
    void setName(String name) {
        this.name = name;
    }
    private Integer getId() {
        return this.id;
    }
    void setId(int id) {
        this.id = id;
    }
    public String toString() {
        return "Item { " +
                "name='" + name + "\'" +
                ", id=" + id + "}";
    }
}

+订单生成数据流类

用于生成订单类的数据流,代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class MyStreamingSource implements SourceFunction<Item> {
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Item> sourceContext) throws Exception {
        while(isRunning) {
            Item item = generateItem();
            sourceContext.collect(item);
            Thread.sleep(1000);
        }
    }

    private Item generateItem() {
        int i = new Random().nextInt(1000);
        Item item = new Item();
        item.setId(i);
        item.setCount((i % 4) + 1);
        item.setSum(item.getCount() * (new Random().nextFloat() * 50));
        return item;
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

}

+测试订单生成类

用于测试上述生成类,也是程序的主函数,代码如下

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamingDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Item> text = env.addSource(new MyStreamingSource()).setParallelism(1);
        DataStream<Item> item = text.map((MapFunction<Item, Item>) value-> value);
        item.print().setParallelism(1);
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }
}

运行StreamingDemo的main函数,可以看到如下的输出结果:
在这里插入图片描述

2.订单统计

接下来,编写Flink算子,以窗口的方式统计一段时间内(5秒为例)订单的数量。

2.1 仅统计订单中商品的件数

思路如下:先读入订单数据源,将读到的订单Item对象映射为一个三元组Tuple3,再按5秒钟的窗口对订单中商品数量(count)进行求和(sum),核心代码如下:

DataStreamSource<Item> order = env.addSource(new MyStreamingSource()).setParallelism(1);
DataStream<Tuple3<Integer, Integer, Float>> mappedStream =
    order.map(new MapFunction<Item, Tuple3<Integer, Integer, Float>>() {
       @Override
        public Tuple3<Integer, Integer, Float> map(Item item) throws Exception {
            return new Tuple3<>(item.getId(), item.getCount(), item.getSum());
        }
    });
    //定义窗口,每5秒一个
    DataStream<Tuple3<Integer, Integer, Float>> windowedStream = mappedStream
            .keyBy((item)->0)  // 此处的分组方式未使用任何分组,所以使用了一个常量0
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .sum(1); // 统计三元组中序号为1(即count属性)的和
    windowedStream.print();

运行后的结果如下:
在这里插入图片描述
可以看到,控制台中的输出,每一条数据前是5条数据,也就是5秒钟的窗口收集到5条(每秒产生1条),第2列为商品数量的和,实现了累加。

2.2 同时统计商品数量和金额

上述示例中,对windowedStream使用了sum进行统计求和,这种方式只能对一个字段进行,如果需要同时统计数量和金额,就必须采用另外一种方式,reduce进行统计。
reduce方法也是windowedStream提供的方法,代码如下:

DataStream<Tuple3<Integer, Integer, Float>> windowedStream = mappedStream
    .keyBy((item)->0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce(new ReduceFunction<Tuple3<Integer, Integer, Float>>() {
        @Override
        public Tuple3<Integer, Integer, Float> reduce(Tuple3<Integer, Integer, Float> t1, Tuple3<Integer, Integer, Float> t2) throws Exception {
            return new Tuple3<>(0, t1.f1 + t2.f1, t1.f2 + t2.f2);
        }
    });

这样,即可实现同时统计数量和金额,运行后效果如下:
在这里插入图片描述
可以看到,第2列是count的和,第3列也是sum的和,实现了同时统计。(第1列是在程序中直接映射为了0,因为id字段统计无意义)

本文章的部分案例借鉴自以下文章:Flink 常用的 DataSet 和 DataStream API,有需要的可以自行查看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2249350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI前景分析展望——GPTo1 SoraAI

引言 人工智能&#xff08;AI&#xff09;领域的飞速发展已不仅仅局限于学术研究&#xff0c;它已渗透到各个行业&#xff0c;影响着从生产制造到创意产业的方方面面。在这场技术革新的浪潮中&#xff0c;一些领先的AI模型&#xff0c;像Sora和OpenAI的O1&#xff0c;凭借其强大…

PAT1085 Perfect Sequence(25)

//判断是否是连续的数 //判断是否只能第一个数是最小值 #include <cstdio> #include <algorithm> typedef long long ll; using namespace std; int n,p; const int maxn 100010; int arr[maxn];int binary(int l, int r, ll tgt){if(arr[n-1] < tgt) return n…

QChart数据可视化

目录 一、QChart基本介绍 1.1 QChart基本概念与用途 1.2 主要类的介绍 1.2.1 QChartView类 1.2.2 QChart类 1.2.3QAbstractSeries类 1.2.4 QAbstractAxis类 1.2.5 QLegendMarker 二、与图表交互 1. 动态绘制数据 2. 深入数据 3. 缩放和滚动 4. 鼠标悬停 三、主题 …

SpringBoot源码-spring boot启动入口ruan方法主线分析(一)

一、SpringBoot启动的入口 1.当我们启动一个SpringBoot项目的时候&#xff0c;入口程序就是main方法&#xff0c;而在main方法中就执行了一个run方法。 SpringBootApplication public class StartApp {public static void main(String[] args) {// testSpringApplication.ru…

C#变量和函数如何和unity组件绑定

1.Button On_click (1)GameObject通过Add component添加上Script (2)Button选GameObject组件而不是直接选Script,直接选Script出现不了Script中的函数 2.RawImage 上面是错的 3.Text 上面是错的&#xff0c;应该是直接在GameObject里面填上对应的值 总结&#xff1a; …

Flink Sink的使用

经过一系列Transformation转换操作后&#xff0c;最后一定要调用Sink操作&#xff0c;才会形成一个完整的DataFlow拓扑。只有调用了Sink操作&#xff0c;才会产生最终的计算结果&#xff0c;这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是…

【Spring MVC】如何获取cookie/session以及响应@RestController的理解,Header的设置

前言 &#x1f31f;&#x1f31f;本期讲解关于SpringMVC的编程之参数传递~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么废…

使用 exe4j 将 Spring Boot 项目打包为 EXE 可执行文件

使用 exe4j 将 Spring Boot 项目打包为 EXE 可执行文件 文章目录 使用 exe4j 将 Spring Boot 项目打包为 EXE 可执行文件什么是 exe4j准备工作打包 Spring Boot 项目为 EXE 文件1.启动 exe4j2. 选择项目类型3. 配置项目名称和输出目录4. 配置项目类型或可执行文件名称5. java配…

前端JavaScript(一)---基本介绍

Javascript是一种由Netscape(网景)的LiveScript发展而来的原型化继承的面向对象的动态类型的区分大小写的客户端脚本语言&#xff0c;主要目的是为了解决服务器端语言&#xff0c;比如Perl&#xff0c;遗留的速度问题&#xff0c;为客户提供更流畅的浏览效果。当时服务端需要对…

阿里Qwen系列开源模型介绍

模型种类丰富 Qwen2&#xff1a;包含Qwen2-0.5b、Qwen2-1.5b、Qwen2-7b、Qwen2-57b-a14b以及Qwen2-72b等五种规模的预训练和指令微调模型&#xff0c;其在多语言处理、长文本处理、代码生成、数学和逻辑推理等能力上&#xff0c;在mmlu、gpqa、humaneval等国际测评中得到了验证…

基于Java的小程序电商商城开源设计源码

近年来电商模式的发展越来越成熟&#xff0c;基于 Java 开发的小程序电商商城开源源码&#xff0c;为众多开发者和企业提供了构建个性化电商平台的有力工具。 基于Java的电子商城购物平台小程序的设计在手机上运行&#xff0c;可以实现管理员&#xff1b;首页、个人中心、用户…

开源 AI 智能名片 2 + 1 链动模式 S2B2C 商城小程序源码助力品牌共建:价值、策略与实践

摘要&#xff1a;在当今数字化商业环境下&#xff0c;品牌构建已演变为企业与消费者深度共建的过程。本文聚焦于“开源 AI 智能名片 2 1 链动模式 S2B2C 商城小程序源码”&#xff0c;探讨其如何融入品牌建设&#xff0c;通过剖析品牌价值构成&#xff0c;阐述该技术工具在助力…

力扣797. 所有可能的路径

算法思想 深度优先搜索&#xff08;DFS&#xff09;&#xff1a; 使用递归从节点 0 开始&#xff0c;探索所有从当前节点到终点 n−1 的路径。每次访问一个节点时&#xff0c;将该节点加入当前路径 path。 回溯法&#xff1a; 在递归返回时&#xff0c;通过 path.pop_back()…

AMD(Xilinx) FPGA配置Flash大小选择

目录 1 FPGA配置Flash大小的决定因素2 为什么选择的Flash容量大小为最小保证能够完成整个FPGA的配置呢&#xff1f; 1 FPGA配置Flash大小的决定因素 在进行FPGA硬件设计时&#xff0c;选择合适的配置Flash是我们进行硬件设计必须考虑的&#xff0c;那么配置Flash大小的选择由什…

Git简单介绍

一、 Git介绍与安装 1.1 Git简介 Git是一个开源的分布式版本控制系统&#xff0c;可以有效、高速地处理从很小到非常大的项目版本管理。 1.2集中式(SVN&#xff09; VS 分布式(git) 集中式版本控制系统&#xff0c;版本库是集中存放在中央服务器的&#xff0c;工作时要先从中央…

FreeSWITCH 简单图形化界面34 - 网络环境安全的情况下,进行任意SIP注册

FreeSWITCH 简单图形化界面34 -网络环境安全的情况下&#xff0c;进行任意SIP注册 测试环境1、前言2、参数3、实践一下 测试环境 http://myfs.f3322.net:8020/ 用户名&#xff1a;admin&#xff0c;密码&#xff1a;admin FreeSWITCH界面安装参考&#xff1a;https://blog.cs…

力扣 二叉树的层序遍历-102

二叉树的层序遍历-102 class Solution { public:vector<vector<int>> levelOrder(TreeNode* root) {vector<vector<int>> res; // 二维数组用来存储每层节点if (root nullptr)return res;queue<TreeNode*> q; // 队列用来进行层序遍历q.push(r…

鸿蒙学习使用本地真机运行应用/元服务 (开发篇)

文章目录 1、前提条件2、使用USB连接方式3、使用无线调试连接方式4、运行 1、前提条件 在Phone和Tablet中运行HarmonyOS应用/元服务的操作方法一致&#xff0c;可以采用USB连接方式或者无线调试的连接方式。两种连接方式是互斥的&#xff0c;只能使用一种&#xff0c;无法同时…

数据库导论

data 数据是数据库中存储的基本数据&#xff0c;描述事物的符号称为数据。 DB 数据库是长期存储在计算机内&#xff0c;有组织&#xff0c;可共享的大量数据的集合。数据库中的数据按照一定的数据模型组织&#xff0c;描述和存储&#xff0c;具有较小的冗余度&#xff0c;较…

数据结构 ——— 归并排序算法的实现

目录 归并排序的思想 归并排序算法的实现 归并排序的思想 将已经有序的子序列合并&#xff0c;得到完全有序的序列&#xff0c;即先使每个子序列有序后&#xff0c;再使子序列段间有序 若将两个有序表合并成一个有序表&#xff0c;称为二路归并 归并排序步骤示意图&#x…