【实时数仓】DWM层订单宽表之维表关联优化 -- 异步查询

news2025/1/15 19:56:03

文章目录

  • 一 DWM层-订单宽表
    • 1 维表关联代码实现
      • (1)优化2:异步查询
        • a 封装线程池工具类
        • b 封装维度异步查询的函数类DimAsyncFunction
        • c 自定义维度查询接口DimJoinFunction
        • d 使用DimAsyncFunction
          • 关联用户维度源码
          • 测试
            • 配置配置表
            • 历史数据同步
          • 总结

一 DWM层-订单宽表

1 维表关联代码实现

(1)优化2:异步查询

在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。

例如:在电商场景中,需要一个商品的sku id去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹id,需要去关联包裹的行业属性、发货信息、收货信息等等。

默认情况下,在Flink的MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。

Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。

Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

在这里插入图片描述

异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。

这种方式特别针对涉及网络IO的操作,减少因为请求等待带来的消耗。

**先决条件。**正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。

a 封装线程池工具类

package com.hzy.gmall.realtime.utils;
/**
 * 线程池工具类
 */
public class ThreadPoolUtil {

    public static ThreadPoolExecutor pool;

//    int corePoolSize,         初始线程数量
//    int maximumPoolSize,      最大线程数
//    long keepAliveTime,       当线程池中空闲线程的数量超过corePoolSize,在keepAliveTime时间后进行销毁
//    TimeUnit unit,            时间单位
//    BlockingQueue<Runnable> workQueue     要执行的任务队列
    public static ThreadPoolExecutor getInstance(){
        // 双重锁实现懒汉式单例创建
        if (pool == null){
            synchronized(ThreadPoolExecutor.class){
                if (pool == null){
                    System.out.println("开辟线程池...");
                    pool = new ThreadPoolExecutor(
                            4,20,300, TimeUnit.SECONDS,
                            new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE)
                    );
                }
            }

        }
        return pool;
    }
}

b 封装维度异步查询的函数类DimAsyncFunction

该类继承异步方法类RichAsyncFunction,实现自定义维度查询接口。

其中RichAsyncFunction<IN,OUT>是Flink提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是<T,T>。

RichAsyncFunction这个类要实现两个方法:

open用于初始化异步连接池。

asyncInvoke方法是核心方法,里面的操作必须是异步的,如果查询的数据库有异步api也可以用线程的异步方法,如果没有异步方法,就要自己利用线程池等方式实现异步查询。

package com.hzy.gmall.realtime.app.fun;
/**
 * 实现维度的异步关联
 * 模板方法设计模式:在父类中定义实现某一个功能的核心算法骨架,将具体的实现延迟到子类中去完成
 *                  子类不改变父类核心算法骨架的前提下,每一个子类都可以有自己的独立实现。
 */
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T,T>{

    private ExecutorService executorService;
    private String tableName;

    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 创建线程池对象
        executorService = ThreadPoolUtil.getInstance();
    }

    // 发送异步请求,完成维度关联
    // 通过创建多线程的方式发送异步请求
    // 此方法每处理流中的一条数据,都会执行一次
    @Override
    public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
        // 通过线程池获取线程
        executorService.submit(new Runnable() {
            // run中的代码实现的就是异步维度关联操作
            @Override
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    // 从对象中获取维度关联的key
                    String key = getKey(obj);
                    // 根据key到维度表中获取维度对象
                    JSONObject dimJsonObj = DimUtil.getDimInfo(tableName, key);
                    // 把维度对象的属性赋值给流中对象属性(维度关联)
                    if (dimJsonObj != null){
                        join(obj,dimJsonObj);
                    }
                    long end = System.currentTimeMillis();
                    System.out.println("维度异步查询耗时:" + (end - start) + "毫秒");
                    resultFuture.complete(Collections.singleton(obj));
                }catch (Exception e){
                    e.printStackTrace();
                    System.out.println("维度异步查询发生异常...");
                }
            }
        });
    }
}

