POSTGRESQL中ETL、fdw的平行替换

news2024/11/15 17:57:40

POSTGRESQL中ETL、fdw的平行替换

01、简介

“ 在我前两次的文章中,说到postgresql对于python的支持,其实很多功能也就可以封装进入的postgresql数据库中去。比如fdw、etl等,本文将以此为叙述点,进行演示展示”

d9de783ffe5e4224852626e3f2388609_0.png

在postgresql数据库中fdw的支持,在创建和使用上都不上太方便,特别是fdw在用表级别关联的时候,性能会大大折扣,因为fdw的数据并不会落地到本地​。所以我们可以利用postgresql对于python的支持,自行封装一个库对库的调度工具,将远端数据进行落地​后再次使用。对于使用的便利性,读者可自行​对比。

02、postgresql16.1的安装

安装依赖

yum install -y bison flex readline-devel zlib-devel zlib zlib-devel gcc  gcc-c++ openssl-devel python3  python3-devel libicu-devel ncurses-devel sqlite-devel tk-devel gcc make

添加用户

useradd postgres 
vim /etc/sudo

在101行以下添加以下内容


postgres ALL=(ALL)     NOPASSWD: ALL

进入官网找到链接,这里使用源码安装。

wget https://ftp.postgresql.org/pub/source/v16.1/postgresql-16.1.tar.gz

解压并进入解压目录


 mv postgresql-16.1.tar.gz /home/postgres
 su - postgres 
 tar -zxf postgresql-16.1.tar.gz
 cd postgresql-16.1

这里编译python支持还是很重要。–with-python 自行构建plpython3u插件


./configure --prefix=/home/postgres/pg --with-openssl  --with-python

make && make install

编辑环境变量


cd 
vim .bash_profile

加入以下环境变量

export PATH=/home/postgres/pg/bin:$PATH 
export PGDATA=/home/postgres/pg/data 

加载环境变量


source ~/.bash_profile

初始化数据库


initdb -D $PGDATA -U postgres -W 
(输入超级用户密码两次)
pg_ctl start 
pg_ctl status

进入数据库创建拓展


CREATE EXTENSION plpython3u CASCADE;

02、创建支持跨库访问的函数

首先下载python链接数据库所需module

postgres=# \! pip3 install -i https://mirrors.aliyun.com/pypi/simple/ cx_Oracle pyodbc pymysql --user 
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Requirement already satisfied: cx_Oracle in ./.local/lib/python3.6/site-packages (8.3.0)
Collecting pyodbc
  Downloading https://mirrors.aliyun.com/pypi/packages/27/5c/5e472d714dea2a634bd79df6b8ace55737a9f50c8fbb3b15521fceda4694/pyodbc-4.0.39-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (330 kB)
     |████████████████████████████████| 330 kB 2.8 MB/s            
Collecting pymysql
  Downloading https://mirrors.aliyun.com/pypi/packages/4f/52/a115fe175028b058df353c5a3d5290b71514a83f67078a6482cff24d6137/PyMySQL-1.0.2-py3-none-any.whl (43 kB)
     |████████████████████████████████| 43 kB 2.4 MB/s             
Installing collected packages: pyodbc, pymysql
Successfully installed pymysql-1.0.2 pyodbc-4.0.39

在链接远程Oracle数据库,需要下载指定的客户端,本文使用的是oracle 19C

wget https://download.oracle.com/otn_software/linux/instantclient/1921000/oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
sudo rpm -ivh oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm

编辑环境变量

vim /etc/profile

配置以下环境变量值

export LD_LIBRARY_PATH=/usr/lib/oracle/19.21/client64/lib:$LD_LIBRARY_PATH

加载环境变量

source /etc/profile

在postgresql数据库中创建具有跨库链接mysql\oracle\sqlserver功能的function。

CREATE OR REPLACE FUNCTION fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100))
RETURNS text AS $$

import cx_Oracle
import pyodbc
import pymysql

