Flink实战案例四部曲

news2025/2/3 0:50:04

Flink实战案例四部曲

第一部曲:统计5分钟内用户修改创建删除文件的操作日志数量

输入
1001,delete
1002,update
1001,create
1002,delte
输出
1001,2
1002,2

代码如下。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TestRiZhiSum {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds=env.socketTextStream("192.168.110.156",9000);
        //这里输入string,输出1001,delete的两个字符串组成的元组
        lineds.flatMap((String line,Collector<Tuple2<String,String>> out)->{
            String[] words=line.split(",");
            out.collect(Tuple2.of(words[0],words[1]));
        }).returns(Types.TUPLE(Types.STRING,Types.STRING))
                .keyBy(ds->ds.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new MyProcessWindow())
                .print();
        env.execute();
    }
}

代码中有MyProcessWindow的窗口处理函数,窗口处理函数需要统计用户日志的数目。需要继承ProcessWindowFunction,实现代码如下所示。

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class MyProcessWindow extends ProcessWindowFunction<Tuple2<String,String>,
        Tuple2<String,Long>,String,TimeWindow>{
    @Override
    public void process(String s, Context context, Iterable<Tuple2<String, String>> iterable,
                        Collector<Tuple2<String, Long>> collector) throws Exception {
        long count=0;
        for(Tuple2<String,String> iter:iterable){
            count+=1;
        }
        collector.collect(Tuple2.of(s,count));
    }
}

注意代码中ProcessWindowFunction函数中产生4个参数,第一个参数是输入的Tuple2元组,其元组中两个值为String和String。第二个参数是输出的Tuple2元组,其元组两个值是String和Long,String代表用户,Long代表统计后的值。第三个参数是键的类型,最后一个参数是固定的TimeWindow。

运行结果后,在linux中输入相关的信息。如下图所示。

 

运行程序后的结果如下。

 

第二部曲:统计5分钟内用户订单的平均值,这里需要统计该时间内产生的用户订单和值及订单的个数,然后计算订单的均值。

输入:
1001,102.5
1002, 98.4
1001,56.4
1002,101.2
输出
1001,83.32
1002,87.32

代码如下。

import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class AmWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds = env.socketTextStream("192.168.110.156",
                9000);
        lineds.flatMap((String line, Collector<Tuple2<String, Double>> out) -> {
            String[] words = line.split(",");
            out.collect(Tuple2.of(words[0], Double.parseDouble(words[1])));
        }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
                .keyBy(ds->ds.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new MyAvgProcessFunction())
                .print();
        env.execute();
    }
}

代码中其中MyAvgProcessFunction()实现平均值的计算处理,代码如下。

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class MyAvgProcessFunction extends ProcessWindowFunction<Tuple2<String,Double>,
        Tuple2<String,Double>,String,TimeWindow> {
    @Override
    public void process(String s, Context context, Iterable<Tuple2<String, Double>>
            iterable, Collector<Tuple2<String, Double>> collector) throws Exception {
        double sum=0;
        double count=0;
        for(Tuple2<String,Double> iter:iterable){
            //tuple2元组的第二个数值为double的价格,可以进行累加
            sum+=iter.f1;
            count++;
        }
        collector.collect(Tuple2.of(s,sum/count));
    }
}

注意代码中ProcessWindowFunction函数中产生4个参数,第一个参数是输入的Tuple2元组,其元组中两个值为String和Double。Double是因为数值之和计算值是比较大的,第二个参数是输出的Tuple2元组,其元组两个值是String和Double,String代表用户,Double代表统计后的值。第三个参数是键的类型,最后一个参数是固定的TimeWindow。

第三部曲:统计计算五秒内每个信号灯通过的汽车数量。

通过对道路信号灯捕获的汽车数量进行统计,得出5秒内每个信号灯通过汽车数量,以便进行合理的道路交通规划和管制。由于网络延迟的问题,需要加入水印效果决定数据的先后顺序,并使用侧道输出获取后期延迟的数据。

流数据第一列为信号灯ID,第二列为汽车数量,第三列车为嵌入数据中的事件时间戳。数据格式如下。

信号灯ID,汽车数量,事件时间戳
1001,3,1000
1002,2,2000

需要建立一个java bean描述这个信号灯的特征。

CarData类就是标志这个信号灯的特征类。

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarData {
    private String id;
    private Integer count;
    private long eventTime;
}

这里需要lombok包。

然后交通灯也需要窗口,输入是一个String,输出则变成了CarData 的类型,同时得到的CarData通过assignTimestampsAndWatermarks方法添加时间戳效果的水印,随之设置水印延迟时间的目的是让水印延迟到达,从而可以解决乱序问题。通过水印延迟到达让在延迟时间范围内到达的迟到数据可以加入到窗口计算中,保证了数据的完整性。当水印到达后就会触发窗口计算,在水印之后到达的迟到数据则会被丢弃。

