sqllineage解析FineBI数据集导入Datahub生成血缘

news2025/1/8 11:50:58

需求

当前数仓架构流程图如下图所示,不支持端到端数据血缘,数据异常排查及影响分析比较被动,需要端到端数据血缘及元数据管理。
在这里插入图片描述
业务系统:各种制造业业务系统(高速迭代、重构、新建中)
数仓开发平台:数栖平台,支持数仓内各层级的DAG调度血缘图
在这里插入图片描述
数仓导出库:PG
BI可视化系统:FineBI,支持内部数据集、图表的血缘
在这里插入图片描述
通过调研分析,引入datahub做元数据管理平台,实现效果如下图展示。
在这里插入图片描述
在这里插入图片描述

方案

实现如下端到端血缘图:
BI报表/仪表盘(dashboard)->BI组件(chart)->BI数据集->数仓导出库(PG)->数仓数据资产(数栖平台)->上游业务系统
工作内容:

  • ✅datahub中自定义FineBI、数栖平台的plateform及图表
  • ✅解析FineBI数据库,获取FineBI中BI报表/仪表盘(dashboard)->BI组件(chart)->BI数据集的血缘关系,调用Datahub rest emiter接口,datahub中生成血缘。
  • ✅获取BI数据集的SQL代码,通过sqllineage解析BI数据集与数仓导出库(PG)的血缘关系,调用Datahub rest emiter接口,datahub中生成血缘。
  • ✅获取数栖平台数据库中工作流、Hive任务的关系,获取Hive任务的SQL代码,通过sqllineage解析SQL代码的血缘,调用Datahub rest emiter接口,datahub中生成血缘。

本文介绍:

  • ✅datahub中自定义FineBI、数栖平台的plateform及图表
  • ✅通过sqllineage解析SQL生成血缘关系
  • ✅调用Datahub rest emiter接口,datahub中生成血缘

前置工作

  • 安装Datahub
  • 安装sqllineage

datahub自定义图标

官方文档:


[cloud@dp-web-uic1 datahub_ingest]$ datahub put platform --name fine_bi --display_name "FineBI" --logo "https://www.finebi.com/images/logo-FineBI.png"
✅ Successfully wrote data platform metadata for urn:li:dataPlatform:fine_bi to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)
[cloud@dp-web-uic1 ~]$ datahub put platform --name yuan_xiang --display_name "源象" --logo "https://www.dtwave.com/images/index/product/shuqi.svg"
✅ Successfully wrote data platform metadata for urn:li:dataPlatform:yuan_xiang to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)
[cloud@dp-web-uic1 ~]$ datahub put platform --name dolphinscheduler --display_name "海豚调度" --logo "https://dolphinscheduler.apache.org/img/hlogo_white.svg"
✅ Successfully wrote data platform metadata for urn:li:dataPlatform:dolphinscheduler to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)
[cloud@dp-web-uic1 datahub_ingest]$ datahub put platform --name statrocks --display_name "StarRocks" --logo "https://docs.starrocks.io/static/b660bcde69091ea56bd94cac0a907018/95f17/starrocks-logo_en-us.png"
✅ Successfully wrote data platform metadata for urn:li:dataPlatform:statrocks to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)

sqllineage解析SQL生成血缘关系

  • sqllineage解析SQL生成血缘测试