def read_data_from_database(db_type, host, port, username, password, db_name, table_name):
    result_values = []  # Initialize as an empty list

    # 读取Oracle数据库中指定表数据的函数
    if db_type.lower() == 'oracle':
        connection_string = f"{username}/{password}@{host}:{port}/{db_name}"
        connection = cx_Oracle.connect(connection_string)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取SQL Server数据库中指定表数据的函数
    elif db_type.lower() == 'sqlserver':
        connection = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={host};port={port};DATABASE={db_name};UID={username};PWD={password}")
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取MySQL数据库中指定表数据的函数
    elif db_type.lower() == 'mysql':
        connection = pymysql.connect(host=host, user=username, password=password, database=db_name, port=port)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    else:
        raise ValueError("Unsupported database type. Supported types: 'oracle', 'sqlserver', 'mysql'")

    # 返回拼接的VALUES子句
    return ', '.join(result_values)

insert_values = read_data_from_database(db_type, host, port, username, password, db_name, tablename)
return insert_values


$$ LANGUAGE plpython3u;

以Oracle作为测试 在Oracle 和PG中均创建测试表conn_fdw
postgresql

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id integer,
    name VARCHAR(50),
    age integer,
    city VARCHAR(50),
    salary integer
);

oracle中

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id NUMBER,
    name VARCHAR2(50),
    age NUMBER,
    city VARCHAR2(50),
    salary NUMBER
);

Oracle中插入数据

-- 插入20行数据
INSERT INTO conn_fdw VALUES (1, 'John', 30, 'New York', 50000);
INSERT INTO conn_fdw VALUES (2, 'Alice', 25, 'Los Angeles', 60000);
INSERT INTO conn_fdw VALUES (3, 'Bob', 35, 'Chicago', 70000);
INSERT INTO conn_fdw VALUES (4, 'Eva', 28, 'San Francisco', 55000);
INSERT INTO conn_fdw VALUES (5, 'Mike', 32, 'Seattle', 65000);
INSERT INTO conn_fdw VALUES (6, 'Sophia', 29, 'Boston', 75000);
INSERT INTO conn_fdw VALUES (7, 'David', 27, 'Denver', 52000);
INSERT INTO conn_fdw VALUES (8, 'Emily', 31, 'Austin', 68000);
INSERT INTO conn_fdw VALUES (9, 'Daniel', 26, 'Phoenix', 58000);
INSERT INTO conn_fdw VALUES (10, 'Olivia', 33, 'Houston', 72000);
INSERT INTO conn_fdw VALUES (11, 'Liam', 24, 'Portland', 49000);
INSERT INTO conn_fdw VALUES (12, 'Ava', 34, 'Atlanta', 71000);
INSERT INTO conn_fdw VALUES (13, 'Logan', 30, 'Miami', 62000);
INSERT INTO conn_fdw VALUES (14, 'Mia', 28, 'Dallas', 54000);
INSERT INTO conn_fdw VALUES (15, 'Jackson', 29, 'Minneapolis', 67000);
INSERT INTO conn_fdw VALUES (16, 'Sophie', 31, 'Detroit', 59000);
INSERT INTO conn_fdw VALUES (17, 'William', 27, 'Philadelphia', 70000);
INSERT INTO conn_fdw VALUES (18, 'Emma', 32, 'San Diego', 66000);
INSERT INTO conn_fdw VALUES (19, 'James', 26, 'Raleigh', 63000);
INSERT INTO conn_fdw VALUES (20, 'Avery', 35, 'Tampa', 71000);


此时再结合SQL语言进行处理远程连接传过来数据,再创建一个函数用于调用以上创建fdw_db

CREATE OR REPLACE FUNCTION inset_fdw_db(db_type varchar(100),host VARCHAR(100)
								  ,port integer, username VARCHAR(100), 
								  password VARCHAR(100), db_name VARCHAR(100),
								  tablename varchar(100),target_bale varchar(100))
RETURNS void AS $$
declare 
data_values text;
begin 
SELECT   fdw_db(db_type, host, port, username, password, db_name,tablename) into data_values;
 
EXECUTE 'insert into '||target_bale ||' values'||data_values;
end;

$$ LANGUAGE plpgsql;

进行调用

 SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 

进入数据库中查看
此时数据已经落地

postgres=# select *  from CONN_FDW;
 id | name | age | city | salary 
----+------+-----+------+--------
(0 rows)

postgres=#  SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 inset_fdw_db 
--------------
 
(1 row)


