Flink加载维度数据

news2024/9/24 1:22:17

Flink加载维度数据

1、为何要加载维度数据?

在我们构建实时数仓时,不能光有事实数据,也需要加载维度数据来标明这些事实数据的具体含义。若只含有事实数据的话,就相当于只有数据本身在不断地变化,而并不知道这些数据具体表示什么意思。因此,我们应当加载维度数据进来。

2、加载维度数据的方式

此处,将提供两种常见的用于加载维度数据的方式。

方式一:缓存文件

district.txt文件:存放于resources资源目录下

1   nanjing
2   suzhou
3   changzhou
4   xuzhou

主体代码

package recovery;

import modules.env.Environments;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 缓存文件的注册与获取
 */
public class TestCache {
    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment see = new Environments()
                .build()
                .enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1)
                .enableRetries(3, 1)
                .enableStateBackend("hashmap", true, false)
                .finish(RuntimeExecutionMode.STREAMING, 1, 3);

        // 1.注册缓存文件
        String path = Thread.currentThread()
                .getContextClassLoader()
                .getResource("district.txt").getPath();// 获取静态文件district.txt的路径
        see.registerCachedFile(path,"district"); // 缓存至环境中

        // 2.注册任务侦听器
        see.registerJobListener(new JobListener() {
            @Override
            public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
                // 任务提交时
                // 任务正常:输出jobClient,任务异常:throwable
                if (Objects.nonNull(jobClient)) {
                    // 输出ID
                    System.out.println(jobClient.getJobID().toString());
                    // 输出状态
                    try {
                        System.err.println(jobClient.getJobStatus().get(10, TimeUnit.SECONDS).name());
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }else if (Objects.nonNull(throwable)) {
                    // 异常不为空
                    System.err.println(throwable.getMessage());
                }
            }

            @Override
            public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
                // 任务执行
                // 任务正常:输出jobExecutionResult,任务异常:throwable
                if (Objects.nonNull(jobExecutionResult)) {
                    System.out.println(jobExecutionResult);
                }else if (Objects.nonNull(throwable)){
                    System.err.println(throwable.getMessage());
                }
            }
        });

        // 3.数据:ID,温度,时间戳
        // 生成水位线
        TimestampAssignerSupplier<Tuple3> supplier = new TimestampAssignerSupplier<Tuple3>() {
            @Override
            public TimestampAssigner<Tuple3> createTimestampAssigner(Context context) {
                return (element,recordTimestamp) -> (Long) element._3();
            }
        };
        WatermarkStrategy<Tuple3> watermark = WatermarkStrategy
                .<Tuple3>forMonotonousTimestamps()
                .withTimestampAssigner(supplier);
        // 数据
        see.fromCollection(Arrays.asList(
                new Tuple3(1,34,System.currentTimeMillis()),
                new Tuple3(2,36,System.currentTimeMillis()+1000),
                new Tuple3(1,35,System.currentTimeMillis()+2000),
                new Tuple3(3,32,System.currentTimeMillis()+3000),
                new Tuple3(2,33,System.currentTimeMillis()+4000)
        ))
                // 4.将缓存文件中地址内容来替代数据中的ID号【通过ID关联】
                .setParallelism(1)
                .assignTimestampsAndWatermarks(watermark)
                .map(new RichMapFunction<Tuple3, Tuple3>() {
                    Map<Integer,String> idName = new HashMap<>(); // 全局Map

                    // 初始化资源
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 读取缓存文件
                        File district = getRuntimeContext().getDistributedCache().getFile("district");
                        try(BufferedReader br = new BufferedReader(new FileReader(district))){ // 会自动释放()内资源
                            String line;
                            while (Objects.nonNull(line = br.readLine())) {
                                String[] s = line.split("\\s+");
                                idName.put(Integer.valueOf(s[0]),s[1]);
                            }
                        }catch (Exception ex){
                            ex.printStackTrace();
                        }
                    }
                    @Override
                    public Tuple3 map(Tuple3 value) throws Exception {
                        return new Tuple3(idName.get(value._1()),value._2(),value._3());
                    }
                    // 释放资源
                    @Override
                    public void close() throws Exception {
                        idName.clear();
                    }
                }).print();

        see.execute("cache-test");
    }
}

结果展示

(nanjing,34,1727094791401)
(suzhou,36,1727094792401)
(nanjing,35,1727094793401)
(changzhou,32,1727094794401)
(suzhou,33,1727094795401)

方式二:广播变量

主要代码

package recovery;

import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;
import scala.Tuple3;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * 广播变量的发送与获取
 * 连接流 connect
 */
public class TestBroadcastConnect {
    public static void main(String[] args) throws Exception {
        // 1.创建环境
        StreamExecutionEnvironment see = new Environments()
                .build()
                .enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1)
                .enableRetries(3, 1)
                .enableStateBackend("hashmap", true, false)
                .finish(RuntimeExecutionMode.STREAMING, 1, 3);

