基于postgresql传统数据仓库搭建

news2025/1/13 13:44:14

目录

  • 概述
    • 数仓选型对比
    • 当前数仓架构问题
    • 解决方案
  • 架构设计
    • 数据仓库设计
    • 命名规范
    • 模型设计
  • PostgreSQL的安装
  • 数据仓库的建立
    • 创建数据库
    • 创建用户组
    • 创建用户
    • 用户加入到用户组
    • 创建模式
    • 模式授权用户
    • 收回函数的执行权限
    • 公开表的select权限
    • 动态sql函数
    • 集中处理函数
  • fdw实现数据抽取
    • 安装mysql_fdw
    • 安装postgres_fdw
    • 授权tool用户fdw的使用
    • 创建连接信息表
    • 创建序列
    • 创建fdw_server和用户映射
    • 辅助函数
      • get_ddl
      • get_ddl_pg2mysql
      • get_ddl_remote_mysql2pg
      • get_ddl_remote_pg2pg
      • sp_extract_mapping
  • 等待与唤醒
    • 创建dblink插件
    • ETL任务通知表
    • 创建序列
    • 辅助函数
      • is_return_result
      • get_15_interval_time
      • wait_table
      • sp_wait
      • sp_notify
  • 海豚调度
    • 安装
    • 架构
    • 搭建
      • 创建租户root
      • 创建普通用户
      • 创建数据源
      • 创建项目
      • 创建告警实例(钉钉告警)
      • 创建告警组
      • 创建datax环境
      • 授权管理
  • ETL过程
    • 可回溯的etl过程——替代变量
    • 模型层工作流
      • ods层
        • fdw实现抽数(不建议)
        • datax实现抽数(建议)
      • dw层——sql脚本
      • dim层——sql脚本
      • dm层——sql脚本
      • 总控
    • 应用层工作流
  • Magic-API统一接口平台
  • 其他函数
    • array_position
    • replace_to_null
    • sp_jzdb
    • sp_sqlexec_efficient

概述

数仓选型对比

数据库存储过程性能可扩展性安全性成本支持度数据一致性数据压缩数据备份和恢复数据分析功能
PostgreSQL支持支持支持支持
Oracle支持支持支持支持
MySQL支持支持支持支持
MSSQL支持支持支持支持
Greenplum支持支持支持
Starrocks支持支持支持
GBase支持支持支持支持
Hive不支持支持支持
Impala不支持支持支持
GaussDB支持支持支持支持
  • 考虑数据规模、计算规模以及成本
  • 数据规模较小,计算能力要求不高,预算低,选型Postgresql主从
  • 数据规模较大,计算能力要求高,有一定预算,选型Greenplum

当前数仓架构问题

  • 无法保证数据一致性:没有严格的数据血缘依赖,导致数据在计算时可能出现不一致的情况
  • 不可见的任务调度:没有一站式界面管理调度
  • 无对外提供数据能力:数仓无法对外输出表
  • 无任务告警:任务报错对开发人员无感知
  • 长时间的锁等待:业务库直接查询物化视图,物化视图的刷新造成的长时间的锁等待
  • 重复工作较多:开发一套环境,生产一套环境,大量的重复开发工作
  • 大材小用:业务库Mysql,数仓Postgresql,使用kettle大材小用了
  • 任务调度不合理:配置调度任务需要java发版,不太合理;任务调度只能调度一条select语句,不合理;任务调度只能每15分钟跑一次,不合理
  • 任务调度不统一:宽表用java做任务调度,kettle用crontab做任务调度,其余还有的用pg_cron做任务调度
  • 数据安全性低:用户管理与权限控制不完善,敏感信息未做脱敏处理
  • 报表查询效率低:没有数仓分层,导致报表查询效率较低

解决方案

问题解决方案说明
无法保证数据一致性改用海豚调度/等待与唤醒海豚调度提供层级间的依赖,等待与唤醒提供表级别依赖
不可见的任务调度改用海豚调度海豚调度提供可视化的界面来管理任务
无对外提供数据能力Magic-APIMagic-API提供统一的接口平台,对外提供数据
无任务告警改用海豚调度海豚调度提供了钉钉、企业微信等告警接口
长时间的锁等待弃用物化视图,改变etl方式采用drop table和rename的方式做etl处理
重复工作较多弃用开发环境,改用pg的用户模式映射开发环境即在生产环境上,在生产环境中区分开发用户和集中用户,开发用户为个人开发(即开发环境),集中用户为集中跑批的用户(即生产环境)
大材小用弃用kettle,改用外部数据包装器或datax业务库只有Mysql,直接通过pg的插件mysql_fdw进行数据抽取;也可以通过海豚调度提供的datax组件进行数据抽取
调度不合理改用海豚调度海豚调度提供的sql组件可以执行sql脚本,海豚调度可以自定义调度时间、频率和并发度
任务调度不统一改用海豚调度海豚调度提供统一的任务调度平台
数据安全性低严格控制用户的权限用户模式一一对应,禁用public的create权限,回收函数的execute权限
报表查询效率低数仓分层ods、dw、dm、dim、app(ads)

架构设计

数据仓库设计

在这里插入图片描述

  • public:所有用户的根
  • dev:开发用户组
    • yuzhenchao和yuxiaotan:以开发人员的全拼命名,属于dev用户组,建立的所有的对象都在自己同名的模式下,对其他模式没有create权限,只有usage权限
  • pro:生产用户组
    • mdl:用来集中跑模型的用户,最终生成的模型在mdl模式下
    • apl:用来集中跑应用(报表)的用户,最终生成的结果表在apl模式下
  • tool:工具用户,etl相关的工具存储过程将存放在tool模式下
  • readonly:只读用户组,在该组下面的用户拥有只读权限
    • finebi:用于finebi连接数仓的用户
    • dbselect:用于dblink和fdw连接数仓的用户
    • jkfw:用于Magic-API连接数仓的用户

命名规范

对象类型对象格式例子
序列seq_seq_dblink_id
临时表tmp_tmp_jg624_rb1
ods层表ods_原库名_原表名ods_xkorder_order
dw层表dwd_/dws_/dw_dwd_order_stage/dws_order_stage/dw_order_stage
dm层表dm_dm_order_efficiency
dim层表dim_dim_dealer
app层表接口的实时表:jk_需求号_real
接口的日表:jk_需求号_rb
接口的周表 :jk_需求号_zb
接口的月表:jk_需求号_yb
finebi的实时表:bi_需求号_real
finebi的日表:bi_需求号_rb
finebi的周表:bi_需求号_zb
finebi的月表:bi_需求号_yb
接口的实时表:jk_jg624_real
接口的日表:jk_jg624_rb
接口的周表 :jk_jg624_zb
接口的月表:jk_jg624_yb
finebi的实时表:bi_jg624_real
finebi的日表:bi_jg624_rb
finebi的周表:bi_jg624_zb
finebi的月表:bi_jg624_yb
主键约束pk_表名_字段名pk_order_order_no
索引idx_表名_字段名idx_order_order_id

模型设计

  • ods层(数据贴源层):用于存储从各个业务系统中提取的原始数据,保留数据的完整性和一致性,做一些简单的处理。比如,空字符串处理成null,is_delete字段统一处理成0和1
  • dwd层(数据明细层):用于存储经过清洗、加工、转换后的数据,保留数据的历史变化,为后续的数据分析和决策提供基础数据。比如,宽带表、移动表、itv表
  • dws层(数据汇总层):用于业务层面汇合的数据,提供更高效的数据查询和分析。比如,dwd层的宽带表、移动表、itv表就会在dws层合并成一个全业务表
  • dm层(数据集市层):存储高度聚合的数据。比如,按月汇总的应收表和实收表
  • dim层(公共维度层):用于存储与业务相关的维度信息,如时间、地域、产品等,为数据分析和决策提供维度支持。比如:代理商表,产品表等
  • app层(应用层):用于存储各种业务应用所需的数据,如报表、分析、可视化等,为业务应用提供数据支持

PostgreSQL的安装

Centos7.6安装postgresql15

【postgresql 数据库运维文档】

数据仓库的建立

创建数据库

create database etl;
\c etl postgres

创建用户组

create role dev;
create role pro;
create role readonly;

创建用户

create role yuzhenchao with login password '${YZC_PWD}' connection limit 20;
create role yuxiaotan with login password '${YXT_PWD}' connection limit 20;
create role mdl with login password '${MDL_PWD}' connection limit 250;
create role apl with login password '${APL_PWD}' connection limit 250;
create role tool with login password '${TOOL_PWD}' connection limit 20;
create role finebi with login password '${FINEBI_PWD}' connection limit 100;
create role dbselect with login password '${DBSELECT_PWD}' connection limit 100;
create role jkfw with login password '${JKFW_PWD}' connection limit 100;

用户加入到用户组

alter group dev add user yuzhenchao;
alter group dev add user yuxiaotan;
alter group pro add user mdl;
alter group pro add user apl;
alter group readonly add user finebi;
alter group readonly add user dbselect;
alter group readonly add user jkfw;

创建模式

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool
create schema ${USERNAME};

模式授权用户

--用户同名模式授权所有权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool
grant create,usage on schema ${USERNAME} to ${USERNAME};

--公开模式的usage权限
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool
grant usage on schema ${USERNAME} to public;

--任何用户都拥有public模式的所有权限
--出于安全,回收任何用户在public的create权限
revoke create on schema public from public;

收回函数的执行权限

/*
 * pg中函数默认公开execute权限
 * 通过pg的基于schema和基于role的默认权限实现
 */
