Flink之迟到的数据

news2024/12/23 6:48:23

迟到数据的处理

  1. 推迟水位线推进: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  2. 设置窗口延迟关闭:.allowedLateness(Time.seconds(3))
  3. 使用侧流接收迟到的数据: .sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888)
                .map(
                        line -> {
                            String[] fields = line.split(",");
                            return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                        }
                ).assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位线延迟2秒
                                .withTimestampAssigner(
                                        (event, ts) -> event.getTs()
                                )
                );

        ds.print("input");

        OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));
        //new OutputTag<WordCount>("late"){}

        SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(
                event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())
        ).keyBy(
                WordCountWithTs::getWord
        ).window(
                TumblingEventTimeWindows.of(Time.seconds(10))
        ).allowedLateness(Time.seconds(5))  // 窗口延迟5秒关闭
         .sideOutputLateData(lateOutputTag) // 捕获到侧输出流
        .aggregate(
                new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {
                    @Override
                    public UrlViewCount createAccumulator() {
                        return new UrlViewCount();
                    }

                    @Override
                    public UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {
                        accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());
                        return accumulator;
                    }

                    @Override
                    public UrlViewCount getResult(UrlViewCount accumulator) {
                        return accumulator;
                    }

                    @Override
                    public UrlViewCount merge(UrlViewCount a, UrlViewCount b) {
                        return null;
                    }
                }
                ,
                new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {
                    @Override
                    public void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {
                        UrlViewCount urlViewCount = elements.iterator().next();
                        //补充url
                        urlViewCount.setUrl(key);
                        //补充窗口信息
                        urlViewCount.setWindowStart(context.window().getStart());
                        urlViewCount.setWindowEnd(context.window().getEnd());

                        // 写出
                        out.collect(urlViewCount);
                    }
                }
        );
        urlViewCountDs.print("window") ;

        //TODO 将窗口的计算结果写出到Mysql的表中, 有则更新,无则插入
        /*
            窗口触发计算输出的结果,该部分数据写出到mysql表中执行插入操作,
            后续迟到的数据,如果窗口进行了延迟, 窗口还能正常对数据进行计算, 该部分数据写出到mysql执行更新操作。

            建表语句:
            CREATE TABLE `url_view_count` (
              `url` VARCHAR(100) NOT NULL  ,
              `cnt` BIGINT NOT NULL,
              `window_start` BIGINT NOT NULL,
              `window_end` BIGINT NOT NULL,
              PRIMARY KEY (url, window_start, window_end )  -- 联合主键
            ) ENGINE=INNODB DEFAULT CHARSET=utf8
         */

        SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink(
                "replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",
                new JdbcStatementBuilder<UrlViewCount>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {
                        preparedStatement.setString(1, urlViewCount.getUrl());
                        preparedStatement.setLong(2, urlViewCount.getCount());
                        preparedStatement.setLong(3, urlViewCount.getWindowStart());
                        preparedStatement.setLong(4, urlViewCount.getWindowEnd());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(2)
                        .withMaxRetries(3)
                        .withBatchIntervalMs(1000L)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://hadoop102:3306/test")
                        .withUsername("root")
                        .withPassword("000000")
                        .build()

        );

        urlViewCountDs.addSink(jdbcSink) ;


        //捕获侧输出流
        SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);
        lateData.print("late");
        //TODO 将侧输出流中的数据,写出到mysql中的表中,需要对mysql中已经存在的数据进行修正
        //转换结构  WordCountWithTs => UrlViewCount
        //调用flink计算窗口的方式, 基于当前数据的时间计算对应的窗口
        SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(
                wordCountWithTs -> {
                    Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*数据时间*/, 0L/*偏移*/, 10000L/*窗口大小*/);
                    Long windowEnd = windowStart + 10000L;
                    return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);
                }
        );
        // 写出到mysql中
        SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink(
                "insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt  ",
                new JdbcStatementBuilder<UrlViewCount>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {
                        preparedStatement.setString(1, urlViewCount.getUrl());
                        preparedStatement.setLong(2, urlViewCount.getCount());
                        preparedStatement.setLong(3, urlViewCount.getWindowStart());
                        preparedStatement.setLong(4, urlViewCount.getWindowEnd());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(2)
                        .withMaxRetries(3)
                        .withBatchIntervalMs(1000L)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://hadoop102:3306/test")
                        .withUsername("root")
                        .withPassword("000000")
                        .build()

        );

        mapDs.addSink(lateJdbcSink) ;

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

withIdleness关键字

解决某条流长时间没有数据,不能推进水位线,导致下游窗口的窗口无法正常计算。

public class Flink12_WithIdleness {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888)
                        .map(
                                line -> {
                                    String[] words = line.split(" ");
                                    return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));
                                }
                        ).assignTimestampsAndWatermarks(
                                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                        .withTimestampAssigner(
                                                (event, ts) -> event.getTs()
                                        )
                                        //如果超过10秒钟不发送数据,就不等待该数据源的水位线
                                        .withIdleness(Duration.ofSeconds(10))
                        );
        ds1.print("input1");

        SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999)
                .map(
                        line -> {
                            String[] words = line.split(" ");
                            return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));
                        }
                ).assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                .withTimestampAssigner(
                                        (event, ts) -> event.getTs()
                                )
                                //如果超过10秒钟不发送数据,就不等待该数据源的水位线
