数仓DWS层之旁路缓存优化

news2024/9/22 17:35:14

 优化原因:

外部数据源的查询常常是流式计算的性能瓶颈。以本程序为例,每次查询都要连接 Hbase,数据传输需要做序列化、反序列化,还有网络传输,严重影响时效性。可以通过旁路缓存对查询进行优化。

旁路缓存模式是一种非常常见的按需分配缓存模式。所有请求优先访问缓存,若缓存命中,直接获得数据返回给请求者。如果未命中则查询数据库,获取结果后,将其返回并写入缓存以备后续请求使用。

(1)旁路缓存策略应注意两点

a)缓存要设过期时间,不然冷数据会常驻缓存,浪费资源。

b)要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存

(2)缓存的选型

一般两种:堆缓存或者独立缓存服务(memcache,redis)

堆缓存,性能更好,效率更高,因为数据访问路径更短。但是难于管理,其它进程无法维护缓存中的数据。

独立缓存服务(redis,memcache),会有创建连接、网络IO等消耗,较堆缓存略差,但性能尚可。独立缓存服务便于维护和扩展,对于数据会发生变化且数据量很大的场景更加适用,此处选择独立缓存服务,将 redis 作为缓存介质

(3)实现步骤

从缓存中获取数据。

① 如果查询结果不为 null,则返回结果。

② 如果缓存中获取的结果为 null,则从 Phoenix 表中查询数据。

a)如果结果非空则将数据写入缓存后返回结果。

b)否则提示用户:没有对应的维度数据

注意:缓存中的数据要设置超时时间,本程序设置为 1 天。此外,如果原表数据发生变化,要删除对应缓存。为了实现此功能,需要对维度分流程序做如下修改:

i)在 MyBroadcastFunction的 processElement 方法内将操作类型字段添加到 JSON 对象中。

ii)在 DimUtil 工具类中添加 deleteCached 方法,用于删除变更数据的缓存信息。

iii)在 MyPhoenixSink 的 invoke 方法中补充对于操作类型的判断,如果操作类型为 update 则清除缓存。

图解:

 

 代码方面:

思路:当我们需要使用外部数据源的表数据时,在第一次使用的时候,从Phoenix获取维表数据,并且将这些维表数据写入Redis缓存中,在后面我们需要再次使用维表数据的时候,我们先可以从Redis中获取,如果Redis中没有,在从Phoenix中获取维表数据并且写入Redis缓存中,主要这里要设置缓存过期时间,要不然会造成冷数据,而浪费资源。当我们修改维表中的数据时,要先删除Redis缓存中的数据,然后再对Phoenix进行更新。

(1)创建连接池(与Phoenix建立连接,即与HBASE建立连接)

package com.atguigu.utils;

import com.alibaba.druid.pool.DruidDataSource;
import com.atguigu.common.GmallConfig;

public class DruidDSUtil {
    private static DruidDataSource druidDataSource=null;

    public static DruidDataSource createDataSource() {
        // 创建连接池
        druidDataSource = new DruidDataSource();
        // 设置驱动全类名
        druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
        // 设置连接 url
        druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
        // 设置初始化连接池时池中连接的数量
        druidDataSource.setInitialSize(5);
        // 设置同时活跃的最大连接数
        druidDataSource.setMaxActive(20);
        // 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
        druidDataSource.setMinIdle(1);
        // 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
        druidDataSource.setMaxWait(-1);
        // 验证连接是否可用使用的 SQL 语句
        druidDataSource.setValidationQuery("select 1");
        // 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
        // 注意,默认值为 true,如果没有设置 validationQuery,则报错
        // testWhileIdle is true, validationQuery not set
        druidDataSource.setTestWhileIdle(true);
        // 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
        druidDataSource.setTestOnBorrow(false);
        // 归还连接时,是否测试
        druidDataSource.setTestOnReturn(false);
        // 设置空闲连接回收器每隔 30s 运行一次
        druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
        // 设置池中连接空闲 30min 被回收,默认值即为 30 min
        druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);

        return druidDataSource;
    }
}

 (二)先判断Redis缓存是否有数据,如果没有,则从Phoenix获取维表数据并且将在Phoenix中查到的数据放入Redis缓存中

package com.atguigu.utils;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.GmallConfig;
import redis.clients.jedis.Jedis;

import java.lang.reflect.InvocationTargetException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