from sqllineage.runner import LineageRunner
def test_create_as():
    sql="""
-- mes数据中获取每个批次第一次上线扫码时间
drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_00;
create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_00
as
 	select 
 		min(produce_date) min_produce_DATE,
 		mo_lot_no,
 		organization_id
 from  bda${db_para}.BDA_MES_PRODUCT_SUMMARY   
 	where factory_no ='CY-SR' 
 		and step_name in ('OC上线组装','整机组装1') 
	group by mo_lot_no,
 			 organization_id
;

-- 订单承诺
drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1;
create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1
as 
select   t1.version_id                
       , t1.promise_id                
       , t1.organization_id           
       , t1.order_id                  
       , t1.order_no                  
       , t1.order_stage               
       , t1.order_type                
       , t1.so_type                   
       , t1.order_status              
       , t1.order_priority            
       , t1.promise_status            
       , t1.product_id                
       , t1.product_no                
       , t1.product_model             
       , t1.order_qty                 
       , t1.bu_name                   
       , t1.rcv_client_name           
       , t1.prepared_client_name      
       , t1.order_source              
       , t1.om_user_name              
       , t1.term_cust                 
       , t1.to_pur_time               
       , t1.factory_no                
       , t1.mo_lot_no                 
       , t1.completed_qty             
       , t1.mo_audit_status           
       , t1.req_arrival_time          
       , t1.mtr_ready_time            
       , t1.plan_promise_time         
       , t1.promise_date_change_reason
       , t1.schedule_start_time       
       , t1.schedule_end_time         
       , t1.pps_type                  
       , t1.pps_exception_info        
       , t1.promise_diff_day          
       , t1.promise_delivery_cycle    
       , t1.change_reason             
       , t1.client_abbr               
       , t1.item_type_product         
       , t1.match_forecast            
       , t1.software_flag             
       , t1.risk_level                
       , t1.risk_reason               
       , t1.ckd_type                  
       , t1.crt_user                  
       , t1.crt_time                  
       , t1.upd_user                  
       , t1.upd_time                  
       , t1.crt_user_name             
       , t1.upd_user_name                                 
from   bda${db_para}.bda_whole_pto_order  t1
left join bda${db_para}.bda_promise_history_record  t2  on t1.promise_id = t2.promise_id  and coalesce(t2.afterchangereason,'') = 'AGAIN_PLAN'
where  t1.version_id like '%最新版本%' 
and    t2.promise_id is null
union all 
select    t1.version_id                
       ,  t1.promise_id                
       ,  t1.organization_id           
       ,  t1.order_id                  
       ,  t1.order_no                  
       ,  t1.order_stage               
       ,  t1.order_type                
       ,  t1.so_type                   
       ,  t1.order_status              
       ,  t1.order_priority            
       ,  t1.promise_status            
       ,  t1.product_id                
       ,  t1.product_no                
       ,  t1.product_model             
       ,  t1.order_qty                 
       ,  t1.bu_name                   
       ,  t1.rcv_client_name           
       ,  t1.prepared_client_name      
       ,  t1.order_source              
       ,  t1.om_user_name              
       ,  t1.term_cust                 
       ,  t1.to_pur_time               
       ,  t1.factory_no                
       ,  t1.mo_lot_no                 
       ,  t1.completed_qty             
       ,  t1.mo_audit_status           
       ,  t1.req_arrival_time          
       ,  t1.mtr_ready_time            
       ,  t1.plan_promise_time         
       ,  t1.promise_date_change_reason
       ,  t1.schedule_start_time       
       ,  t1.schedule_end_time         
       ,  t1.pps_type                  
       ,  t1.pps_exception_info        
       ,  t1.promise_diff_day          
       ,  t1.promise_delivery_cycle    
       ,  t1.change_reason             
       ,  t1.client_abbr               
       ,  t1.item_type_product         
       ,  t1.match_forecast            
       ,  t1.software_flag             
       ,  t1.risk_level                
       ,  t1.risk_reason               
       ,  t1.ckd_type                  
       ,  t1.crt_user                  
       ,  t1.crt_time                  
       ,  t1.upd_user                  
       ,  t1.upd_time                  
       ,  t1.crt_user_name             
       ,  t1.upd_user_name                           
from (
       select   t1.version_id                
             ,  t1.promise_id                
             ,  t1.organization_id           
             ,  t1.order_id                  
             ,  t1.order_no                  
             ,  t1.order_stage               
             ,  t1.order_type                
             ,  t1.so_type                   
             ,  t1.order_status              
             ,  t1.order_priority            
             ,  t1.promise_status            
             ,  t1.product_id                
             ,  t1.product_no                
             ,  t1.product_model             
             ,  t1.order_qty                 
             ,  t1.bu_name                   
             ,  t1.rcv_client_name           
             ,  t1.prepared_client_name      
             ,  t1.order_source              
             ,  t1.om_user_name              
             ,  t1.term_cust                 
             ,  t1.to_pur_time               
             ,  t1.factory_no                
             ,  t1.mo_lot_no                 
             ,  t1.completed_qty             
             ,  t1.mo_audit_status           
             ,  t1.req_arrival_time          
             ,  t1.mtr_ready_time            
             ,  t1.plan_promise_time         
             ,  t1.promise_date_change_reason
             ,  t1.schedule_start_time       
             ,  t1.schedule_end_time         
             ,  t1.pps_type                  
             ,  t1.pps_exception_info        
             ,  t1.promise_diff_day          
             ,  t1.promise_delivery_cycle    
             ,  t1.change_reason             
             ,  t1.client_abbr               
             ,  t1.item_type_product         
             ,  t1.match_forecast            
             ,  t1.software_flag             
             ,  t1.risk_level                
             ,  t1.risk_reason               
             ,  t1.ckd_type                  
             ,  t1.crt_user                  
             ,  t1.crt_time                  
             ,  t1.upd_user                  
             ,  t1.upd_time                  
             ,  t1.crt_user_name             
             ,  t1.upd_user_name             
             ,  row_number() over (partition by t1.promise_id order by t1.version_id desc) rn
      from   bda${db_para}.bda_whole_pto_order  t1
      where  version_id not like '%最新版本%' 
      and not exists (select 1 from bda${db_para}.bda_whole_pto_order t2 where version_id like '%最新版本%' and t1.promise_id = t2.promise_id )
      ) t1 
left join bda${db_para}.bda_promise_history_record  t2  on t1.promise_id = t2.promise_id  and coalesce(t2.afterchangereason,'') = 'AGAIN_PLAN'
where     t2.promise_id is null
and       t1.rn = 1
;




-- CRM订单与工单关联
drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01;
create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01
as
select      bu.dept_name bu_name
            ,t2.organization_id        -- 20220701 wyr
           --  ,'514' Organization_Id
            ,t1.item_code item_code
            ,cus.cus_name -- 收货客户
            ,t1.so_header_id
            ,t1.so_line_id so_line_id
            ,t1.so_code so_header_code
            ,t1.line_no so_line_code
            ,t2.wip_entity_name -- 工单号
            ,t2.lot_number -- 批次
            ,t2.Project_Name
            ,t1.om_user_name Om_User_Name -- 销管
            ,t1.sale_name sales_user -- 销售
            ,case when bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' 
                       and mig.min_class like '%PC模块%' then date_add(t1.pur_start_time, 20)
                  when bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' 
                       and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 35)
                  when bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' 
                       and mig.min_class like'%PC模块%' then date_add(t1.pur_start_time, 25)
                  when bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' 
                       and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 45)
                  when bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' 
                       and mig.min_class like '%PC模块%' then date_add(t1.pur_start_time, 20)
                  when bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' 
                       and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 30)
                  else t1.pur_start_time
             end stat_date -- 统计日期 提交下采购日期 + 对应日期
            ,substr(t1.expected_delivery_date, 1, 10) delivety_time -- 计划发运日期
            ,substr(t1.crt_time, 1, 10) crm_create_time -- 销售订单创建时间
            ,substr(t1.pur_start_time, 1, 10) purchase_date -- 提交下采购时间
            ,substr(t1.produce_start_time, 1, 10) produce_date -- 下生产时间
            ,substr(t2.Xwh_Creation_Date, 1, 10) wip_create_date -- 委外工单创建日期
            ,substr(t2.Scheduled_Start_Date, 1, 10) Scheduled_Start_Date -- 工单齐套日期
            ,substr(t2.Mc_Creation_Date, 1, 10)  Mc_Creation_Date -- 生管确认时间
            ,substr(t2.first_trx_date, 1, 10) first_finish_date -- 首次完工入库日期
            ,substr(t2.last_trx_date, 1, 10) last_finish_date -- 完全完工入库日期
            ,t1.so_type_name order_type -- 订单类型
            ,t2.wip_job_status -- 工单状态
            ,t2.Job_Type -- 工单类型
            ,t2.Class_Code -- 工单分类
            ,t2.Quantity_Completed -- 工单已完工数量
            ,t1.qty -- 订单数量
            ,case when t6.order_no is not null then t6.match_forecast else bsse.is_source_forecast end as is_source_forecast  -- 订单有无预测
            ,mio.planning_make_buy_code -- 整机加工模式 制造/采购
            ,case when mig.min_class like '%PC模块%' then 'PC模块' else '其他' end prod_type
            ,datediff(t2.last_trx_date, t1.pur_start_time) supply_cycle -- 供应链周期 (取多个工单中最早的完工入库时间,计算供应链周期)
            ,case when t1.so_type_name <> '备品订单' and t2.first_trx_date is not null then 'Y' else 'N' end supply_cycle_flag -- 供应链周期标识
            ,case when t1.so_type_name = '客户订单' and t2.Job_Type = '标准'
                       and (
                            (bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' 
                             and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 20)
                            or 
                            (bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' 
                             and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 35)
                            or
                            (bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' 
                             and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 25)
                            or
                            (bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' 
                             and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 45)
                            or
                            (bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' 
                             and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 20)
                            or
                            (bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' 
                             and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 35)
                           ) and t2.first_trx_date is not null then 'Y'
                  else 'N' end delivety_complete_flag -- 交付达成标识
            ,case when t1.so_type_name in  ('客户订单','销售订单') and t2.Job_Type = '标准' then 'Y' else 'N' end is_delivety_complete_flag -- 交付达成标识
            ,t1.expected_delivery_date overseas_stat_date -- 海外订单交付达成归集时间
            ,case when t1.so_type_name in  ('客户订单','销售订单')  -- and bsse.is_source_forecast is not null 
                       and datediff(t2.last_trx_date,  t1.expected_delivery_date) <= 0 and t2.last_trx_date is not null then 'Y'
                  else 'N' end overseas_is_delivety_complete_flag -- 海外订单交付达成标识
            ,case when t1.so_type_name in  ('客户订单','销售订单') -- and bsse.is_source_forecast is not null
                       and (datediff('${bizDate}', t1.expected_delivery_date) >= 0 
                            or (datediff('${bizDate}', t1.expected_delivery_date) < 0 and datediff(t2.last_trx_date, t1.expected_delivery_date) <= 0)
                           ) then 'Y' 
                  else 'N' end overseas_delivety_complete_flag -- 海外订单交付达成数据范围
            ,row_number() over(partition by t2.Lot_Number order by t1.pur_start_time) rn
            ,t2.Start_Quantity wip_qty
            ,t2.fisrt_picking_date -- 首次领料时间
            ,t3.first_ship_date
            ,t3.last_ship_date
            ,-1*trx33.shipped_qty shipped_qty -- 已出货数量 
            ,t2.Quantity_Completed + trx33.shipped_qty as difference_qty -- 差异
            ,dmpm.screen_size -- 尺寸
            ,t2.Created_By as pm_user -- 生管负责人
            ,substr(t3.min_scheduled_date, 1, 10) as min_scheduled_date -- 实际齐套日期
            ,substr(t5.min_produce_DATE, 1, 10)  min_produce_date
            ,t1.bt_name             -- add by tjl 2022.07.21 
            ,bsse.so_line_group_id  -- 
            ,substr(t3.online_date, 1, 10)  as online_date
            ,datediff(substr(t1.expected_delivery_date, 1, 10),substr(t1.pur_start_time, 1, 10)) as cus_expect_cycle  -- 客户期望周期
            ,case when t6.order_no is not null and t6.plan_promise_time is not null then datediff(substr(t6.plan_promise_time,1,10),substr(t1.pur_start_time, 1, 10))  -- 如有承诺日期 预计供应链=承诺日期-下采购日期
                  when t6.order_no is not null and t6.plan_promise_time is null and t2.wip_entity_name is null then datediff(date_add(substr(t6.mtr_ready_time, 1, 10),6),substr(t1.pur_start_time, 1, 10))  -- 无承诺日期 未开工单,= 齐套日期+6
                  when t2.wip_entity_name is not null and  t3.online_date is not null then datediff(date_add(substr(t3.online_date, 1, 10),4),substr(t1.pur_start_time, 1, 10))  -- 已开工单,已有上线日期,=上线日期+4
                  when t2.wip_entity_name is not null and  t3.online_date is  null then datediff(date_add(substr(t2.Scheduled_Start_Date, 1, 10),6),substr(t1.pur_start_time, 1, 10))  -- 已开工单,暂无上线日期,=齐套日期+6
              end as  estimate_supply_cycle   -- 预计供应链周期
             ,t8.cus_level
-- from        bda${db_para}.bda_oms_so_lines t1
FROM        bda${db_para}.bda_sd_so t1
left join  bda${db_para}.bda_sd_so_ext bsse 
on         t1.so_line_id = bsse.so_line_id
and        bsse.part_dt IN ('crm_so', 'oms_so') 
join        bda${db_para}.bda_job_inv_trx_zj_dtl t2
on           bsse.so_line_group_id = t2.source_line_id
-- and    t1.so_header_id = t2.source_header_id
left join   dim${db_para}.dim_hcm_orgunit bu
on          t1.bill_bu_id = bu.dept_oid
left join   bda${db_para}.comm_market_cus cus
on          t1.rec_cus_code = cus.id
-- join        (select item_value, fullname 
--              from o_crm${db_para}.comm_dictionary_detail
--              where parentcode = '$CRM_DELIVERY_SO_TYPE') cdd
-- on          cdd.item_value = t1.so_type
left join   dim${db_para}.md_item_group mig
on          t2.item_code = mig.item_code
left join   dim${db_para}.md_item_org mio
on          t1.item_code = mio.item_code
and         mio.Organization_Id = '514'
left join   dim${db_para}.dim_md_prod_model dmpm
on          mig.product_model = dmpm.prod_model
left join   bda${db_para}.bda_job_dtl t3
on          t2.wip_entity_name = t3.wip_entity_name
left join   o_md${db_para}.md_prod_model t4
on          mig.product_model = t4.product_model
left join   (select sum(trx_so.trx_qty) shipped_qty
                    ,trx_so.bch_nbr
                from bda${db_para}.bda_inv_item_trx_bach_dtl trx_so 
               where trx_so.trx_type_id = 33 
               group by trx_so.bch_nbr) trx33 
on          trx33.bch_nbr = t2.lot_number
left join  sda${db_para}.tmp_sda_delivety_complete_sr_sum_00 t5 on t5.mo_lot_no = t2.lot_number
left join  sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1 t6 
on         t1.line_code = t6.order_no
left join   bda${db_para}.bda_wip_mo_header t7 on t3.wip_entity_name = t7.ebs_mo_code
left join  (select  t.cus_code
     , t2.hcm_dept_oid    as dept_oid
     , max(t.cus_level)   as cus_level_id
     , max(t1.fullname)   as cus_level
     , t2.hcm_dept_name   as dept_name
from      o_crm${db_para}.cus_bu_ext_info t 
left join o_crm${db_para}.comm_dictionary_detail t1
on        t.cus_level = t1.item_value
and       t1.parentcode = '$CRM_CUS_LEVEL'
inner join dim${db_para}.dim_hcm_crm_org_map t2
on         t.bu_code = t2.dept_code
where      t2.dept_name not like '%失效%'
and        t.is_deleted = '0'
and        t2.hcm_dept_oid is not null
group by  t.cus_code,t2.hcm_dept_oid,t2.hcm_dept_name)  t8 
on         t1.rec_cus_code = t8.cus_code
and        bu.dept_oid = t8.dept_oid
where       t1.pur_start_time is not null
and         t1.is_onhand_out in ('0','否')
and         t4.finished_or_semi_finished_prod = '成品'
AND         t1.part_dt IN ('crm_so', 'oms_so') 
and         t3.wip_job_status<>'已取消' and (t3.wip_job_status<>'已关闭' or t3.quantity_completed >0)
and         coalesce(t7.source_demand_max,'')<>'相关需求'
;






insert overwrite table sda${db_para}.sda_delivety_complete_sr_sum
select       t.bu_name
             ,t.Organization_Id
             ,t.item_code
             ,t.cus_name -- 收货客户
             ,t.so_header_code
             ,t.so_line_code
             ,t.wip_entity_name
             ,t.lot_number
             ,t.Project_Name
             ,t.Om_User_Name -- 销管
             ,t.sales_user -- 销售
             ,t.delivety_time -- 计划发运日期
             ,t.crm_create_time -- 销售订单创建时间
             ,t.purchase_date -- 提交下采购时间
             ,t.produce_date -- 下生产时间
             ,t.stat_date -- 统计日期 提交下采购日期 + 对应日期
             ,t.wip_create_date -- 委外工单创建日期
             ,t.Scheduled_Start_Date -- 工单齐套日期
             ,t.Mc_Creation_Date -- 生管确认时间
             ,t.first_finish_date -- 首次完工入库日期
             ,t.last_finish_date -- 完全完工入库日期
             ,t.order_type -- 订单类型
             ,t.job_type 
             ,t.supply_cycle -- 供应链周期
             ,t.supply_cycle_flag -- 供应链周期标识
             ,t.delivety_complete_flag -- 交付达成标识
             ,t.is_delivety_complete_flag
             ,t.overseas_stat_date
             ,t.overseas_is_delivety_complete_flag
             ,t.overseas_delivety_complete_flag
             ,t.is_source_forecast is_source_forecast
             ,t.wip_qty
             ,t.fisrt_picking_date
             ,t.first_ship_date
             ,t.last_ship_date
             ,'MTO' order_mode
             ,current_timestamp()
             ,'${bizDate}'
             ,t.shipped_qty -- 已出货数量 
             ,t.difference_qty -- 差异
             ,t.screen_size -- 尺寸
             ,t.pm_user -- 生管负责人
             ,t.min_scheduled_date
             ,t.min_produce_date
             ,t.bt_name   -- add by tjl 2022.07.21 
             ,t.so_line_group_id
             ,t.Class_Code    -- add by wyr 2022.09.23
             ,t.cus_level   as cus_level   --  tjl 2022.11.02
             ,t.cus_expect_cycle       as cus_expect_cycle      -- 客户期望周期    -- add by tjl 2022.11.02
             ,t.estimate_supply_cycle  as estimate_supply_cycle -- 预计供应链周期  -- add by tjl 2022.11.02
from         sda${db_para}.tmp_sda_delivety_complete_sr_sum_01 t
where        t.rn = 1
;
    """

    result = LineageRunner(sql.replace("${db_para}",''))
    print(result.source_tables)
    print(result.target_tables)

