Flink-intervalJoin源码和并行度问题

news2024/11/28 12:40:33

1.源码

底层用的是connect

把两个流的数据先保存到状态中

 先判断有没有迟到,迟到就放到侧输出流

再根据范围找数据

然后根据上界删除数据

 

 

package com.atguigu.gmall.realtime.test;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @author 
 * @date 2023/7/8
 * 该案例演示了Flink的intervalJoin
 */
public class Test03_IntervalJoin {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(2);

        //TODO 2.检查点相关的设置(略)
        //TODO 3.从指定的网络端口获取员工数据,指定Watermark以及提取事件时间字段
        SingleOutputStreamOperator<Emp> empDS = env
            .socketTextStream("hadoop102", 8888)
            .map(
                listStr -> {
                    String[] fieldArr = listStr.split(",");
                    return new Emp(Integer.valueOf(fieldArr[0]), fieldArr[1],
                        Integer.valueOf(fieldArr[2]), Long.valueOf(fieldArr[3]));
                }
            ).assignTimestampsAndWatermarks(
                // WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                WatermarkStrategy
                    .<Emp>forMonotonousTimestamps()
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<Emp>() {
                            @Override
                            public long extractTimestamp(Emp emp, long recordTimestamp) {
                                return emp.getTs();
                            }
                        }
                    )
            );
        empDS.print("emp:");
        //TODO 4.从指定的网络端口获取部门数据,指定Watermark以及提取事件时间字段
        SingleOutputStreamOperator<Dept> deptDS = env
            .socketTextStream("hadoop102", 8889)
            .map(
                lineStr -> {
                    String[] fieldArr = lineStr.split(",");
                    return new Dept(Integer.valueOf(fieldArr[0]), fieldArr[1], Long.valueOf(fieldArr[2]));
                }
            ).assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Dept>forMonotonousTimestamps()
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<Dept>() {
                            @Override
                            public long extractTimestamp(Dept dept, long recordTimestamp) {
                                return dept.getTs();
                            }
                        }
                    )
            );

        deptDS.print("dept:");

        //TODO 5.使用intervalJoin关联员工和部门
        empDS
            .keyBy(Emp::getDeptno)
            .intervalJoin(deptDS.keyBy(Dept::getDeptno))
            .between(Time.milliseconds(-5),Time.milliseconds(5))
            .process(
                new ProcessJoinFunction<Emp, Dept, Tuple2<Emp,Dept>>() {
                    @Override
                    public void processElement(Emp emp, Dept dept, Context ctx, Collector<Tuple2<Emp, Dept>> out) throws Exception {
                        out.collect(Tuple2.of(emp,dept));
                    }
                }
            ).print(">>>");
        env.execute();
    }
}

2.并行度问题

多个上游,一个下游,下游取最小的

一个上游,多个下游,广播

多个上游,多个下游,先广播,再取最小的

注意:水位线会减1ms

有可能出现上游出现了某个水位线,但是不触发,是因为别的并行度没有数据,水位线是long的最小值

尽量在source处设置水位线

只有kafka source是多个并行度,其他source都是一个并行度

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/741895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java基础教程】(十一)面向对象篇 · 第五讲:透彻讲解Java中的static关键字及代码块——静态属性、静态方法和普通代码块、构造块、静态块的使用~

Java基础教程之面向对象 第五讲 本节学习目标1️⃣ static 关键字1.1 static定义静态属性1.2 static定义静态方法1.3 主方法1.4 实际应用功能一&#xff1a;实现类实例化对象个数的统计功能二&#xff1a; 实现属性的自动设置 2️⃣ 代码块2.1 普通代码块2.2 构造块2.3 静态块…

gulimall-首页渲染-nginx域名搭建

