【实时数仓】DWM层订单宽表之实现基本的维度查询、加入旁路缓存模式

news2025/2/4 18:05:06

文章目录

  • 一 DWM层-订单宽表
    • 1 维表关联代码实现
      • (1)首先实现基本的维度查询功能
        • a 封装Phoenix查询的工具类PhoenixUtil
        • b 封装查询维度的工具类DimUtil
      • (2) 优化1:加入旁路缓存模式
        • a 缓存策略的几个注意点
        • b 缓存的选型
        • c 在pom.xml文件中添加Redis的依赖包
        • d 封装RedisUtil,通过连接池获得Jedis
        • e 在DimUtil中加入缓存,如果缓存中没有,再从Phoenix中查询
        • f 针对主键只有一个的情况进行进一步的优化
        • g 修改DimSink的invoke方法

一 DWM层-订单宽表

1 维表关联代码实现

维度关联实际上就是在流中查询存储在hbase中的数据表。但是即使通过主键的方式查询,hbase的查询速度也是不及流之间的join。外部数据源的查询常常是流式计算的性能瓶颈,所以在这个基础上还有进行一定的优化。

(1)首先实现基本的维度查询功能

a 封装Phoenix查询的工具类PhoenixUtil

实现从phoenix表中查询数据。

package com.hzy.gmall.realtime.utils;
/**
 * 从phoenix表中查询数据
 */
public class PhoenixUtil {

    private static Connection conn;