if __name__ == "__main__":
    test_create_as()

调用Datahub rest emiter接口,datahub中生成血缘

#!/usr/bin/python3
# coding=utf8
# -----------------------------------------------------------------------------------
# 日  期:2022.08.30
# 作  者:zds
# 用  途: 数仓Hive血缘
#        1. 通过Trino查询数据库,获取数栖平台调度DAG血缘关系
#        2. 注意:直接操作数据库修改权限,BI有大概几分钟的缓存时间,需要等待数据更新。
#        3. 注意:fine_pack_filter中create_type=3,是用户角色。使用的rowid = fine_user中的id,在最终用户权限上配置的。
# .       4. "且" = 34;"或"=35
#        5. 依赖数仓中manual开头的表,这些表通过爬虫采集,数据延迟一天
# -----------------------------------------------------------------------------------
import json
import time
import datetime
import base64
import re
import pandas as pd
from simple_ddl_parser import DDLParser
from sqlalchemy import create_engine
from sqllineage.runner import LineageRunner
import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter

class DWHiveLineage:
    def __init__(self):
        self.shuxi_db = create_engine("mysql+pymysql://xxxx@p-dbsec-mysql.gz.cvte.cn:10006/uic")

    def get_task_sql(self):
        # tasktype_id in (4,8,11,12,16) 全部有源码的任务
        sql = """
select cata_id,flow_id,task_id,task_name,task_type_name,source, parameter from (
    select rtc.task_id ,rtc.source,rtc.parameter,bt.task_name,bt.tasktype_id,btt.task_type_name,bc.cata_id,bc.flow_id
    from dipper.rel_task_config rtc
    left join  (
	    select task_name,tasktype_id,task_id,flow_id from dipper.bas_task where tasktype_id in (12,16) and tasktype_id is not null
	and ws_id = 11 and invalid = 0
    )bt on rtc.task_id = bt.task_id 
    left join dipper.bas_tasktype btt on btt.tasktype_id = bt.tasktype_id
    left join (select * from dipper.bas_cata where invalid = 0 and ws_id = 11) bc on bc.flow_id = bt.flow_id
    )t where t.source is not null and t.task_name is not null
order by flow_id  
        """
        df = pd.read_sql(sql=sql, con=self.shuxi_db)
        return df

    def list_lineages(self):
        df = self.get_task_sql()
        dataset_lineages = {}
        idx = 0
        for row in df.to_dict(orient="records"):
            try:
                sql = base64.b64decode(row['source']).decode('utf-8')
                print("============" + row['task_name'] + "========")
                result = LineageRunner(sql.replace("${db_para}", ''))
                # 一个文件中有多个SQL语句,需要拆分处理
                if len(result.target_tables) > 2:
                    print("目标表有多个,需要拆分SQL再计算血缘:【{}】".format(result.target_tables))
                else:
                    dataset_lineages[str(result.target_tables[0])] = [str(t) for t in self.source_tables]
                    idx += 1

            except Exception as e:
                print("解析任务【{}】SQL失败。".format(row['task_name']))
                print(e)
                break
            if idx > 10:
                break
        return dataset_lineages

    def generate_lineages(self):
        result_tables = self.list_lineages()
        for target_table in result_tables.keys():
            input_tables_urn = []
            for source_table in result_tables[target_table]:
                input_tables_urn.append(builder.make_dataset_urn("hive", source_table))

            # Construct a lineage object.
            lineage_mce = builder.make_lineage_mce(
                input_tables_urn,
                builder.make_dataset_urn("hive", target_table),
            )

            # Create an emitter to the GMS REST API.
            emitter = DatahubRestEmitter("http://xx.xx.xx.xx:8080")

            # Emit metadata!
            emitter.emit_mce(lineage_mce)
            try:
                emitter.emit_mce(lineage_mce)
                print("添加数仓表 【{}】血缘成功".format(target_table))
            except Exception as e:
                print("添加数仓表 【{}】血缘失败".format(target_table))
                print(e)
                break
    
