使用 Python 和 SQL 自动将 ETL 传输到 SFTP 服务器

news2025/1/15 21:39:08

了解如何在 Windows 上自动执行从 PostgreSQL 数据库到远程服务器的日常数据传输过程

       欢迎来到雲闪世界。将文件从一个位置传输到另一个位置的过程显然是自动化的完美选择。重复执行这项工作可能令人望而生畏,尤其是当您必须对几组数据执行整个 ETL(提取、转换、加载)过程时。

假设您的公司将数据存放在数据仓库中,然后他们决定将部分分析工作外包给外部数据分析供应商。该供应商提供定制的分析软件,该软件将为您公司的核心生产团队显示仪表板和报告。

这意味着,作为数据工程师,您必须每天、每小时、每 30 分钟或按照外包合同决定的任何其他频率向该供应商传输数据。

本文详细介绍了包含 SFTP 上传的 ETL 流程。我们将采用安全文件传输协议 (SFTP),这是一种在两个远程服务器之间传输文件的安全方式,通过使用所谓的安全外壳 (SSH) 协议对文件进行加密。

我们将通过以下详细步骤了解获取和传输此类数据文件的过程:

  1. 数据提取:编写 SQL 脚本,从数据仓库中的 PostgreSQL 数据库中提取特定数据(本例中为销售、捐赠和劳动力数据)。然后,我们将使用 psycopg2 库将这些 SQL 脚本嵌入到我们的 Python 代码中。
  2. 数据转换:根据外部供应商的期望,使用 Python 转换数据。
  3. 数据加载:将数据上传到外部供应商的远程服务器,仍然使用 Python。
  4. 使用 Windows 任务计划程序安排每日运行。

首先,让我们看看外部供应商的系统期望接收的文件的性质。

外部分析供应商要求的数据规范

在我当前的示例中,文件要求如下:

  • 每天早上 6:00 将销售额、商店捐赠数量、“ADC”捐赠数量和工时数据分别传输给他们一次单独的 CSV 文件。传输的文件应涵盖前一天生成的完整数据。
  • 提供了这些 CSV 文件的模板,以确保提取正确的列并使用 UTf-8 编码。
  • 每个文件的名称应包含特定的字符串加上前一天日期,格式如下:

图片来自作者

图片来自作者

1.数据提取

 

第一步是进入 PostgresSQL 数据库并编写将保存在项目文件夹中的 SQL 脚本。在当前情况下,我们将脚本命名为:wesa_sales.sql、wesa_donors_stores.sql、wesa_donors_adc.sql 和 wesa_labor.sql。这些脚本将分别提取所需的销售额、商店捐赠、“ADC”捐赠和劳动力数据。在此示例中,“商店”和“ADC”是业务的两种位置类型。

SQL 脚本的复杂性取决于要提取的数据类型、它们在数据库中的存储方式以及预期 CSV 文件的要求。在我当前的示例中,以下是我的 SQL 脚本:

-- Extract sales data for previous day
DROP TABLE IF EXISTS public.sales1;
DROP TABLE IF EXISTS public.sales2;