        // 2.广播变量
        MapStateDescriptor desc1 = new MapStateDescriptor("idCity", Integer.class, String.class); // 描述特征
        BroadcastStream<Tuple2> broadcastStream = see.fromCollection(Arrays.asList(
                // 广播出去的内容
                new Tuple2(1, "nanjing"),
                new Tuple2(2, "suzhou"),
                new Tuple2(3, "wuxi")
        )).broadcast(desc1); // 广播流

        // 3.数据:ID,温度,时间戳
        see.fromCollection(Arrays.asList(
                        new Tuple3(1,34,System.currentTimeMillis()),
                        new Tuple3(2,36,System.currentTimeMillis()+1000),
                        new Tuple3(1,35,System.currentTimeMillis()+2000),
                        new Tuple3(3,32,System.currentTimeMillis()+3000),
                        new Tuple3(2,33,System.currentTimeMillis()+4000)
                ))
                .setParallelism(1)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        (SerializableTimestampAssigner<Tuple3>) (element,recordTimestamp) -> (Long) element._3()
                                )
                )
                // 4.连接流:与广播流数据进行连接(获取广播变量,变为广播连接流)
                .connect(broadcastStream)
                // 5.将广播变量中地址内容来替代数据中的ID号【通过ID关联】
                .process(new BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>() {
                    @Override
                    public void processElement(Tuple3 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.ReadOnlyContext ctx, Collector<Tuple3> out) throws Exception {
                        Object v = ctx.getBroadcastState(desc1).get(value._1()); // 取
                        out.collect(new Tuple3(v,value._2(),value._3()));
                    }
                    @Override
                    public void processBroadcastElement(Tuple2 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.Context ctx, Collector<Tuple3> out) throws Exception {
                        ctx.getBroadcastState(desc1).put(value._1,value._2); // 存
                    }
                })
                // 6.业务: 平均温度
                .keyBy(t3->t3._1().toString())
                .window(Timer.tumbling(5,0,TimeUnit.SECONDS ,WindowStagger.NATURAL))
                .process(new ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>() {
                    @Override
                    public void process(String city, ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>.Context context, Iterable<Tuple3> elements, Collector<Tuple2> out) throws Exception {
                        float avg = 0.0f;
                        int count = 0;
                        Iterator<Tuple3> it = elements.iterator();
                        while(it.hasNext()){
                            count++;
                            avg += (Integer) it.next()._2();
                        }
                        avg /= count;
                        // 将平均温度往后送
                        out.collect(new Tuple2(city,avg));
                    }
                })
            	// 相当于print()操作
                .addSink(new SinkFunction<Tuple2>() {
                    @Override
                    public void invoke(Tuple2 value, Context context) throws Exception {
                        System.out.println(value);
                    }
                });

        see.execute("broadcast-connect");
    }
}

结果展示

(nanjing,34.5)
(suzhou,36.0)
(wuxi,32.0)
(suzhou,33.0)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2158977.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenAI o1团队突破性论文:『过程推理』中数学推理能力大幅提升,从正确中学习的新方法

原创 超 近年来&#xff0c;大型语言模型(LLMs)在复杂的多步推理任务中取得了令人瞩目的进展。这些模型能够生成逐步的思维链&#xff0c;解决从小学数学到高等微积分的各种问题。然而&#xff0c;即使是最先进的模型也常常陷入逻辑陷阱&#xff0c;产生看似合理但实际错误的推…

PHP校园外卖跑腿小程序带后台(商业版)

有需要请加文章底部Q哦 可远程调试 PHP校园外卖跑腿小程序带后台(商业版) 一 介绍 此校园外卖跑腿小程序端基于原生开发&#xff0c;后端基于ThinkPHP5框架开发&#xff0c;数据库mysql&#xff0c;系统角色分为用户&#xff0c;商家(自配送)&#xff0c;跑腿员&#xff0c;管…

Python+requests+pytest+allure自动化测试框架

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 1、核心库 requests request请求 openpyxl excel文件操作 loggin 日志 smtplib 发送邮件 configparser unittest.mock mock服务 2、目录结构 base utils …

如何使用ssm实现社区流浪动物救助领养系统的设计与开发+vue

TOC ssm666社区流浪动物救助领养系统的设计与开发vue 第一章 课题背景及研究内容 1.1 课题背景 信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安…

kafka 消费者线程安全问题详细探讨

内容概要 主要内容 常见错误案例 下面这段代码大概逻辑 初始化时 实例化KafkaConsumer, 开启线程拉取消息并且处理 资源释放回调 停止线程、调用kafkaConsumer.close进行资源释放 表面上没有问题&#xff0c;但实际上可能出现线程安全问题&#xff0c;因为poll 和 close 两…

python按照财年分组案例

有如下数据&#xff1a;需要按照如下要求进行分组。 需求是对Site进行分组 条件当值是Act得时候&#xff0c;分组名字就是 条件当值是Rebase*得时候&#xff0c;分组名字就是FY?1/?1 条件当值是FIRM 得时候&#xff0c;分组名字就是 每年得7月到次年得6月为一个财年&#xff…

C++之初识STL(概念)