if __name__ == "__main__":
    dw = DWHiveLineage()
    dw.generate_lineages()

效果图

在这里插入图片描述
有疑问,欢迎留言讨论

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/186859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jChartFX Plus JavaScript 7.6.7367 Crack

jChartFX Plus包括 jChartFX 的所有功能和其他画廊类型、高级商业智能功能和用于您的 Web 或移动应用程序的仪表板功能。新的 jChartFX Plus 为您提供额外的能力来开发完整的商业仪表板和商业智能应用程序 在最新的 jChartFX Plus 中&#xff0c;我们引入了象形图和象形图条控件…

仿牛客论坛项目Docker部署(ElasticSearch+Redis+Kafka+MySQL)

仿牛客论坛项目项目部署(docker)1.ElasticSearch安装本体安装ik插件2.Zookeeper3.Kafka测试是否启动成功4.MySQL启动mysql测试载入数据5.Redis6.DockerFile构建Java8项目部署成功参考项目部署(docker) 1.ElasticSearch 安装本体 mkdir -p /opt/docker/es/plugins #创建插件文…

jspssm小区车位物业管理系统

目 录 1 绪论 1 1.1 研究背景 1 1.2 小区物业管理系统的现状 1 1.3 系统实现的功能 1 1.4 小区物业管理系统的特点 2 1.5 本文的组织结构 2 1.6 系统分析 2 2 开发技术与环境配置 3 2.1 ssm框架 3 2.2 JSP技术 3 2.3 JavaScript 4 …

