Flink实时电商数仓之旁路缓存

news2025/4/6 20:35:18

撤回流的处理

撤回流是指流式处理过程中,两表join过程中的数据是一条一条跑过来的,即原本可以join到一起的数据在刚开始可能并没有join上。

  • 撤回流的格式:
    在这里插入图片描述
  • 解决方案
    • 定时器:使用定时器定时10s(数据最大的时间差值),定时器触发时将状态中的数据发送过来
    • 如果重复计算这些数据,如何保持结果正确即可;通过每次度量值修改为当次度量值 - 上次度量值即可

异步IO

在这里插入图片描述

  • 减少等待的时间,充分利用已有的资源
  • 使用异步IO时,必须保证从头到尾都是异步的操作;即使用异步的连接器
/**
     * 获取到 redis 的异步连接
     *
     * @return 异步链接对象
     */
    public static StatefulRedisConnection<String, String> getRedisAsyncConnection() {
        RedisClient redisClient = RedisClient.create("redis://hadoop102:6379/2");
        return redisClient.connect();
    }



    /**
     * 关闭 redis 的异步连接
     *
     * @param redisAsyncConn
     */
    public static void closeRedisAsyncConnection(StatefulRedisConnection<String, String> redisAsyncConn) {
        if (redisAsyncConn != null) {
            redisAsyncConn.close();
        }
    }

 /**
     * 获取到 Hbase 的异步连接
     *
     * @return 得到异步连接对象
     */
    public static AsyncConnection getHBaseAsyncConnection() {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "hadoop102");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            return ConnectionFactory.createAsyncConnection(conf).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭 hbase 异步连接
     *
     * @param asyncConn 异步连接
     */
    public static void closeAsyncHbaseConnection(AsyncConnection asyncConn) {
        if (asyncConn != null) {
            try {
                asyncConn.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

异步IO关联

  1. AsyncDataStream.unorderedWait(异步核心逻辑, 60, TimeUnit.SECONDS) 异步关联维度表
  2. CompletableFuture.supplyAsync(new Supplier<>(){ 异步访问读取Redis中的数据 }),返回的数据类型是Future类型
    • 先拼写访问的redisKey
    • 获取到dimSkuInfoFuture期货
    • 使用dimSkuInfoFuture.get()获取异步结果
  3. thenApplyAsync(new Function<>()), 旁路缓存判断,判断是否在redis中读取到相关数据,如果没有读取到,需要访问HBase.
  4. 需要重写HBase的getCells方法,改为getAsyncCells方法
    • 连接更换为异步连接
    • Future类型数据需要再get()方法获取具体的值
    • 无需关闭连接
  5. 将从HBase读取的数据保存到redis, redisAsyncConnection.async().setex(redisKey,24*60*60,dimJsonObj.toJSONString());

异步维度关联封装

  1. 继承RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>接口
  2. 将表名和rowkey拼接的方法抽象化,让方法调用者自己传进来
  3. 封装join方法, join(TradeSkuOrderBean input, JSONOjbect dim); join方法里面填写度量值的聚合逻辑
  4. 将抽象方法和具体方法分离,把抽象方法放到接口中,在实现该接口
  5. TradeSkuOrderBean类改为泛型方法T
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T>
implements DimJoinFunction<T>{

    StatefulRedisConnection<String, String> redisAsyncConnection;
    AsyncConnection hBaseAsyncConnection;
    String tableName;


    @Override
    public void open(Configuration parameters) throws Exception {
        redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
        hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();
    }

    @Override
    public void close() throws Exception {
        RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
        HBaseUtil.closeAsyncHbaseConnection(hBaseAsyncConnection);
    }

    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
        //java的异步编程方式
        String tableName = getTableName();
        String rowKey = getId(input);
        String redisKey = RedisUtil.getRedisKey(tableName, rowKey);
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {

                //第一步异步访问得到的数据

                RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);
                String dimInfo = null;
                try {
                    dimInfo = dimSkuInfoFuture.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return dimInfo;
            }
        }).thenApplyAsync(new Function<String, JSONObject>() {
            @Override
            public JSONObject apply(String dimInfo) {
                JSONObject dimJsonObj = null;
                //旁路缓存判断
                if (dimInfo == null || dimInfo.isEmpty()) {
                    try {
                        //需要访问HBase
                        dimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);
                        //将读取的数据保存到redis
                        redisAsyncConnection.async().setex(redisKey, 24 * 60 * 60, dimJsonObj.toJSONString());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    //redis中存在缓存数据
                    dimJsonObj = JSONObject.parseObject(dimInfo);
                }
                return dimJsonObj;
            }
        }).thenAccept(new Consumer<JSONObject>() {
            public void accept(JSONObject dim) {
                //合并维度信息
                if (dim == null) {
                    //无法关联到维度信息
                    System.out.println("无法关联到当前的维度信息:" + tableName + ":" + rowKey);
                } else {
                    join(input,dim);
                }

                //返回结果
                resultFuture.complete(Collections.singletonList(input));
            }
        });
    }



}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1356374.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