//                                .withIdleness(Duration.ofSeconds(10))
                );
        ds2.print("input2");

        ds1.union(ds2)
                .map(event->new WordCount(event.getUrl(),1))
                .keyBy(WordCount::getWord)
                .window(
                        TumblingEventTimeWindows.of(Time.seconds(10))
                ).sum("count")
                .print("window");

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

基于时间的合流

窗口联结Window Join

WindowJoin: 在同一个窗口内的相同key的数据才能join成功。

orderDs.join( detailDs )
	  .where( OrderEvent::getOrderId )  // 第一条流用于join的key
	  .equalTo( OrderDetailEvent::getOrderId) // 第二条流用于join的key
	  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
	  .apply(
	          new JoinFunction<OrderEvent, OrderDetailEvent, String>() {
	              @Override
	              public String join(OrderEvent first, OrderDetailEvent second) throws Exception {
	                  // 处理join成功的数据
	                  return  first + " -- " + second ;
	              }
	          }
	  ).print("windowJoin");

时间联结intervalJoin

在这里插入图片描述

IntervalJoin : 以一条流中数据的时间为基准, 设定上界和下界, 形成一个时间范围, 另外一条流中相同key的数据如果能落到对应的时间范围内, 即可join成功。

核心代码:

 orderDs.keyBy(
               OrderEvent::getOrderId
       ).intervalJoin(
               detailDs.keyBy( OrderDetailEvent::getOrderId)
       ).between(
               Time.seconds(-2) , Time.seconds(2)
       )
       //.upperBoundExclusive()  排除上边界值
       //.lowerBoundExclusive()  排除下边界值
       .process(
               new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {
                   @Override
                   public void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {
                       //处理join成功的数据
                       out.collect( left + " -- " + right );
                   }
               }
       ).print("IntervalJoin");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1303394.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【华为数据之道学习笔记】3-9元数据治理面临的挑战

华为在进行元数据治理以前&#xff0c;遇到的元数据问题主要表现为数据找不到、读不懂、不可信&#xff0c;数据分析师们往往会陷入数据沼泽中&#xff0c;例如以下常见的场景。 某子公司需要从发货数据里对设备保修和维保进行区分&#xff0c;用来不对过保设备进行服务场景分析…

uniapp - 简单版本自定义tab栏切换

tab切换是APP开发最常见的功能之一&#xff0c;uniapp中提供了多种形式的tab组件供我们使用。对于简单的页面而言&#xff0c;使用tabbar组件非常方便快捷&#xff0c;可以快速实现底部导航栏的效果。对于比较复杂的页面&#xff0c;我们可以使用tab组件自由定义样式和内容 目录…

Spring 的缓存机制【记录】

一、背景 在最近的业务需求开发过程中遇到了“传说中”的循环依赖问题&#xff0c;在之前学习Spring的时候经常会看到Spring是如何解决循环依赖问题的&#xff0c;所谓循环依赖即形成了一个环状的依赖关系&#xff0c;这个环中的某一个点产生不稳定变化都会导致整个链路产生不…

WPF仿网易云搭建笔记(5):信息流控制之IOC容器

文章目录 专栏和Gitee仓库前言IOC容器Prism IOC使用声明两个测试的服务类MainWindow IOC 注入[单例]MainWindow里面获取UserController无法使用官方解决方案 使用自定义IOC容器&#xff0c;完美解决既然Prism不好用&#xff0c;直接上微软的IOC解决方案App.xaml.csViewModel里面…

axios 基础的 一次封装 二次封装

