C++和Python实现SQL Server数据库导出数据到S3并导入Redshift数据仓库

news2025/1/30 20:05:05

用C++实现高性能数据处理,Python实现操作Redshift导入数据文件。
在Visual Studio 2022中用C++和ODBC API导出SQL Server数据库中张表中的所有表的数据为CSV文件格式的数据流,用逗号作为分隔符,用双引号包裹每个数据,字符串类型的数据去掉前后的空格,数据中如果包含双引号,则将一个双引号替换为两个双引号,创建gzip压缩文件,输出数据流写入到gzip压缩文件包中的唯一的csv文件中,一张表对应一个gzip压缩文件。用异步编程的方法优化数据库表的读取和写入文件的过程,每个gzip压缩文件包中的csv文件的第一行都是用逗号分割的表头,对应数据库表的字段,程序需要异常处理和日志文件输出,当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导出数据行数、成功或者失败的状态,日志文件不存在则创建。

以下是一个使用C++和ODBC API在Visual Studio 2022中实现将SQL Server数据库表数据导出为CSV文件并进行gzip压缩的示例代码。代码使用异步编程优化数据库读取和文件写入过程,并包含异常处理和日志记录功能。

#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <future>
#include <sql.h>
#include <sqlext.h>
#include <zlib.h>
#include <ctime>
#include <iomanip>

// 日志记录函数
void logMessage(const std::string& message) {
    std::ofstream logFile("export_log.txt", std::ios::app);
    if (logFile.is_open()) {
        auto now = std::chrono::system_clock::now();
        std::time_t now_c = std::chrono::system_clock::to_time_t(now);
        std::tm* now_tm = std::localtime(&now_c);
        std::ostringstream oss;
        oss << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S") << " " << message << std::endl;
        logFile << oss.str();
        logFile.close();
    }
}

// 处理字符串中的双引号
std::string escapeDoubleQuotes(const std::string& str) {
    std::string result = str;
    size_t pos = 0;
    while ((pos = result.find('"', pos))!= std::string::npos) {
        result.replace(pos, 1, 2, '"');
        pos += 2;
    }
    return result;
}

// 从数据库读取表数据
std::vector<std::vector<std::string>> readTableData(SQLHSTMT hstmt) {
    std::vector<std::vector<std::string>> data;
    SQLSMALLINT columnCount = 0;
    SQLNumResultCols(hstmt, &columnCount);

    std::vector<SQLCHAR*> columns(columnCount);
    std::vector<SQLINTEGER> lengths(columnCount);
    for (SQLSMALLINT i = 0; i < columnCount; ++i) {
        columns[i] = new SQLCHAR[SQL_MAX_MESSAGE_LENGTH];
        SQLBindCol(hstmt, i + 1, SQL_C_CHAR, columns[i], SQL_MAX_MESSAGE_LENGTH, &lengths[i]);
    }

    while (SQLFetch(hstmt) == SQL_SUCCESS) {
        std::vector<std::string> row;
        for (SQLSMALLINT i = 0; i < columnCount; ++i) {
            std::string value(reinterpret_cast<const char*>(columns[i]));
            value = escapeDoubleQuotes(value);
            row.push_back(value);
        }
        data.push_back(row);
    }

    for (SQLSMALLINT i = 0; i < columnCount; ++i) {
        delete[] columns[i];
    }

    return data;
}

// 将数据写入CSV文件
void writeToCSV(const std::vector<std::vector<std::string>>& data, const std::vector<std::string>& headers, const std::string& filename) {
    std::ofstream csvFile(filename);
    if (csvFile.is_open()) {
        // 写入表头
        for (size_t i = 0; i < headers.size(); ++i) {
            csvFile << '"' << headers[i] << '"';
            if (i < headers.size() - 1) csvFile << ',';
        }
        csvFile << std::endl;

        // 写入数据
        for (const auto& row : data) {
            for (size_t i = 0; i < row.size(); ++i) {
                csvFile << '"' << row[i] << '"';
                if (i < row.size() - 1) csvFile << ',';
            }
            csvFile << std::endl;
        }

        csvFile.close();
    } else {
        throw std::runtime_error("Failed to open CSV file for writing");
    }
}