forBoundedOutOfOrderness(Duration.ofSeconds(5))

起到了这样的目的,后面设置水印的具体内容。

设置水印的内容如下。

DataStream<CarData> watermarkDs=cardataStream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<CarData>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner<CarData>(){
            @Override
            public long extractTimestamp(CarData carData, long l) {
                return carData.getEventTime();
            }
        })
);

还需要侧道输出。

OutputTag lateOutputTag=new OutputTag<CarData>("late-data"){};

再设置滑动窗口,处理滑动窗口的reduce处理函数。

具体代码如下 。

SingleOutputStreamOperator<CarData> result=watermarkDs.keyBy(CarData::getId)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .allowedLateness(Time.seconds(3))
        .sideOutputLateData(lateOutputTag)
        .reduce(new MyReduceFunction(),new MyProcessOldFunction());

这里设置有窗口允许延迟方法allowLateness(Time.Seconds(3))

这里有两个reduce的两个参数,第一个方法决定了交通灯的汇总计算方法,第二个方法决定交通灯参数的输出。

MyReduceFunction()的函数内容如下。

import org.apache.flink.api.common.functions.ReduceFunction;
public class MyReduceFunction implements ReduceFunction<CarData>{
    @Override
    public CarData reduce(CarData carData, CarData t1) throws Exception {
        return new CarData(carData.getId(),carData.getCount()+t1.getCount(),carData.getEventTime());
    }
}

MyProcessOldFunction()方法如下。

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class MyProcessOldFunction extends ProcessWindowFunction<CarData,String,String,TimeWindow>{
    @Override
    public void process(String s, Context context, Iterable<CarData> iterable, Collector<String> collector) throws Exception {
        CarData cardata=iterable.iterator().next();
        collector.collect("窗口1("+context.window().getStart()+"~"+context.window().getEnd()+")的计算结果:"+s+":"+cardata.getCount());
    }
}

整个交通灯主程序的代码如下。

public class JiaoTongWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds=env.socketTextStream("192.168.110.156",9000);
        DataStream<CarData> cardataStream=lineds.map(new MapFunction<String,CarData>(){
            public CarData map(String s) throws Exception {
                String[] words=s.split(",");
                return new CarData(words[0],Integer.parseInt(words[1]),Long.parseLong(words[2]));
            }
        });
        DataStream<CarData> watermarkDs=cardataStream.assignTimestampsAndWatermarks(
            WatermarkStrategy.<CarData>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<CarData>(){
                    @Override
                    public long extractTimestamp(CarData carData, long l) {
                        return carData.getEventTime();
                    }
                })
        );
        OutputTag lateOutputTag=new OutputTag<CarData>("late-data"){};
        SingleOutputStreamOperator<CarData> result=watermarkDs.keyBy(CarData::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(lateOutputTag)
                .reduce(new MyReduceFunction(),new MyProcessOldFunction());
        result.print();
        env.execute();

    }
}

第四部:五、使用Flink SQL计算5秒内用户订单总金额

这里包括每个用户的订单总数,订单总金额, 可能有数据延迟的问题,使用水印解决数据延迟的问题,用户订单包括字段:订单ID,用户ID,订单金额,订单时间。具体对应的java bean代码如下。

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderA {
    private String id;
    private Integer userId;
    private Integer money;
    private Long createTime;
}

主程序初始化时需要跟Table的StreamTableEnvironment环境变量相融合。代码如下 。

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env,settings);

案例中需要定义数据源,add_source方法产生定单的相关属性,代码如下 。

DataStreamSource<OrderA> wordStream=env.addSource(new RichSourceFunction<OrderA>(){
    private Boolean isRunning=true;
    @Override
    public void run(SourceContext<OrderA> sourceContext) throws Exception {
        Random random=new Random();
        while(isRunning){
            OrderA ordera=new OrderA(
                    UUID.randomUUID().toString(),
                    random.nextInt(5),
                    random.nextInt(200),
                    System.currentTimeMillis()
            );
            TimeUnit.SECONDS.sleep(1);
            sourceContext.collect(ordera);
        }
    }
    @Override
    public void cancel() {
        isRunning=false;
    }
});

产生数据时,有1秒钟的延迟后产生第二个订单。

对订单的产生时间制造水印,代码如下。

DataStream<OrderA> watermarkds=wordStream.assignTimestampsAndWatermarks(
       WatermarkStrategy.<OrderA>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner<OrderA>(){
            @Override
            public long extractTimestamp(OrderA ordera, long l) {
                return ordera.getCreateTime();
            }
        })
);