首页渲染与nginx域名搭建 前言一、首页1.1 整合 thymeleaf1.2 整合 dev-tools1.3 渲染分类数据 二、Nginx 域名搭建2.1 搭建域名访问环境2.2 nginx 配置文件2.3 总结 前言 本文继续记录B站谷粒商城项目视频 P136-140 的内容&#xff0c;做到知识点的梳理和总结的作用。 一、首…

组合取球-2022年全国青少年信息素养大赛Python国赛第6题

[导读]&#xff1a;超平老师计划推出《全国青少年信息素养大赛Python编程真题解析》50讲&#xff0c;这是超平老师解读Python编程挑战赛真题系列的第8讲。 全国青少年信息素养大赛&#xff08;原全国青少年电子信息智能创新大赛&#xff09;是“世界机器人大会青少年机器人设计…

教你如何自定义 Github 首页

一、创建自定义首页仓库 若要自定义首页&#xff0c;首先需要创建一个与你 Github ID 同名的仓库 创建完成后就可以开始为你的首页添加一些有趣的内容了&#xff0c;代码格式可以是 markdown 语法&#xff0c;也可以是 HTML 语法&#xff0c;但 HTML 的扩展性更强一点&#xf…

大唐京兆韦氏,两个老来发奋的诗人

在唐朝&#xff0c;诗人文学政治家如过江之鲫&#xff0c;有两个人物是绝对不可忽略的&#xff0c;那就是韦应物和韦庄。其中韦应物是韦庄的曾祖父。 京兆韦氏是整个唐朝时期最重要的士族家族之一&#xff0c;代有人才出&#xff0c;衣冠鼎盛&#xff0c;为关中望姓之首。韦氏…

I/O多复用函数(select、poll、epoll)

目录 一、select函数二、poll函数 select函数 int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);poll函数 int poll(struct pollfd *fds, nfds_t nfds, int timeout);epoll API epoll_create epoll_wait epoll_ctl一、sel…

Spring Boot 系列3 -- 日志文件

目录 1. 日志有什么用? 2. 日志的使用 2.1 得到日志对象 2.2 使用日志对象打印日志 3. 日志级别 3.1 日志级别的用途 3.2 日志级别的分类和使用 3.3 日志的打印规则 3.4 日志级别的设置 4. 日志持久化 5. 更简单的日志输出--lombok 6. 拓展1 lombok的原理 7. 拓…

SSM整合-1

SSM整合 创建工程SSM整合 2.1 Spring: SpringConfig 2.2 SpringMvc: ServletConfig、SpringMvcConfig 3.3 Mybatis JdbcConfig、MybatisConfig、jdbc.properties 3.功能模块 表与实体类 dao(接口自动代理) service(接口实现类) 业务层接口测试&#xff08;整合Junit&#xff…

在vite创建的vue3项目中使用Cesium加载立体地形信息并调整初始化角度

在vite创建的vue3项目中使用Cesium加载立体地形信息并调整初始化角度 使用vite创建vue3项目 npm create vitelatestcd到创建的项目文件夹中 npm install安装Cesium npm i cesium vite-plugin-cesium vite -D配置 &#xff08;1&#xff09;在项目的vite.config.js文件中添加&am…

新手学习编程有什么注意事项?

为什么要学习如何编码&#xff1f; 世界正在成为一个地球村。编码是它发生的一个重要原因。 你应该学习如何编码的原因有很多&#xff0c;我将在这里触及其中的一些。 首先&#xff0c;学习编码可以大大提高你的分析和解决问题的能力。 您的收入潜力增加&#xff1a;有高级开…

NTIRE2023 图像复原和增强赛事Efficient Super-Resolution赛道冠军方案解读——DIPNet

DIPNet: Efficiency Distillation and Iterative Pruning for Image Super-Resolution 0. 简介 NTIRE 的全称为New Trends in Image Restoration and Enhancement Challenges&#xff0c;即“图像复原和增强挑战中的新趋势”&#xff0c;是CVPR(IEEE Conference on Computer V…

【JAVAEE】JVM中垃圾回收机制 GC