02-微服务-Eureka注册中心

Eureka注册中心 假如我们的服务提供者user-service部署了多个实例&#xff0c;如图&#xff1a; 大家思考几个问题&#xff1a; order-service在发起远程调用的时候&#xff0c;该如何得知user-service实例的ip地址和端口&#xff1f;有多个user-service实例地址&#xff0c;…

Idea如何配置git

打开Ideal&#xff0c;点击Settings&#xff0c;找到Version Control这一栏&#xff0c;然后点开&#xff0c;找到Git 如果我们电脑是已经有git&#xff0c;那我们就点击那个有点像文件夹的标致&#xff0c;然后找到我们安装在电脑上面的自己安装的git的exe结尾的文件&#xff…

【论文阅读笔记】Stable View Synthesis 和 Enhanced Stable View Synthesis

目录 Stable View Synthesis摘要引言 Enhanced Stable View Synthesis 从Mip-NeRF360的对比实验中找到的两篇文献&#xff0c;使用了卷积神经网络进行渲染和新视角合成&#xff0c;特此记录一下 ToDo Stable View Synthesis paper&#xff1a;https://readpaper.com/pdf-ann…

JS中 focus 和 blur 焦点事件

发现的一个小知识点 focus 获取焦点事件 代码如下&#xff1a; <body><input type"text" placeholder"input输入框"><script>let input document.querySelector(input)input.addEventListener(focus, function (e) {e.target.style.…

(读书笔记)网络是如何连接的

1.1 生成 HTTP 请求消息 浏览器是一个具备多种客户端功能的综合性客户端软件,因此它需要 一些东西来判断应该使用其中哪种功能来访问相应的数据,而各种不同的 URL(Uniform Resource Locator,统一资源定位符。) 就是用来干这个的,比如访问 Web 服务器时用“http:”,而访…

三段式电流保护与自动重合闸MATLAB仿真模型

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 前加速、后加速的区别&#xff1a; 前加速是保护装置不判别是永久性故障还是瞬时故障&#xff0c;直接跳闸&#xff0c;然后经重合闸装置来纠正&#xff1b;后加速是保护装置是先判别故障类型有选择性跳闸 …

【Linux Shell】4. 数组

文章目录 【 1. 数组的定义 】【 2. 读取数组 】【 3. 关联数组 】【 4. 获取数组中的所有元素 】【 5. 获取数组的长度 】 数组中可以存放多个值。 Bash Shell 只支持一维数组&#xff08;不支持多维数组&#xff09;&#xff0c;初始化时不需要定义数组大小。与大部分编程语言…

【动态规划】LeetCode-10. 正则表达式匹配

10. 正则表达式匹配。 给你一个字符串 s 和一个字符规律 p&#xff0c;请你来实现一个支持 ‘.’ 和 ‘*’ 的正则表达式匹配。 ‘.’ 匹配任意单个字符‘*’ 匹配零个或多个前面的那一个元素 所谓匹配&#xff0c;是要涵盖 整个 字符串 s的&#xff0c;而不是部分字符串。 …

Microsoft Word去除页面多余的换行符

大家写论文的时候或者排版的时候可能遇到换行符多出来了导致页面的不美观。像下面这张图一样&#xff0c;虽然latex不会出现这种问题。 处理方式 点击插入然后点击分页 结果展示

设计模式 七大原则