// 压缩CSV文件为gzip
void compressCSV(const std::string& csvFilename, const std::string& gzipFilename) {
    std::ifstream csvFile(csvFilename, std::ios::binary);
    std::ofstream gzipFile(gzipFilename, std::ios::binary);
    if (csvFile.is_open() && gzipFile.is_open()) {
        gzFile gzOut = gzopen(gzipFilename.c_str(), "wb");
        if (gzOut) {
            char buffer[1024];
            while (csvFile.read(buffer, sizeof(buffer))) {
                gzwrite(gzOut, buffer, sizeof(buffer));
            }
            gzwrite(gzOut, buffer, csvFile.gcount());
            gzclose(gzOut);
        } else {
            throw std::runtime_error("Failed to open gzip file for writing");
        }
        csvFile.close();
        gzipFile.close();
        std::remove(csvFilename.c_str());
    } else {
        throw std::runtime_error("Failed to open files for compression");
    }
}

// 导出单个表
void exportTable(const std::string& server, const std::string& database, const std::string& schema, const std::string& table) {
    SQLHENV henv = nullptr;
    SQLHDBC hdbc = nullptr;
    SQLHSTMT hstmt = nullptr;

    try {
        SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
        SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);
        SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);

        std::string connectionString = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=" + server + ";DATABASE=" + database + ";UID=your_username;PWD=your_password";
        SQLRETURN ret = SQLDriverConnect(hdbc, nullptr, (SQLCHAR*)connectionString.c_str(), SQL_NTS, nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT);
        if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {
            throw std::runtime_error("Failed to connect to database");
        }

        std::string query = "SELECT * FROM " + schema + "." + table;
        SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
        ret = SQLExecDirect(hstmt, (SQLCHAR*)query.c_str(), SQL_NTS);
        if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {
            throw std::runtime_error("Failed to execute query");
        }

        std::vector<std::vector<std::string>> data = readTableData(hstmt);
        std::vector<std::string> headers;
        SQLSMALLINT columnCount = 0;
        SQLNumResultCols(hstmt, &columnCount);
        for (SQLSMALLINT i = 0; i < columnCount; ++i) {
            SQLCHAR columnName[SQL_MAX_COLUMN_NAME_LEN];
            SQLSMALLINT nameLen;
            SQLDescribeCol(hstmt, i + 1, columnName, SQL_MAX_COLUMN_NAME_LEN, &nameLen, nullptr, nullptr, nullptr, nullptr);
            headers.push_back(reinterpret_cast<const char*>(columnName));
        }

        std::string csvFilename = table + ".csv";
        writeToCSV(data, headers, csvFilename);
        std::string gzipFilename = table + ".gz";
        compressCSV(csvFilename, gzipFilename);

        std::ostringstream oss;
        oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Rows Exported: " << data.size() << ", Status: Success";
        logMessage(oss.str());
    } catch (const std::exception& e) {
        std::ostringstream oss;
        oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Status: Failed, Error: " << e.what();
        logMessage(oss.str());
    } finally {
        if (hstmt) SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
        if (hdbc) SQLDisconnect(hdbc); SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
        if (henv) SQLFreeHandle(SQL_HANDLE_ENV, henv);
    }
}

int main() {
    std::string server = "<实例名>";
    std::string database = "<数据库名>";
    std::string schema = "<Schema名>";
    std::vector<std::string> tables = {"<表名1>", "<表名2>", "<表名3>"};

    std::vector<std::future<void>> futures;
    for (const auto& table : tables) {
        futures.push_back(std::async(std::launch::async, exportTable, server, database, schema, table));
    }

    for (auto& fut : futures) {
        fut.get();
    }

    return 0;
}

代码说明:

日志记录: logMessage 函数用于记录操作日志,包括时间戳、数据库名、schema名、表名、导出数据行数和操作状态。
字符串处理: escapeDoubleQuotes 函数用于处理字符串中的双引号,将其替换为两个双引号。
数据库读取: readTableData 函数使用ODBC API从数据库中读取表数据,并将其存储在二维向量中。
CSV写入: writeToCSV 函数将数据写入CSV文件,包括表头和数据行,并用双引号包裹每个数据,使用逗号作为分隔符。
文件压缩: compressCSV 函数将生成的CSV文件压缩为gzip格式,并删除原始CSV文件。
表导出: exportTable 函数负责连接数据库、执行查询、读取数据、写入CSV文件并压缩。
主函数: main 函数定义了数据库服务器、数据库名、schema名和表名,并使用异步任务并行导出每个表的数据。