给Docker NodeRed 设置登陆账户

第一步&#xff1a;运行Nodered docker run -it --rm -e TZ"Asia/Shanghai" -p 1880:1880 -v node_red_data:/data --name mynodered nodered/node-red 进入web页面没有显示用户头像 第二步&#xff1a;进入容器 docker exec -it mynodered /bin/bash 退到根目录 …

AI算法工程师 | 09机器学习-概率图模型(三)隐马尔可夫模型 HMM

目录机器学习 - 概率图模型 之 隐马尔可夫模型 HMM一、马尔科夫链二、HMM 的基本概念1、HMM 背景与定义2、HMM 的两个基本假设3、确定 HMM 的两个空间和三组参数三、HMM 三个基本问题 | 导图四、HMM 相关算法1、前向算法2、维特比&#xff08;Viterbi&#xff09;算法五、案例&…

小红书购物笔记是什么

小红书购物笔记是什么 新手必看&#xff5c;12个小红书隐藏功能及操作玩法❗ #小红书#自媒体#新媒体#小红书隐藏功能#小红书运营#新手做小红书#小红书隐藏玩法#自媒体运营 hello&#xff0c;大家好&#xff0c;如果你刚好刷到这篇文章&#xff0c;说明你也是想做或正在做小红…

【内网安全】——msf木马生成教程