public class DimUtil {
    //启动Redis
    //   bin/redis-server.sh ./redis.conf
    //   bin/redis-cli -h hadoop107 --raw
    public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {

        //先查询Redis
        Jedis jedis = JedisUtil.getJedis();
        String redisKey="DIM"+tableName+":"+key;
        String dimJsonStr = jedis.get(redisKey);

        //如果Redis缓存中有数据,则从缓存中读取数据,如果没有,则从Phoenix(Hbase)中获取数据

        if(dimJsonStr!=null){
            //重置过期时间
            jedis.expire(redisKey,24*60*60);
            //归还连接
            jedis.close();
            //返回维表数据
            return JSON.parseObject(dimJsonStr);
        }

        else{
            //拼接SQL语句
            String querySql="select * from " + GmallConfig.HBASE_SCHEMA +"."+tableName+"where id="+ key+"'";
            System.out.println("querySql>>>"+querySql);

            //查询数据
            List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);

            //将从Phoenix查询到的数据写入Redis
            JSONObject dimInfo = queryList.get(0);
            jedis.set(redisKey, dimInfo.toJSONString());
            //设置过期时间
            jedis.expire(redisKey,24*60*60);
            //归还连接
            jedis.close();

            //返回结果
            return dimInfo;
        }

    }

    //删除Redis中的缓存数据
    public static void delDimInfo(String tableName,String key){
        //获取连接
        Jedis jedis = JedisUtil.getJedis();
        //删除数据
        jedis.del("DIM"+tableName+":"+key);
        //归还连接
        jedis.close();
    }

}

(三)当维表数据更新时,需要删除Redis对应的维表数据(删除方法在上一段代码中)

package com.atguigu.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DimUtil;
import com.atguigu.utils.DruidDSUtil;
import com.atguigu.utils.PhoenixUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.SQLException;