用Python删除当前目录下所有功能扩展名为gz文件,接着运行export_sqlserver.exe程序,输出该程序的输出内容并等待它运行完成,然后连接SQL Server数据库和Amazon Redshift数据仓库,从数据库中获取所有表和它们的字段名,然后在Redshift中创建字段名全部相同的同名表,字段长度全部为最长的varchar类型,如果表已经存在则不创建表,自动上传当前目录下所有功能扩展名为gz文件到S3,默认覆盖同名的文件,然后使用COPY INTO将S3上包含csv文件的gz压缩包导入对应创建的Redshift表中,文件数据的第一行是表头,导入所有上传的文件到Redshift表,程序需要异常处理和日志文件输出,当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导入数据行数、成功或者失败的状态,日志文件不存在则创建。

import os
import subprocess
import pyodbc
import redshift_connector
import boto3
import logging
from datetime import datetime


# 配置日志记录
logging.basicConfig(filename='operation_log.log', level=logging.INFO,
                    format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')


def delete_gz_files():
    try:
        for file in os.listdir('.'):
            if file.endswith('.gz'):
                os.remove(file)
        logging.info('所有.gz文件已删除')
    except Exception as e:
        logging.error(f'删除.gz文件时出错: {e}')


def run_export_sqlserver():
    try:
        result = subprocess.run(['export_sqlserver.exe'], capture_output=True, text=True)
        print(result.stdout)
        logging.info('export_sqlserver.exe运行成功')
    except Exception as e:
        logging.error(f'运行export_sqlserver.exe时出错: {e}')


def create_redshift_tables():
    # SQL Server 连接配置
    sqlserver_conn_str = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_sqlserver_server;DATABASE=your_database;UID=your_username;PWD=your_password'
    try:
        sqlserver_conn = pyodbc.connect(sqlserver_conn_str)
        sqlserver_cursor = sqlserver_conn.cursor()

        # Redshift 连接配置
        redshift_conn = redshift_connector.connect(
            host='your_redshift_host',
            database='your_redshift_database',
            user='your_redshift_user',
            password='your_redshift_password',
            port=5439
        )
        redshift_cursor = redshift_conn.cursor()

        sqlserver_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
        tables = sqlserver_cursor.fetchall()

        for table in tables:
            table_name = table[0]
            sqlserver_cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
            columns = sqlserver_cursor.fetchall()
            column_definitions = ', '.join([f"{column[0]} VARCHAR(MAX)" for column in columns])

            try:
                redshift_cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({column_definitions})")
                redshift_conn.commit()
                logging.info(f'在Redshift中成功创建表 {table_name}')
            except Exception as e:
                logging.error(f'在Redshift中创建表 {table_name} 时出错: {e}')

        sqlserver_conn.close()
        redshift_conn.close()
    except Exception as e:
        logging.error(f'连接数据库或创建表时出错: {e}')


def upload_gz_files_to_s3():
    s3 = boto3.client('s3')
    bucket_name = 'your_bucket_name'
    try:
        for file in os.listdir('.'):
            if file.endswith('.gz'):
                s3.upload_file(file, bucket_name, file)
                logging.info(f'成功上传文件 {file} 到S3')
    except Exception as e:
        logging.error(f'上传文件到S3时出错: {e}')


def copy_data_to_redshift():
    redshift_conn = redshift_connector.connect(
        host='your_redshift_host',
        database='your_redshift_database',
        user='your_redshift_user',
        password='your_redshift_password',
        port=5439
    )
    redshift_cursor = redshift_conn.cursor()
    bucket_name = 'your_bucket_name'
    try:
        for file in os.listdir('.'):
            if file.endswith('.gz') and file.endswith('.csv.gz'):
                table_name = file.split('.')[0]
                s3_path = f's3://{bucket_name}/{file}'
                sql = f"COPY {table_name} FROM '{s3_path}' IAM_ROLE 'your_iam_role' CSV HEADER"
                try:
                    redshift_cursor.execute(sql)
                    redshift_conn.commit()
                    row_count = redshift_cursor.rowcount
                    logging.info(f'成功将数据导入表 {table_name},导入行数: {row_count}')
                except Exception as e:
                    logging.error(f'将数据导入表 {table_name} 时出错: {e}')
    except Exception as e:
        logging.error(f'连接Redshift或导入数据时出错: {e}')
    finally:
        redshift_conn.close()


if __name__ == "__main__":
    delete_gz_files()
    run_export_sqlserver()
    create_redshift_tables()
    upload_gz_files_to_s3()
    copy_data_to_redshift()

代码说明:

日志记录:使用 logging 模块配置日志记录,记录操作的时间戳和操作信息到 operation_log.log 文件。
删除.gz文件: delete_gz_files 函数删除当前目录下所有扩展名为 .gz 的文件。
运行export_sqlserver.exe: run_export_sqlserver 函数运行 export_sqlserver.exe 程序并输出其内容。
创建Redshift表: create_redshift_tables 函数连接SQL Server和Redshift数据库,获取SQL Server中所有表和字段名,在Redshift中创建同名表,字段类型为 VARCHAR(MAX) 。
上传.gz文件到S3: upload_gz_files_to_s3 函数上传当前目录下所有扩展名为 .gz 的文件到S3。
将数据从S3导入Redshift: copy_data_to_redshift 函数使用 COPY INTO 语句将S3上的CSV压缩包数据导入对应的Redshift表中。

请根据实际的数据库配置、S3桶名和IAM角色等信息修改代码中的相关参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2286579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分享| RL-GPT 框架通过慢agent和快agent结合提高AI解决复杂任务的能力-Arxiv

结论 “RL-GPT: Integrating Reinforcement Learning and Code-as-policy” RL-GPT 框架为解决大语言模型在复杂任务处理中的难题提供了创新有效的途径&#xff0c; 旨在将强化学习&#xff08;RL&#xff09;和代码即策略相结合&#xff0c; 以解决大语言模型&#xff08…

Prompt提示词完整案例:让chatGPT成为“书单推荐”的高手

大家好&#xff0c;我是老六哥&#xff0c;我正在共享使用AI提高工作效率的技巧。欢迎关注我&#xff0c;共同提高使用AI的技能&#xff0c;让AI成功你的个人助理。 许多人可能会跟老六哥一样&#xff0c;有过这样的体验&#xff1a;当我们遇到一个能力出众或对事物有独到见解的…

【开源免费】基于SpringBoot+Vue.JS在线考试学习交流网页平台(JAVA毕业设计)

本文项目编号 T 158 &#xff0c;文末自助获取源码 \color{red}{T158&#xff0c;文末自助获取源码} T158&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

如何解压rar格式文件?8种方法(Win/Mac/手机/网页端)

RAR 文件是一种常见的压缩文件格式&#xff0c;由尤金・罗谢尔&#xff08;Eugene Roshal&#xff09;开发&#xff0c;因其扩展名 “rar” 而得名。它通过特定算法将一个或多个文件、文件夹进行压缩&#xff0c;大幅减小存储空间&#xff0c;方便数据传输与备份。然而&#xf…

Kafka 副本机制(包含AR、ISR、OSR、HW 和 LEO 介绍)

文章目录 Kafka 副本机制&#xff08;包含AR、ISR、OSR、HW 和 LEO 介绍&#xff09;1. 副本的基本概念2. 副本同步和一致性2.1 AR&#xff08;Assigned Replicas&#xff09;2.2 ISR&#xff08;In-Sync Replicas&#xff09;2.3 OSR&#xff08;Out-of-Sync Replicas&#xf…

网关登录校验

网关登录校验 单体架构时我们只需要完成一次用户登录、身份校验&#xff0c;就可以在所有业务中获取到用户信息。而微服务拆分后&#xff0c;每个微服务都独立部署&#xff0c;不再共享数据。也就意味着每个微服务都需要做登录校验&#xff0c;这显然不可取。 鉴权思路分析 …

【C语言】在Windows上为可执行文件.exe添加自定义图标

本文详细介绍了在 Windows 环境下,如何为使用 GCC 编译器编译的 C程序 添加自定义图标,从而生成带有图标的 .exe 可执行文件。通过本文的指导,读者可以了解到所需的条件以及具体的操作步骤,使生成的程序更具专业性和个性化。 目录 1. 准备条件2. 具体步骤步骤 1: 准备资源文…

计算机毕业设计Python+知识图谱大模型AI医疗问答系统 健康膳食推荐系统 食谱推荐系统 医疗大数据 机器学习 深度学习 人工智能 爬虫 大数据毕业设计

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

商品信息管理自动化测试

目录 前言 一、思维导图 二、代码编写 1.在pom.xml文件中添加相关依赖 2.自动化代码编写 三、代码测试 小结 前言 1. 针对商品信息管理项目进行测试&#xff0c;商品信息管理项目主要有商品列表页、部门列表页、员工列表页&#xff0c;主要功能&#xff1a;对商品信息的…

【实践】基于SakuraLLM的离线日文漫画及视频汉化

介绍 LLM 大型语言模型&#xff08;英语&#xff1a;large language model&#xff0c;LLM&#xff09;&#xff0c;也称大语言模型&#xff0c;是由具有大量参数&#xff08;通常数十亿个权重或更多&#xff09;的人工神经网络组成的一类语言模型。在进行语言理解与分析&…

常见的同态加密算法收集

随着对crypten与密码学的了解&#xff0c;我们将逐渐深入学习相关知识。今天&#xff0c;我们将跟随同态加密的发展历程对相关算法进行简单的收集整理 。 目录 同态加密概念 RSA算法 ElGamal算法 ELGamal签名算法 Paillier算法 BGN方案 Gentry 方案 BGV 方案 BFV 方案…

SSM-MyBatis-总结

文章目录 一、Hello MyBatis1.1 流程1.2 总结 二、Crud 的一些注意点三、参数传递3.1 #{ } VS ${ }3.2 单、复参数传递&#xff08;1&#xff09;单参数&#xff08;2&#xff09;多参数 -- Param&#xff08;3&#xff09;总结 四、查询结果返回--结果封装4.1 ResultType 一般…

万字长文总结前端开发知识---JavaScriptVue3Axios

JavaScript学习目录 一、JavaScript1. 引入方式1.1 内部脚本 (Inline Script)1.2 外部脚本 (External Script) 2. 基础语法2.1 声明变量2.2 声明常量2.3 输出信息 3. 数据类型3.1 基本数据类型3.2 模板字符串 4. 函数4.1 具名函数 (Named Function)4.2 匿名函数 (Anonymous Fun…

Flutter android debug 编译报错问题。插件编译报错

下面相关内容 都以 Mac 电脑为例子。 一、问题 起因&#xff1a;&#xff08;更新 Android studio 2024.2.2.13、 Flutter SDK 3.27.2&#xff09; 最近 2025年 1 月 左右&#xff0c;我更新了 Android studio 和 Flutter SDK 再运行就会出现下面的问题。当然 下面的提示只是其…

【Proteus仿真】【51单片机】简易计算器系统设计

目录 一、主要功能 二、使用步骤 三、硬件资源 四、软件设计 五、实验现象 联系作者 一、主要功能 1、LCD1602液晶显示 2、矩阵按键​ 3、可以进行简单的加减乘除运算 4、最大 9999*9999 二、使用步骤 系统运行后&#xff0c;LCD1602显示数据&#xff0c;通过矩阵按键…

JavaScript函数中this的指向

总结&#xff1a;谁调用我&#xff0c;我就指向谁&#xff08;es6箭头函数不算&#xff09; 一、ES6之前 每一个函数内部都有一个关键字是 this &#xff0c;可以直接使用 重点&#xff1a; 函数内部的 this 只和函数的调用方式有关系&#xff0c;和函数的定义方式没有关系 …

51单片机入门_01_单片机(MCU)概述(使用STC89C52芯片;使用到的硬件及课程安排)

文章目录 1. 什么是单片机1.1 微型计算机的组成1.2 微型计算机的应用形态1.3 单板微型计算机1.4 单片机(MCU)1.4.1 单片机内部结构1.4.2 单片机应用系统的组成 1.5 80C51单片机系列1.5.1 STC公司的51单片机1.5.1 STC公司单片机的命名规则 2. 单片机的特点及应用领域2.1 单片机的…

51单片机入门_02_C语言基础0102

C语言基础部分可以参考我之前写的专栏C语言基础入门48篇 以及《从入门到就业C全栈班》中的C语言部分&#xff0c;本篇将会结合51单片机讲差异部分。 课程主要按照以下目录进行介绍。 文章目录 1. 进制转换2. C语言简介3. C语言中基本数据类型4. 标识符与关键字5. 变量与常量6.…

时间轮:XXL-JOB 高效、精准定时任务调度实现思路分析

大家好&#xff0c;我是此林。 定时任务是我们项目中经常会遇到的一个场景。那么如果让我们手动来实现一个定时任务框架&#xff0c;我们会怎么做呢&#xff1f; 1. 基础实现&#xff1a;简单的线程池时间轮询 最直接的方式是创建一个定时任务线程池&#xff0c;用户每提交一…

人工智能如何驱动SEO关键词优化策略的转型与效果提升

内容概要 随着数字化时代的到来&#xff0c;人工智能&#xff08;AI&#xff09;技术对各行各业的影响日益显著&#xff0c;在搜索引擎优化&#xff08;SEO&#xff09;领域尤为如此。AI的应用不仅改变了关键词研究的方法&#xff0c;而且提升了内容生成和搜索优化的效率&…