    private static void initConn() {
        try {
            // 注册驱动
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            // 获取连接
            conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
            // 设置操作的表空间
            conn.setSchema(GmallConfig.HBASE_SCHEMA);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 执行查询sql,将查询结果封装为T类型对象,放到List中
    public static <T>List<T> queryList(String sql,Class<T> clz){
        if (conn == null){
            initConn();
        }
        List<T> resList = new ArrayList<>();
        ResultSet rs = null;
        PreparedStatement ps = null;
        try {
            // 创建数据库操作对象
             ps = conn.prepareStatement(sql);
            // 执行SQL语句
             rs = ps.executeQuery();
             // 获取查询结果集的元数据信息
            ResultSetMetaData metaData = rs.getMetaData();

            // 处理结果集
            // ID| TM_NAME
            // 1 | zhangsan
            while (rs.next()){
                // 通过反射创建要封装的对象
                T obj = clz.newInstance();
                // 对所有列进行遍历,以获取列的名称
                // jdbc操作时列从1开始
                for (int i = 1; i <= metaData.getColumnCount(); i++){
                    String columnName = metaData.getColumnName(i);
                    // 通过BeanUtils工具类给对象的属性赋值
                    BeanUtils.setProperty(obj,columnName,rs.getObject(i));
                }
                // 将封装的对象放到List集合中
                resList.add(obj);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放资源
            if (rs != null){
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (ps != null){
                try {
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return resList;
    }


    // 测试
    public static void main(String[] args) {
        List<JSONObject> jsonObjectList = queryList("select * from dim_base_trademark", JSONObject.class);
        System.out.println(jsonObjectList);
    }
}

b 封装查询维度的工具类DimUtil

直接按照维度的主键查询Phoenix中维度数据的工具类。

package com.hzy.gmall.realtime.utils;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.List;

/**
 * 查询维度数据的工具类
 */
public class DimUtil {
    // 从phoenix表中查询维度数据
    // 返回值类型:{"ID":"1","TM_NAME":"zhangsan"}
    // 参数类型:"select * from dim_base_trademark where id = '1' and TM_NAME = 'AAA' ", JSONObject.class
    public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String,String> ... colNameAndValues){
        // 拼接查询维度的SQL
        StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where 1=1 ");
        for (int i = 0; i < colNameAndValues.length; i++) {
            // 去掉1 = 1,则第一个条件前不加或最后一个条件后不加
//            if(i >= 1){
//                selectDimSql.append(" and ");
//            }
            Tuple2<String, String> colNameAndValue = colNameAndValues[i];
            String colName = colNameAndValue.f0;
            String colValue = colNameAndValue.f1;
            selectDimSql.append("and " + colName + "='" + colValue + "'");
            // 最后一个条件后不加
//            if(i < colNameAndValues.length - 1){
//                selectDimSql.append(" and ");
//            }
        }
        System.out.println("查询sql的语句:" + selectDimSql);
        // 底层调用的还是之前封装的查询phoenix表数据的方法
        List<JSONObject> dimList = PhoenixUtil.queryList(selectDimSql.toString(), JSONObject.class);
        JSONObject dimInfoJsonObj = null;
        if (dimList != null && dimList.size() > 0){
            // 根据维度数据的主键去查询,所以只会返回一条数据
            dimInfoJsonObj = dimList.get(0);
        } else {
            System.out.println("维度数据没找到:" + selectDimSql);
        }
        return dimInfoJsonObj;
    }

    public static void main(String[] args) {
        JSONObject dimInfo = DimUtil.getDimInfoNoCache("dim_base_trademark", Tuple2.of("id", "13"));
        System.out.println(dimInfo);
    }
}

(2) 优化1:加入旁路缓存模式

在上面实现功能中,直接查询的Hbase。外部数据源的查询常常是流式计算的性能瓶颈,所以需要在上面实现的基础上进行一定的优化。这里使用旁路缓存(cache-aside-pattern)。

旁路缓存模式是一种非常常见的按需分配缓存的模式。如下图,任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,再查询数据库,同时把结果写入缓存以备后续请求使用。

在这里插入图片描述

a 缓存策略的几个注意点

缓存要设过期时间,不然冷数据会常驻缓存浪费资源。

要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存。

b 缓存的选型

一般两种:堆缓存或者独立缓存服务(redis,memcache)。

堆缓存,从性能角度看更好,毕竟访问数据路径更短,减少过程消耗。但是管理性差,其他进程无法维护缓存中的数据,如Flink的状态。

独立缓存服务(redis,memcache)本身性能也不错,不过会有创建连接、网络IO等消耗。但是考虑到数据如果会发生变化,那还是独立缓存服务管理性更强,而且如果数据量特别大,独立缓存更容易扩展。

因为这里的维度数据都是可变数据,所以还是采用Redis管理缓存。

c 在pom.xml文件中添加Redis的依赖包

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.3.0</version>
</dependency>

d 封装RedisUtil,通过连接池获得Jedis

package com.hzy.gmall.realtime.utils;

import jdk.nashorn.internal.scripts.JD;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * 获取redis的java客户端Jedis
 */
public class RedisUtil {
    // 声明JedisPool连接池
    private static JedisPool jedisPool;
    public static Jedis getJedis(){
        if(jedisPool == null){
            initJedisPool();
        }
        System.out.println("获取Redis连接...");
        return jedisPool.getResource();
    }

    // 初始化连接池对象
    private static void initJedisPool() {
        // 连接池配置对象
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 最大连接数
        jedisPoolConfig.setMaxTotal(100);
        // 每次连接时是否进行ping pong测试
        jedisPoolConfig.setTestOnBorrow(true);
        // 连接耗尽是否等待
        jedisPoolConfig.setBlockWhenExhausted(true);
        // 等待时间
        jedisPoolConfig.setMaxWaitMillis(2000);
        // 最小空闲连接数
        jedisPoolConfig.setMinIdle(5);
        // 最大空闲连接数
        jedisPoolConfig.setMaxTotal(5);

        jedisPool = new JedisPool(jedisPoolConfig,"hadoop101",6379,10000);
    }

    public static void main(String[] args) {
        Jedis jedis = getJedis();
        String pong = jedis.ping();
        System.out.println(pong);
    }
}

测试:输出pong,则可以正常运行。

关于redis服务的常见问题:

  • 想要在外部访问redis,启动redis前需要配置文件中如下选项
    • 将此选项bind 127.0.0.1打开
    • 将此选项设置为noprotected-mode
  • 启动redis服务一定要加上配置文件的路径,否则还是读取的默认配置
    • 命令redis + 配置文件路径
  • 关于redis持久化权限内的问题
    • 在配置文件中默认配置dir ./,默认在进行rdb持久化的时候,文件会保存到当前启动命令所在的目录下,如果没有对该目录的操作权限,落盘会出现错误。

e 在DimUtil中加入缓存,如果缓存中没有,再从Phoenix中查询

public static JSONObject getDimInfo(String tableName, Tuple2<String,String> ... colNameAndValues){
    // 拼接查询维度的SQL
    StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where ");
    // 拼接redis的key
    StringBuilder redisKey = new StringBuilder("dim:"+tableName.toLowerCase()+":");

    for (int i = 0; i < colNameAndValues.length; i++) {
        Tuple2<String, String> colNameAndValue = colNameAndValues[i];
        String colName = colNameAndValue.f0;
        String colValue = colNameAndValue.f1;

        selectDimSql.append(colName + "='" + colValue + "'");
        redisKey.append(colValue);

        if (i < colNameAndValues.length - 1){
            selectDimSql.append(" and ");
            redisKey.append("_");
        }
    }
    // 先根据key到redis中查询缓存的维度数据
    // 声明操作redis的客户端
    Jedis jedis = null;
    // 声明变量,用于接受从redis中查询出来的缓存数据
    String jsonStr = null;
    // 声明变量,用于处理返回的维度对象
    JSONObject dimInfoJsonObj = null;

    try {
        jedis = RedisUtil.getJedis();
        // 从redis中获取维度数据
        jsonStr = jedis.get(redisKey.toString());
    } catch (Exception e){
        e.printStackTrace();
        System.out.println("从redis中查询维度数据发生了异常...");
    }
    
    // 判断是否从redis中获取到了维度缓存数据
    if (jsonStr != null && jsonStr.length() > 0){
        // 从redis中查到了维度的缓存数据,将缓存的维度字符串转换为json对象
        dimInfoJsonObj = JSON.parseObject(jsonStr);
    } else {
        // 从redis中没有查到维度的缓存数据,发送请求到phoenix库中去查询
        System.out.println("查询sql的语句:" + selectDimSql);
        // 底层调用的还是之前封装的查询phoenix表数据的方法
        List<JSONObject> dimList = PhoenixUtil.queryList(selectDimSql.toString(), JSONObject.class);

        if (dimList != null && dimList.size() > 0){
            // 根据维度数据的主键去查询,所以只会返回一条数据
            dimInfoJsonObj = dimList.get(0);
            // 将从phoenix中查询出来的维度数据,写到redis缓存中
            if (jedis != null){
                jedis.setex(redisKey.toString(),3600*24,dimInfoJsonObj.toJSONString());
            }
        } else {
            System.out.println("维度数据没找到:" + selectDimSql);
        }
    }

    if (jedis != null){
        jedis.close();
        System.out.println("关闭redis连接。");
    }

    return dimInfoJsonObj;
}

测试:运行两次以观察提升性能。

public static void main(String[] args) {
//        JSONObject dimInfo = DimUtil.getDimInfoNoCache("dim_base_trademark", Tuple2.of("id", "13"));
        JSONObject dimInfo = DimUtil.getDimInfo("dim_base_trademark", Tuple2.of("id", "13"));
        System.out.println(dimInfo);
    }

f 针对主键只有一个的情况进行进一步的优化

public static JSONObject getDimInfo(String tableName, String id){
    return getDimInfo(tableName,Tuple2.of("ID",id));
}
    public static void main(String[] args) {
//        JSONObject dimInfo = DimUtil.getDimInfoNoCache("dim_base_trademark", Tuple2.of("id", "13"));
//        JSONObject dimInfo = DimUtil.getDimInfo("dim_base_trademark", Tuple2.of("id", "13"));
        JSONObject dimInfo = DimUtil.getDimInfo("dim_base_trademark", "13");
        System.out.println(dimInfo);
    }

g 修改DimSink的invoke方法

如果维度数据发生了变化,同时失效该数据对应的Redis中的缓存,先写入数据库,再失效缓存。

正常数据如果在缓存中查到,直接返回结果,否则去Hbase的维度表中进行查询,查询到返回结果,并加入到redis缓存。如果数据已经缓存到redis中,且Hbase维度表数据发生变化,需要将缓存数据删除。业务系统对数据进行修改,maxwell将变化采集到ods层,Flink程序从ods层拿到数据,对维度数据进行处理后将数据放到维度侧输出流中。在DimSink的invoke方法进行实际的业务操作,完成之后即完成了对Hbase表中数据的修改,修改完成之后需要将已经缓存的原数据从redis中删除。

在invoke方法最后添加

public void invoke(JSONObject jsonObj, Context context) throws Exception {
    // 上游传递过来的数据格式如下:
    // {"database":"gmall2022",
    // "data":{"tm_name":"a","id":13},
    // "commit":true,
    // "sink_table":"dim_base_trademark",
    // "type":"insert",
    // "table":"base_trademark","
    // ts":1670131087}

    // 获取维度表表名
    String tableName = jsonObj.getString("sink_table");
    // 获取数据
    JSONObject dataJsonObj = jsonObj.getJSONObject("data");
    // 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
    String upsertSql = genUpsertSql(tableName,dataJsonObj);

    System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);

    PreparedStatement ps = null;
    try {
        // 创建数据库操作对象
        ps = conn.prepareStatement(upsertSql);
        // 执行sql语句
        ps.executeUpdate();
        // 手动提交事务,phoenix的连接实现类不是自动提交事务
        conn.commit();
    }catch (SQLException e){
        e.printStackTrace();
        throw new RuntimeException("向phoenix维度表中插入数据失败了");
    } finally {
        // 释放资源
        if (ps != null){
            ps.close();
        }
    }
    // 如果当前维度数据进行删除或者修改,清空redis缓存中的数据
    if (jsonObj.getString("type").equals("update") || jsonObj.getString("type").equals("delete")){
        DimUtil.deleteCached(tableName,dataJsonObj.getString("id"));
    }
}
// 根据redis中的key删除redis中的记录
public static void deleteCached(String tableName, String id) {
    String redisKey = "dim:"+tableName.toLowerCase()+":"+ id;
    try {
        Jedis jedis = RedisUtil.getJedis();
        jedis.del(redisKey);
        jedis.close();
    }catch (Exception e){
        e.printStackTrace();
        System.out.println("删除redis缓存发生了异常");
    }
}

测试:

启动BaseDBApp,增加配置表配置如下图:

在这里插入图片描述

修改base_trademark中已经缓存到redis中的数据信息,BaseDBApp执行upsert操作,输出以下信息

维度数据::2> {"database":"gmall2022","xid":94206,"data":{"tm_name":"d","id":13},"old":{"tm_name":"b"},"commit":true,"sink_table":"dim_base_trademark","type":"update","table":"base_trademark","ts":1670593664}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('d','13')
获取Redis连接...

查看phoenix中数据是否改变,redis缓存数据是否删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/103402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AnimateGAN 迁移部署

文章目录1. 模型概述2. 迁移过程2.1 将ckpt的权重文件转换为pb的权重文件。2.2 将pb的权重文件迁移为 BM1684 bmodel模型2.3 迁移后pipeline搭建2.4 使用streamlit部署3. 效果展示AnimateGAN 是一个基于 GAN 的动漫生成模型&#xff0c;可以将真实的场景照片转换成动漫形式。本…

CASA(Carnegie-Ames-Stanford Approach)模型

植被作为陆地生态系统的重要组成部分对于生态环境功能的维持具有关键作用。植被净初级生产力&#xff08;Net Primary Productivity, NPP&#xff09;是指单位面积上绿色植被在单位时间内由光合作用生产的有机质总量扣除自养呼吸的剩余部分。植被NPP是表征陆地生态系统功能及可…

设计模式之美总结(创建型篇)

title: 设计模式之美总结&#xff08;创建型篇&#xff09; date: 2022-11-03 13:58:36 tags: 设计模式 categories:技术书籍及课程 cover: https://cover.png feature: false 文章目录1. 单例模式&#xff08;Singleton Design Pattern&#xff09;1.1 为什么要使用单例&…

如何在高密度的IB学习中杀出重围?

建议选择IB所需具备的能力/特点 ▣ 敢于挑战自我&#xff0c;愿意通过努力换取个人能力的飞跃 ▣ 如果擅长或喜欢写作&#xff08;中英文&#xff09;&#xff0c;IB对于你来说可能不会那么难。 ▣ 有自主学习、自主研究的能力。有些老师可能教的并不太让人满意&#xff0c;因此…

OpenTelemetry系列 (三)| 神秘的采集器 - Opentelemetry Collector

前言 上个篇章中我们主要介绍了OpenTelemetry的客户端的一些数据生成方式&#xff0c;但是客户端的数据最终还是要发送到服务端来进行统一的采集整合&#xff0c;这样才能看到完整的调用链&#xff0c;metrics等信息。因此在这个篇章中会主要介绍服务端的采集能力。 客户端数…

学Python能做哪些副业?我一般不告诉别人

前两天一个朋友找到我吐槽&#xff0c;说工资一发交完房租水电&#xff0c;啥也不剩&#xff0c;搞不懂朋友圈里那些天天吃喝玩乐的同龄人钱都是哪来的&#xff1f; 确实如此&#xff0c;刚毕业的大学生工资起薪都很低&#xff0c;在高消费、高租金的城市&#xff0c;别说存钱…

日志篇- ES+Logstash+Filebeat+Kibana+Kafka+zk 安装配置与使用详解

1- 学习目标 ELK基本概念&#xff0c;特点安装部署 Kibana ES集群 Logstash Filebeat Kafka集群性能瓶颈以及优化QA汇总 2- 介绍 2.1- 基本概念 Elasticsearch 分布式搜索和分析引擎&#xff0c;具有高可伸缩、高可靠和易管理等特点。基于 Apache Lucene 构建&#xff0c…

xv6---Lab4 traps

参考&#xff1a; Lab: Traps 关于寄存器s0和堆栈https://pdos.csail.mit.edu/6.828/2020/lec/l-riscv-slides.pdf RISC-V assembly Q: 哪些寄存器包含函数的参数?例如&#xff0c;哪个寄存器在main对printf的调用中保存了传参13 ? A: a2保存13(通过gdb调试可看出寄存器a2的…

【设备管理系统】如何助力制造企业实现精益生产?

随着企业对于机械设备的依赖性越来越高&#xff0c;生产设备日益大型化、自动化&#xff0c;流程线生产流程问题逐渐浮于表面&#xff0c;现阶段设备管理的各项制度已经不能够满足日常的生产工作。企业逐渐都面临着设备管理的复杂问题&#xff0c;尤其是设备的保养、维修、日常…

JMeter—HTTP压测

目录&#xff1a;导读 一、创建线程组 二、添加HTTP 三、查看结果树 四、响应断言 五、聚合报告 六、自定义变量 七、CSV可变参数压测 结语 一、创建线程组 右击-->添加-->Threads(Users)-->线程组 下面对比较重要的几个参数&#xff0c;讲解下&#xff1a; …

Vue基础7

Vue基础7生命周期引出生命周期用css animation实现用定时器实现错误&#xff1a;用methods实现使用生命周期函数mounted实现生命周期定义分析生命周期挂载流程beforeCreate()created()beforeMount()mounted()template的作用更新流程beforeUpdate()updated()销毁流程beforeDestr…

【数据库】二阶段锁

Two-phase locking (2PL) is a concurrency controlprotocol that determines whether a txn can access an object in the database on the fly. The protocol does not need to know all the queriesthat a txn will execute ahead of time. 分为两个阶段&#xff1a; 一阶…

颅内EEG记录揭示人类DMN网络的电生理基础

使用无创功能磁共振成像&#xff08;fMRI&#xff09;的研究为人类默认模式网络&#xff08;DMN&#xff09;的独特功能组织和深远重要性提供了重要的见解&#xff0c;但这些方法在跨多个时间尺度上解决网络动力学的能力有限。电生理技术对于应对这些挑战至关重要&#xff0c;但…

RAID 0 添加新磁盘

1&#xff1a;查看当前可用挂载磁盘 lsblk 2&#xff1a;可见 sda 与 sdb 已被挂载&#xff0c;需要挂载 sdc 和 sdd 由于硬盘的默认分区格式是MBR&#xff0c;这种格式的硬盘支持的最大挂载容量为2T&#xff0c;为了满足我们的要求&#xff0c;需要将硬盘格式转化为MBR&…

Node.js 编写接口入门学习(GET、POST)

一、简介 nvm 安装、卸载与使用&#xff08;详细步骤&#xff09;&#xff0c;用于管理/切换 Node 多版本环境。 node 是否安装成功 $ node -v安装完成之后&#xff0c;通过 node 直接运行 test.js。 // test.js console.log(Hello Node)# 命令行执行 $ node test.js二、简单的…

[ 数据结构 -- 手撕排序算法第七篇 ] 归并排序

文章目录前言一、常见的排序算法二、归并排序的基本思想三、归并排序3.1 归并排序的递归版本3.2 归并排序的非递归版本四、归并排序的特性总结前言 手撕排序算法第七篇&#xff1a;归并排序&#xff01; 从本篇文章开始&#xff0c;我会介绍并分析常见的几种排序&#xff0c;例…

深度学习秘籍

显式构造 隐式构造 loss通常是一个标量 batchsize越小其实越好 回归 预测的是一个连续 softmax回归是一个多分类问题 分类 预测是一个离散值 Huber RoBust Loss, 也就是通常所说SmoothL1损失 常用命令 import torch import torchvision from torchvision import transformsso…

ContentProvider的介绍和使用

文章目录ContentProviderContentProvider简介运行时权限Android权限机制详解在程序运行时申请权限访问其他程序当中数据ContentResolver的基本用法读取系统联系人信息创建自己的ContentProvider创建ContentProvider的步骤实现跨程序数据共享ContentProvider 如果我们想要实现跨…

浅拷贝深拷贝递归

常见的基本数据类型&#xff1a;Number、String 、Boolean、Null和Undefined 引用数据类型&#xff1a;Object、Array、Function 1&#xff09;基本数据类型&#xff1a;存储在栈内存中,可以直接访问到该变量的值。 2&#xff09;引用数据类型&#xff1a;存储在堆内存中,每…

有哪些数据统计软件适合初学者使用?

前段时间写过一篇“数据分析工具”的内容&#xff0c;周末有伙伴私信问我有没有什么适合初学者、业务人员的&#xff0c;更简单一点的数据可视化软件。 所以今天来分享下我在做数据分析时用过的几个简单易上手的数据可视化软件。 先放上目录&#xff1a; 数据统计收集类——简…