Flink双流Join

news2025/1/16 2:40:20

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join

  • coGroup

  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

:::color3 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

代码演示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果
        env.setParallelism(1);

        //2. source-加载数据   key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("绿色:"+ Arrays.toString(arr));
                        return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("绿色的时间:"+timeStamp);
                                        System.out.println(element.f0);
                                        return timeStamp;
                                    }
                                })
                );
                ;
        // 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Tuple3<String,Integer,String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("橘色:"+ Arrays.toString(arr));

                        return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("橘色的时间:"+timeStamp);

                                        return timeStamp;
                                    }
                                })
                );

        //3. transformation-数据处理转换
        DataStream resultStream = greenStream.join(orangeStream)
                .where(tup3 -> tup3.f0)
                .equalTo(tup3 -> tup3.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {

                    @Override
                    public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {
                        System.out.println(t1.f2);
                        System.out.println(t2.f2);
                        return Tuple3.of(t1.f0, t1.f1, t2.f1);
                    }
                });

        //4. sink-数据输出
        resultStream.print();

        //5. execute-执行
        env.execute();
    }
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

:::

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
​
橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python-Open3D学习笔记】005Mesh相关方法

TriangleMesh相关方法 文章目录 TriangleMesh相关方法1. 查看mesh三角形面信息2. 可视化三角形3. 上采样4. 计算mesh形成的面积和体积 1. 查看mesh三角形面信息 def view_hull_triangles(hull: o3d.geometry.TriangleMesh):"""查看mesh三角形面信息&#xff08…

【LeetCode热题100】优先级队列

这盘博客记录了关于优先级队列的几道题&#xff0c;包括最后一块石头的重量、数据流中的第K大元素、前K个高频单词、数据流的中位数。 class Solution { public:int lastStoneWeight(vector<int>& stones) {priority_queue<int> heap;for(auto s : stones) hea…

node.js基础学习-cheerio模块-简单小爬虫(五)

学习cheerio模块&#xff0c;简单做一个爬取图片网站的图片&#xff0c;并且将这些图片下载到本地指定的文件夹下&#xff0c;很多图片网站都有一些反爬取的机制&#xff0c;找的好几个都会报302错误&#xff0c;所以我找了一个小的图片网站&#xff0c;这个没有反爬取机制&…

技术创新与人才培养并重 软通动力子公司鸿湖万联亮相OpenHarmony人才生态大会

11月27日&#xff0c;由开放原子开源基金会指导&#xff0c;OpenHarmony项目群工作委员会主办的OpenHarmony人才生态大会2024在武汉隆重举办。软通动力子公司鸿湖万联作为OpenHarmony项目群A类捐赠人应邀出席。大会期间&#xff0c;鸿湖万联不仅深度参与了OpenHarmony人才生态年…

FFmpeg 推流给 FreeSWITCH

FFmpeg 推流&#xff0c;貌似不难&#xff0c;网上有很多资料, 接到一个任务&#xff0c;推流给 FreeSWITCH&#xff0c;最开始以为很容易&#xff0c; 实则不然&#xff0c;FreeSWITCH uuid_debug_media <uuid>&#xff0c; 一直没人任何反应 仔细一查&#xff0c;Fr…

彻底理解quadtree四叉树、Octree八叉树 —— 点云的空间划分的标准做法

1.参考文章&#xff1a; &#xff08;1&#xff09;https://www.zhihu.com/question/25111128 这里面的第一个回答&#xff0c;有一幅图&#xff1a; 只要理解的四叉树的构建&#xff0c;对于八叉树的构建原理类比方法完全一样&#xff1a;对于二维平面内的随机分布的这些点&…

【工具变量】上市公司企业数字化转型指数(甄红线版本,战略引领、技术驱动、环境支撑、数字化成果及应用)2011-2022年

一、测算方式&#xff1a;参考《经济研究》甄红线&#xff08;2023&#xff09;老师研究的做法&#xff0c;本文采用 &#xff23;&#xff33;&#xff2d;&#xff21;&#xff32; 数据库中国上市公司数字化转型研究数据库中企业数字化转型指数来衡量企业数字化转型水平。 数…

长短期记忆网络 (LSTM) 简介

文章目录 一、说明二、传统 RNN 的问题三、为什么梯度消失&#xff1f;四、长短期记忆网络简介五、忘记门六、Update Gate (Input Gate)七、Output Gate八、数学上的内存九、从 LSTM 到 Transformer十、总结 一、说明 机器学习取得进步的领域之一是自然语言处理。对于用于机器…

安卓悬浮窗应用外无法穿透事件问题

现象&#xff1a; 应用内悬浮窗如何设置了 WindowManager.LayoutParams.FLAG_NOT_FOCUSABLE WindowManager.LayoutParams.FLAG_NOT_TOUCHABLE在自己应用内事件穿透正常&#xff0c;但到应用外就无法点击。 原因&#xff1a; 解决方法&#xff1a; layoutParams.alpha 0.8f …

linux minio安装

安装minio&#xff08;Centos&#xff09; 1. 查看服务器版本uname -a 2. 到minio官网下载对应的版本 官网地址&#xff1a;minio官网下载 根据上面查看的信息是x86_64系统所以我们下载linu-amd64 3. 上传到服务器 新建minioServer目录 上传至该目录下 赋权 chmod x mi…

JavaScript字符串

这张图片主要介绍了JavaScript中的字符串类型&#xff08;string&#xff09;。 字符串 1. 字符串的定义 在JavaScript中&#xff0c;通过单引号&#xff08;&#xff09;、双引号&#xff08;"&#xff09;或反引号&#xff08;&#xff09;包裹的数据都叫字符串。单引…

Linq中的投影运算 (C#):Select、SelectMany、Zip

投影是指将对象转换为一种新形式的操作&#xff0c;该形式通常只包含那些将随后使用的属性。 通过使用投影&#xff0c;您可以构造从每个对象生成的新类型。 可以投影属性&#xff0c;并对该属性执行数学函数。 还可以在不更改原始对象的情况下投影该对象。 1、Select 下面的…

RabbitMQ在手动消费的模式下设置失败重新投递策略

最近在写RabbitMQ的消费者&#xff0c;因为业务需求&#xff0c;希望失败后重试一定次数&#xff0c;超过之后就不处理了&#xff0c;或者放入死信队列。我这里就达到重试次数后就不处理了。本来以为很简单的&#xff0c;问了kimi&#xff0c;按它的方法配置之后&#xff0c;发…

Maven install java heap space

Maven install java heap space 打包报错 Maven install java heap space 解决&#xff1a; vm option: -Xms1024m -Xmx1024m如果 vm配置了&#xff0c;还是一样报错&#xff0c;就重新选择JRE看看是否正确&#xff0c;idea会默认自己的环境&#xff0c;导致设置vm无效&…

OpenCV_Code_LOG

孔洞填充 void fillHole(const Mat srcBw, Mat &dstBw) {Size m_Size srcBw.size();Mat TempMat::zeros(m_Size.height2,m_Size.width2,srcBw.type());//延展图像srcBw.copyTo(Temp(Range(1, m_Size.height 1), Range(1, m_Size.width 1)));cv::floodFill(Temp, Point(…

C7.5【x86汇编】底层分析范围for的执行过程

目录 1.反汇编代码 2.分析 1.栈区初始化 2.设置数组元素的值 3. 逐条分析范围for 1.arr的地址被放到[ebp-2Ch]处 2.[ebp-2Ch]指向的值被复制一份到[ebp-30h]处 3.eax暂存[ebp-2Ch]指向的值,加28h后存储到[ebp-34h]处 4.跳转指令 5.比较[ebp-30h]和[ebp-34h]指向的值,…

hue 4.11容器化部署,已结合Hive与Hadoop

配合《Hue 部署过程中的报错处理》食用更佳 官方配置说明页面&#xff1a; https://docs.gethue.com/administrator/configuration/connectors/ 官方配置hue.ini页面 https://github.com/cloudera/hue/blob/master/desktop/conf.dist/hue.ini docker部署 注意&#xff1a; …

51单片机快速入门之中断的应用 2024/11/23 串口中断

51单片机快速入门之中断的应用 基本函数: void T0(void) interrupt 1 using 1 { 这里放入中断后需要做的操作 } void T0(void)&#xff1a; 这是一个函数声明&#xff0c;表明函数 T0 不接受任何参数&#xff0c;并且不返回任何值。 interrupt 1&#xff1a; 这是关键字和参…

ffmpeg安装(windows)

ffmpeg安装-windows 前言ffmpeg安装路径安装说明 前言 ffmpeg的安装也是开箱即用的,并没有小码哥说的那么难 ffmpeg安装路径 这就下载好了! 安装说明 将上面的bin目录加入到环境变量,然后在cmd中测试一下: C:\Users\12114\Desktop\test\TaskmgrPlayer\x64\Debug>ffmpe…