文章目录
- 一 DWM层-订单宽表
- 1 维表关联代码实现
- (1)首先实现基本的维度查询功能
- a 封装Phoenix查询的工具类PhoenixUtil
- b 封装查询维度的工具类DimUtil
- (2) 优化1:加入旁路缓存模式
- a 缓存策略的几个注意点
- b 缓存的选型
- c 在pom.xml文件中添加Redis的依赖包
- d 封装RedisUtil,通过连接池获得Jedis
- e 在DimUtil中加入缓存,如果缓存中没有,再从Phoenix中查询
- f 针对主键只有一个的情况进行进一步的优化
- g 修改DimSink的invoke方法
一 DWM层-订单宽表
1 维表关联代码实现
维度关联实际上就是在流中查询存储在hbase中的数据表。但是即使通过主键的方式查询,hbase的查询速度也是不及流之间的join。外部数据源的查询常常是流式计算的性能瓶颈,所以在这个基础上还有进行一定的优化。
(1)首先实现基本的维度查询功能
a 封装Phoenix查询的工具类PhoenixUtil
实现从phoenix表中查询数据。
package com.hzy.gmall.realtime.utils;
/**
* 从phoenix表中查询数据
*/
public class PhoenixUtil {
private static Connection conn;
private static void initConn() {
try {
// 注册驱动
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
// 获取连接
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
// 设置操作的表空间
conn.setSchema(GmallConfig.HBASE_SCHEMA);
} catch (Exception e) {
e.printStackTrace();
}
}
// 执行查询sql,将查询结果封装为T类型对象,放到List中
public static <T>List<T> queryList(String sql,Class<T> clz){
if (conn == null){
initConn();
}
List<T> resList = new ArrayList<>();
ResultSet rs = null;
PreparedStatement ps = null;
try {
// 创建数据库操作对象
ps = conn.prepareStatement(sql);
// 执行SQL语句
rs = ps.executeQuery();
// 获取查询结果集的元数据信息
ResultSetMetaData metaData = rs.getMetaData();
// 处理结果集
// ID| TM_NAME
// 1 | zhangsan
while (rs.next()){
// 通过反射创建要封装的对象
T obj = clz.newInstance();
// 对所有列进行遍历,以获取列的名称
// jdbc操作时列从1开始
for (int i = 1; i <= metaData.getColumnCount(); i++){
String columnName = metaData.getColumnName(i);
// 通过BeanUtils工具类给对象的属性赋值
BeanUtils.setProperty(obj,columnName,rs.getObject(i));
}
// 将封装的对象放到List集合中
resList.add(obj);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放资源
if (rs != null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (ps != null){
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return resList;
}
// 测试
public static void main(String[] args) {
List<JSONObject> jsonObjectList = queryList("select * from dim_base_trademark", JSONObject.class);
System.out.println(jsonObjectList);
}
}
b 封装查询维度的工具类DimUtil
直接按照维度的主键查询Phoenix中维度数据的工具类。
package com.hzy.gmall.realtime.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
/**
* 查询维度数据的工具类
*/
public class DimUtil {
// 从phoenix表中查询维度数据
// 返回值类型:{"ID":"1","TM_NAME":"zhangsan"}
// 参数类型:"select * from dim_base_trademark where id = '1' and TM_NAME = 'AAA' ", JSONObject.class
public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String,String> ... colNameAndValues){
// 拼接查询维度的SQL
StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where 1=1 ");
for (int i = 0; i < colNameAndValues.length; i++) {
// 去掉1 = 1,则第一个条件前不加或最后一个条件后不加
// if(i >= 1){
// selectDimSql.append(" and ");
// }
Tuple2<String, String> colNameAndValue = colNameAndValues[i];
String colName = colNameAndValue.f0;
String colValue = colNameAndValue.f1;
selectDimSql.append("and " + colName + "='" + colValue + "'");
// 最后一个条件后不加
// if(i < colNameAndValues.length - 1){
// selectDimSql.append(" and ");
// }
}
System.out.println("查询sql的语句:" + selectDimSql);
// 底层调用的还是之前封装的查询phoenix表数据的方法
List<JSONObject> dimList = PhoenixUtil.queryList(selectDimSql.toString(), JSONObject.class);
JSONObject dimInfoJsonObj = null;
if (dimList != null && dimList.size() > 0){
// 根据维度数据的主键去查询,所以只会返回一条数据
dimInfoJsonObj = dimList.get(0);
} else {
System.out.println("维度数据没找到:" + selectDimSql);
}
return dimInfoJsonObj;
}
public static void main(String[] args) {
JSONObject dimInfo = DimUtil.getDimInfoNoCache("dim_base_trademark", Tuple2.of("id", "13"));
System.out.println(dimInfo);
}
}
(2) 优化1:加入旁路缓存模式
在上面实现功能中,直接查询的Hbase。外部数据源的查询常常是流式计算的性能瓶颈,所以需要在上面实现的基础上进行一定的优化。这里使用旁路缓存(cache-aside-pattern)。
旁路缓存模式是一种非常常见的按需分配缓存的模式。如下图,任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,再查询数据库,同时把结果写入缓存以备后续请求使用。
a 缓存策略的几个注意点
缓存要设过期时间,不然冷数据会常驻缓存浪费资源。
要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存。
b 缓存的选型
一般两种:堆缓存或者独立缓存服务(redis,memcache)。
堆缓存,从性能角度看更好,毕竟访问数据路径更短,减少过程消耗。但是管理性差,其他进程无法维护缓存中的数据,如Flink的状态。
独立缓存服务(redis,memcache)本身性能也不错,不过会有创建连接、网络IO等消耗。但是考虑到数据如果会发生变化,那还是独立缓存服务管理性更强,而且如果数据量特别大,独立缓存更容易扩展。
因为这里的维度数据都是可变数据,所以还是采用Redis管理缓存。
c 在pom.xml文件中添加Redis的依赖包
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
d 封装RedisUtil,通过连接池获得Jedis
package com.hzy.gmall.realtime.utils;
import jdk.nashorn.internal.scripts.JD;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* 获取redis的java客户端Jedis
*/
public class RedisUtil {
// 声明JedisPool连接池
private static JedisPool jedisPool;
public static Jedis getJedis(){
if(jedisPool == null){
initJedisPool();
}
System.out.println("获取Redis连接...");
return jedisPool.getResource();
}
// 初始化连接池对象
private static void initJedisPool() {
// 连接池配置对象
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 最大连接数
jedisPoolConfig.setMaxTotal(100);
// 每次连接时是否进行ping pong测试
jedisPoolConfig.setTestOnBorrow(true);
// 连接耗尽是否等待
jedisPoolConfig.setBlockWhenExhausted(true);
// 等待时间
jedisPoolConfig.setMaxWaitMillis(2000);
// 最小空闲连接数
jedisPoolConfig.setMinIdle(5);
// 最大空闲连接数
jedisPoolConfig.setMaxTotal(5);
jedisPool = new JedisPool(jedisPoolConfig,"hadoop101",6379,10000);
}
public static void main(String[] args) {
Jedis jedis = getJedis();
String pong = jedis.ping();
System.out.println(pong);
}
}
测试:输出pong,则可以正常运行。
关于redis服务的常见问题:
- 想要在外部访问redis,启动redis前需要配置文件中如下选项
- 将此选项
bind 127.0.0.1
打开 - 将此选项设置为no
protected-mode
- 将此选项
- 启动redis服务一定要加上配置文件的路径,否则还是读取的默认配置
- 命令
redis + 配置文件路径
。
- 命令
- 关于redis持久化权限内的问题
- 在配置文件中默认配置dir ./,默认在进行rdb持久化的时候,文件会保存到当前启动命令所在的目录下,如果没有对该目录的操作权限,落盘会出现错误。
e 在DimUtil中加入缓存,如果缓存中没有,再从Phoenix中查询
public static JSONObject getDimInfo(String tableName, Tuple2<String,String> ... colNameAndValues){
// 拼接查询维度的SQL
StringBuilder selectDimSql = new StringBuilder("select * from " + tableName + " where ");
// 拼接redis的key
StringBuilder redisKey = new StringBuilder("dim:"+tableName.toLowerCase()+":");
for (int i = 0; i < colNameAndValues.length; i++) {
Tuple2<String, String> colNameAndValue = colNameAndValues[i];
String colName = colNameAndValue.f0;
String colValue = colNameAndValue.f1;
selectDimSql.append(colName + "='" + colValue + "'");
redisKey.append(colValue);
if (i < colNameAndValues.length - 1){
selectDimSql.append(" and ");
redisKey.append("_");
}
}
// 先根据key到redis中查询缓存的维度数据
// 声明操作redis的客户端
Jedis jedis = null;
// 声明变量,用于接受从redis中查询出来的缓存数据
String jsonStr = null;
// 声明变量,用于处理返回的维度对象
JSONObject dimInfoJsonObj = null;
try {
jedis = RedisUtil.getJedis();
// 从redis中获取维度数据
jsonStr = jedis.get(redisKey.toString());
} catch (Exception e){
e.printStackTrace();
System.out.println("从redis中查询维度数据发生了异常...");
}
// 判断是否从redis中获取到了维度缓存数据
if (jsonStr != null && jsonStr.length() > 0){
// 从redis中查到了维度的缓存数据,将缓存的维度字符串转换为json对象
dimInfoJsonObj = JSON.parseObject(jsonStr);
} else {
// 从redis中没有查到维度的缓存数据,发送请求到phoenix库中去查询
System.out.println("查询sql的语句:" + selectDimSql);
// 底层调用的还是之前封装的查询phoenix表数据的方法
List<JSONObject> dimList = PhoenixUtil.queryList(selectDimSql.toString(), JSONObject.class);
if (dimList != null && dimList.size() > 0){
// 根据维度数据的主键去查询,所以只会返回一条数据
dimInfoJsonObj = dimList.get(0);
// 将从phoenix中查询出来的维度数据,写到redis缓存中
if (jedis != null){
jedis.setex(redisKey.toString(),3600*24,dimInfoJsonObj.toJSONString());
}
} else {
System.out.println("维度数据没找到:" + selectDimSql);
}
}
if (jedis != null){
jedis.close();
System.out.println("关闭redis连接。");
}
return dimInfoJsonObj;
}
测试:运行两次以观察提升性能。
public static void main(String[] args) {
// JSONObject dimInfo = DimUtil.getDimInfoNoCache("dim_base_trademark", Tuple2.of("id", "13"));
JSONObject dimInfo = DimUtil.getDimInfo("dim_base_trademark", Tuple2.of("id", "13"));
System.out.println(dimInfo);
}
f 针对主键只有一个的情况进行进一步的优化
public static JSONObject getDimInfo(String tableName, String id){
return getDimInfo(tableName,Tuple2.of("ID",id));
}
public static void main(String[] args) {
// JSONObject dimInfo = DimUtil.getDimInfoNoCache("dim_base_trademark", Tuple2.of("id", "13"));
// JSONObject dimInfo = DimUtil.getDimInfo("dim_base_trademark", Tuple2.of("id", "13"));
JSONObject dimInfo = DimUtil.getDimInfo("dim_base_trademark", "13");
System.out.println(dimInfo);
}
g 修改DimSink的invoke方法
如果维度数据发生了变化,同时失效该数据对应的Redis中的缓存,先写入数据库,再失效缓存。
正常数据如果在缓存中查到,直接返回结果,否则去Hbase的维度表中进行查询,查询到返回结果,并加入到redis缓存。如果数据已经缓存到redis中,且Hbase维度表数据发生变化,需要将缓存数据删除。业务系统对数据进行修改,maxwell将变化采集到ods层,Flink程序从ods层拿到数据,对维度数据进行处理后将数据放到维度侧输出流中。在DimSink的invoke方法进行实际的业务操作,完成之后即完成了对Hbase表中数据的修改,修改完成之后需要将已经缓存的原数据从redis中删除。
在invoke方法最后添加
public void invoke(JSONObject jsonObj, Context context) throws Exception {
// 上游传递过来的数据格式如下:
// {"database":"gmall2022",
// "data":{"tm_name":"a","id":13},
// "commit":true,
// "sink_table":"dim_base_trademark",
// "type":"insert",
// "table":"base_trademark","
// ts":1670131087}
// 获取维度表表名
String tableName = jsonObj.getString("sink_table");
// 获取数据
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
String upsertSql = genUpsertSql(tableName,dataJsonObj);
System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);
PreparedStatement ps = null;
try {
// 创建数据库操作对象
ps = conn.prepareStatement(upsertSql);
// 执行sql语句
ps.executeUpdate();
// 手动提交事务,phoenix的连接实现类不是自动提交事务
conn.commit();
}catch (SQLException e){
e.printStackTrace();
throw new RuntimeException("向phoenix维度表中插入数据失败了");
} finally {
// 释放资源
if (ps != null){
ps.close();
}
}
// 如果当前维度数据进行删除或者修改,清空redis缓存中的数据
if (jsonObj.getString("type").equals("update") || jsonObj.getString("type").equals("delete")){
DimUtil.deleteCached(tableName,dataJsonObj.getString("id"));
}
}
// 根据redis中的key删除redis中的记录
public static void deleteCached(String tableName, String id) {
String redisKey = "dim:"+tableName.toLowerCase()+":"+ id;
try {
Jedis jedis = RedisUtil.getJedis();
jedis.del(redisKey);
jedis.close();
}catch (Exception e){
e.printStackTrace();
System.out.println("删除redis缓存发生了异常");
}
}
测试:
启动BaseDBApp,增加配置表配置如下图:
修改base_trademark中已经缓存到redis中的数据信息,BaseDBApp执行upsert操作,输出以下信息
维度数据::2> {"database":"gmall2022","xid":94206,"data":{"tm_name":"d","id":13},"old":{"tm_name":"b"},"commit":true,"sink_table":"dim_base_trademark","type":"update","table":"base_trademark","ts":1670593664}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('d','13')
获取Redis连接...
查看phoenix中数据是否改变,redis缓存数据是否删除。