postgres=# select *  from CONN_FDW;
 id |  name   | age |     city      | salary 
----+---------+-----+---------------+--------
  1 | John    |  30 | New York      |  50000
  2 | Alice   |  25 | Los Angeles   |  60000
  3 | Bob     |  35 | Chicago       |  70000
  4 | Eva     |  28 | San Francisco |  55000
  5 | Mike    |  32 | Seattle       |  65000
  6 | Sophia  |  29 | Boston        |  75000
  7 | David   |  27 | Denver        |  52000
  8 | Emily   |  31 | Austin        |  68000
  9 | Daniel  |  26 | Phoenix       |  58000
 10 | Olivia  |  33 | Houston       |  72000
 11 | Liam    |  24 | Portland      |  49000
 12 | Ava     |  34 | Atlanta       |  71000
 13 | Logan   |  30 | Miami         |  62000
 14 | Mia     |  28 | Dallas        |  54000
 15 | Jackson |  29 | Minneapolis   |  67000
 16 | Sophie  |  31 | Detroit       |  59000
 17 | William |  27 | Philadelphia  |  70000
 18 | Emma    |  32 | San Diego     |  66000
 19 | James   |  26 | Raleigh       |  63000
 20 | Avery   |  35 | Tampa         |  71000
(20 rows)



总结

该方法不仅可以应用到数据库对数据库之间,也可以应到,数据库对文件路径下。在postgresql嵌入python代码 其实可以替换掉一些中间件的使用。可控性,定制性也会更强。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380761.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

好用的便签有哪些?windows便签工具在哪打开?

每当我8点准时上班,在等待电脑开机的过程,我都会习惯性地思考整理今天要晚上的任务,列出所要完成的待办事项。随着每一项任务的清晰呈现,我的心情也逐渐明朗起来。当然了,这个时候,我迫切需要一款好用的便签…

大数据赋能电竞出海企业发展

近几年电竞行业发展迅速,我国单2022年新增近4万家电竞相关企业,竞争十分激烈。中国电竞市场规模在全球占比19%左右,海外有巨大的增量市场,特别是东南亚、中南亚和拉丁美洲是电竞市场增长最快的地区,在2020至2025年期间…

【微信小程序独立开发2】授权登录 上

前言:这一节设想完成的功能为进入小程序后请求授权信息,用户授权登录后,弹出宠物登记页面,并根据宠物类型播放背景音乐 小程序昵称头像在之前的版本获取规则为触发后弹出用户授权弹窗,授权后可直接获取用户头像和昵称&…

DCP文件传输的重要性与应用

在数字时代,文件传输已成为商业运作中不可或缺的一环。随着企业越来越多地采用云基础设施和服务,有效地在云和团队之间传输大文件和数据集变得至关重要。在这一背景下,数据复制协议(DCP)文件传输应运而生,引…

Web实战丨基于django+html+css+js的电子商务网站

文章目录 写在前面实验目标需求分析实验内容安装依赖库1.登陆界面2.注册界面3.电子商城界面4.其他界面 运行结果写在后面 写在前面 本期内容:基于DjangoHTMLCSSJS的电子商务网站 实验环境: vscode或pycharmpython(3.11.4)django 代码下载地址&#x…

【分布式技术】监控平台zabbix介绍与部署

目录 一、为什么要做监控? 二、zabbix是什么? 三、zabbix有哪些组件? ​编辑Zabbix 6.0 功能组件: ●Zabbix Server ●数据库 ●Web 界面 ●Zabbix Agent ●Zabbix Proxy ●Java Gateway 四、zabbix的工作原理&#xf…

GNSS差分码偏差(DCB)原理学习与数据下载地址