一、平常axios的请求发送方式 修改起来麻烦的一批 代码一大串 二、axios的一次封装 我们会在src/utils创建一个request.js的文件来存放我们的基地址与拦截器 /* 封装axios用于发送请求 */ import axios from axios/* (1)request 相当于 Axios 的实例对象 (2)为什么要有reque…

python自动化测试实战 —— WebDriver API的使用

软件测试专栏 感兴趣可看&#xff1a;软件测试专栏 自动化测试学习部分源码 python自动化测试相关知识&#xff1a; 【如何学习Python自动化测试】—— 自动化测试环境搭建 【如何学习python自动化测试】—— 浏览器驱动的安装 以及 如何更…

Web安全-SQL注入【sqli靶场第11-14关】(三)

★★实战前置声明★★ 文章中涉及的程序(方法)可能带有攻击性&#xff0c;仅供安全研究与学习之用&#xff0c;读者将其信息做其他用途&#xff0c;由用户承担全部法律及连带责任&#xff0c;文章作者不承担任何法律及连带责任。 0、总体思路 先确认是否可以SQL注入&#xff0…

深拷贝、浅拷贝 react的“不可变值”

知识获取源–晨哥&#xff08;现实中的人 嘿嘿&#xff09; react中如果你想让一个值始终不变 或者说其他操作不影响该值 它只是作用初始化的时候 使用了浅拷贝–改变了初始值 会改变初始值(selectList1) 都指向同一个地址 const selectList1 { title: 大大, value: 1 };con…

ES-分析器

分析器 两种常用的英语分析器 1 测试工具 #可以通过这个来测试分析器 实际生产环境中我们肯定是配置在索引中来工作 GET _analyze {"text": "My Moms Son is an excellent teacher","analyzer": "english" }2 实际效果 比如我们有下…

前端框架(Front-end Framework)和库(Library)的区别

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含python、JS工程源码)+数据集+模型(三)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 模块实现1. 数据预处理1&#xff09;爬取功能2&#xff09;下载功能 2. 创建模型并编译1&#xff09;定义模型结构2&#xff09;优化…

appium安卓app自动化,遇到搜索框无搜索按钮元素时无法搜索的解决方案

如XX头条&#xff0c;搜索框后面有“搜索”按钮&#xff0c;这样实现搜索操作较为方便。 但有些app没有设置该搜索按钮&#xff0c;初学者就要花点时间去学习怎么实现该功能了&#xff0c;如下图。 这时候如果定位搜索框&#xff0c;再点击操作&#xff0c;再输入文本后&#x…

java工程(ajax/axios/postman)向请求头中添加消息

1、问题概述 在项目中我们经常会遇到需要向请求头中添加消息的场景&#xff0c;然后后端通过request.getRequest()或者RequestHeader获取请求头中的消息。 下面提供几种前端向请求头添加消息的方式 2、创建一个springmvc工程用于测试 2.1、创建工程并引入相关包信息 sprin…

Maven项目引入本地jar

Maven项目引入本地jar 1.对应maven模块项目中建lib目录&#xff0c;将jar放入进去 2.在对应的模块pom.xml中引入此依赖jar 3.在对应的maven-plugin插件打包的pom.xml中指定需要includeSystemScope为true的jar

做数据分析为何要学统计学(10)——如何进行时间序列分析

时间序列是由随时间变化的值构成&#xff0c;如产品销量、气温数据等等。通过对时间序列展开分析&#xff0c;能够回答如下问题&#xff1a; &#xff08;1&#xff09;被研究对象的活动特征是否有周期性&#xff08;也称季节性&#xff09;&#xff08;2&#xff09;被研究对…

strict-origin-when-cross-origin

严格限制同源策略 &#xff08;1&#xff09;允许服务器的同源IP地址访问 &#xff08;2&#xff09;允许Referer --- 后端服务器要配置

2023年阿里云云栖大会-核心PPT资料下载

一、峰会简介 历经14届的云栖大会&#xff0c;是云计算产业的建设者、推动者、见证者。2023云栖大会以“科技、国际、年轻”为基调&#xff0c;以“计算&#xff0c;为了无法计算的价值”为主题&#xff0c;发挥科技平台汇聚作用&#xff0c;与云计算全产业链上下游的先锋代表…

树莓派,opencv,Picamera2利用舵机云台追踪人脸

一、需要准备的硬件 Raspiberry 4b两个SG90 180度舵机&#xff08;注意舵机的角度&#xff0c;最好是180度且带限位的&#xff0c;切勿选360度舵机&#xff09;二自由度舵机云台&#xff08;如下图&#xff09;Raspiberry CSI 摄像头 组装后的效果&#xff1a; 二、项目目标…

排序-选择排序与堆排序

文章目录 一、选择排序二、堆排序三、时间复杂度四、稳定性 一、选择排序 思想&#xff1a; 将数组第一个元素作为min&#xff0c;然后进行遍历与其他元素对比&#xff0c;找到比min小的数就进行交换&#xff0c;直到最后一个元素就停止&#xff0c;然后再将第二个元素min&…

温湿度传感器DHT11的简单应用

文章目录 一、DHT11是什么&#xff1f;二、使用步骤1.硬件1.硬件连接2.工作原理1.串行单总线2.温湿度数据采集原理 2.软件1.DHT11初始化如下&#xff08;示例&#xff09;&#xff1a;2.DHT11复位如下&#xff08;示例&#xff09;&#xff1a;3.等待DHT11的回应如下&#xff0…