通过Stream建立Table的TemporaryView,通过createTemporaryView方法,代码如下。

tableEnv.createTemporaryView("t_order",watermarkds,
        $("id"),
        $("userId"),
        $("money"),
        $("createTime")
);

接下来使用sql语句进行查询统计,平均值和需要的总钱数。

代码如下。

String sql="select userId,count(*) as totalCount,sum(money) as sumMoney from t_order group by userId,tumble(createTime,interval '5' second)";
Table table=tableEnv.sqlQuery(sql);
tableEnv.toRetractStream(table,Row.class).print();
env.execute();

最终整体代码如下。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import java.time.Duration;
import org.apache.flink.table.api.Table;
import static org.apache.flink.table.api.Expressions.$;
import org.apache.flink.types.Row;
public class MyOrderWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env,settings);
        DataStreamSource<OrderA> wordStream=env.addSource(new RichSourceFunction<OrderA>(){
            private Boolean isRunning=true;
            @Override
            public void run(SourceContext<OrderA> sourceContext) throws Exception {
                Random random=new Random();
                while(isRunning){
                    OrderA ordera=new OrderA(
                            UUID.randomUUID().toString(),
                            random.nextInt(5),
                            random.nextInt(200),
                            System.currentTimeMillis()
                    );
                    TimeUnit.SECONDS.sleep(1);
                    sourceContext.collect(ordera);
                }
            }
            @Override
            public void cancel() {
                isRunning=false;
            }
        });
        DataStream<OrderA> watermarkds=wordStream.assignTimestampsAndWatermarks(
               WatermarkStrategy.<OrderA>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderA>(){
                    @Override
                    public long extractTimestamp(OrderA ordera, long l) {
                        return ordera.getCreateTime();
                    }
                })
        );
        tableEnv.createTemporaryView("t_order",watermarkds,
                $("id"),
                $("userId"),
                $("money"),
                $("createTime")
        );
        String sql="select userId,count(*) as totalCount,sum(money) " +
                "as sumMoney from t_order group by userId,tumble(createTime,interval '5' second)";
        Table table=tableEnv.sqlQuery(sql);
        tableEnv.toRetractStream(table,Row.class).print();
        env.execute();
    }
}

至此,flink的四步应用程序结束,希望对大家有帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/103889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Anlios装grouplist 组件之后报错,安装tiger-vncserver

因为之前升级了一个epel-release源&#xff0c;然后containerd也装进去了&#xff0c;但是版本太低 然后以为是runc挡住了&#xff0c;发现没有runc 删完了containerd就可以装了 rpm -ivh http://mirrors.wlnmp.com/centos/wlnmp-release-centos.noarch.rpm dnf install wntp…

【性能篇】30 # 怎么给WebGL绘制加速?

说明 【跟月影学可视化】学习笔记。 常规绘图方式的性能瓶颈 例子&#xff1a;在一个画布上渲染 3000 个不同颜色的、位置随机的三角形&#xff0c;并且让每个三角形的旋转角度也随机。 <!DOCTYPE html> <html lang"en"><head><meta charse…

SpringBoot+Vue实现前后端分离的高校思政课实践教学管理系统

文末获取源码 开发语言&#xff1a;Java 使用框架&#xff1a;spring boot 前端技术&#xff1a;JavaScript、Vue.js 、css3 开发工具&#xff1a;IDEA/MyEclipse/Eclipse、Visual Studio Code 数据库&#xff1a;MySQL 5.7/8.0 数据库管理工具&#xff1a;phpstudy/Navicat JD…

基于SSM的大学生心理健康系统设计与实现

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 网站前台&#xff1a;关于我们、联系信息、文章信息、咨询师信息、服务信息、测试信喜 管理员功能&#xff1a; 1、管理…

继续谈谈从Rxjava迁移到Flow的背压策略

前言 对于背压问题不久前就讨论过了&#xff0c;这里就不过多介绍了&#xff0c;总之它是一个非常复杂的话题&#xff0c;本文的主要目的是分析我们如何从Rxjava迁移到Flow并且使用其背压方案&#xff0c;由于本身技术的限制以及协程内部的复杂性&#xff0c;不会做过多的深入…

下载安装PyTorch

1、下载并安装Visual Studio Code选择合适版本安装 2、下载安装conda并配置环境 下载方式一&#xff1a;官网下载 下载方式二&#xff1a;清华镜像安装 3、conda配置环境 打开电脑高级系统配置点开系统环境变量&#xff1a; 找到path然后点击Edit或者直接双击&#xff1a; 之后…

利用LSTM识别篇章关系实战代码+数据