STL&#xff08;标准模板库&#xff09; STL广义分类为&#xff1a;容器&#xff0c;算法&#xff0c;迭代器 * **容器**和**算法**之间通过**迭代器**进行无缝连接 意义&#xff1a;C的**面向对象**和**泛型编程**思想&#xff0c;目的就是**复用性的提升** STL六大组件 1. 容…

MODELS 2024:闪现奥地利,现场直击报道

周末出逃&#xff01;小编闪现至奥地利林茨&#xff0c;亲临第27届MODELS 2024国际会议&#xff0c;以第一视角引领你深入会议现场&#xff0c;领略其独特风采。利用午饭时间&#xff0c;小编紧急码字&#xff0c;只为第一时间将热点资讯呈现给你~ 会议介绍&#xff1a; MODEL…

计算机毕业设计之:微信小程序的校园闲置物品交易平台(源码+文档+讲解)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

JavaEE: 深入探索TCP网络编程的奇妙世界(六)

文章目录 TCP核心机制TCP核心机制九: 面向字节流TCP核心机制十: 异常处理 小小的补充(URG 和 PSH)~TCP小结TCP/UDP 对比用UDP实现可靠传输(经典面试题) 结尾 TCP核心机制 上一篇文章JavaEE: 深入探索TCP网络编程的奇妙世界(五) 书接上文~ TCP核心机制九: 面向字节流 TCP是面…

开关频率与谐振频率对应的模态图

当fsfr时 当fr2<fs<fr1时 当fs>fr1时 开关频率对应输入电压的频率 谐振频率对应的是谐振电流的频率

JavaSE - 面向对象编程05

01 正则表达式 【1】概念&#xff1a;正则表达式是由一些特定字符组成的&#xff0c;代表的是一个规则。 【2】可以用来做什么&#xff1f; ① 用于校验数据格式的合法性 ② 用于在文本中爬取满足要求的内容 ③ 用于String类的replace方法&#xff0c;split方法的替换和分割 …

动态时间【JavaScript】

这个代码实现了一个动态显示当前日期和时间的功能。具体来说&#xff0c;它会每秒更新一次时间并在页面上显示出来。 实现效果&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><…

GUI编程之MATLAB入门详解(01)

⛄前言 图形用户界面的设计是MATLAB的核心应用之一。当用户与计算机之间或用户与计算机程序之间进行交互操作时&#xff0c;舒服高效的用户接口功能则会对用户产生极大的吸引力。图形用户界面&#xff08;GUI&#xff09;则通过窗口、图标、按钮、菜单、文本等图形对象构成用户…

美业SaaS收银系统如何收银?博弈美业实操/美业门店管理系统源码

1.打开博弈美业APP 2.工作台上方的【收银台】、【扫码核销】、【密码核销】均可完成收银 3.【收银台】可直接选择商品/服务/课程&#xff0c;再选择客户后提交订单收款 4.【扫码核销】【密码核销】可直接扫描二维码、输入核销码进行收银

大模型日报|7 篇必读的大模型论文

大家好&#xff0c;今日必读的大模型论文来啦&#xff01; 1.中科大团队提出人像视频编辑方法 PortraitGen 中国科学技术大学团队提出了 PortraitGen&#xff0c;这是一种功能强大的人像视频编辑方法&#xff0c;它能通过多模态提示实现一致且富有表现力的风格化。 传统的人…

SLAM面经1(百度)

百度面经 百度共三面,如果面试效果俱佳,会增加一个hr面。前二面主要是技术面,分为在线coding+代码知识+专业知识+工程能力。第三面是主管面,偏向于管理方面,和hr面相似。 一面 1)在线coding 在线coding的考试内容为下面力扣的变种。 2)专业面 (1)VINS-FUSION与ORB…

鲲鹏计算这五年:硬生态基本盘稳住,才能放手进击软生态

文 | 智能相对论 作者 | 叶远风 数智化深入发展、新质生产力成为主旋律的当下&#xff0c;本土计算产业的发展被寄予越来越多的关注和期待。自2019年开启以来&#xff0c;鲲鹏计算产业生态已经整整走过5个年头。 因此&#xff0c;今年华为全联接大会的鲲鹏之夜&#xff0c;在…

【网络安全】依赖混淆漏洞实现RCE

未经许可&#xff0c;不得转载。 文章目录 正文 依赖混淆是一种供应链攻击漏洞&#xff0c;发生在企业的内部依赖包错误地从公共库&#xff08;如npm&#xff09;下载&#xff0c;而不是从其私有注册表下载。攻击者可以在公共注册表中上传一个与公司内部包同名的恶意包&#xf…

java基础(2)方法的使用

目录 1.前言 2.正文 2.1方法的定义 2.2方法的调用过程 2.3方法的实参与形参 2.3.1形参 2.3.2实参 2.3.3参数传递 2.4方法的重载 3.小结 1.前言 哈喽大家好啊&#xff0c;今天博主继续带领大家学习java的基本语法&#xff0c;java的基础语法部分打算用六到七篇博文完…