作者名&#xff1a;白昼安全主页面链接&#xff1a; 主页传送门创作初心&#xff1a; 一切为了她座右铭&#xff1a; 不要让时代的悲哀成为你的悲哀专研方向&#xff1a; web安全&#xff0c;后渗透技术每日emo&#xff1a; 钱真的是万能的一、msfvenom基本使用 1、–p (- -pay…

【目标检测论文解读复现NO.28】基于改进YOLO v5的电厂管道油液泄漏检测

前言此前出了目标改进算法专栏&#xff0c;但是对于应用于什么场景&#xff0c;需要什么改进方法对应与自己的应用场景有效果&#xff0c;并且多少改进点能发什么水平的文章&#xff0c;为解决大家的困惑&#xff0c;此系列文章旨在给大家解读最新目标检测算法论文&#xff0c;…

C++工程实践必备技能

文章目录单元测试框架如何引入如何使用测试相关SUBCASETEST_SUITETEST_CASE_FIXTURETEST_CASE_TEMPLATE断言相关常用断言宏常用工具函数benchmark框架如何引入如何使用防止被优化优化不稳定比较测试结果计算BigO输出结果到其他格式CLion中查看测试覆盖率CLion中使用sanitizers检…

【Graph】NetworkX官方基础教程:图的生成与相关操作