1.显式篇章关系分类概述 案例知识点: 任务描述:篇章关系分析是自然语言中处理篇章级基础语言分析任务,其目的是利用规则或机器学习等计算机处理手段判别篇章各组成成分之间的修辞逻辑关系,从而从整体上理解篇章。其中论元之间有连接词连接的此类关系称为显式篇章关系。本教…

实验七、MOS管分压式偏置共源放大电路的静态和动态参数

一、题目 搭建MOS管分压式偏置共源放大电路。利用Multisim研究下列问题&#xff1a; &#xff08;1&#xff09;确定一组电路参数&#xff0c;使电路的 QQQ 点合适。 &#xff08;2&#xff09;若输出电压波形底部失真&#xff0c;则可采取哪些措施&#xff1f;若输出电压波形…

Mysql概念知识

Mysql数据库基础知识为什么要使用数据库数据保存在内存数据保存在文件数据保存在数据库什么是SQL&#xff1f;什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有几种录入格式&#xff1f;分别有什么区别&#xff1f;数据类型mysql有哪些数据类型引…

计算机毕设Python+Vue学生资源管理系统(程序+LW+部署)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

websocket使用方法

前言 最近项目用到了websocket接口&#xff0c;用来做长连接&#xff0c;监听服务器数据变化&#xff0c;保持各终端数据同步。 用下来发现确实很好用&#xff0c;避免了轮询&#xff0c;开销小&#xff0c;而且最重要的是没有同源策略限制。 websocket WebSocket 是一种在…

Nginx-安装和部署全过程

前言 OpenResty是一个基于Nginx与 Lua 的高性能 Web 平台&#xff0c;其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。openresty官网&#xff1a;OpenResty - 中文官方站 …

软件著作权到底享有哪些权利?

根据《计算机软件保护条例》相关规定 软件著作权人享有下列各项权利: &#xff08;一&#xff09;发表权&#xff0c;即决定软件是否公之于众的权利&#xff1b; &#xff08;二&#xff09;署名权&#xff0c;即表明开发者身份&#xff0c;在软件上署名的权利&#xff1b; &a…

iOS的启动优化

应用的启动优化 当我们参与到大型应用的时候 会遇到一些启动时间过长的情况 这时候就需要使用到相关的操作。 总结来说&#xff0c;main()方法调用前&#xff0c;启动过程大体分为如下步骤&#xff1a; 先是LLVM把项目翻译成IR文件然后到backend&#xff0c;PRE_MAIN,main。 …

感冒咳嗽土法子

目录介绍 01.常见的感冒药02.止咳的土方法03.感冒的土方法 01.常见的感冒药 感冒是生活中最常见的疾病 患者往往会有&#xff1a;头昏、发烧、浑身酸痛、鼻塞、流鼻涕等症状 注意点 注意休息&#xff0c;适当补充水分&#xff0c;保持室内空气流通。 常见感冒药 主要成分的作…

CSS规范

CSS规范 命名规范 页面外围控制整体布局宽度&#xff1a;wrapper、页头&#xff1a;header、页面主体&#xff1a;main、内容&#xff1a;content、页脚&#xff1a;footer、导航&#xff1a;nav、主导航&#xff1a;mainbav、子导航&#xff1a;subnav、顶导航&#xff1a;t…

利用FormData上传本地文件

前言 最近接了个小项目&#xff0c;有个用客户端本地文件的需求。 正常这种需求都是前台传文件&#xff0c;后台去解析。 但这次C的老哥非让我给文件路径&#xff0c;说公司平台有解析文件的能力。 我说web不是桌面端&#xff0c;拿不到真实路径&#xff0c;他还不信&#…

已解决1. Downgrade the protobuf package to 3.20.x or lower.

已解决TypeError: Descriptors cannot not be created directly. If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc > 3.1.0If you cannot immediately regenerate your protos, some other possible worka…

【Ctfer训练计划】——(二)

作者名&#xff1a;Demo不是emo 主页面链接&#xff1a;主页传送门创作初心&#xff1a;舞台再大&#xff0c;你不上台&#xff0c;永远是观众&#xff0c;没人会关心你努不努力&#xff0c;摔的痛不痛&#xff0c;他们只会看你最后站在什么位置&#xff0c;然后羡慕或鄙夷座右…

kubernetes管理应用配置之ConfigMap和Secret

目录 一、ConfigMap 二、Secret 一、ConfigMap 应用部署的一个最佳实践是将应用所需的配置信息与程序进行分离&#xff0c;这样可以使得应用程序被更好地复用&#xff0c;通过不同的配置也能实现更灵活的功能。 将应用打包为容器镜像后&#xff0c;可以通过环境变量或者外挂文…