一、DCB原理 GNSS差分码偏差(DCB,Differential Code Bias)是由不同类型的GNSS信号在卫星和接收机不同通道产生的时间延迟(硬件延迟/码偏差)差异,按照频率相同或者不同又可以细分为频内偏差(例如…

PADS9.5 : 元件库绘制

元件库绘制 1、打开PADS LOGIC 软件 2、先开始元件的电参数 这理面我们只需要先关注: 门 ,就是当前画的元件有几个部分 示例:两个门:A、B 3、再开始编辑图形 选择创建2D线,绘制PARTA 外框 添加端点,就是接…

生态茶园建设方案——福建蜂窝物联

一、项目背景 为了进一步提高茶产业集约化、产业化发展水平,充分运用物联网、互联网等高新技术为产业赋能,加速推动安溪茶产业转型升级,县政府决定在安溪县推进“安溪智慧生态茶园项目”,并以茶叶重镇感德镇实施“安溪智慧生态茶园…

CRM-如何做好客户管理

客户是企业最重要的资源,也是客户360视图管理的主数据,企业的运转都是围绕客户来开展的,如何做好客户数据的管理是一门学问,也需要企业动态的调整战略。 客户分为企业客户(Account)与个人客户(…

图解智慧:数据可视化如何助你高效洞悉信息?

在信息爆炸的时代,数据扮演着越来越重要的角色,而数据可视化则成为解读和理解海量数据的得力工具。那么,数据可视化是如何帮助我们高效了解数据的呢?下面我就以可视化从业者的角度来简单聊聊这个话题。 无需深奥的专业知识&#x…

环信服务端下载消息文件---菜鸟教程

前言 在服务端,下载消息文件是一个重要的功能。它允许您从服务器端获取并保存聊天消息、文件等数据,以便在本地进行进一步的处理和分析。本指南将指导您完成环信服务端下载消息文件的步骤。 环信服务端下载消息文件是指在环信服务端上,通过调…

实用编程调试技巧

目录 一、调试的基本步骤 二、Debug和Release的介绍 三、Windows环境调试介绍 1.调试环境的准备 2.学会快捷键 最常用的几个快捷键: 断点应用举例: 3.调试的时候查看程序当前信息 (1&#xff09…

橘子学Spring01之spring的那些工厂和门面使用

一、Spring的工厂体系 我们先来说一下spring的工厂体系(也称之为容器),得益于大佬们对于单一职责模式的坚决贯彻,在十几年以来spring的发展路上,扩展出来大量的工厂类,每一个工厂类都承担着自己的功能(其实就是有对应的方法实现)…

redis高级篇之单线程和多线程

目录 1、redis的发展史 2、redis为什么选择单线程? 3、主线程和Io线程是怎么协作完成请求处理的? 4、IO多路复用 5、开启redis多线程 1、redis的发展史 Redis4.0之前是用的单线程,4.0以后逐渐支持多线程 Redis4.0之前一直采用单线程的主…

智慧农业大棚建设方案——福建蜂窝物联

一、项目背景 温室大棚在不适宜植物生长的季节,能提供生育期和增加产量,多用于低温季节喜温蔬菜、花卉、林木等植物栽培或育苗等。因此对种植作物生长环境的要求要精确的多。 大多数农户加温、浇水、通风等,全凭感觉。人感觉冷了就加温&#…

Netfilter 是如何工作的(六):连接跟踪信息的入口创建(in)和出口确认(confirm)

Articles (gitee.io) IPtables-朱双印博客 (zsythink.net) 在 Netfilter 是如何工作的(五) 中连接跟踪信息使用的创建-确认机制的 Netfilter在报文进入系统的入口处,将连接跟踪信息记录在报文上,在出口进行confirm.确认后的连接信息 本文以一个本机上送…

【LeetCode每日一题】2182. 构造限制重复的字符串

2024-1-13 文章目录 [2182. 构造限制重复的字符串](https://leetcode.cn/problems/construct-string-with-repeat-limit/)思路: 2182. 构造限制重复的字符串 思路: 按照字符出现次数从高到低的顺序进行重复,通过维护一个指针 j 来寻找下一个…

如何在你的网站接入QQ登录?

文章目录 准备阶段申请QQ登录的权限创建应用最后上传qqlogin.php代码 准备阶段 国内服务器和备案域名需要你有张独一无二本人的身份证你正面手持身份证的图片一张100px*100px的网站图标 申请QQ登录的权限 首先访问qq互联,点击我直接访问 登陆完成后我们点击面的…

Java判断字符串当中是否有中文符号(不是中文名称,是符号)

public static void main(String[] args) throws ParseException, IOException, URISyntaxException {// 测试示例String testString1 "Hello,test!";String testString2 "This is a test.";boolean result1 containsChineseSymbols(testStr…