c 自定义维度查询接口DimJoinFunction

这个异步维表查询的方法适用于各种维表的查询,用什么条件查,查出来的结果如何合并到数据流对象中,需要使用者自己定义。

这就是自己定义了一个接口DimJoinFunction<T>包括两个方法。

package com.hzy.gmall.realtime.app.fun;

// 维度关联查询的接口
public interface DimJoinFunction<T> {

   /**
     * 需要实现如何把结果装配给数据流对象
     * @param obj  数据流对象
     * @param jsonObject   异步查询结果
     * @throws Exception
     */
    void join(T obj, JSONObject dimJsonObj) throws Exception;

     /**
     * 需要实现如何从流中对象获取主键
     * @param obj  数据流对象
     */
    String getKey(T obj);
}

d 使用DimAsyncFunction

核心的类是AsyncDataStream,这个类有两个方法一个是有序等待(orderedWait),一个是无序等待(unorderedWait)。

  • 无序等待(unorderedWait)

后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会有乱序出现。

  • 有序等待(orderedWait)

严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会差一些。

  • 注意
    • 这里实现了用户维表的查询,那么必须重写装配结果join方法和获取查询rowkey的getKey方法。
    • 方法的最后两个参数60, TimeUnit.SECONDS,标识次异步查询最多执行60秒,否则会报超时异常。
关联用户维度源码

在OrderWideApp中关联不同维度数据。

        // TODO 8 和用户维度进行关联
        // 以下方式(同步)实现效率低
//        orderWideDS.map(
//                new MapFunction<OrderWide, OrderWide>() {
//                    @Override
//                    public OrderWide map(OrderWide orderWide) throws Exception {
//                        Long user_id = orderWide.getUser_id();
//                        JSONObject userDimInfo = DimUtil.getDimInfo("dim_user_info", user_id.toString());
//                        String gender = userDimInfo.getString("GENDER");
//                        orderWide.setUser_gender(gender);
//                        return null;
//                    }
//                }
//        );
        // 异步操作,和用户表关联
        SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
                orderWideDS,
                // 动态绑定
                new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
                    @Override
                    public void join(OrderWide orderWide, JSONObject dimJsonObj) throws ParseException {
                        String gender = dimJsonObj.getString("GENDER");
                        orderWide.setUser_gender(gender);

                        // 2000-10-02
                        String birthday = dimJsonObj.getString("BIRTHDAY");
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                        Date birthdayDate = sdf.parse(birthday);
                        long diffMs = System.currentTimeMillis() - birthdayDate.getTime();
                        long ageLong = diffMs / 1000L / 60L / 60L / 24L / 365L;
                        orderWide.setUser_age((int) ageLong);
                    }

                    @Override
                    public String getKey(OrderWide orderWide) {
                        return orderWide.getUser_id().toString();
                    }
                },
                60,
                TimeUnit.SECONDS
        );
        
        orderWideWithUserDS.print(">>>");
测试

总体业务流程如下图:

在这里插入图片描述

启动zookeeper、kafka、hdfs、等待安全模式退出启动hbase、maxwell、phoenix。

配置配置表

加入用户表的配置到配置表,批量导入,删除原有的几条配置,执行以下语句

INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'insert', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'update', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'insert', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'update', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'insert', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'update', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'insert', 'hbase', 'dim_base_category1', 'id,name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'update', 'hbase', 'dim_base_category1', 'id,name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'insert', 'hbase', 'dim_base_category2', 'id,name,category1_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'update', 'hbase', 'dim_base_category2', 'id,name,category1_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'insert', 'hbase', 'dim_base_category3', 'id,name,category2_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'update', 'hbase', 'dim_base_category3', 'id,name,category2_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'insert', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'update', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'insert', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'update', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'insert', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'update', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'insert', 'hbase', 'dim_base_trademark', 'id,tm_name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'update', 'hbase', 'dim_base_trademark', 'id,tm_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('cart_info', 'insert', 'kafka', 'dwd_cart_info', 'id,user_id,sku_id,cart_price,sku_num,img_url,sku_name,is_checked,create_time,operate_time,is_ordered,order_time,source_type,source_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('comment_info', 'insert', 'kafka', 'dwd_comment_info', 'id,user_id,nick_name,head_img,sku_id,spu_id,order_id,appraise,comment_txt,create_time,operate_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'insert', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'update', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'insert', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'update', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'insert', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', 'id', ' SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'update', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('favor_info', 'insert', 'kafka', 'dwd_favor_info', 'id,user_id,sku_id,spu_id,is_cancel,create_time,cancel_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'insert', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'update', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail', 'insert', 'kafka', 'dwd_order_detail', 'id,order_id,sku_id,sku_name,order_price,sku_num,create_time,source_type,source_id,split_activity_amount,split_coupon_amount,split_total_amount', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_activity', 'insert', 'kafka', 'dwd_order_detail_activity', 'id,order_id,order_detail_id,activity_id,activity_rule_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_coupon', 'insert', 'kafka', 'dwd_order_detail_coupon', 'id,order_id,order_detail_id,coupon_id,coupon_use_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'insert', 'kafka', 'dwd_order_info', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'update', 'kafka', 'dwd_order_info_update', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_refund_info', 'insert', 'kafka', 'dwd_order_refund_info', 'id,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,refund_reason_txt,refund_status,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'insert', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'update', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'insert', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'update', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'insert', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', 'id', ' SALT_BUCKETS = 4');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'update', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'insert', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', 'id', ' SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'update', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'insert', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', 'id', ' SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'update', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', NULL, NULL);

结果如下:

在这里插入图片描述

开启BaseDBApp,配置表中添加了许多配置信息,运行后,程序会在hbase中创建相应的维度表。

在phoenix中可以查看到对应的数据。

在这里插入图片描述

历史数据同步

业务系统的user_info表中存在数据,maxwell采集时需要对用户信息的历史数据进行同步。

# maxwell-bootstrap的作用同步历史数据,实现将user_info表中的数据全部读取出来,不具备封装JSON并发送到hbase中的能力,--client_id 中的id maxwell_1 在/opt/module/maxwell-1.25.0/config.properties文件中进行过配置,由maxwell进程完成将数据封装成json并发送到hbase
bin/maxwell-bootstrap --user maxwell  --password 123456 --host hadoop101  --database gmall2022 --table user_info --client_id maxwell_1

完成后,维度表中数据如下:

在这里插入图片描述

启动OrderWideApp,启动redis:redis-server /home/hzy/redis2022.conf,生成业务数据。

结果如下图:

在这里插入图片描述

总结
  • 需要启动的进程:
    • zookeeper、kafka、maxwell、hdfs、hbase、redis
    • BaseDBApp分流
    • OrderWideApp订单宽表准备
  • 程序运行流程:
    • 当运行模拟生成业务数据的jar包
    • 向业务数据库MySQL中插入生成的业务数据
    • MySQL将变化的数据放到binlog中
    • maxwell从binlog中获取数据,将数据封装成JSON字符串发送到kafka的ods主题中(ods_base_db_m)
    • BaseDBApp从ods_base_db_m主题中读取数据,进行分流
      • 事实数据:写回到kafka的dwd主题中
      • 维度数据:保存到phoenix的维度表中
    • OrderWideApp从dwd主题中获取订单和订单明细主题
    • 使用intervalJoin对订单和订单明细进行双流join
    • 将用户维度关联到订单宽表上
      • 基本的维度关联
      • 优化1:旁路缓存
      • 优化2:异步IO – 模板方法设计模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/107071.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

构建Web应用程序哪个最好:PHP,Python还是Ruby?

在本文中&#xff0c;我们将了解哪种PHP&#xff0c;Python和Ruby最适合构建Web应用程序。 什么是 Python&#xff1f; Python 是一种具有动态语义的解释型、面向对象的高级编程语言。其高级内置数据结构&#xff0c;以及动态绑定和动态类型&#xff0c;使其特别适合用作脚本…

如何计算并有效降低独立站的客户流失率?

关键词&#xff1a;客户流失率 独立站 对于跨境电商的独立站运营来说&#xff0c;现在获取一个新用户的成本越来越高&#xff0c;成本可能是维护现有客户的5倍;品牌有大约60%–70% 的机会向现有客户销售产品&#xff0c;而仅有大约 20% 的机会向新客户销售产品。 所以&#xff…

有奖征文活动:从 RTC 到 RTE,从音视频到「实时万象」!

Hi 小伙伴们&#xff0c; 社区已经更名 RTE 两个月辣&#xff5e;大家对于 RTC 和 RTE 的区分&#xff0c;是否还有疑惑呢&#xff1f;&#x1f914; 关于这两者的区别&#xff0c;我们创始人兼 CEO 赵斌老师说&#xff1a; “RTC&#xff08;实时音视频&#xff09;从 Commu…

k8s-Service简单学习

目录 Service介绍 Service类型 Service使用 实验环境准备 ClusterIP类型的Service HeadLiness类型的Service NodePort类型的Service LoadBalancer类型的Service ExternalName类型的Service Ingress介绍&#xff08;最好&#xff09; Ingress使用 环境准备 Http代理 …

网络安全实战之植入后门程序

在VMware上建立两个虚拟机&#xff1a;win7和kali。 Kali&#xff1a;它是Linux发行版的操作系统&#xff0c;它拥有超过300个渗透测试工具&#xff0c;就不用自己再去找安装包&#xff0c;去安装到我们自己的电脑上了&#xff0c;毕竟自己从网上找到&#xff0c;也不安全。它…

搞懂 Dubbo 入门理论,这一篇就够

RPC简介 ● RPC&#xff0c; Remote Procedure Call &#xff0c;远程过程调用&#xff0c;是一种跨系统间服务调用的协议或框架 ● 在很多企业&#xff0c;在内部存在大量的业务子系统&#xff0c;这些子系统都承担独立的业务功能&#xff0c;并相互协作&#xff0c;实现企业…

[思维模式-10]:《如何系统思考》-6- 认识篇 - 结构决定功能,如何进行深度思考

目录 第1章 深度思考概述 1.1 深度思考的本质 1.2 深度思考的冰山模型 1.3 行为模式&#xff1a;结构与现象之间的桥梁 第2章 复杂多变的现象背后的六种基本的行为模式 第3章 透过模式/趋势看清内在的结构 第1章 深度思考概述 1.1 深度思考的本质 结构决定功能&#x…

超标量处理器设计——第三章_虚拟存储器

超标量处理器设计——第三章_虚拟存储器 参考《超标量处理器》姚永斌著 文章目录超标量处理器设计——第三章_虚拟存储器3.2 地址转换3.2.1 单级页表3.2.2 多级页表3.2.3 Page Fault3.3 程序保护3.4 加入TLB和Cache3.4.1 TLB的设计TLB缺失TLB的写入对TLB进行控制3.4.2 Cache的设…

时变电磁场 工程电磁场 P24

两个重要定律 首先是两个非常重要的定理 法拉第电磁感应定律完整形式&#xff1a; 应用斯托克斯定律&#xff0c;我们可以得到 在静止媒质钟我们有 全电流定律 对于非恒定的电流&#xff0c;我们可以写成 相应的微分形式 电磁感应定律与全电流定律构成了电磁场理论的基石 这两…

python自学难吗?零基础学python难吗?

作为一名小白&#xff0c;学习任务新东西的时候都会关系&#xff0c;要学的东西难吗&#xff1f;学习Python时自然也会关心Python难吗&#xff1f;能学会吗&#xff1f; 从编程语言的角度来看&#xff0c;Python相比于其他语言真的是不难&#xff0c;Python本身极简的语法&…

正点原子-Linux嵌入式开发学习-第二期03

第九讲&#xff1a;模仿STM32驱动开发实验 前言&#xff1a;在02中我们学习的如何使用C语言的知识去编写代码&#xff0c;并且是直接定义寄存器地址的。你自己回想一下&#xff0c;stm32的库文件是这样的吗&#xff1f;当然不是&#xff0c;它是继续封装了地址&#xff0c;把寄…

ACL介绍

ACL 中文&#xff1a;访问控制列表 介绍&#xff1a; ACL 是网络当中策略的一种&#xff0c;策略&#xff1a;我们之前学的内容只够我们把网络连通&#xff0c;但网络不仅仅是能连通那么简单&#xff0c;在保证网络能连通的基础上&#xff0c;应该还有更高一层的追求&#xf…

地统计插值学习心得(二)

简介 交叉验证是一种“留一”法,可用于确定插值模型与数据的拟合程度。交叉验证从数据集中移除一个点,并使用剩余的所有其他点来预测被移除点的位置。预测值随后与测量值进行比较,并生成大量统计数据 来确定预测的准确性。 交叉验证窗格可以用于展示并评价插值模型的执行效…

Redis持久化之RDB

Redis持久化之RDB1.RDB&#xff08;Redis DataBase&#xff09;1.1 RDB是什么1.2 dump.rdb文件1.3Redis启动方式1.4 配置文件中默认快照配置1.5 命令save VS bgsave1.6 RDB举例1.7 stop-writes-on-bgsave-error1.8 rdbchecksum 检查完整性2.持久化如何执行的2.1 流程优势劣势1.…

【小学信息技术教资面试】《生活在信息中》教案

题目&#xff1a;生活在信息中内容&#xff1a; 基本要求&#xff1a; &#xff08;1&#xff09;说出信息的不同表现形式及信息在生活中的应用。 &#xff08;2&#xff09;教学中注意师生间的交流互动。 &#xff08;3&#xff09;十分钟之内完成试讲。 《生活在信息中》教…

【学习笔记04】生命周期的钩子函数

目录一、生命周期的钩子函数二、创建阶段三、挂载阶段四、父子组件创建和挂载阶段钩子函数的执行次序五、更新阶段六、销毁阶段七、复习和补充1、MVVM2、v-for中的key值3、$nextTick一、生命周期的钩子函数 在组件的生命周期的过程中自动的调用的函数&#xff0c;叫做生命周期…

C++进阶 多态讲解

作者&#xff1a;小萌新 专栏&#xff1a;C进阶 作者简介&#xff1a;大二学生 希望能和大家一起进步&#xff01; 本篇博客简介&#xff1a;简单介绍C中多态的概念 多态多态的概念多态的定义及实现多态的构成条件虚函数虚函数的重写虚函数重写的两个例外协变析构函数的重写C11…

【ML】numpy meshgrid函数使用说明(全网最简单版)

【ML】numpy meshgrid函数使用说明meshgrid的作用&#xff1f;怎么使用&#xff08;举例说明&#xff09;手工描点&#xff08;帮助理解&#xff09;怎么画三维&#xff1f;附画图代码meshgrid的作用&#xff1f; 首先要明白numpy.meshgrid()函数是为了画网格&#xff0c;&…

Systemverilog实现参数化的Round-Robin Arbiter Tree

本篇内容涉及的rtl代码为开源组织PLUP的common cell仓库中的源代码&#xff0c;本文只是对其进行些许解读。源码链接如下&#xff1a;[https://github.com/pulp-platform/common_cells/blob/dc555643226419b7a602f0aa39d449545ea4c1f2/src/rr_arb_tree.sv] “想要快速提升编程能…

基于springboot的公司人事管理系统

1 简介 今天向大家介绍一个帮助往届学生完成的毕业设计项目&#xff0c;公司人事管理系统。 计算机毕业生设计,课程设计需要帮助的可以找我 源码获取------》 链接&#xff1a;https://pan.baidu.com/s/1CdxrlV7GeRRmsT9UWEMtJg 提取码&#xff1a;cygy 2 设计概要 21世纪…