public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    private static DruidDataSource druidDataSource=null;

    @Override
    public void open(Configuration parameters) throws Exception {
        druidDataSource = DruidDSUtil.createDataSource();
    }

    /*主流数据
    value数据格式:(消费的topic_db)
    {"database":"gmall-211126-flink","table":"base_trademark","type":"insert","ts":1652499161,"xid":167,
    "commit":true,"data":{"id":13,"tm_name":"atguigu","logo_url":"/aaa/aaa"}}
     */
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {

        //获取连接
        DruidPooledConnection connection = druidDataSource.getConnection();

        String sinkTable=value.getString("sinkTable");
        JSONObject data=value.getJSONObject("data");

        //获取数据类型
        String type=value.getString("type");
        //如果为更新类型,则需要删除Redis中的数据
        if("update".equals(type)){
            DimUtil.delDimInfo(sinkTable.toUpperCase(),data.getString("id"));
        }

        //写出数据
        PhoenixUtil.upsertValues(connection,sinkTable,data);

        //归还连接
        connection.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/69293.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用Python海龟绘图画一个世界杯的足球

利用Python海龟绘图画一个世界杯的足球 花有重开日 人无再少年 四年一次的世界杯快要结束&#xff0c;为了纪念此次世界杯&#xff0c;特意用Python画了一个足球。 1.设计思路以及实现效果 世界杯足球实现思路&#xff1a; 首先使用海龟画一个圆形作为足球的外边框。然后在足…

3天带你走向实战!阿里顶配版Spring全家桶面试进阶笔记有多强?

Spring框架自从诞生以来就一直备受开发者青睐&#xff0c;它涵盖了Spring、Springboot、SpringCloud等诸多解决方案&#xff0c;一般我们都会统称为Spring全家桶&#xff01;出于Spring框架在Java开发者心中中的统治地位&#xff0c;所以不管是面试还是工作&#xff0c;Spring都…

夜神模拟器+fiddler抓包(抓取APPhttps请求,删除sll证书校验)

1.安装fiddler https://telerik-fiddler.s3.amazonaws.com/fiddler/FiddlerSetup.exe &#xff08;下载不了直接去官网找&#xff09; 2.配置 开启https请求抓取&#xff0c;不抓https可忽略2.修改或查看端口&#xff08;使用默认8888端口&#xff0c;要自定义端口可修改&#…

Arduino 定时器中断

Arduino 定时器中断 Circuits Arduino 查看原文 简介&#xff1a;Arduino 定时器中断 奥雷里&#xff08;地球、月亮和太阳&#xff09; 立式兰花播种机 胶合板书柜扬声器 计时器中断允许您以非常特定的时间间隔执行任务&#xff0c;而不管代码中发生了什么其他事情。我…

Unity ILRuntime Debugger使用及常见问题

目录前言1.安装2.使用3.常见问题前言 ILRuntime支持在VS中断点调试&#xff0c;下面说一下ILRuntime Debugger的使用及常见问题。 1.安装 需要下载对应版本的ILRuntime Debugger VS插件。我是在Unity中PackageManager安装的ILRuntime&#xff0c;可以在插件信息中查看版本。…

记SQL插入emoji成功,但是程序插入失败问题

在执行单测时&#xff0c;碰到了以下熟悉的问题 org.springframework.jdbc.UncategorizedSQLException: ### Error updating database. Cause: java.sql.SQLException: Incorrect string value: \xF0\x9F\x92\x8B for column name at row 1 ### The error may involve com.*…

Java入门教程(16)——条件判断语句

文章目录1. if结构1.1 if 单分支结构1.2 if-else 双分支结构1.3 if-else if-else 多分支结构switch 语句switch 多分支结构1. if结构 1.1 if 单分支结构 语法结构: if(布尔表达式){ 语句块 }实例&#xff1a;掷色子游戏 这里给大家扩展一个Math函数 Math.Random()&#xff0c…

动态规划算法

1.简介 1.动态规划(Dynamic Programming)算法的核心思想是: 将大问题划分为小问题进行解决,从而一步步获取最优解的处理算法; 2.动态规划算法与分治算法类似,其基本思想也是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到原问题的解; 3.与分治法不同…

项目统一规范包管理器

一般来说每个团队都会统一规定项目内只使用一个包管理器&#xff0c;譬如&#xff1a;npm、yarn、pnpm等&#xff0c;我们可以在文档中或者项目根目录REDEM.md中进行描述来形成共识&#xff0c;但毕竟是文档&#xff0c;并不能真正的进行约束&#xff0c;如果有项目成员没有看文…

SpringBoot自动装配原理分析,看完你也能手写一个starter组件

什么是 SpringBoot 2012 年 10 月&#xff0c;一个叫 Mike Youngstrom 的人在 Spring Jira 中创建了一个功能请求&#xff0c;要求在 Spring Framework 中支持无容器 Web 应用程序体系结构&#xff0c;提出了在主容器引导 Spring 容器内配置 Web 容器服务。这件事情对 SpringBo…

Linux 进程间通信

目录 进程间通信的必要性 进程间通信的技术背景 进程间通信的本质理解&#xff1a; 管道IPC&#xff1a;匿名管道 示意图 匿名管道的本质原理&#xff1a; demo示例代码&#xff1a; pipe 系统调用 注意&#xff1a; 管道读写的4种情况&#xff1a; 管道的特点&…

H5UI库和二维码

一、H5UI库 1、使用方法&#xff1a; ​ &#xff08;1&#xff09;页面中引入css文件 ​ h5ui.css &#xff08;h5ui.min.css&#xff09; ​ &#xff08;2&#xff09;页面中引入js文件 ​ jquery.min.js ​ h5ui.min.js 2、组件的用法 ​ &#xff08;1&#xff09…

为您的高速SPI添加强大和可靠的隔离交流

介绍 串行外设接口&#xff08;SPI&#xff09;是工业设备中常用于数字处理器核心和外围设备之间通信的一种协议。然而&#xff0c;为了安全使用&#xff0c;有必要对外围设备和核心进行电隔离。虽然隔离和SPI都是成熟的技术&#xff0c;但将两者接口并不像预期的那么简单。 …

SAP ABAP——数据类型(五)【LIKE系列关键字】

&#x1f4ac;个人网站&#xff1a;【芒果个人日志】​​​​​​ &#x1f4ac;原文地址&#xff1a;SAP ABAP——数据类型&#xff08;五&#xff09;【LIKE系列关键字】 - 芒果个人日志 (wyz-math.cn) &#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税…

【git】简洁实用教程

虽然之前有git的笔记了&#xff0c;但是操作和命令太多&#xff0c;有点冗余&#xff0c;下面整理出最常见的一些场景和git需求。 零、Git速查表 好习惯&#xff1a;每次提交后和开发代码前&#xff0c;都应该pull下 常见命令&#xff1a; git clone拉取服务器代码&#xff0…

深度解读 | 如何构建以指标为核心的ABI平台?

在上期一文中&#xff0c;我们了解到BI不同发展阶段运行模式及遇到的问题。“报表阶段”是以报表粒度进行管理&#xff0c;数据和报表完全耦合在一起&#xff0c;在不同报表间产生数据和指标的冗余和重复&#xff0c;形成报表爆炸、技术债&#xff0c;导致数据不可信、分析不敏…

Windows 7下安装oracle12c报错:O/S-Error:(OS 1385)

查看报错日志&#xff1a;C:\Program Files\Oracle\Inventory\logs\ installActions2015-04-21_09-29-15AM.log, 提示查看&#xff1a; D:\app\Administrator\cfgtoollogs\netca\trace_OraDB12Home1-150421 11上午1616.log &#xff0c; 打开该log&#xff0c;在尾部发现如下错…

LaTeX页眉页脚自定义【有图有代码】

LaTeX页眉页脚自定义【有图有代码】一、自定义页眉页脚示例【双页文档】\fancyhead \fancyfoot1、代码讲解2、自定义代码3、页眉和页脚的装饰线4、总页数二、自定义页眉页脚示例【单页文档】\rhead \rfoot三、\pagestyle{}介绍四、设置当前页面样式\thispagestyle{}平时在写报告…

中级软件设计师备考上午题总结

中级软件设计师备考上午题总结 前言 10月末11月初备考了中级软件设计师&#xff0c;备考时间总计20天整&#xff0c;由于预留的备考时间并不多&#xff0c;上午题复习策略主要是以看别人整理好的笔记为主&#xff0c;不懂的地方以看zst_2001的视频为辅&#xff0c;最后预留了…

JDBC Java对数据库增删改查(完整案例)

目录 一.综合上述7个步骤&#xff0c;实现向student表中插入一条数据。 1、注册驱动 2 、获取数据库连接对象 3、获取发送SQL语句对象 4、编写SQL语句&#xff0c;SQL语句最好是先在SQLyog里面写一遍并运行一下&#xff0c;保证SQL语句没有语法 错误&#xff0c;这里sid是…