SELECT 
   a.balance_start_dt AS "Date",
 CAST(a.location_id AS TEXT) AS "StoreNo",
 'All' as "Cashier",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Accessories' THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "AccessoriesUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Accessories'::text THEN a.value ELSE 0::numeric END) AS "AccessoriesSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Jewellery'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "JewelryUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Jewellery'::text THEN a.value ELSE 0::numeric END) AS "JewelrySales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Books'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "BookUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Books'::text THEN a.value ELSE 0::numeric END) AS "BookSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'AV'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "AVUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'AV'::text THEN a.value ELSE 0::numeric END) AS "AVSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Electrical'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ElectricalUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Electrical'::text THEN a.value ELSE 0::numeric END) AS "ElectricalSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Furniture'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "FurnitureUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Furniture'::text THEN a.value ELSE 0::numeric END) AS "FurnitureSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Outlet Furniture'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "OutletFurnitureUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Outlet Furniture'::text THEN a.value ELSE 0::numeric END) AS "OutletFurnitureSales",
 sum(CASE WHEN a.pos_dept_desc = 'Outlet'::text AND a.pos_sub_dept_desc != 'Outlet Furniture'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "OutletUnits",
    sum(CASE WHEN a.pos_dept_desc = 'Outlet'::text AND a.pos_sub_dept_desc != 'Outlet Furniture'::text THEN a.value ELSE 0::numeric END) AS "OutletSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Footwear'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ShoeUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Footwear'::text THEN a.value ELSE 0::numeric END) AS "ShoeSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Women'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "WomenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Women'::text THEN a.value ELSE 0::numeric END) AS "WomenSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Men'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "MenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Men'::text THEN a.value ELSE 0::numeric END) AS "MenSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Children'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ChildrenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Children'::text THEN a.value ELSE 0::numeric END) AS "ChildrenSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Boutique'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "BoutiqueUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Boutique'::text THEN a.value ELSE 0::numeric END) AS "BoutiqueSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Linens'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "LinenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Linens'::text THEN a.value ELSE 0::numeric END) AS "LinenSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Collectibles'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "CollectiblesUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Collectibles'::text THEN a.value ELSE 0::numeric END) AS "CollectiblesSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Sporting Goods'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "SportingGoodsUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Sporting Goods'::text THEN a.value ELSE 0::numeric END) AS "SportingGoodsSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Toys'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ToysUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Toys'::text THEN a.value ELSE 0::numeric END) AS "ToysSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Wares'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "WaresUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Wares'::text THEN a.value ELSE 0::numeric END) AS "WaresSales",
 sum(CASE WHEN a.pos_sub_dept_desc = ANY (ARRAY['Seasonal'::text, 'Christmas'::text, 'Halloween%'::text, 'Back to School'::text]) THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "SeasonalUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = ANY (ARRAY['Seasonal'::text, 'Christmas'::text, 'Halloween%'::text, 'Back to School'::text]) THEN a.value ELSE 0::numeric END) AS "SeasonalSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Share The Good'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ShareTheGoodUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Share The Good'::text THEN a.value ELSE 0::numeric END) AS "ShareTheGoodSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Events'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "EventsUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Events'::text THEN a.value ELSE 0::numeric END) AS "EventsSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Commercial Services'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "Commercial-ICUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Commercial Services'::text THEN a.value ELSE 0::numeric END) AS "Commercial-ICSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Gift Card'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "NoGiftCardsIssued",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Gift Card'::text THEN a.value ELSE 0::numeric END) AS "TotalGiftCardsIssued",
 sum(CASE WHEN a.pos_dept_desc = 'Donation'::text THEN CAST(a.qty AS INTEGER) ELSE 0 END) AS "ChangeRoundupUnits",
    sum(CASE WHEN a.pos_dept_desc = 'Donation'::text THEN a.value ELSE 0::numeric END) AS "ChangeRoundup",
 sum(CASE WHEN a.pos_sub_dept_desc ~~ ANY (ARRAY['20LB%'::text, 'Dept%'::text, 'Mask%'::text]) OR a.pos_dept_desc IS NULL THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "OtherUnits",
    sum(CASE WHEN a.pos_sub_dept_desc ~~ ANY (ARRAY['20LB%'::text, 'Dept%'::text, 'Mask%'::text]) OR a.pos_dept_desc IS NULL THEN a.value ELSE 0::numeric END) AS "OtherSales",
 0::numeric AS "TotalTax",
 sum(CASE WHEN CAST(a.location_id AS TEXT) != '' THEN a.value ELSE 0::numeric END) AS "DebugTotalSales"
 into sales1
   FROM dw.pos_upc_sales_bal_v  AS a
  WHERE a.balance_start_dt = (CURRENT_DATE - '1 day'::interval) 
  GROUP BY a.location_id, a.balance_start_dt;
 
SELECT 
 sum(CASE WHEN c.tally_id = '3102'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "TransactionCount",
 sum(CASE WHEN c.tally_id = '130'::text THEN CAST(c.qty AS INTEGER) ELSE 0 END) AS "NoVISATransactions",
    sum(CASE WHEN c.tally_id = '130'::text THEN c.value ELSE 0::numeric END) AS "TotalVISACharges",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['131'::text, '104'::text]) THEN CAST(c.qty AS INTEGER) ELSE 0 END) AS "NoMastercardTransactions",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['131'::text, '104'::text]) THEN c.value ELSE 0::numeric END) AS "TotalMastercardCharges",
 sum(CASE WHEN c.tally_id = '132'::text THEN CAST(c.qty AS INTEGER) ELSE 0 END) AS "NoAMEXTransactions",
    sum(CASE WHEN c.tally_id = '132'::text THEN c.value ELSE 0::numeric END) AS "TotalAMEXCharges",
 sum(CASE WHEN c.tally_id = '133'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoDiscoverTransactions",
    sum(CASE WHEN c.tally_id = '133'::text THEN c.value ELSE 0::numeric END) AS "TotalDiscoverCharges",
 sum(CASE WHEN c.tally_id = '103'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoDebitTransactions",
    sum(CASE WHEN c.tally_id = '103'::text THEN c.value ELSE 0::numeric END) AS "TotalDebitCharges",
 0 AS "NoE-CheckTransactions",
 0 AS "TotalofE-ChecksCollected",
 sum(CASE WHEN c.tally_id = '102'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoPaperCheckTransactions",
    sum(CASE WHEN c.tally_id = '102'::text THEN c.value ELSE 0::numeric END) AS "TotalPaperChecksCollected",
 sum(CASE WHEN c.tally_id = '135'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoStoreCreditsRedeemed",
    sum(CASE WHEN c.tally_id = '135'::text THEN c.value ELSE 0::numeric END) AS "TotalStoreCreditsRedeemed",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['124'::text, '134'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoGiftCardsRedeemed",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['124'::text, '134'::text]) THEN c.value ELSE 0::numeric END) AS "TotalGiftCardsRedeemed",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['117'::text, '129'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoGiftCertificatesRedeemed",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['117'::text, '129'::text]) THEN c.value ELSE 0::numeric END) AS "TotalGiftCertificatesRedeemed",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['153'::text, '125'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoVouchersRedeemed",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['153'::text, '125'::text]) THEN c.value ELSE 0::numeric END) AS "TotalVouchersRedeemed",
 sum(CASE WHEN c.tally_id = '3303'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoOtherCardTransactions", --same as GoodRewardsDiscount, but this is a tender
    sum(CASE WHEN c.tally_id = '3303'::text THEN c.value ELSE 0::numeric END) AS "TotalOtherCardCharges",
 sum(CASE WHEN c.tally_id = '101'::text THEN c.value ELSE 0::numeric END) AS "ExpectedCash",
 sum(CASE WHEN c.tally_id = '101'::text THEN c.value ELSE 0::numeric END) AS "TotalCashDeposit", --hand-counted cash
 0 AS "TotalChecksDeposit",
 sum(CASE WHEN c.tally_id = '1007'::text THEN c.value ELSE 0::numeric END) AS "PettyCash",
 sum(CASE WHEN c.tally_id = '3303'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoGoodRewardsDiscount",
    sum(CASE WHEN c.tally_id = '3303'::text THEN c.value*-1 ELSE 0::numeric END) AS "TotalGoodRewardsDiscount",
 sum(CASE WHEN c.tally_id = '3343'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoTMDiscount",
    sum(CASE WHEN c.tally_id = '3343'::text THEN c.value*-1 ELSE 0::numeric END) AS "TotalTMDiscount",
 sum(CASE WHEN c.tally_id = '3328'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoSeniorsDiscount",
    sum(CASE WHEN c.tally_id = '3328'::text THEN c.value*-1 ELSE 0::numeric END) AS "TotalSeniorsDiscount",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['3301'::text, '3302'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoOtherDiscount",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['3301'::text, '3302'::text]) THEN c.value*-1 ELSE 0::numeric END) AS "TotalOtherDiscount",
 c.location_id AS "StoreNo2",
    c.balance_start_dt AS "Date2"
 into sales2
   FROM dw.pos_clk_sales_bal AS c
  WHERE c.balance_start_dt = (CURRENT_DATE - '1 day'::interval) 
  GROUP BY c.location_id, c.balance_start_dt;
  
 SELECT * FROM
 sales1
 JOIN
 sales2
ON sales1."StoreNo" = sales2."StoreNo2" AND sales1."Date" = sales2."Date2"
;
-- Extract store donations data for previous day
SELECT DISTINCT gia_locations.location_name AS "StoreName",
    don_counts.don_dt AS "Date",
    sum(don_counts.count) AS "GGCDonors",
    0 AS "TextilesHung",
    0 AS "TextilesRotated",
   FROM dw.don_counts
     LEFT JOIN dw.gia_locations ON don_counts.location_id = gia_locations.location_id
  WHERE don_counts.dedupe_flg = 'Y'::text AND (don_counts.count_type <> ALL (ARRAY['Total'::text, 'Hr Sum'::text, 'Vol. Hrs'::text])) 
  AND gia_locations.location_type = 'R'::text 
  AND don_counts.don_dt = (CURRENT_DATE - '1 day'::interval)
  AND gia_locations.current_version_flg = 'Y'::text
  GROUP BY gia_locations.location_name, don_counts.don_dt;
-- Extract ADC donations data for previous day
SELECT DISTINCT 
 replace(replace(don_counts.location_id, '010', '1000'),'090','9000') AS "Store_no",
    don_counts.don_dt AS "PostingDate",
    sum(don_counts.count) AS "No_of_ADC_Donors"
FROM dw.don_counts
LEFT JOIN dw.gia_locations ON don_counts.location_id = gia_locations.location_id
WHERE don_counts.dedupe_flg = 'Y'::text 
 AND (don_counts.count_type <> ALL (ARRAY['Total'::text, 'Hr Sum'::text, 'Vol. Hrs'::text])) 
 AND (gia_locations.location_type = ANY (ARRAY['D'::text, 'W'::text, 'I'::text, 'B'::text])) 
 AND don_counts.don_dt = (CURRENT_DATE - '1 day'::interval)
 AND gia_locations.current_version_flg = 'Y'::text
GROUP BY don_counts.location_id, don_counts.don_dt;
-- Extract labor data for previous day
SELECT gia_locations.location_name AS "Department Description",
  EXTRACT(month FROM emp_hou_bal.balance_start_dt::date) ||
  '/' ||
  EXTRACT(day FROM emp_hou_bal.balance_start_dt::date) ||
  '/' ||
  EXTRACT(year FROM emp_hou_bal.balance_start_dt::date)
  AS "Date",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['REG'::text, 'TRNG'::text]) THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Regular Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['REG'::text, 'TRNG'::text]) THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Regular Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'OT'::text THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Overtime Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'OT'::text THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Overtime Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'BRVMT'::text THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Bereav Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'BRVMT'::text THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Bereav Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['STATH'::text, 'STATS'::text, 'STATW'::text]) THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Holiday Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['STATH'::text, 'STATS'::text, 'STATW'::text]) THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Holiday Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['VACH'::text, 'VAC'::text, 'WELL'::text, 'WELL-Covid'::text, 'PDSUS'::text, 'EBKTK'::text, 'PBKTK'::text,'PDLOA'::text]) THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "PTO Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['VACH'::text, 'VAC'::text, 'WELL'::text, 'WELL-Covid'::text, 'PDSUS'::text, 'EBKTK'::text, 'PBKTK'::text,'PDLOA'::text]) THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "PTO Earnings",
    0 AS "FMLA Hours",
    0 AS "FMLA Earnings"
   FROM dw.emp_hou_bal
     LEFT JOIN dw.gia_locations ON emp_hou_bal.location_id = gia_locations.location_id
  WHERE emp_hou_bal.balance_start_dt = (CURRENT_DATE - '1 day'::interval) AND gia_locations.current_version_flg = 'Y'::text
  AND emp_hou_bal.paycode_name !~~ 'Analytics%'::text AND emp_hou_bal.paycode_name != 'Worked Hours'::text 
  AND (emp_hou_bal.paycode_name <> ALL (ARRAY['$WKND'::text, 'LATE'::text, 'NPD S'::text, 'NPD V'::text, 'NPDLV'::text, 'NOSHW'::text, 'NPWCB'::text, 'NPSUS'::text, 
            'FLEX'::text, 'FLEX TK'::text, 'Un-Approved OT'::text, 'EDUC'::text, 'VACPO'::text, 'VACPT'::text, 'RETRO'::text, 'Worked Hours'::text]))
  AND emp_hou_bal.fin_dept_id = ANY (ARRAY['500'::text, '528'::text, '501'::text])
  AND emp_hou_bal.volunteer = 'N'
  GROUP BY gia_locations.location_name, emp_hou_bal.balance_start_dt;

SQL 脚本完成后,创建一个项目文件夹。在此文件夹中,创建一个名为 sql_queries 的子文件夹,然后将 SQL 脚本保存在其中。

2.数据转换

 

让我们进入 Python 来执行其余的 ETL 过程。

使用您选择的任何代码编辑器打开项目文件夹。我使用的是 VS Code。在这里,再创建 4 个文件:main.py、emailing.py、.env、requirements.txt。

您的项目文件夹结构现在应如下所示:

Project-Folder/
├── sql_queries/
│   ├── wesa_sales.sql
│   ├── wesa_donor_stores.sql
    ├── wesa_donor_adc.sql
    ├── wesa_labor.sql
├── .env
├── emailing.py
├── main.py
└── requirements.txt

安装依赖项

为了将此项目的库与您的计算机隔离并避免任何未来的干扰,建议创建一个虚拟环境,其中将安装与该项目相关的所有依赖项。

在代码编辑器中选择 Python 解释器,然后在终端中运行以下两行代码来创建并激活虚拟环境。我的环境名称是 wesa_env,我使用的是 Windows 命令提示符。

python -m venv wesa_env

wesa_env\Scripts\activate

将以下库输入到您的 requirenments.txt 文件中,然后返回到您的终端(命令提示符)并通过运行下一行将它们安装到您的虚拟环境中。

pandas==2.2.1
psycopg2==2.9.9
paramiko==3.4.0
python-dotenv
python -m pip install -r requirements.txt

辅助函数

在 emailing.py 中,编写一个代码,一旦 SFTP 上传失败,就会向您发送警报电子邮件。下面的代码会创建一封包含错误详细信息的电子邮件,并使用 Gmail 的 SMTP 服务器和您的凭据将其发送给您。本文后面将提供访问 Gmail 的 SMTP 服务器的指南。

# import libraries
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
import os
from dotenv import load_dotenv

load_dotenv() # this loads all the environment variables
#define the send email function
def send_alert_email(error_message):
    # Email configuration
    sender_email = os.getenv("SENDER")
    receiver_email = os.getenv("RECEIVER")
    password = os.getenv("PWD_EMAIL")
    # Email content
    message = MIMEMultipart()
    message['From'] = sender_email
    message['To'] = receiver_email
    message['Subject'] = "SFTP Upload Failed"
    body = f"Error occurred during SFTP upload:\n{error_message}"
    message.attach(MIMEText(body, 'plain'))
    # Connect to SMTP server and send email
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login(sender_email, password)
        server.sendmail(sender_email, receiver_email, message.as_string())
    print("Alert email sent")

将库导入到主 Python 脚本

在您的 main.py 文件中,使用以下代码块导入所需的库:

import csv
from datetime import datetime, date, timedelta
import os
import emailing # this is in the helper module will be created above
import psycopg2
import paramiko
import pandas as pd
from dotenv import load_dotenv

我将解释上述三个库的用途:

  • Psycopg2 是 Python 的 PostgreSQL 数据库适配器。它允许 Python 程序连接数据库并执行 SQL 查询。
  • Paramiko 用于通过 SSH 连接远程服务器。
  • dotenv 用于加载存储在.env 文件中的环境变量。

验证服务器

由于我们需要访问 PostgreSQL 服务器、供应商的远程 SFTP 服务器和 Gmail 的 SMTP 服务器以发送错误电子邮件警报,出于隐私目的,我们将所有凭据作为环境变量存储在我们的 .env 文件中,如下所示:

# Postgres database credentials
SERVER="enter server name"
DATABASE="enter database name"
UID_DB="enter your username"
PWD_DB="enter your password"

# supplier's FTP server credentials. These will be provided to you by the supplier
HOST_FTP ="enter host name"
USER_FTP="enter your username"
PWD_FTP="enter your password"
PORT_FTP="enter the port number"

# Your credentials for Gmail SMPT server
SENDER="enter your gmail address: example@gmail.com"
RECEIVER="enter the email address that will receive the alert"
PWD_EMAIL="enter your generated app password from your google account"

您可以点击此链接了解如何为 GMAIL 的 SMTP 服务器生成应用程序密码。

接下来,在 main.py 中,加载环境变量并获取数据库凭据和远程服务器的 SSH 凭据。

load_dotenv() # this loads all the environment variables

# Get your database credentials from environment variables
SERVER_NAME=os.getenv("SERVER")
DATABASE_NAME=os.getenv("DATABASE")
UID=os.getenv("UID_DB")
PWD=os.getenv("PWD_DB")
PORT="5432"

# Get remote server SSH credentials from environment variables
HostName = os.getenv("HOST_FTP")
UserName = os.getenv("USER_FTP")
PassWord = os.getenv("PWD_FTP")
Port = os.getenv("PORT_FTP")

以下代码块将创建与 PostgreSQL 数据库的连接,执行 sql_queries 文件夹中的每个 SQL 查询,将数据保存在 CSV 文件中,并将 CSV 文件转换为 Pandas 数据框进行清理。

清理后,数据框将被写回到 CSV 文件中。生成的每个 CSV 文件将按供应商要求命名,并保存在临时文件夹 — temp 中。

然后它将关闭与数据库的连接。

请注意每行代码所附的描述。

#Define path to local directory
WORK_SPACE = r"temp"

# Connect to the Postgresql database
    try:
        conn = psycopg2.connect(database=DATABASE_NAME, host=SERVER_NAME, user=UID, password=PWD, port=PORT)
        print("Database connected successfully")
    except:
        print("database not connected")

        # Create a cursor
    cur = conn.cursor()

    # Fetch the SQL queries from the sub-folder
    for filename in os.listdir("sql_queries"):
        sql_query_path = os.path.join("sql_queries", filename)
        # Read the SQL query from the file
        with open(sql_query_path, 'r') as file:
                sql_query = file.read()

        # Execute the SQL query
        cur.execute(sql_query)

        # Fetch all rows from the query result
        rows = cur.fetchall()
        
        # Specify the filename for the sales CSV file and dump the rows in it
        if "sales" in sql_query_path:
            csv_file_path = os.path.join(WORK_SPACE, ("WESA_SALE_NEW_" + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path, 'w', newline='') as f:
                # Create a CSV writer object
                writer = csv.writer(f)
                # Write the header (column names)
                writer.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer.writerows(rows)
            
            # Clean the sales csv file: delete last 2 columns, add leading zeros to 'StoreNo' column and create the 'Credits' column
            data = pd.read_csv(csv_file_path)
            data = data.iloc[:, :-2]
            data['StoreNo'] = data['StoreNo'].astype(str).apply(lambda x: x.zfill(3))
            data['Credits'] = -(data['ShareTheGoodSales'] + data['TotalGiftCardsIssued'] + data['ChangeRoundup'] + data['TotalStoreCreditsRedeemed']).round(2)
            data.to_csv(csv_file_path, sep=',', encoding='utf-8', index=False)

        # Specify the filename for the store donations CSV file and dump the rows in it
        if "stores" in sql_query_path:
            csv_file_path2 = os.path.join(WORK_SPACE, ("WESA_DONR_NEW_" + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path2, 'w', newline='') as f:
                # Create a CSV writer object
                writer2 = csv.writer(f)
                # Write the header (column names)
                writer2.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer2.writerows(rows)

            # Clean the store donations csv file: round the GGCDonors column
            data = pd.read_csv(csv_file_path2)
            data['GGCDonors'] = data['GGCDonors'].round(0).astype(int)
            data.to_csv(csv_file_path2, sep=',', encoding='utf-8', index=False)   

        # Specify the filename for the ADC donations CSV file and dump the rows in it
        if "donors_adc" in sql_query_path:
            csv_file_path3 = os.path.join(WORK_SPACE, ("WESA_ADC_" + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path3, 'w', newline='') as f:
                # Create a CSV writer object
                writer3 = csv.writer(f)
                # Write the header (column names)
                writer3.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer3.writerows(rows)

            # Clean the ADC donations CSV file: round the No_of_ADC_Donors column
            data = pd.read_csv(csv_file_path3)
            data['No_of_ADC_Donors'] = data['No_of_ADC_Donors'].round(0).astype(int)
            data.to_csv(csv_file_path3, sep=',', encoding='utf-8', index=False)   

        # Specify the filename for the labor CSV file and dump the rows in it
        if "labor" in sql_query_path:
            csv_file_path4 = os.path.join(WORK_SPACE, ("Labor_Payroll." + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d%H%M%S")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path4, 'w', newline='') as f:
                # Create a CSV writer object
                writer4 = csv.writer(f)
                # Write the header (column names)
                writer4.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer4.writerows(rows)
        
            # Clean the labor CSV file: Rename some values in the Department Description column
            data = pd.read_csv(csv_file_path4)
            data['Department Description'] = data['Department Description'].replace({'Impact Centre Eic':'Edmonton Outlet Store', 'Impact Centre Cic':'Calgary Outlet Store'})
            data.to_csv(csv_file_path4, sep=',', encoding='utf-8', index=False)
        
    print("All 4 files created")

    # Close the cursor and connection
    cur.close()
    conn.close()

3. 数据上传

 

在同一个main.py中,继续代码将文件传输到供应商的远程服务器,如下所示。

代码将指定目标目录,使用 SFTP 客户端与远程服务器建立安全外壳连接,循环遍历临时文件夹中的所有文件,将它们传输到远程目录,然后关闭连接。

# Data transfer from temp folder to WESA remote Server

    # Specify the remote directory
    remote_directory = "/RemoteFTP-NEW"

    # create ssh client and connect to remote server
    SSH_Client= paramiko.SSHClient()
    SSH_Client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    SSH_Client.connect( hostname=HostName, port=Port, username=UserName,
                    password= PassWord)

    # create an SFTP client object for the connection
    sftp_Client = SSH_Client.open_sftp()

    # transfer files to the remote server
    #loop through all files in the local directory and upload only files to the remote directory
    for file_name in os.listdir(WORK_SPACE):
        local_file_path = os.path.join(WORK_SPACE, file_name)
        if os.path.isfile(local_file_path):
            remote_file_path = os.path.join(remote_directory, file_name)
            sftp_Client.put(local_file_path, remote_file_path)

    print("All 4 files transferred")

    # close the connection
    sftp_Client.close()
    SSH_Client.close()

错误处理

最后两个代码块可以放在 try-catch 块中,这样一旦出现任何错误,就会通过电子邮件警报向您发送异常报告。发送此警报将通过本文中创建的电子邮件辅助函数来实现。

try:
  # insert the lines above from 'Connecting to the Postgresql database' to 'closing the SSH client'

except Exception as e:
    emailing.send_alert_email(str(e))  # Sending an email alert
    print("Upload failed:", str(e))

最后,在同一个 main.py 中,添加以下行以清空临时文件夹,以便准备好在第二天接收新文件。

#Empty the temporary local folder
for file_name in os.listdir(WORK_SPACE):
    local_file_path = os.path.join(WORK_SPACE, file_name)
    if os.path.isfile(local_file_path):
        os.remove(local_file_path)

4. 任务调度

 

使用 Windows 任务计划程序,安排 main.py Python 文件每天早上 6:00 运行。

必要时,您可以使用 WinSCP 查看并确保传输的文件确实位于远程服务器中。但这是可选的,并且不是此 ETL 过程的一部分。

警告

请注意,上述整个 ETL 流程仅适用于 Windows 用户。如果您是 Mac 或 Linux 用户,则需要采用其他方式来完成某些步骤。

结论

总而言之,本文展示了利用 SQL 和 Python 库简化数据传输流程的自动化功能。

从 PostgreSQL 数据库中提取数据并清理数据到通过 SFTP 安全地传输数据,自动化系统显著减少了人工工作量和人为错误。

通过 Gmail 的 SMTP 服务器集成可靠的电子邮件警报系统进一步确保及时解决文件传输过程中的任何问题。

安排脚本每天运行可确保数据更新的一致性,从而使该解决方案成为可靠的数据管道。

感谢关注雲闪世界。(Aws解决方案架构师vs开发人员&GCP解决方案架构师vs开发人员)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2075224.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

神经网络模型剪枝快速指南

模型剪枝&#xff08;Model Pruning&#xff09;是指从深度学习神经网络模型中删除不重要的参数&#xff0c;以减小模型大小并实现更高效的模型推理。通常&#xff0c;只剪枝参数的权重&#xff0c;而不影响偏差。偏差的剪枝往往有更明显的缺点。 非结构化剪枝期间权重如何归零…

书生.浦江大模型实战训练营——(十)Lagent 自定义你的 Agent 智能体

最近在学习书生.浦江大模型实战训练营&#xff0c;所有课程都免费&#xff0c;以关卡的形式学习&#xff0c;也比较有意思&#xff0c;提供免费的算力实战&#xff0c;真的很不错&#xff08;无广&#xff09;&#xff01;欢迎大家一起学习&#xff0c;打开LLM探索大门&#xf…

【9月持续更新】国内ChatGPT-4中文镜像网站整理~

以前我也是通过官网使用&#xff0c;但是经常被封号&#xff0c;就非常不方便&#xff0c;后来有朋友推荐国内工具&#xff0c;用了一阵之后&#xff0c;发现&#xff1a;稳定方便&#xff0c;用着也挺好的。 最新的 GPT-4o、4o mini&#xff0c;可搭配使用~ 1、 最新模型科普&…

遗传算法整合talib技术分析算子做因子挖掘,比如ADX, 阿隆指标等

“ 原创内容第631篇&#xff0c;专注量化投资、个人成长与财富自由” 七年实现财富自由 七年&#xff0c;经过十万小时刻意练习&#xff0c;足矣在任何领域成为专家。 七年&#xff0c;成为自己的财富管理专家。 七年&#xff0c;实现财富自由。 1512篇原创内容 公众号 星球…

怎样恢复微信聊天记录?4个巧妙方法,速来学习!

微信不仅是我们的通讯工具&#xff0c;更是情感的载体&#xff0c;每一句“早安”与“晚安”都藏着不为人知的温柔。但有时候这些珍贵的聊天记录却会离家出走。怎么恢复微信聊天记录&#xff1f;就成为我们需要解答的难题。 别担心&#xff0c;今天&#xff0c;小编我将化身为…

PostgresSQL--基于Kubernetes部署PostgresSQL

基于docker 拉取镜像&#xff0c;这个镜像是我自己的阿里云镜像&#xff0c;拉取的国外的镜像。 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/postgres:latest创建 dolphinscheduler 命名空间&#xff0c;本文命名空间是使用的dolphinscheduler 使用 kubectl…

基于元神操作系统编写(FPU)数学计算程序

1. 背景 数学计算已经成为计算机的主要工作之一&#xff0c;尤其是实数运算&#xff0c;在人工智能时代更是普遍存在&#xff0c;神经网络中的绝大部分参数都用的实数。 2. 方法 &#xff08;1&#xff09;FPU运算 计算机中的实数运算是通过数学协处理器FPU完成的&#xff…

黑神话悟空配置要求:CPU/内存/显卡/存储和系统最低限制

玩《黑神话&#xff1a;悟空》对电脑配置有什么要求&#xff1f;至少需要i5处理器、16G内存、GTX 1060显卡、130G空闲磁盘空间&#xff0c;没有高配电脑怎么办&#xff1f;码笔记整理详细配置如下&#xff1a; CPU处理器&#xff1a;64位处理器&#xff0c;CPU选择Intel Core …

数据防泄密知识集锦丨八个实用数据防泄露软件,你知道吗

数据已成为企业的核心资产。 然而&#xff0c;随着网络威胁的日益严峻&#xff0c;数据泄露事件频发&#xff0c;给企业带来了巨大的经济损失和声誉风险。 为了有效保护企业数据的安全性和保密性&#xff0c;各种数据防泄露软件应运而生。 本文将为您介绍八个实用的数据防泄露…

ROS机器人专用云台相机防抖摄像头

【告别模糊】机器人专用摄像头&#xff0c;为您的视觉算法保驾护航 产品概述 Autolabor C1专为机器人设计的高性能摄像头&#xff0c;即使在没有减震装置或不平坦的路面上&#xff0c;也能提供清晰稳定的图像。它拥有先进的主动式机械防抖和数字ISP防抖技术&#xff0c;图像效…

基于太阳能供电的水情监测站设计(论文+源码+图纸)

1.总体方案设计 根据水情监测站系统的实际应用需求&#xff0c;从硬件电路以及软件程序两个方面展开系统设计。按照系统设计功能以及功能选型的结果&#xff0c;制定了如图2.11所示的系统总体框图。系统采用STM32单片机作为控制器&#xff0c;在传感器检测模块中包括的DS18B20…

netty编程之使用ChannelOutboundHandler对write出去的消息做不同处理

写在前面 源码 。 在进行网络编程的时候&#xff0c;不可避免的需要对write出去的消息做一些处理&#xff0c;比如脱敏&#xff0c;增加统一数据等。而netty提供了ChannelOutboundHandler来允许我们拦截消息从而可以对消息进行处理。对应的接口是io.netty.channel.ChannelHand…

Python:win10下一种不用编译,直接下载二进制依赖的方法

python依赖的安装&#xff0c;在win环境下&#xff0c; 有些包还是比较麻烦&#xff0c; 经常编译失败&#xff0c; 我曾发帖讨论过多次&#xff0c;有帖为证&#xff01;点此进入&#xff01; https://blog.csdn.net/weixin_62598385/article/details/135945383 win下的Pyth…

基于vue.js和node.js的酒坊销售网站的设计与实现---附源码98047

目 录 摘要 1 绪论 1.1研究背景与意义 1.3研究内容 1.4论文结构与章节安排 2 酒坊销售网站分析 2.1 可行性分析 2.2系统流程分析 2.2.1 数据增加流程 2.2.2 数据修改流程 2.2.3 数据删除流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例…

实战分享:利用两大在线平台实现自动化数据采集的技巧

本文将深入探讨如何运用两大主流在线平台&#xff0c;通过实战案例分享&#xff0c;揭示自动化数据采集的高效技巧。无需编程基础&#xff0c;也能快速掌握跨平台数据抓取秘籍&#xff0c;助力企业和个人提升市场竞争力与决策效率。 正文 在大数据时代背景下&#xff0c;信息…

ESP8266通过WiFiManager实现Web配网

背景 一个项目中使用到了一款压力传感器,需要通过单片机实现数据的采集并发送到远程的服务器上,单片机采用的时ESP8266,通过WiFiManager实现局域网配置,以及远端服务器IP地址和服务端口的配置。发布此文章记录一下使用WiFiManager实现配网的方法。 程序流程图 示例代码 …

如何下载GB2312字体,免费

因为写文章需要用到&#xff0c;然后wps里面这个是收费的&#xff0c;所以我就去找了免费的&#xff0c;现在分享给大家。 因为我看网上很多都是给一个网址&#xff0c;有些网址已经坏了&#xff0c;所以我这里给一下我的链接 链接&#xff1a;https://pan.baidu.com/s/1wiyF…

如何用Java SpringBoot+Vue构建高效的产品订单管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

python 爬虫,东方网 上海新闻, 简单数据分析

起因: 本来想去市区玩玩&#xff0c;结果搜到一些相关的新闻&#xff0c;所以就想爬取新闻网站… 1. 爬虫部分 import os import csv import time import requests""" # home: https://sh.eastday.com/ # 1. 标题, url&#xff0c; 来源&#xff0c;时间 &qu…

SQL进阶技巧:近距离有效的缺失值填充问题【last_value实现版】

目录 0 场景描述 1 数据准备 2 问题分析 3 小结 0 场景描述 场景:现在有一张商品入库表,包括商品id、商品成本和入库日期3个字段,由于某些原因,导致部分商品的成本缺失(为0或者没有值都是缺失),这样不利于我们计算成本。所以现在要把缺失的商品进价补充完整,补充的…