1.单一职责原则 单一职责原则&#xff08;SRP&#xff1a;Single responsibility principle&#xff09;又称单一功能原则 核心&#xff1a;解耦和增强内聚性&#xff08;高内聚&#xff0c;低耦合&#xff09;。 描述&#xff1a; 类被修改的几率很大&#xff0c;因此应该专注…

(Java企业 / 公司项目)Nacos的怎么搭建多环境配置?(含相关面试题)(二)

上一篇讲了一个单体服务中配置&#xff0c;传统的Nacos配置但是在微服务架构当中肯定都是多环境下配置&#xff0c;比如生产环境&#xff0c;dev测试环境等等。 第一种方式模拟开始&#xff1a; 首先展示在生产环境中nacos如何配置&#xff0c;在模块下新建一个配置文件&…

javascript 常见工具函数(一)

1.将JSON数据根据相同值&#xff0c;进行归类划分&#xff1a; var arr [{ time: "1", img: "22222" }, { time: "2", img: "555" }, { time: "1", img: "888888" }, { time: "2", img: "4444&q…

D50|单调栈

739.每日温度 初始思路&#xff1a; 暴力解法但是会超时。 class Solution {public int[] dailyTemperatures(int[] temperatures) {int[] answer new int[temperatures.length];for(int i 0;i<temperatures.length;i){for(int j i;j<temperatures.length;j){if(te…

NeRF-RPN: A general framework for object detection in NeRFs 全文翻译

摘要 Abstract 本文提出了第一个重要的物体检测框架 NeRF-RPN&#xff0c;它直接在 NeRF 上运行。给定一个预先训练好的 NeRF 模型&#xff0c;NeRF-RPN 的目标是检测场景中所有物体的边界框。通过利用包含多尺度三维神经体积特征的新颖体素表示法&#xff0c;我们证明…

双击shutdown.bat关闭Tomcat报错:未设置关闭端口~

你们好&#xff0c;我是金金金。 场景 当我startup.bat启动tomcat之后&#xff0c;然后双击shutdown.bat关闭&#xff0c;结果报错了~ 排查 看报错信息很明显了&#xff0c;未配置关闭端口&#xff0c;突然想起来了我在安装的时候都选的是默认的配置&#xff0c;我还记得有这…

深度学习课程实验二深层神经网络搭建及优化

一、 实验目的 1、学会训练和搭建深层神经网络&#xff1b; 2、掌握超参数调试正则化及优化。 二、 实验步骤 初始化 1、导入所需要的库 2、搭建神经网络模型 3、零初始化 4、随机初始化 5、He初始化 6、总结三种不同类型的初始化 正则化 1、导入所需要的库 2、使用非正则化…

k8s中的容器探针

pod的容器健康检查---探针 probe&#xff1a;k8s对容器执行的定期检查&#xff0c;诊断。 探针的三种规则 所有的探针都是针对容器不是针对pod 1、 存活探针---livenessProbe&#xff1a;探测容器是否正常运行。如果发现探测失败&#xff0c;会杀掉容器。容器会根据重启策略…

Python从入门到网络爬虫(面向对象详解)

前言 Python从设计之初就已经是一门面向对象的语言&#xff0c;正因为如此&#xff0c;在Python中创建一个类和对象是很容易的。本章节我们将详细介绍Python的面向对象编程。如果你以前没有接触过面向对象的编程语言&#xff0c;那你可能需要先了解一些面向对象语言的一些基本…

【Java集合类篇】HashMap的数据结构是怎样的?

HashMap的数据结构是怎样的? ✔️HashMap的数据结构✔️ 数组✔️ 链表 ✔️HashMap的数据结构 在Java中&#xff0c;保存数据有两种比较简单的数据结构: 数组和链表&#xff08;或红黑树&#xff09;。 HashMap是 Java 中常用的数据结构&#xff0c;它实现了 Map 接口。Has…

Jupyter Notbook+cpolar内网穿透实现公共互联网访问使用数据分析工作

​​ 文章目录 1.前言2.Jupyter Notebook的安装2.1 Jupyter Notebook下载安装2.2 Jupyter Notebook的配置2.3 Cpolar下载安装 3.Cpolar端口设置3.1 Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 ​​​​ 1.前言 在数据分析工作中&#xff0c;使用最多的无疑就是各…