--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool
--在schema为yuzhenchao上创建的任何函数,除定义者外,其他人调用需要显式授权
alter default privileges for role ${USERNAME} revoke execute on functions from public;
--由yuzhenchao用户创建的任何函数,除定义者外,其他人调用需要显式授权
alter default privileges in schema ${USERNAME} revoke execute on functions from public;

公开表的select权限

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool
--在schema为yuzhenchao上创建的任何表默认公开select权限
alter default privileges in schema ${USERNAME} grant select on tables to public;
--由yuzhenchao用户创建的任何表默认公开select权限
alter default privileges for role ${USERNAME} grant select on tables to public;

动态sql函数

/*
 * 为了方便各用户的管理
 * 需要用定义者权限创建动态sql函数
 * 最终由tool用户集中管理
 */
create or replace function tool.sp_exec(
    vsql character varying
)
    returns void
    language plpgsql
    security definer
as $function$ 
/*
 * 作者 : v-yuzhenc
 * 功能 : 以集定义者权限执行sql
 * vsql : 需要执行的sql语句
 * */
begin
    execute vsql;
end;
$function$
;
alter function tool.sp_exec(varchar) owner to tool;
grant all on function tool.sp_exec(varchar) to tool;

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl
create or replace function ${USERNAME}.sp_exec(
    vsql character varying
)
    returns void
    language plpgsql
    security definer
as $function$ 
/*
 * 作者 : v-yuzhenc
 * 功能 : 以集定义者权限执行sql
 * vsql : 需要执行的sql语句
 * */
begin
    execute vsql;
end;
$function$
;
alter function ${USERNAME}.sp_exec(varchar) owner to ${USERNAME};
grant all on function ${USERNAME}.sp_exec(varchar) to ${USERNAME};
grant execute on function ${USERNAME}.sp_exec(varchar) to tool;

集中处理函数

create or replace function tool.sp_execsql(
     exec_sql character varying
    ,exec_user character varying
)
    returns void
    language plpgsql
    security definer
as $function$ 
/* 作者 : v-yuzhenc
 * 功能 : 集中处理程序,以某用户的权限执行某条sql语句
 * exec_sql : 需要执行的sql语句
 * exec_user : 需要以哪个用户的权限执行该sql语句
 * */
declare 
    p_user varchar := exec_user;
    o_search_path varchar;
begin
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO '||p_user||',public,oracle';
    case p_user 
        when 'yuzhenchao' then perform yuzhenchao.sp_exec(exec_sql);
        when 'yuxiaotan' then perform yuxiaotan.sp_exec(exec_sql);
        when 'mdl' then perform mdl.sp_exec(exec_sql);
        when 'apl' then perform apl.sp_exec(exec_sql);
        when 'tool' then perform tool.sp_exec(exec_sql);
        else raise exception '未配置该用户:%',p_user;
    end case;
    --恢复模式搜索路径
    execute 'SET search_path TO '||o_search_path;

    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;

alter function tool.sp_execsql(varchar, varchar) owner to tool;
grant all on function tool.sp_execsql(varchar, varchar) to tool;

fdw实现数据抽取

安装mysql_fdw

mysql_fdw的安装与使用

安装postgres_fdw

create extension postgres_fdw;

授权tool用户fdw的使用

grant all on foreign data wrapper mysql_fdw to tool;
grant all on foreign data WRAPPER postgres_fdw to tool;

创建连接信息表

\c etl tool
create table tool.dblink_connection_info (
	 connname varchar(63) not null
	,conntype varchar(63) null
	,hostname varchar(15) null
	,port varchar(15) null
	,dbname varchar(63) null
	,username varchar(63) null
	,userpwd varchar(63) null
	,fdw_server varchar(63) null
	,createtime timestamp null default current_timestamp
	,constraint dblink_connection_info_pkey primary key (connname)
);

comment on table tool.dblink_connection_info is '存放用于dblink的连接信息,不公开';
comment on column tool.dblink_connection_info.connname is '连接名字,自定义即可,唯一';
comment on column tool.dblink_connection_info.conntype is '数据源类型';
comment on column tool.dblink_connection_info.hostname is '数据库所在主机ip地址';
comment on column tool.dblink_connection_info.port is '数据库所在端口';
comment on column tool.dblink_connection_info.dbname is '数据库名字';
comment on column tool.dblink_connection_info.username is '用于连接的用户名';
comment on column tool.dblink_connection_info.userpwd is '用户名对应的密码';
comment on column tool.dblink_connection_info.fdw_server is '对应的fdw_server的名字';
comment on column tool.dblink_connection_info.createtime is '创建时间';

alter table tool.dblink_connection_info owner to tool;
revoke select on tool.dblink_connection_info from public;
grant select(connname,conntype,hostname,port,dbname,username,fdw_server ,createtime) on tool.dblink_connection_info to public;

创建序列

create sequence tool.seq_tmp_fdw_id
	increment by 1
	minvalue 1
	maxvalue 9999
	start 1
	cache 1
	cycle;

创建fdw_server和用户映射

create server ${MYSQL_SERVER_NAME} foreign data wrapper mysql_fdw options (host '${MYSQL_HOSTNAME}', port '${MYSQL_PORT}');
create user mapping for public server ${MYSQL_SERVER_NAME} options (username '${MYSQL_USERNAME}', password '${MYSQL_USERPWD}');

create server ${PG_SERVER_NAME} foreign data wrapper postgres_fdw options (host '${PG_HOSTNAME}', port '${PG_PORT}',dbname '${PG_DATABASE}');
create user mapping for public server ${PG_SERVER_NAME} options (user '${PG_USERNAME}', password '${PG_USERPWD}');

辅助函数

get_ddl

create or replace function tool.get_ddl(
     schematable character varying
    ,getmode character varying default 'table'::character varying
    ,newtablename character varying default null::character varying
)
    returns text
    language plpgsql
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定表名(区分大小写),返回当前表名的建表语句,备注语句
 * 		默认当前模式,其他模式请加 模式.表名
 * schematable : schemaname.tablename或者tablename
 * getmode : 默认table(获取表的建表语句)
 * 		view(获取视图的建视图语句)
 * 		viewtable(获取视图对应的建表语句)
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * */
declare 
	p_tablename varchar;
	p_schemaname varchar := user::varchar(64);
	p_newtablename varchar := newtablename;
	p_result text := null;
	p_array varchar[];
