文章目录
- 一 DWM层-订单宽表
- 1 维表关联代码实现
- (1)优化2:异步查询
- a 封装线程池工具类
- b 封装维度异步查询的函数类DimAsyncFunction
- c 自定义维度查询接口DimJoinFunction
- d 使用DimAsyncFunction
- 关联用户维度源码
- 测试
- 配置配置表
- 历史数据同步
- 总结
一 DWM层-订单宽表
1 维表关联代码实现
(1)优化2:异步查询
在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。
例如:在电商场景中,需要一个商品的sku id去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹id,需要去关联包裹的行业属性、发货信息、收货信息等等。
默认情况下,在Flink的MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。
Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。
这种方式特别针对涉及网络IO的操作,减少因为请求等待带来的消耗。
**先决条件。**正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。
a 封装线程池工具类
package com.hzy.gmall.realtime.utils;
/**
* 线程池工具类
*/
public class ThreadPoolUtil {
public static ThreadPoolExecutor pool;
// int corePoolSize, 初始线程数量
// int maximumPoolSize, 最大线程数
// long keepAliveTime, 当线程池中空闲线程的数量超过corePoolSize,在keepAliveTime时间后进行销毁
// TimeUnit unit, 时间单位
// BlockingQueue<Runnable> workQueue 要执行的任务队列
public static ThreadPoolExecutor getInstance(){
// 双重锁实现懒汉式单例创建
if (pool == null){
synchronized(ThreadPoolExecutor.class){
if (pool == null){
System.out.println("开辟线程池...");
pool = new ThreadPoolExecutor(
4,20,300, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE)
);
}
}
}
return pool;
}
}
b 封装维度异步查询的函数类DimAsyncFunction
该类继承异步方法类RichAsyncFunction,实现自定义维度查询接口。
其中RichAsyncFunction<IN,OUT>是Flink提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是<T,T>。
RichAsyncFunction这个类要实现两个方法:
open用于初始化异步连接池。
asyncInvoke方法是核心方法,里面的操作必须是异步的,如果查询的数据库有异步api也可以用线程的异步方法,如果没有异步方法,就要自己利用线程池等方式实现异步查询。
package com.hzy.gmall.realtime.app.fun;
/**
* 实现维度的异步关联
* 模板方法设计模式:在父类中定义实现某一个功能的核心算法骨架,将具体的实现延迟到子类中去完成
* 子类不改变父类核心算法骨架的前提下,每一个子类都可以有自己的独立实现。
*/
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T,T>{
private ExecutorService executorService;
private String tableName;
public DimAsyncFunction(String tableName) {
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
// 创建线程池对象
executorService = ThreadPoolUtil.getInstance();
}
// 发送异步请求,完成维度关联
// 通过创建多线程的方式发送异步请求
// 此方法每处理流中的一条数据,都会执行一次
@Override
public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
// 通过线程池获取线程
executorService.submit(new Runnable() {
// run中的代码实现的就是异步维度关联操作
@Override
public void run() {
try {
long start = System.currentTimeMillis();
// 从对象中获取维度关联的key
String key = getKey(obj);
// 根据key到维度表中获取维度对象
JSONObject dimJsonObj = DimUtil.getDimInfo(tableName, key);
// 把维度对象的属性赋值给流中对象属性(维度关联)
if (dimJsonObj != null){
join(obj,dimJsonObj);
}
long end = System.currentTimeMillis();
System.out.println("维度异步查询耗时:" + (end - start) + "毫秒");
resultFuture.complete(Collections.singleton(obj));
}catch (Exception e){
e.printStackTrace();
System.out.println("维度异步查询发生异常...");
}
}
});
}
}
c 自定义维度查询接口DimJoinFunction
这个异步维表查询的方法适用于各种维表的查询,用什么条件查,查出来的结果如何合并到数据流对象中,需要使用者自己定义。
这就是自己定义了一个接口DimJoinFunction<T>
包括两个方法。
package com.hzy.gmall.realtime.app.fun;
// 维度关联查询的接口
public interface DimJoinFunction<T> {
/**
* 需要实现如何把结果装配给数据流对象
* @param obj 数据流对象
* @param jsonObject 异步查询结果
* @throws Exception
*/
void join(T obj, JSONObject dimJsonObj) throws Exception;
/**
* 需要实现如何从流中对象获取主键
* @param obj 数据流对象
*/
String getKey(T obj);
}
d 使用DimAsyncFunction
核心的类是AsyncDataStream,这个类有两个方法一个是有序等待(orderedWait),一个是无序等待(unorderedWait)。
- 无序等待(unorderedWait)
后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会有乱序出现。
- 有序等待(orderedWait)
严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会差一些。
- 注意
- 这里实现了用户维表的查询,那么必须重写装配结果join方法和获取查询rowkey的getKey方法。
- 方法的最后两个参数60,
TimeUnit.SECONDS
,标识次异步查询最多执行60秒,否则会报超时异常。
关联用户维度源码
在OrderWideApp中关联不同维度数据。
// TODO 8 和用户维度进行关联
// 以下方式(同步)实现效率低
// orderWideDS.map(
// new MapFunction<OrderWide, OrderWide>() {
// @Override
// public OrderWide map(OrderWide orderWide) throws Exception {
// Long user_id = orderWide.getUser_id();
// JSONObject userDimInfo = DimUtil.getDimInfo("dim_user_info", user_id.toString());
// String gender = userDimInfo.getString("GENDER");
// orderWide.setUser_gender(gender);
// return null;
// }
// }
// );
// 异步操作,和用户表关联
SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
orderWideDS,
// 动态绑定
new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject dimJsonObj) throws ParseException {
String gender = dimJsonObj.getString("GENDER");
orderWide.setUser_gender(gender);
// 2000-10-02
String birthday = dimJsonObj.getString("BIRTHDAY");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date birthdayDate = sdf.parse(birthday);
long diffMs = System.currentTimeMillis() - birthdayDate.getTime();
long ageLong = diffMs / 1000L / 60L / 60L / 24L / 365L;
orderWide.setUser_age((int) ageLong);
}
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getUser_id().toString();
}
},
60,
TimeUnit.SECONDS
);
orderWideWithUserDS.print(">>>");
测试
总体业务流程如下图:
启动zookeeper、kafka、hdfs、等待安全模式退出启动hbase、maxwell、phoenix。
配置配置表
加入用户表的配置到配置表,批量导入,删除原有的几条配置,执行以下语句
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'insert', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'update', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'insert', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'update', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'insert', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'update', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'insert', 'hbase', 'dim_base_category1', 'id,name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'update', 'hbase', 'dim_base_category1', 'id,name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'insert', 'hbase', 'dim_base_category2', 'id,name,category1_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'update', 'hbase', 'dim_base_category2', 'id,name,category1_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'insert', 'hbase', 'dim_base_category3', 'id,name,category2_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'update', 'hbase', 'dim_base_category3', 'id,name,category2_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'insert', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'update', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'insert', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'update', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'insert', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'update', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'insert', 'hbase', 'dim_base_trademark', 'id,tm_name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'update', 'hbase', 'dim_base_trademark', 'id,tm_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('cart_info', 'insert', 'kafka', 'dwd_cart_info', 'id,user_id,sku_id,cart_price,sku_num,img_url,sku_name,is_checked,create_time,operate_time,is_ordered,order_time,source_type,source_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('comment_info', 'insert', 'kafka', 'dwd_comment_info', 'id,user_id,nick_name,head_img,sku_id,spu_id,order_id,appraise,comment_txt,create_time,operate_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'insert', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'update', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'insert', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'update', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'insert', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', 'id', ' SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'update', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('favor_info', 'insert', 'kafka', 'dwd_favor_info', 'id,user_id,sku_id,spu_id,is_cancel,create_time,cancel_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'insert', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'update', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail', 'insert', 'kafka', 'dwd_order_detail', 'id,order_id,sku_id,sku_name,order_price,sku_num,create_time,source_type,source_id,split_activity_amount,split_coupon_amount,split_total_amount', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_activity', 'insert', 'kafka', 'dwd_order_detail_activity', 'id,order_id,order_detail_id,activity_id,activity_rule_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_coupon', 'insert', 'kafka', 'dwd_order_detail_coupon', 'id,order_id,order_detail_id,coupon_id,coupon_use_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'insert', 'kafka', 'dwd_order_info', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'update', 'kafka', 'dwd_order_info_update', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_refund_info', 'insert', 'kafka', 'dwd_order_refund_info', 'id,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,refund_reason_txt,refund_status,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'insert', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'update', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'insert', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'update', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'insert', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', 'id', ' SALT_BUCKETS = 4');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'update', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'insert', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', 'id', ' SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'update', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'insert', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', 'id', ' SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'update', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', NULL, NULL);
结果如下:
开启BaseDBApp,配置表中添加了许多配置信息,运行后,程序会在hbase中创建相应的维度表。
在phoenix中可以查看到对应的数据。
历史数据同步
业务系统的user_info表中存在数据,maxwell采集时需要对用户信息的历史数据进行同步。
# maxwell-bootstrap的作用同步历史数据,实现将user_info表中的数据全部读取出来,不具备封装JSON并发送到hbase中的能力,--client_id 中的id maxwell_1 在/opt/module/maxwell-1.25.0/config.properties文件中进行过配置,由maxwell进程完成将数据封装成json并发送到hbase
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2022 --table user_info --client_id maxwell_1
完成后,维度表中数据如下:
启动OrderWideApp,启动redis:redis-server /home/hzy/redis2022.conf
,生成业务数据。
结果如下图:
总结
- 需要启动的进程:
- zookeeper、kafka、maxwell、hdfs、hbase、redis
- BaseDBApp分流
- OrderWideApp订单宽表准备
- 程序运行流程:
- 当运行模拟生成业务数据的jar包
- 向业务数据库MySQL中插入生成的业务数据
- MySQL将变化的数据放到binlog中
- maxwell从binlog中获取数据,将数据封装成JSON字符串发送到kafka的ods主题中(ods_base_db_m)
- BaseDBApp从ods_base_db_m主题中读取数据,进行分流
- 事实数据:写回到kafka的dwd主题中
- 维度数据:保存到phoenix的维度表中
- OrderWideApp从dwd主题中获取订单和订单明细主题
- 使用intervalJoin对订单和订单明细进行双流join
- 将用户维度关联到订单宽表上
- 基本的维度关联
- 优化1:旁路缓存
- 优化2:异步IO – 模板方法设计模式