NetworkX官方基础教程&#xff1a;图的生成与相关操作1. Graph生成和graph操作2. 分析图3. 画图参考资料本文作为对图结构和复杂网络的快速上手&#xff0c;内容包括基于NetworkX进行图的生成与相关操作&#xff0c;Graph的分析以及绘制Graphs。 NetworkX官方基础教程 1. Grap…

Mapper代理开发

1、定义与SQL映射文件同名的Mapper接口&#xff0c;并且将Mapper接口和SQL映射文件放置在同一目录下。 在Resources下新建层级包需要用分隔符 / 2、设置SQL映射文件的namespace属性为UserMapper接口全限定名 全限定类名就是类名全称&#xff0c;带包路径的用点隔开&#xff…

Python从入门到入土的90行代码

文章目录基础入门菜鸟提升基础晋级高手之路内置包库奇技淫巧基础入门 1 python 即在命令行输入python&#xff0c;进入Python的开发环境。 2 x 12*3-4/56**2 加减乘除四则混合运算&#xff0c;可当作计算器使用&#xff0c;其中**表示乘方。 3 print(x) 即输出x的值&#x…

毕业设计:基于汇编实现的欢乐QQ堂小游戏 附完整代码

本次实现制作了汇编版的QQ堂,使用了VGA 320x200 256色视频显示, FAT12文件系统、时钟中断,nasm + gcc联合编译,通过端口设置调色板。其中除了AI,游戏界面、逻辑等均由汇编实现。游戏具有良好的图形界面,流畅的操作性,令人愉快的玩法,并且有AI与你作战。 首先展示游戏的开…