begin
	--校验getmode是否正确,不正确直接向外抛异常
	if getmode not in ('table','view','viewtable') then 
		raise exception '参数2必须为table、view或者viewtable!';
	end if;
	--如果传参为null直接抛出异常
	if schematable is null then 
		raise exception '表名或视图名不能为空!';
	end if;
	--含有多个点时,直接抛出异常
	--if instr(schematable,'.',1,2) <> 0 then 
	--	raise exception '表名或视图名输入不正确!';
	--end if;

	--解析schematable
	p_array := string_to_array(schematable,'.');
	if p_array[2] is null then
		p_tablename := p_array[1];
	else 
		p_tablename := trim(p_array[2]);
		p_schemaname := trim(p_array[1]);
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
	if getmode in ('table','viewtable') then
		if getmode = 'table' and not exists (select 1 from pg_tables where tablename = p_tablename and schemaname = p_schemaname) then 
			raise exception '%.%表不存在!',p_schemaname,p_tablename;
		elsif getmode = 'viewtable' and not exists (select 1 from pg_views where viewname = p_tablename and schemaname = p_schemaname) then
			raise exception '%.%视图不存在!',p_schemaname,p_tablename;
		end if;
		select 
			'drop table if exists "'||p_newtablename||'";'||
			chr(10)||'create table "'||p_newtablename||'" ('||
			chr(10)||
			string_agg(chr(9)||
				case when attnum = 1 then ' ' else ',' end||
				'"'||c.attname||'" '||  --字段名
				format_type(c.atttypid, c.atttypmod)||  --字段类型
				coalesce (' default '||substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid) for 128),'')||  --字段默认值
				case when c.attnotnull = true then ' not null' else ' null' end,chr(10) order by c.attnum
			)||
			--主键约束
			coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
			chr(10)||');'||
			--压缩信息
			--coalesce(' with ( '||chr(10)||chr(9)||' '||array_to_string(a.reloptions,chr(10)||chr(9)||',')||chr(10)||')','') ||
			--分布策略
			--case when e.policytype = 'r' then ' distributed replicated;' when e.policytype = 'p' then coalesce(' distributed by ('||
			--string_agg(case when array_position(string_to_array(array_to_string(e.distkey::int2[],','),',')::int[],c.attnum::int,1) <> 0 then '"'||c.attname||'"' end,',' order by string_to_array(array_to_string(e.distkey::int2[],','),',')::int[])||
			--');',' distributed randomly;') else ' distributed randomly;' end||
			--表备注(注释)
			coalesce(chr(10)||'comment on table "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||
			--字段备注(注释)
			coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
		into p_result
		from pg_class a 
		inner join pg_namespace b 
		on (a.relnamespace = b.oid)
		inner join pg_attribute c 
		on (a.oid = c.attrelid)
		left join pg_attrdef d 
		on (c.attrelid = d.adrelid and c.attnum = d.adnum)
		--left join gp_distribution_policy e 
		--on (a.oid = e.localoid)
		left join pg_description f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		left join (
		    select d.indrelid
		        ,string_agg('"'||c.attname||'"',',' order by c.attnum) prikey
		    from pg_class a, pg_namespace b, pg_attribute c, pg_index d 
		    where a.relnamespace = b.oid
		        and a.oid = c.attrelid
		        and a.oid = d.indrelid
		        and d.indisprimary = true
		        and c.attnum = any(d.indkey)
		        and a.relname = p_tablename
			    and b.nspname = p_schemaname  
			 group by d.indrelid
		) g 
		on (a.oid = g.indrelid)
		left join pg_description h 
		on (a.oid = h.objoid and h.objsubid = 0)
		where c.attnum > 0
			and not c.attisdropped
			and a.relname = p_tablename
			and b.nspname = p_schemaname
		group by b.nspname,a.relname,a.reloptions,h.description,g.prikey;
	else
		if getmode = 'view' and not exists (select 1 from pg_views where viewname = p_tablename and schemaname = p_schemaname) then
			raise exception '%.%视图不存在!',p_schemaname,p_tablename;
		end if;
		select 
			' CREATE OR REPLACE VIEW "'||p_newtablename||'" AS '||chr(10)||d.definition||
			--表备注(注释)
			coalesce(chr(10)||'comment on view "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||
			--字段备注(注释)
			coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
		into p_result
		from pg_class a 
		inner join pg_namespace b 
		on (a.relnamespace = b.oid)
		inner join pg_attribute c 
		on (a.oid = c.attrelid)
		left join pg_description f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		inner join pg_views d
		on (b.nspname = d.schemaname and a.relname = d.viewname)
		left join pg_description h 
		on (a.oid = h.objoid and h.objsubid = 0)
		where d.viewname = p_tablename
			and d.schemaname = p_schemaname
		group by a.relname,d.definition,h.description;
	end if;
	return p_result;
end;
$function$
;

grant execute on function tool.get_ddl(varchar, varchar, varchar) to public;

get_ddl_pg2mysql

create or replace function tool.get_ddl_pg2mysql(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying default null::character varying
)
    returns text
    language plpgsql
AS $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定本地pg数据库的表名、模式名,
 *        以mysql的语法返回指定模式下指定表的ddl语句
 * tablename : 指定pg的表名
 * schemaname : 指定pg的模式名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * */
declare 
	p_tablename varchar := tablename;
	p_schemaname varchar := schemaname;
	p_newtablename varchar := newtablename;
	p_result text := null;
    existbj int;
    v_sql varchar;
begin
	--如果传参为null直接抛出异常
	if p_tablename is null then 
		raise exception '表名或视图名不能为空!';
	end if;
    if p_schemaname is null then 
		raise exception '模式名不能为空!';
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
    
    --判断表或视图是否存在
    execute $v_sql$
        select 1 from pg_tables
        where tablename = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        union all 
        select 1 from pg_views
        where viewname = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        ;
    $v_sql$ into existbj; 
    
    if existbj is null then
        raise exception '表或视图不存在!';
    end if;
    v_sql := $v_sql$
        select 
			'drop table if exists `'||'$v_sql$||p_newtablename||$v_sql$'||'`;'||
			chr(10)||'create table `'||'$v_sql$||p_newtablename||$v_sql$'||'` ('||
			chr(10)||
			string_agg(chr(9)||
				case when c.attnum = 1 then ' ' else ',' end||
				'`'||c.attname||'` '||  --字段名
				case 
					when i.data_type = 'int' then 'int(11)'
			        when i.data_type = 'character varying' then case when regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+') is null or regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+')::int > 16341 then 'text' else replace (format_type(c.atttypid, c.atttypmod),'character varying','varchar') end
			        when i.data_type = 'character' then replace(format_type(c.atttypid, c.atttypmod),'character','char')
			        when i.data_type = 'date' then 'date'
			        when i.data_type = 'timestamp with time zone' then replace(replace (format_type(c.atttypid, c.atttypmod),' with time zone', ''),'timestamp','datetime')
			        when i.data_type = 'timestamp without time zone' then replace(replace (format_type(c.atttypid, c.atttypmod),' without time zone', ''),'timestamp','datetime')
			        when i.data_type = 'bigint' then 'bigint(20)'
			        when i.data_type = 'double precision' then 'double'
			        when i.data_type = 'smallint' then 'smallint(6)'
			        when i.data_type = 'text' then 'text'
			        when i.data_type = 'bytea' then 'blob'
			        when i.data_type = 'real' then 'float'
			        when i.data_type = 'numeric' then format_type(c.atttypid, c.atttypmod)
			        when i.data_type = 'time' then 'interval'
			        when i.data_type = 'json' then 'json'
			        else 'text'
			    end||  --字段类型
				case when c.attnotnull = true or ((d.typtype = 'd'::"char") AND d.typnotnull) then ' not null' else ' null' end||
				coalesce(' comment '''||replace(f.description,'''','''''')||'''','')
				,chr(10) order by c.attnum
				
			)||
			--主键约束
			coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
			chr(10)||')'||coalesce('comment '''||replace(h.description,'''','''''')||'''','')||
			';'
		from pg_class a 
		inner join pg_namespace b 
		on (a.relnamespace = b.oid)
		inner join pg_attribute c 
		on (a.oid = c.attrelid)
		left join pg_type d
		on (c.atttypid = d.oid)
		left join pg_description f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		left join (
		    select d.indrelid
		        ,string_agg('`'||c.attname||'`',',' order by c.attnum) prikey
		    from pg_class a, pg_namespace b, pg_attribute c, pg_index d 
		    where a.relnamespace = b.oid
		        and a.oid = c.attrelid
		        and a.oid = d.indrelid
		        and d.indisprimary = true
		        and c.attnum = any(d.indkey)
		        and a.relname = '$v_sql$||p_tablename||$v_sql$'
			    and b.nspname = '$v_sql$||p_schemaname||$v_sql$'  
			 group by d.indrelid
		) g 
		on (a.oid = g.indrelid)
		left join pg_description h 
		on (a.oid = h.objoid and h.objsubid = 0)
		left join information_schema.columns i
		on (a.relname = i.table_name and b.nspname = i.table_schema and c.attnum = i.ordinal_position)
		where c.attnum > 0
			and not c.attisdropped
			and a.relname = '$v_sql$||p_tablename||$v_sql$'
			and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
		group by b.nspname,a.relname,h.description,g.prikey;
    $v_sql$
    ;
    execute v_sql into p_result;
    
	return p_result;
end;
$function$
;
grant execute on function tool.get_ddl_pg2mysql(varchar, varchar, varchar) to public;

get_ddl_remote_mysql2pg

create or replace function tool.get_ddl_remote_mysql2pg(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying default null::character varying
    ,remote_connname character varying default '${CONNNAME}'::character varying
)
    returns text
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程mysql数据库的表名、库名和连接信息,
 *        以pg的语法返回指定库下指定表的ddl语句
 * tablename : 指定mysql的表名
 * schemaname : 指定mysql的库名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
 * */
declare 
	p_tablename varchar := tablename;
	p_schemaname varchar := schemaname;
	p_newtablename varchar := newtablename;
    p_remote_connname varchar := remote_connname;
	p_result text := null;
    tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
    tbname_1 varchar := 'tmp_fdw_tables_'||tmp_fdw_id;
    tbname_2 varchar := 'tmp_fdw_views_'||tmp_fdw_id;
    tbname_3 varchar := 'tmp_fdw_columns_'||tmp_fdw_id;
    existbj int;
    v_sql varchar;
    o_search_path varchar;  --模式搜索路径
begin
	--如果传参为null直接抛出异常
	if p_tablename is null then 
		raise exception '表名或视图名不能为空!';
	end if;
    if p_schemaname is null then 
		raise exception '模式名(库名)不能为空!';
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO tool,'||o_search_path;

    --创建外部表
    select $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
        --创建tables映射表
        create foreign table $v_sql$||tbname_1||$v_sql$(
             table_name varchar(64)
			,table_schema varchar(64)
			,table_comment text
        ) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'tables');
        --创建views映射表
        create foreign table $v_sql$||tbname_2||$v_sql$(
             table_name varchar(64)
			,table_schema varchar(64)
        ) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'views');
        --创建columns映射表
        create foreign table $v_sql$||tbname_3||$v_sql$(
             table_schema varchar(64)
			,table_name varchar(64)
			,column_name varchar(64)
			,ordinal_position int
			,is_nullable varchar(3)
			,data_type text
			,column_type text
			,column_comment text
			,column_key varchar(3)
        ) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'columns');
    $v_sql$
    into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname
    ;
    execute v_sql;
    
    --判断表或视图是否存在
    execute $v_sql$
        select 1 from "$v_sql$||tbname_1||$v_sql$"
        where table_name = '$v_sql$||p_tablename||$v_sql$'
            and table_schema = '$v_sql$||p_schemaname||$v_sql$'
        union all 
        select 1 from "$v_sql$||tbname_2||$v_sql$"
        where table_name = '$v_sql$||p_tablename||$v_sql$'
            and table_schema = '$v_sql$||p_schemaname||$v_sql$'
        ;
    $v_sql$ into existbj; 
    
    if existbj is null then
        raise exception '表或视图不存在!';
    end if;
    v_sql := $v_sql$
        with tmp_a as (
		    (select
		         '$v_sql$||p_newtablename||$v_sql$' as table_name
		        ,table_schema
		        ,coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(case when table_comment = '' then null else table_comment end,'''','''''')||''';','') as table_comment
		    from $v_sql$||tbname_1||$v_sql$ 
		    where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
		        and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
		    limit 1)
		    union all 
		    (select
		         '$v_sql$||p_newtablename||$v_sql$' as table_name
		        ,table_schema
		        ,null as table_comment
		    from $v_sql$||tbname_2||$v_sql$ 
		    where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
		        and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
		    limit 1)
		), tmp_b as (
		    select 
		         '$v_sql$||p_newtablename||$v_sql$' as table_name
		        ,table_schema
		        ,string_agg(chr(9)||
		            case when ordinal_position = 1 then ' ' else ',' end||
		            --字段名
		            '"'||lower(column_name)||'"'||' '||
		            --数据类型
		            case 
			            when data_type = 'int' then data_type
				        when data_type = 'varchar' then replace (column_type,'varchar(0)','varchar(1)')
				        when data_type = 'char' then replace(replace(column_type,'char','varchar'),'varchar(0)','varchar(1)')
				        when data_type = 'date' then 'date'
				        when data_type = 'datetime' then replace (column_type, data_type, 'timestamp')
				        when data_type = 'timestamp' then 'timestamp'
				        when data_type = 'bigint' then 'bigint'
				        when data_type = 'double' then 'double precision'
				        when data_type = 'smallint' then 'smallint'
				        when data_type = 'decimal' then replace (column_type,'unsigned zerofill','')
				        when data_type = 'longtext' then 'text'
				        when data_type = 'text' then 'text'
				        when data_type = 'tinyint' then 'int'
				        when data_type = 'longblob' then 'bytea'
				        when data_type = 'blob' then 'bytea'
				        when data_type = 'float' then 'real'
				        when data_type = 'tinytext' then 'text'
				        when data_type = 'mediumtext' then 'text'
				        when data_type = 'numeric' then 'numeric'
				        when data_type = 'time' then 'interval'
				        when data_type = 'json' then 'json'
				        else 'varchar'
				    end||' '||
				    --空约束
				    case 
				    	when is_nullable = 'NO' then 'not null'
				    	when is_nullable = 'NO' then 'null'
				    	else 'null'
				    end
				    ,chr(10) order by ordinal_position
		         )||
		         --主键约束
		         coalesce(chr(10)||chr(9)||',primary key ('||string_agg(case when column_key = 'PRI' then '"'||lower(column_name)||'"' end,',' order by ordinal_position)||')','') 
		        as column_info
		        --字段备注
		        ,coalesce(chr(10)||string_agg('comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||lower(column_name)||'" is '''||replace(case when column_comment = '' then null else column_comment end,'''','''''')||''';',chr(10) order by ordinal_position),'') column_comment
		    from $v_sql$||tbname_3||$v_sql$
		    where upper(table_name) = upper('$v_sql$||p_tablename||$v_sql$')
		        and upper(table_schema) = upper('$v_sql$||p_schemaname||$v_sql$')
		    group by 
		         table_schema
		    limit 1
		)
		select 
		    'drop table if exists "'||a.table_name||'";'||chr(10)||
			'create table "'||a.table_name||'" ('||chr(10)||
			b.column_info||chr(10)||
			');'||a.table_comment||b.column_comment
		from tmp_a a
		inner join tmp_b b 
		on (a.table_schema = b.table_schema and a.table_name = b.table_name)
		;
    $v_sql$
    ;
    execute v_sql into p_result;
    
    --删除临时外部表
    execute $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
    $v_sql$
    ;
    
    --恢复模式搜索路径
	execute 'SET search_path TO '||o_search_path;
	return p_result;
    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.get_ddl_remote_mysql2pg(varchar,varchar,varchar,varchar) to public;

get_ddl_remote_pg2pg

create or replace function tool.get_ddl_remote_pg2pg(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying default null::character varying
    ,remote_connname character varying default '${CONNNAME}'::character varying
)
    returns text
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程pg数据库的表名、模式名和连接信息,
 *        以pg的语法返回指定模式下指定表的ddl语句
 * tablename : 指定pg的表名
 * schemaname : 指定pg的模式名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
 * */
declare 
	p_tablename varchar := tablename;
	p_schemaname varchar := schemaname;
	p_newtablename varchar := newtablename;
    p_remote_connname varchar := remote_connname;
	p_result text := null;
    tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
    tbname_1 varchar := 'tmp_fdw_pg_class_'||tmp_fdw_id;
    tbname_2 varchar := 'tmp_fdw_pg_namespace_'||tmp_fdw_id;
    tbname_3 varchar := 'tmp_fdw_pg_attribute_'||tmp_fdw_id;
    tbname_5 varchar := 'tmp_fdw_pg_description_'||tmp_fdw_id;
    tbname_6 varchar := 'tmp_fdw_pg_index_'||tmp_fdw_id;
    tbname_8 varchar := 'tmp_fdw_pg_tables_'||tmp_fdw_id;
    tbname_9 varchar := 'tmp_fdw_pg_views_'||tmp_fdw_id;
    existbj int;
    v_sql varchar;
    o_search_path varchar;  --模式搜索路径
begin
	--如果传参为null直接抛出异常
	if p_tablename is null then 
		raise exception '表名或视图名不能为空!';
	end if;
    if p_schemaname is null then 
		raise exception '模式名不能为空!';
	end if;
	p_newtablename := coalesce (p_newtablename,lower(p_tablename));
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO tool,'||o_search_path;

    --创建外部表
    select $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_5||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_6||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_8||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_9||$v_sql$;
        --创建pg_class映射表
        create foreign table $v_sql$||tbname_1||$v_sql$(
             "oid" oid not null
			,"relname" name not null
			,"relnamespace" oid not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_class');
        create foreign table $v_sql$||tbname_2||$v_sql$(
             "oid" oid not null
			,"nspname" name not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_namespace');
        create foreign table $v_sql$||tbname_3||$v_sql$(
             "attrelid" oid not null
			,"attname" name not null
			,"atttypid" oid not null
			,"attnum" smallint not null
			,"atttypmod" integer not null
			,"attnotnull" boolean not null
			,"attisdropped" boolean not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_attribute');
        create foreign table $v_sql$||tbname_5||$v_sql$(
             "objoid" oid not null
			,"classoid" oid not null
			,"objsubid" integer not null
			,"description" text not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_description');
        create foreign table $v_sql$||tbname_6||$v_sql$(
			 "indrelid" oid not null
			,"indisprimary" boolean not null
			,"indkey" int2vector not null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_index');
        create foreign table $v_sql$||tbname_8||$v_sql$(
             "schemaname" name null
			,"tablename" name null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_tables');
        create foreign table $v_sql$||tbname_9||$v_sql$(
             "schemaname" name null
			,"viewname" name null
        ) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_views');
    $v_sql$
    into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname
    ;
    execute v_sql;
    
    --判断表或视图是否存在
    execute $v_sql$
        select 1 from "$v_sql$||tbname_8||$v_sql$"
        where tablename = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        union all 
        select 1 from "$v_sql$||tbname_9||$v_sql$"
        where viewname = '$v_sql$||p_tablename||$v_sql$'
            and schemaname = '$v_sql$||p_schemaname||$v_sql$'
        ;
    $v_sql$ into existbj; 
    
    if existbj is null then
        raise exception '表或视图不存在!';
    end if;
    v_sql := $v_sql$
        select 
			'drop table if exists "'||'$v_sql$||p_newtablename||$v_sql$'||'";'||
			chr(10)||'create table "'||'$v_sql$||p_newtablename||$v_sql$'||'" ('||
			chr(10)||
			string_agg(chr(9)||
				case when c.attnum = 1 then ' ' else ',' end||
				'"'||c.attname||'" '||  --字段名
				format_type(c.atttypid, c.atttypmod)||  --字段类型
				case when c.attnotnull = true then ' not null' else ' null' end,chr(10) order by c.attnum
			)||
			--主键约束
			coalesce (chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
			chr(10)||');'||
			--表备注(注释)
			coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(h.description,'''','''''')||''';','')||
			--字段备注(注释)
			coalesce(chr(10)||string_agg(case when f.description is not null then 'comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';' end,chr(10) order by c.attnum),'')
		from $v_sql$||tbname_1||$v_sql$ a 
		inner join $v_sql$||tbname_2||$v_sql$ b 
		on (a.relnamespace = b.oid)
		inner join $v_sql$||tbname_3||$v_sql$ c 
		on (a.oid = c.attrelid)
		left join $v_sql$||tbname_5||$v_sql$ f 
		on (a.oid = f.objoid and c.attnum = f.objsubid)
		left join (
		    select d.indrelid
		        ,string_agg('"'||c.attname||'"',',' order by c.attnum) prikey
		    from $v_sql$||tbname_1||$v_sql$ a, $v_sql$||tbname_2||$v_sql$ b, $v_sql$||tbname_3||$v_sql$ c, $v_sql$||tbname_6||$v_sql$ d 
		    where a.relnamespace = b.oid
		        and a.oid = c.attrelid
		        and a.oid = d.indrelid
		        and d.indisprimary = true
		        and c.attnum = any(d.indkey)
		        and a.relname = '$v_sql$||p_tablename||$v_sql$'
			    and b.nspname = '$v_sql$||p_schemaname||$v_sql$'  
			 group by d.indrelid
		) g 
		on (a.oid = g.indrelid)
		left join $v_sql$||tbname_5||$v_sql$ h 
		on (a.oid = h.objoid and h.objsubid = 0)
		where c.attnum > 0
			and not c.attisdropped
			and a.relname = '$v_sql$||p_tablename||$v_sql$'
			and b.nspname = '$v_sql$||p_schemaname||$v_sql$'
		group by b.nspname,a.relname,h.description,g.prikey;
    $v_sql$
    ;
    execute v_sql into p_result;
    
    --删除临时外部表
    execute $v_sql$
        --存在临时的外部表时,直接删除
        drop foreign table if exists $v_sql$||tbname_1||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_2||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_3||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_5||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_6||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_8||$v_sql$;
        drop foreign table if exists $v_sql$||tbname_9||$v_sql$;
    $v_sql$
    ;
    
    --恢复模式搜索路径
    execute 'SET search_path TO '||o_search_path;
    return p_result;
    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.get_ddl_remote_pg2pg(varchar, varchar, varchar, varchar) to public;

sp_extract_mapping

create or replace function tool.sp_extract_mapping(
     tablename character varying
    ,schemaname character varying
    ,newtablename character varying
    ,remote_connname character varying default '${CONNNAME}'::character varying
)
    returns void
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程(远程连接)表的表名,模式名(mysql为库名),
 *        在tool用户下创建一个指定新表名的外部映射表,
 *        访问该映射表,相当于直接访问远程表。
 *        select * from tool.newtablename;
 * tablename : 指定远程表的表名
 * schemaname : 指定远程表的模式名(mysql为库名)
 * newtablename : 在tool用户下创建的外部表表名
 * remote_connname : 远程连接名,来自tool.dblink_connection_info.connname
 * */
declare 
    p_tablename varchar := tablename;
    p_schemaname varchar := schemaname;
    p_newtablename varchar := newtablename;
    p_remote_connname tool.dblink_connection_info.connname%type := remote_connname;
    v_datasource_type tool.dblink_connection_info.conntype%type;
    v_sql varchar;
    o_search_path varchar;  --模式搜索路径
begin 
    --判断表是否为空
	if p_tablename is null then 
	    raise exception '参数tablename不能为空!';
	end if;

    if p_schemaname is null then 
        raise exception '参数schemaname不能为空!';
    end if;
   
    if p_newtablename not like 'tmp\_%' then 
        raise exception '参数newtablename必须以tmp_开头!';
    end if;
   
    --记录原来的模式搜索路径
    execute 'show search_path;' into o_search_path;
    --临时切换模式搜索路径
    execute 'SET search_path TO tool,'||o_search_path;
    
    v_datasource_type := conntype from tool.dblink_connection_info where connname = p_remote_connname limit 1;
    case v_datasource_type 
        when 'pg' then v_sql := tool.get_ddl_remote_pg2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);
        when 'mysql' then v_sql := tool.get_ddl_remote_mysql2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);
        else v_sql := null;
    end case;
    
    if v_sql is null then 
        raise exception '不支持的数据源类型!目前只支持mysql和pg!';
    end if;
    
    --拼接外部表ddl
    select replace(replace(replace(replace(replace(regexp_replace(v_sql
		,' not null| null','','g'),
		'drop table ','drop foreign table '
		),
		'create table ','create foreign table '
		),
		'comment on ','--comment on '
		),
		',primary key ','--,primary key '
		),
		');',') server '||fdw_server||' options('||case conntype when 'pg' then 'schema_name ' when 'mysql' then 'dbname' end||''''||p_schemaname||''''||',table_name '||''''||p_tablename||''''||');'
		)
	into v_sql
	from tool.dblink_connection_info
	where connname = p_remote_connname;
    execute v_sql;
    --execute 'show search_path;' into o_search_path;
    --raise notice '%',v_sql;
    --raise notice '%',o_search_path;
    --恢复模式搜索路径
	execute 'SET search_path TO '||o_search_path;
    exception when others then
        --恢复模式搜索路径
        execute 'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;
end;
$function$
;
grant execute on function tool.sp_extract_mapping(varchar, varchar, varchar, varchar) to public;

等待与唤醒

创建dblink插件

\c etl postgres
create extension dblink;

ETL任务通知表

\c etl tool
CREATE TABLE tool.etl_task_notice (
	table_name varchar(64) NULL, -- 表名
	schema_name varchar(64) NULL, -- 模式名
	update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP -- 表的更新时间
);
COMMENT ON TABLE tool.etl_task_notice IS 'etl任务通知,etl任务完成时,得到结果表后向该表中插入一条数据';
COMMENT ON COLUMN tool.etl_task_notice.table_name IS '表名';
COMMENT ON COLUMN tool.etl_task_notice.schema_name IS '模式名';
COMMENT ON COLUMN tool.etl_task_notice.update_time IS '表的更新时间';
ALTER TABLE tool.etl_task_notice OWNER TO tool;

创建序列

create sequence tool.seq_dblink_sessionid
	increment by 1
	minvalue 1
	maxvalue 9999
	start 1
	cache 1
	cycle;

辅助函数

is_return_result

create or replace function tool.is_return_result(
     select_statement character varying
    ,retrytimes integer default 60
)
    returns integer
    language plpgsql
    security definer
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:执行动态select语句,并且该执行过程是自治的,
 *      判断是否有结果返回,有则返回1,否则返回0
 * vsql:执行的动态sql
 * retrytimes:拿不到连接时拿连接重试次数,默认重试60次
 * */
declare
    p_select_statement varchar := select_statement;
    v_sql varchar; --动态sql
    p_retrytimes int := retrytimes;
    p_count int := 0;
    p_session_id varchar := nextval('seq_dblink_sessionid')::varchar;
    p_session_name varchar := 'dblink_'||p_session_id;
    p_result int := 0;
    dblink_conn varchar;
begin
	
    --尝试拿连接
    while true loop
    begin
	    --获取dblink连接
	    select 'host='||hostname||' port='||port||' dbname='||dbname||' user='||username||' password='||userpwd 
	    into dblink_conn
	    from tool.dblink_connection_info
	    where connname = '101.34.75.200-pg-etl';
	    perform dblink_connect(p_session_name,dblink_conn);
	    exit;
	    exception when others then 
	        if p_count >= p_retrytimes then 
	            exit;
	        end if;
	    p_count := p_count + 1;
	    --睡眠1s再拿连接
	    perform pg_sleep(1);
	    continue;
    end;
    end loop;
    v_sql :=  'select 1 from (' || p_select_statement || ') a limit 1';
    --执行动态sql语句
    select count(1) into p_result from dblink(p_session_name,v_sql) as (id int);
    --关闭dblink连接
    perform dblink_disconnect(p_session_name);
    return p_result;
    
    --报错时得先把连接关掉再把错误抛出来
    exception when others then 
	    begin
	        perform dblink_disconnect(p_session_name);
	        exception when others then
	            null;
	    end;
	    raise exception '%',SQLERRM;
end;
$function$
;
grant all on function tool.is_return_result(varchar, int4) to public;

get_15_interval_time

create or replace function tool.get_15_interval_time(
     v_time timestamp with time zone default current_timestamp
    ,time_type character varying default 'before'::character varying
)
    returns timestamp with time zone
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:获取某个时间所在分钟0-15,15-30,30-45,45-60的先后时间
 * v_time:指定时间
 * time_type:时间类型,时间段的前一段则BEFORE(默认),时间段的后一段则AFTER
 * */
declare
    p_v_time timestamptz := v_time;
    p_time_type varchar := upper(time_type);
    v_result timestamptz;
begin
	if p_time_type not in ('BEFORE','AFTER') then 
	    raise exception 'p_time_type必须是BEFORE或者AFTER!';
	end if;
    if p_time_type = 'BEFORE' then 
        v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||
		    case 
		        when to_char(p_v_time,'mi')::numeric-0 >= 0 and to_char(p_v_time,'mi')::numeric-0 < 15 then '00'
		        when to_char(p_v_time,'mi')::numeric-15 >= 0 and to_char(p_v_time,'mi')::numeric-15 < 15 then '15'
		        when to_char(p_v_time,'mi')::numeric-30 >= 0 and to_char(p_v_time,'mi')::numeric-30 < 15 then '30'
		        when to_char(p_v_time,'mi')::numeric-45 >= 0 and  to_char(p_v_time,'mi')::numeric-45 < 15 then '45'
		    end||'00','yyyymmddhh24miss');
    elsif p_time_type = 'AFTER' then 
        v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||
		    case 
		        when to_char(p_v_time,'mi')::numeric-0 >= 0 and to_char(p_v_time,'mi')::numeric-0 < 15 then '00'
		        when to_char(p_v_time,'mi')::numeric-15 >= 0 and to_char(p_v_time,'mi')::numeric-15 < 15 then '15'
		        when to_char(p_v_time,'mi')::numeric-30 >= 0 and to_char(p_v_time,'mi')::numeric-30 < 15 then '30'
		        when to_char(p_v_time,'mi')::numeric-45 >= 0 and  to_char(p_v_time,'mi')::numeric-45 < 15 then '45'
		    end||'00','yyyymmddhh24miss') + interval '15 minutes';
    end if; 
    return v_result;
end;
$function$
;
grant execute on function tool.get_15_interval_time(timestamptz, varchar) to public;

wait_table

create or replace function tool.wait_table(
     sql_statement character varying
    ,check_freq integer default 30
    ,check_time integer default 300
)
    returns integer
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:等待某个sql执行返回结果集,只能是select语句,等待成功返回1,等待失败返回0
 * sql_statement:sql语句
 * check_freq:检测频率,默认每30s检测一次
 * check_time:最多检测多少秒,默认5分钟
 * */
declare 
    p_sql_statement varchar := sql_statement;
    p_check_freq numeric := check_freq;
    p_check_time numeric := check_time;
    v_result int := 0;  --返回结果
    v_sql varchar(32767);  --动态sql
    v_id numeric;
    check_begin numeric := 0;
    v_hint varchar(400);
begin 
	--只能select语句
	if (trim(lower(p_sql_statement)) ~ '^select') = false then
	    raise exception 'sql_statement参数必须以select开头!';
	end if;
    
    --开始检测
    raise notice '----------开始检测----------';
    --动态sql
    v_sql :=  'select 1 from (' || p_sql_statement || ') a limit 1';
    raise notice '检测语句为:%',v_sql;
    loop
        v_hint := '当前检测时间为:'||to_char(clock_timestamp(),'yyyy-mm-dd hh24:mi:ss');
        raise notice '%',v_hint;
        begin
	        v_id := tool.is_return_result(v_sql);
	        --判断v_id是否为空
	        if v_id = 1 then
	            --等表标识置为1
	            v_result := 1;
	        else 
	            --当前检测没有检测通过,则初始时间后移
	            check_begin := check_begin + p_check_freq;
	        end if;
	        
            --若动态sql因为某表的不存在而产生异常,则不退出,继续下一次等表
            exception when others then
                check_begin := check_begin + p_check_freq;
	    end;
	    --退出循环的条件就是:等表超时或者等表标识为1
        exit when check_begin > p_check_time or v_result = 1; 
        perform pg_sleep(p_check_freq);
    end loop;
   
    if v_result = 1 then 
        raise notice '----------等表成功----------';
    else
        raise notice '----------等表超时----------';
        raise exception '----------等表超时----------';
    end if;
    return v_result;
end;
$function$
;
grant execute on function tool.wait_table(varchar, int4, int4) to public;

sp_wait

create or replace function tool.sp_wait(
     tablename character varying
    ,schemaname character varying
    ,waittype character varying default 'real'::character varying
    ,checkfreq integer default 20
    ,checktime integer default 600
)
    returns void
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:等待某张表被sp_notify存过唤醒。
 * tablename : 等待的表名
 * schemaname : 表名对应的模式名
 * waittype : 等待类型 year month day hour real
 *     year:当年被唤醒过
 *     month:当月被唤醒过
 *     day:当天被唤醒过
 *     hour:当前所在的小时被唤醒过
 *     real:伪实时,0-15,15-30,30-45,45-60,当前分钟所在的分钟范围被唤醒过
 * checkfreq:检测频率,默认每隔20s检测一次
 * checktime:最多检测多少秒,默认检测600s
 * */
declare 
    p_tablename varchar := lower(tablename);
    p_schemaname varchar := lower(schemaname);
    p_waittype varchar := lower(waittype);
    p_checkfreq int := checkfreq;
    p_checktime int := checktime;
    v_sql varchar;
begin 
	if p_tablename is null then 
	    raise exception '参数tablename不能为空!';
	end if;
    if p_schemaname is null then 
        raise exception '参数schemaname不能为空!';
    end if;
    if p_waittype not in ('year','month','day','hour','real') then 
        raise exception '参数waittype范围[''year'',''month'',''day'',''hour'',''real'']!';
    end if;
    if p_checkfreq >= 61 or p_checkfreq <= 0 then 
        raise exception '参数checkfreq范围 1 ~ 60 !';
    end if;
    if p_checktime >= 1801 or p_checktime <= 0 then 
        raise exception '参数checktime范围 1 ~ 1800 !';
    end if;
   
    v_sql := $v_sql$select 1 from tool.etl_task_notice
        where table_name = '$v_sql$||p_tablename||$v_sql$'
            and schema_name = '$v_sql$||p_schemaname||$v_sql$'
            and $v_sql$||
            case p_waittype
	            when 'year' then $v_sql$to_char(update_time,'yyyy') = to_char(current_timestamp,'yyyy')$v_sql$
	            when 'month' then $v_sql$to_char(update_time,'yyyymm') = to_char(current_timestamp,'yyyymm')$v_sql$
	            when 'hour' then $v_sql$to_char(update_time,'yyyymmddhh24') = to_char(current_timestamp,'yyyymmddhh24')$v_sql$
	            when 'real' then $v_sql$update_time >= tool.get_15_interval_time(current_timestamp,'BEFORE') and update_time <= tool.get_15_interval_time(current_timestamp,'AFTER')$v_sql$
	        end
    ;
    --raise notice '%',v_sql;
    perform tool.wait_table(v_sql,p_checkfreq,p_checktime);
end;
$function$
;
grant all on function tool.sp_wait(varchar, varchar, varchar, int4, int4) to public;

sp_notify

create or replace function tool.sp_notify(
     tablename character varying
    ,schemaname character varying
)
    returns void
    language plpgsql
    security definer
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:唤醒某个模式下的某个表,
 *       即通知该表已经etl完成
 * tablename:表名
 * schemaname:模式名
 * */
declare 
    p_tablename tool.etl_task_notice.table_name%type := lower(tablename);
    p_schemaname tool.etl_task_notice.schema_name%type := lower(schemaname);
begin 
    if p_tablename is null then 
        raise exception '参数tablename不能为空';
    end if;
   
    if p_schemaname is null then 
        raise exception '参数schemaname不能为空';
    end if;
   
    insert into tool.etl_task_notice(
         table_name 
        ,schema_name
    ) values (
         p_tablename
        ,p_schemaname
    );
end;
$function$
;
grant all on function tool.sp_notify(varchar, varchar) to public;

海豚调度

安装

Centos7.6集群部署海豚调度3.1.5

架构

在这里插入图片描述

搭建

对象对象实例说明
租户root操作系统执行用户
普通用户yuzhenchao姓名全拼
普通用户yuxiaotan姓名全拼
数据源dbselect@223.242.39.75:5432/dp海豚调度只有select权限的数据源
数据源mdl@101.34.75.200:5432/etletl数据库中模型开发的数据源
数据源apl@101.34.75.200:5432/etletl数据库中应用层开发的数据源
项目海豚调度元数据建模对海豚调度的元数据进行etl处理
项目海豚调度元数据应用开发——yuzhenchaoyuzhenchao的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度
项目海豚调度元数据应用开发——yuxiaotanyuxiaotan的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度
告警实例模型告警模型不管成功还是失败,都需要进行告警
告警实例应用告警-yuzhenchao应用告警通知指定的人yuzhenchao,一般只告警失败的任务
告警实例应用告警-yuxiaotan应用告警通知指定的人yuxiaotan,一般只告警失败的任务
告警组模型告警直接与告警实例一一对应
告警组应用告警-yuzhenchao直接与告警实例一一对应
告警组应用告警-yuxiaotan直接与告警实例一一对应
环境管理datax-execdatax的执行环境配置

创建租户root

在这里插入图片描述

创建普通用户

在这里插入图片描述
在这里插入图片描述

创建数据源

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建项目

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建告警实例(钉钉告警)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建告警组

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建datax环境

在这里插入图片描述

注:

  • 我这里选择的默认分组,所以需要在每台worker的机器上安装datax,并且让DATAX_HOME=/usr/local/datax
  • 数据库版本过高时,需要在对应的插件目录将旧版本的驱动删除,否则会连接失败

DataX / userGuid.md

授权管理

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

ETL过程

可回溯的etl过程——替代变量

变量名变量值变量说明
today$[yyyyMMdd]今天
yesterday$[yyyyMMdd-1]昨天
day2before$[yyyyMMdd-2]2天前
day3before$[yyyyMMdd-3]3天前
day4before$[yyyyMMdd-4]4天前
day5before$[yyyyMMdd-5]5天前
day6before$[yyyyMMdd-6]6天前
day7before$[yyyyMMdd-7]7天前

模型层工作流

ods层

  • ods层抽取数据,为了避免表产生死锁等待以及读脏数据,我们采用临时表方式进行数据抽取
  • 先创建临时表,将数据抽取到临时表,最终将临时表重命名为目标表
源库名源表名目标模式目标表名表说明
dpt_ds_projectmdlods_dp_t_ds_project项目表
dpt_ds_process_definitionmdlods_dp_t_ds_process_definition工作流定义表
dpt_ds_tenantmdlods_dp_t_ds_tenant租户表
dpt_ds_schedulesmdlods_dp_t_ds_schedules调度表
dpt_ds_process_task_relationmdlods_dp_t_ds_process_task_relation工作流任务关系表
dpt_ds_task_definitionmdlods_dp_t_ds_task_definition任务定义表
dpt_ds_process_instancemdlods_dp_t_ds_process_instance工作流实例表
dpt_ds_task_instancemdlods_dp_t_ds_task_instance任务实例表
dpt_ds_relation_process_instancemdlods_dp_t_ds_relation_process_instance存放的数据用于处理流程定义中含有子流程的情况
dpt_ds_sessionmdlods_dp_t_ds_session会话表
dpt_ds_usermdlods_dp_t_ds_user用户表
dpt_ds_datasourcemdlods_dp_t_ds_datasource数据源表
dpt_ds_access_tokenmdlods_dp_t_ds_access_token访问令牌表
dpt_ds_relation_datasource_usermdlods_dp_t_ds_relation_datasource_user用户数据源关系表
dpt_ds_queuemdlods_dp_t_ds_queue队列表

fdw实现抽数(不建议)

  • 先选择项目
    在这里插入图片描述
  • 选择工作流定义
    在这里插入图片描述
  • 选择创建工作流
    在这里插入图片描述
  • 拖拉拽sql图标
    在这里插入图片描述
  • 在开发用户下开发ods的脚本,然后在海豚调度上配置任务,我们以表 t_ds_project举例
--通过该函数获取建表语句
--select tool.get_ddl_remote_pg2pg('t_ds_project','public','tmp_ods_dp_t_ds_project');
drop table if exists "tmp_ods_dp_t_ds_project";
create table "tmp_ods_dp_t_ds_project" (
	 "id" integer not null
	,"name" character varying(100) null
	,"code" bigint not null
	,"description" character varying(255) null
	,"user_id" integer null
	,"flag" integer null
	,"create_time" timestamp without time zone null
	,"update_time" timestamp without time zone null
	,primary key ("id")
);
comment on table "tmp_ods_dp_t_ds_project" is '项目表';
comment on column "tmp_ods_dp_t_ds_project"."id" is '项目id';
comment on column "tmp_ods_dp_t_ds_project"."name" is '项目名称';
comment on column "tmp_ods_dp_t_ds_project"."code" is '项目编码';
comment on column "tmp_ods_dp_t_ds_project"."description" is '项目描述';
comment on column "tmp_ods_dp_t_ds_project"."user_id" is '用户id,对应t_ds_user.id';
comment on column "tmp_ods_dp_t_ds_project"."create_time" is '项目创建时间';
comment on column "tmp_ods_dp_t_ds_project"."update_time" is '项目最近更新时间';
--创建映射表
do $$
begin
    perform tool.sp_extract_mapping('t_ds_project','public','tmp_ods_dp_t_ds_project');
end$$;

--抽取数据
insert into tmp_ods_dp_t_ds_project
select * from tool.tmp_ods_dp_t_ds_project
;

do $$
begin
	--增加last_pg_time时间字段
	alter table tmp_ods_dp_t_ds_project add last_pg_time timestamp default current_timestamp;
    --空字符串处理成null
    perform tool.replace_to_null('tmp_ods_dp_t_ds_project');
    --如果ods表存在就删除
	drop table if exists ods_dp_t_ds_project;
    --表重命名
	alter table tmp_ods_dp_t_ds_project rename to ods_dp_t_ds_project;
    
    --ods_dp_t_ds_project
    perform tool.sp_notify('ods_dp_t_ds_project','mdl');
    
    --表分析
    analyze ods_dp_t_ds_project;
end$$;

在这里插入图片描述

  • 点击保存
    在这里插入图片描述
    在这里插入图片描述
  • 点击上线然后运行工作流
    在这里插入图片描述
    在这里插入图片描述
  • 查看日志
    在这里插入图片描述
  • 其他同理,连线可以控制并发度
    在这里插入图片描述

datax实现抽数(建议)

  • 创建工作流,拖拉拽sql组件和datax组件
    在这里插入图片描述
  • 编写脚本,任务配置
  • 前置sql组件
drop table if exists "tmp_ods_dp_t_ds_project";
create table "tmp_ods_dp_t_ds_project" (
	 "id" integer not null
	,"name" character varying(100) null
	,"code" bigint not null
	,"description" character varying(255) null
	,"user_id" integer null
	,"flag" integer null
	,"create_time" timestamp without time zone null
	,"update_time" timestamp without time zone null
	,primary key ("id")
);
comment on table "tmp_ods_dp_t_ds_project" is '项目表';
comment on column "tmp_ods_dp_t_ds_project"."id" is '项目id';
comment on column "tmp_ods_dp_t_ds_project"."name" is '项目名称';
comment on column "tmp_ods_dp_t_ds_project"."code" is '项目编码';
comment on column "tmp_ods_dp_t_ds_project"."description" is '项目描述';
comment on column "tmp_ods_dp_t_ds_project"."user_id" is '用户id,对应t_ds_user.id';
comment on column "tmp_ods_dp_t_ds_project"."create_time" is '项目创建时间';
comment on column "tmp_ods_dp_t_ds_project"."update_time" is '项目最近更新时间';

在这里插入图片描述

  • datax的select语句
--数据抽取select语句
select 
     "id"
	,"name"
	,"code"
	,"description"
	,"user_id"
	,"flag"
	,"create_time"
	,"update_time"
from t_ds_project
;

--后置操作
do $$
begin
	--增加last_pg_time时间字段
	alter table tmp_ods_dp_t_ds_project add last_pg_time timestamp default current_timestamp;
    --空字符串处理成null
    perform tool.replace_to_null('tmp_ods_dp_t_ds_project');
    --如果ods表存在就删除
	drop table if exists ods_dp_t_ds_project;
    --表重命名
	alter table tmp_ods_dp_t_ds_project rename to ods_dp_t_ds_project;
    
    --ods_dp_t_ds_project
    perform tool.sp_notify('ods_dp_t_ds_project','mdl');
    
    --表分析
    analyze ods_dp_t_ds_project;
end$$;

在这里插入图片描述

  • 其他同理

在这里插入图片描述

  • 效率对比
    在这里插入图片描述

dw层——sql脚本

dim层——sql脚本

dm层——sql脚本

总控

应用层工作流

Magic-API统一接口平台

其他函数

array_position

create or replace function tool.array_position(
     arrayint integer[]
    ,elementint integer
    ,times integer default 1
)
    returns integer
    language plpgsql
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 返回数组指定元素所在的位置,未匹配到返回0
 * arrayint : 数组
 * element : 指定元素
 * times  : 第几次出现的位置
 * */
declare 
	p_times int := 0;
	p_result int := 0;
begin
	if array_length(arrayint,1) is null then 
		return p_result;
	end if;
	for i in 1..array_length(arrayint,1) loop  
		if arrayint[i] = elementint then 
			p_times := p_times + 1;
		end if;
		if p_times = times then
			return i;
		end if;
	end loop;
	return p_result;
end;
$function$
;
grant execute on function tool.array_position(_int4, int4, int4) to public;

replace_to_null

create or replace function tool.replace_to_null(
     tablename character varying
    ,schemaname character varying default ("current_user"())::character varying(64)
    ,execuser varchar default current_user::varchar
)
    returns void
    language plpgsql
    security definer
as $function$
/* 作者 : v-yuzhenc
 * 功能:扫描指定表的所有varchar和text类型的字段,将字段值为''替换成null
 * tablename : 需要扫描的表名
 * schemaname : 需要扫描的模式名
 * */
declare 
	p_tablename varchar := lower(tablename);
	p_schemaname varchar := lower(schemaname);
	p_execuser varchar(64) := execuser;--调用者
	existbj int := 0;  --存在标记
	v_sql varchar;  --动态sql
begin
	if p_schemaname <> p_execuser then 
	    raise exception '你只有权限操作自己模式下的表!';
	end if;
	--扫描varchar和text字段
	select count(1)
	into existbj
	from pg_class a
	inner join pg_namespace b
	on (a.relnamespace = b.oid)
	inner join pg_attribute c
	on (a.oid = c.attrelid)
	inner join pg_type d
	on (c.atttypid = d.oid)
	where c.attnum > 0
		and d.typname in ('varchar','text')
		and a.relname = p_tablename
		and b.nspname = p_schemaname;
	--若不存在varchar或者text字段,则不做处理
	if existbj = 0 then
		raise notice '%.%表不存在或者不需要处理空字符串!',p_schemaname,p_tablename;
		return;
	end if;
	--拼接处理空字符串语句
	select 
		string_agg('update '||p_tablename||' 
	set '||c.attname||' = null where '||c.attname||' = '''';',chr(10))
	into v_sql
	from pg_class a
	inner join pg_namespace b
	on (a.relnamespace = b.oid)
	inner join pg_attribute c
	on (a.oid = c.attrelid)
	inner join pg_type d
	on (c.atttypid = d.oid)
	where c.attnum > 0
		and d.typname in ('varchar','text')
		and a.relname = p_tablename
		and b.nspname = p_schemaname;
	--通过集中处理程序执行动态sql
	perform tool.sp_execsql(v_sql,p_schemaname);
end;
$function$
;
grant execute on function tool.replace_to_null(varchar, varchar,varchar) to public;

sp_jzdb

create or replace function tool.sp_jzdb(
     tablename character varying
    ,oldschema character varying
    ,newschema character varying
    ,tablename_new character varying default null::character varying
    ,execuser varchar default current_user::varchar
)
    returns void
    language plpgsql
    security definer
as $function$ 
/* 
 * 作者 : v-yuzhenc
 * 功能:集中导表,将指定模式下的表以新的表名导入到新的模式下
 * tablename : 指定模式下的表,不区分大小写
 * oldschema : 指定模式,不区分大小写
 * newschema : 新的模式,不区分大小写
 * tablename_new : 新的表名,不区分大小写,默认与旧表名相同
 * execuser : 调用者用户名,无需传参,默认值即可
 * */
declare 
	p_tablename varchar(64) := lower(tablename);
	p_tablename_new varchar(64) := coalesce (lower(tablename_new),p_tablename);
	p_oldschema varchar(64) := lower(oldschema);
	p_newschema varchar(64) := lower(newschema);
    p_execuser varchar(64) := execuser;  --调用者用户名,无需人工传参
    jzdb_tname varchar(64) := 'jzdb_'||p_tablename_new;
    existbj numeric;
    v_sql varchar;
    bak_tname varchar;
begin
	
	--调用者可以将自己的表导入到其他模式
	--也可以将其他模式的表导入到自己模式
	if p_execuser <> p_oldschema and p_execuser <> p_newschema then 
	    raise exception 'oldschema和newschema参数之一必须与当前用户名一致,因为只允许操作与自己相关的表';
	end if;
	--建表
    v_sql := tool.get_ddl(p_oldschema||'.'||p_tablename,'table',jzdb_tname);
    perform tool.sp_execsql(v_sql,p_newschema);

	--插入数据
	perform tool.sp_execsql('insert into "'||jzdb_tname||'" select * from "'||p_oldschema||'"."'||p_tablename||'";',p_newschema);
	
	--判断新模式下的新表是否有重名的
	select count(1)
	into existbj
	from pg_tables a 
	where a.tablename = p_tablename_new
		and a.schemaname = p_newschema;
	if existbj <> 0 then 
	    --如果有重名的表存在
	    --则做备份,加前缀 o_模式名_表名_版本号
	    select 
	        count(1)+1 v_no  --版本号
	    into existbj
	    from pg_tables a
	    where substr(a.tablename,4+length(p_newschema),length(p_tablename_new)) = p_tablename_new
	        and a.schemaname = 'tool'
	    ;
	    --新表名建在tool下,加前缀 o_模式名_表名_版本号
	    bak_tname := 'o_'||p_newschema||'_'||p_tablename_new||'_'||existbj::varchar;
	    --建表
	    v_sql := tool.get_ddl(p_newschema||'.'||p_tablename_new,'table',bak_tname);
	    perform tool.sp_execsql(v_sql,'tool');
	    --插入数据
	    perform tool.sp_execsql('insert into "'||bak_tname||'" select * from "'||p_newschema||'"."'||p_tablename_new||'";','tool');
	    --分析表
	    perform tool.sp_execsql('analyze "'||bak_tname||'";','tool');
		perform tool.sp_execsql('drop table "'||p_tablename_new||'";',p_newschema);
	end if;

	--重命名表
	perform tool.sp_execsql('alter table "'||jzdb_tname||'" rename to '||p_tablename_new,p_newschema);

	--分析表
	perform tool.sp_execsql('analyze "'||p_tablename_new||'";',p_newschema);
    
end;
$function$
;
grant execute on function tool.sp_jzdb(varchar, varchar, varchar, varchar,varchar) to public;

sp_sqlexec_efficient

create or replace function tool.sp_sqlexec_efficient(
     sqlexec character varying
    ,exectimes integer default 1
)
    returns character varying
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:sql执行效率检测,事物不会提交,
 *       返回每次执行时间和平均执行时间
 * sqlexec:需要检测的sql
 * exectimes:需要检测的次数,范围1-10
 * */
declare 
    begin_time timestamp;
    end_time timestamp;
    exec_duration interval;
    v_result varchar := '';
    p_sqlexec varchar := sqlexec;
    p_exectimes int := exectimes;
    exec_begin int := 0;
    sum_sqlexec interval := '00:00:00.000000'::interval;
    sqlexec_avg interval;
begin
	if p_exectimes <= 0 or p_exectimes >= 11 then 
	    raise exception 'exectimes参数值范围为1~10';
	end if;
    while exec_begin >= 0 and exec_begin <= p_exectimes-1 loop
	    begin
		    begin_time := clock_timestamp();
		    execute p_sqlexec;
		    end_time = clock_timestamp();
		    raise exception '回滚' using errcode = '12345';
		    exception when sqlstate '12345' then 
		        null;
		end;
	    exec_duration = end_time-begin_time;
	    v_result := v_result||to_char(begin_time,'yyyy-mm-dd hh24:mi:ss.us')||'~'||to_char(end_time,'yyyy-mm-dd hh24:mi:ss.us')||':'||exec_duration::varchar||';'||chr(10);
	    sum_sqlexec := sum_sqlexec + exec_duration;
	    exec_begin := exec_begin + 1;
	end loop;
    sqlexec_avg := sum_sqlexec/p_exectimes;
    v_result := v_result||'avg:'||sqlexec_avg::varchar||';';
    return v_result;
end;
$function$
;
grant execute on function tool.sp_sqlexec_efficient(varchar, int4) to public;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/513174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT与Discord无缝接入指南,获取你的专属聊天机器人

ChatGPT与Discord无缝接入指南,获取你的专属聊天机器人 一、获取OpenAI API密钥。二、获取Discord Token三、注册GitHub,有账号的可以直接登录。四、在线开发环境搭建Replit五、注册cron-job.org本教程收集于: AIGC从入门到精通教程 ChatGPT和Discord都非常流行,而在Disco…

基于RK3588s人工智能大算力多网口工业网关交换机,可接5路千兆高清相机

RK3588/RK3588S AI BOX 功能接口说明 接口需求 功能定义与要求 备注 成本与目标 硬件设计争取全国产化&#xff0c; 632GB 版本 RK3588S PCBA 尺寸 146*102 丝印版本号 RK3588S AI BOX V1.0 EMMC 支持 SanDisk SDINBDA4-32/64/128G 4G-8GB 标配 8GB …

tensorboard如何使用

神经网络本身比较难理解&#xff0c;看起来很神秘&#xff0c;所以我们可以借助可视化根据tensorboard关注神经网络的运行过程&#xff0c;其中包括了各项数据指标以及神经网络自身的图结构。 TensorBoard 是 TensorFlow 的可视化工具&#xff0c;可以帮助用户更好地理解和调试…

【Linux】项目自动化构建工具make/makefile

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;Linux的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录 前言一、make/makefile的背景二、…

00后卷起来,真没我们老油条什么事了···

都说00后躺平了&#xff0c;但是有一说一&#xff0c;该卷的还是卷。 这不&#xff0c;前段时间我们公司来了个00后&#xff0c;工作没两年&#xff0c;跳槽到我们公司起薪20K&#xff0c;都快接近我了。后来才知道人家是个卷王&#xff0c;从早干到晚就差搬张床到工位睡觉了。…

Android 中你碰不到但是很重要的类之ActivityThread

作者&#xff1a;Drummor 通过本文能了解一下内容 1、和系统进程打交道的桥头堡 应用进程起来之后ART(Android Runtime)第一站就是ActivityThread&#xff0c;代码层面上就是ActivityThread的main()方法&#xff0c;是不是很熟悉&#xff0c;爷青回啊&#xff0c;这不就是java…

基于深度学习的动物识别系统的实现

项目介绍 动物识别系统&#xff0c;使用Python作为主要开发语言&#xff0c;基于深度学习TensorFlow框架&#xff0c;搭建卷积神经网络算法。并通过对18种动物数据集进行训练&#xff0c;最后得到一个识别精度较高的模型。并基于Django框架&#xff0c;开发网页端操作平台&…

数据分析师 ---- SQL强化(3)

数据分析师 ---- SQL强化(3) 题目&#xff1a;每个月Top3的周杰伦歌曲 从听歌流水中找到18-25岁用户在2022年每个月播放次数top 3的周杰伦的歌曲 输入例子&#xff1a; drop table if exists play_log; create table play_log (fdate date,user_id int,song_id int ); inser…

前端的培训计划书

文章目录 导文模板一一、前言二、培训目标三、培训内容和计划 模板二模板三 导文 这里是导文 模板一 一、前言 随着互联网的快速发展&#xff0c;前端开发已经成为了现代软件开发中一个不可或缺的重要技能。本次培训旨在帮助学员快速掌握前端开发的核心知识和技能&#xff0c…

ChatGPT实现撰写邮件

撰写邮件 电子邮件是日常工作中很常用的工具&#xff0c;在相对正式的场合&#xff0c;一封格式美观、用语典雅的电子邮件正文会起到很好的作用。ChatGPT 可以较好的完成电子邮件的编写和格式美化工作。 下面让我们以产品销售的角度&#xff0c;写一封推销邮件。假定产品名称…

String类 [中]

目录 一、 string 的深浅拷贝 0x00 构造函数与析构函数的实现 0x01 拷贝构造 0x02 赋值 0x03 整体代码 二、 string的实现 0x01 引入 0x02 c_str 0x03 默认构造函数 三、size()与operator[]的实现 0x01 size()的实现 0x02 operator[]的实现 0x03 遍历实现 四、迭代器…

洛谷B2098 整数去重

整数去重 题目描述 给定含有 n n n 个整数的序列&#xff0c;要求对这个序列进行去重操作。所谓去重&#xff0c;是指对这个序列中每个重复出现的数&#xff0c;只保留该数第一次出现的位置&#xff0c;删除其余位置。 输入格式 输入包含两行&#xff1a; 第一行包含一个…

获取两个日期间时长 (XX天XX时XX分)

使用场景&#xff1a; 发货日期与到货日期 计算运输时长 代码&#xff1a; private String getMinuteTime(String startTime, String endTime) {String minuteTime null;if (StrUtil.isNotBlank(startTime) && StrUtil.isNotBlank(endTime)) {long minute DateUti…

【芯片设计- RTL 数字逻辑设计入门 2 - vcs 及 verdi 使用介绍】

文章目录 1.1 VCS 编译环境1.1.1 Complie Design1.1.2 simv 仿真 1.2 VCS 波形生成及查看1.2.1 verdi 命令介绍1.2.2 verdi 波形查看 1.1 VCS 编译环境 VCS 全称是 Verilog Compiler Simulator&#xff0c;是 Synopsys 公司的&#xff0c;类似于windows环境下的 questasim 或 …

Flink sql

1.创建表的执行环境 第一种 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Event> streamOperator env.addSource(new ClickSource()).assignTimestampsAndWatermarks(W…

深入理解C语言指针

目录 一、指针基础知识 二、野指针 三、指针运算 四、二级指针 五、指针数组与数组指针 六、函数指针与函数指针数组 一、指针基础知识 什么是指针&#xff1f; 指针其实就是个用来存放地址的变量&#xff0c;定义为type *。 指针大小&#xff1f; 32位平台(32个地…

【大数据-调度工具】dolphinscheduler安装和遇到的问题

1.安装 安装步骤按照官网安装即可 官网&#xff1a;DolphinScheduler | 文档中心 (apache.org) 版本&#xff1a;3.1.5 2.踩坑记录 Q1.大文件无法上传 问题描述&#xff1a; 在资源中心中上传文件选择完大文件夹之后&#xff0c;选择确认之后确认按钮转了几圈圈之后就没…

[Element]调整select样式

通过伪元素&#xff0c;实现这个和step长得差不多的样式 <template><el-selectv-model"value"placeholder"请选择提报单位"style"width: 430px"><el-optionv-for"(item, i) in officeList":class"el-option get…

hive的基本操作语句

背景&#xff1a;记录一下hive创建数据库&#xff0c;建表&#xff0c;添加数据&#xff0c;创建分区等的语句吧&#xff0c;省得总百度&#xff0c;&#x1f604; 第一步&#xff1a;hive的建库语句 create database pdata_dynamic;查看是否创建成功了 show databases;显示如…

Jenkins入门教程

一、开始使用 Jenkins 本导读将向您介绍使用 Jenkins、Jenkins 的主要特性和 Jenkins Pipeline 的基本知识。 本导读使用“独立”的 Jenkins 发行版&#xff0c;它可以在您自己本地的机器上运行。 准备工作 第一次使用 Jenkins&#xff0c;您需要&#xff1a; 机器要求&…