博主简介&#xff1a;想进大厂的打工人博主主页&#xff1a;xyk:所属专栏: JavaEE初阶 上篇文章我们讲了java运行时内存的各个区域。 传送门&#xff1a;【JavaEE】JVM的组成及类加载过程_xyk:的博客-CSDN博客 对于程序计数器、虚拟机栈、本地方法栈这三部分区域而言&#x…

6.S081——CPU调度部分(CPU的复用和调度)——xv6源码完全解析系列(10)

0.briefly speaking 终于到这里了&#xff0c;我们在之前阅读很多地方的内核代码时&#xff0c;总是习惯性地绕开CPU调度的部分(比如yield函数)。现在我们总算可以深入进去一探究竟了&#xff0c;这次总算是将整个操作系统中的一块重要拼图拼上去了。 有操作系统相关基础概念…

5 字符串拼接

5 字符串拼接作者: 赵晓鹏时间限制: 1S章节: 动态规划与贪心 输入说明 : 见问题描述。 输出说明 : 见问题描述。 输入范例 : aaaaaaaaaab aaaaaaaac aaaaaaaaaacaaaaaaaab 输出范例 : YES aa_________aaaaaaaab #include<iostream> #include<vector> using …

【数据分析 - 基础入门之pandas篇②】- pandas数据结构——Series

文章目录 前言一、Series的创建1.1 列表创建1.2 NumPy数组创建1.3 字典创建 二、Series索引2.1 显式索引2.2 隐式索引 三、Series切片2.1 显式切片2.2 隐式切片 四、Series基本属性和方法4.1 属性4.2 方法4.3 案例——使用 bool 值去除空值 五、Series运算六、Series多层行索引…

带你全面了解四大内存操作函数memset(),memcpy(),memmove(),memcmp()(附模拟实现)

内存操作函数 文章目录 内存操作函数memcpymemmovememcmpmemset 注&#xff1a;点击蓝色标题可以跳转到官方网站查看更权威的解析哦。 memcpy void * memcpy ( void * destination, const void * source, size_t num );函数memcpy从source的位置开始&#xff0c;向后复制num个…

Linux - CentOS 7 源码安装 MySQL 8.0.31

一、mysql-boost-8.0.31.tar.gz 源码下载 下载地址&#xff1a;https://dev.mysql.com 二、源码安装 MySQl 要求 cmake、Boost C库、ncurses库、OpenSSL库 //需要cmake3&#xff0c;gcc-5.3以上&#xff1b; 三、源码搭建 MySQL 环境 1、创建用户名和组 groupadd mysql …

ChatLaw团队招实习生啦!真格基金的创业、投资与AI详细指南;远程工作的8个安全法则;游戏开发者的数学教程 | ShowMeAI日报

&#x1f440;日报&周刊合集 | &#x1f3a1;生产力工具与行业应用大全 | &#x1f9e1; 点赞关注评论拜托啦&#xff01; &#x1f916; 北大 ChatLaw 团队招聘实习生&#xff0c;开放算法和前后端岗位 ChatLaw 是一个开源的中文法律大模型&#xff0c;由北京大学与北大-兔…

vim编辑器中实现左边目录,右边内容布局的方法(vim插件:显示树形目录插件NERDTree安装和使用)

NERDTree&#xff1a;是Vim编辑器的文件系统资源管理器。使用此插件&#xff0c;用户可以直观地浏览复杂的目录层次结构&#xff0c;快速打开文件进行读取或编辑&#xff0c;并执行基本的文件系统操作。 它允许轻松浏览文件&#xff0c;并在不离开vim的情况下执行一些基本操作…

Spring Cloud的基本应用

上篇文章我们的eureka的集群已经搭建完毕,但是我们还没有开始使用,之前我们的page访问的方法是直接写死的,现在我们就可以改为集群的方式来写 Autowired//注册中心对应的客户端对象private DiscoveryClient discoveryClient;RequestMapping("query/{id}")public Prod…