npm : 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径,请确保路径 正确,然后再试一次。

npm : 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果包括路径&#xff0c;请确保路径 正确&#xff0c;然后再试一次。解决方法方法一&#xff1a;以管理员身份运行方法二&#xff1a;查看npm环境变量配置方法一&#xf…

3.1.4 构造函数及构造代码块

文章目录1.概念2.形式3.练习4.关于构造函数的总结5.构造代码块与局部代码块5.1 形式5.2 构造代码块的特点5.3 局部代码块5.4 练习5.5 静态/构造/局部代码块的比较1.概念 构造方法是一种特殊的方法,它是一个与类同名且没有返回值类型的方法 构造方法的主要功能就是完成对象创建…

如何在android开发中使用Kotlin Flow(一)

Kotlin 的Flow可以对数据流进行建模&#xff0c;类似LiveData、RxJava的数据流。 Flow也是用观察者模式实现的。 观察者模式包括了可观察对象&#xff08;Observable&#xff0c;生产者、发射者、源这些称呼都是指可观察对象&#xff0c;可以被观察&#xff09;、观察对象&…

如何安装谷歌服务框架?(Google三件套下载教程)

谷歌服务框架&#xff0c;想必大家已经了解过了&#xff0c;都知道是个什么东西。这里我在赘述一下&#xff0c;谷歌服务框架是支持谷歌应用商店&#xff08;Google Play&#xff09;正常运行的服务框架&#xff0c;没有它&#xff0c;谷歌商店无法正常在安卓手机上运行&#x…

计算机图形学 第6章 三维变换与投影

目录 # 学习要求 前置知识 三维几何变换总的式子&#xff1a; 平移变换 比例变换 旋转变换&#xff1a;绕x轴旋转 反射变换 错切变换 三维复合变换 坐标系变换 正交投影矩阵 三视图 斜投影定义 透视投影 透视变换坐标系 ## 代码 透视投影分类 # 学习要求 …

怎么把图片做成gif动图?三步搞定gif在线制作

在日常办公、生活中经常会使用一些gif动态图片&#xff0c;生动有趣画面丰富。很好奇这些gif动图是怎么制作的吧&#xff01;其实&#xff0c;制作gif表情包的方法很简单&#xff0c;给大家分享几个简单实用的gif制作&#xff08;https://www.gif.cn/&#xff09;方法&#xff…

ccflow 代码——流程讲义

爬虫组件分析目录概述需求&#xff1a;设计思路实现思路分析1.表单引擎模版表系统框架Jflow 对使用者的要求参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,…