PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

news2025/2/3 8:56:26

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

import json
from pyspark.sql import SparkSession

def load_config(config_path):
    with open(config_path, 'r') as f:
        return json.load(f)

def main(config_path, base_s3_path):
    # 初始化SparkSession,配置S3和Excel支持
    spark = SparkSession.builder \
        .appName("DataExportJob") \
        .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \
        .getOrCreate()

    # 配置S3访问(根据实际环境配置)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

    config = load_config(config_path)

    for template in config['templates']:
        label = template['label']
        sql_template = template['sql_template']
        parameters_list = template['parameters']

        for params in parameters_list:
            # 验证参数数量是否匹配
            placeholders = sql_template.count('{')
            if len(params) != placeholders:
                raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")

            # 替换SQL中的占位符
            formatted_sql = sql_template.format(*params)
            df = spark.sql(formatted_sql)

            # 生成文件名参数部分
            param_str = "_".join(params)
            base_filename = f"{label}_{param_str}"

            # 定义输出路径
            output_paths = {
                'parquet': f"{base_s3_path}/parquet/{base_filename}",
                'csv': f"{base_s3_path}/csv/{base_filename}",
                'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"
            }

            # 写入Parquet
            df.write.mode('overwrite').parquet(output_paths['parquet'])

            # 写入CSV(自动生成header)
            df.write.mode('overwrite') \
                .option("header", "true") \
                .csv(output_paths['csv'])

            # 写入Excel(使用spark-excel包)
            df.write.format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .mode("overwrite") \
                .save(output_paths['excel'])

    spark.stop()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')
    parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')
    args = parser.parse_args()

    main(args.config, args.s3_path)

配置文件示例(config.json)

{
  "templates": [
    {
      "label": "sales_report",
      "sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'",
      "parameters": [
        ["202301", "north"],
        ["202302", "south"]
      ]
    },
    {
      "label": "user_activity",
      "sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id",
      "parameters": [
        ["2023-01-01"],
        ["2023-01-02"]
      ]
    }
  ]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel
    ├── sales_report_202301_north.xlsx
    ├── sales_report_202302_south.xlsx
    └── user_activity_2023-01-01.xlsx

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2291187.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DeepSeek r1本地安装全指南

环境基本要求 硬件配置 需要本地跑模型,兼顾质量、性能、速度以及满足日常开发需要,我们需要准备以下硬件: CPU:I9内存:128GB硬盘:3-4TB 最新SSD,C盘确保有400GB,其它都可划成D盘…

基于VMware的ubuntu与vscode建立ssh连接

1.首先安装openssh服务 sudo apt update sudo apt install openssh-server -y 2.启动并检查ssh服务状态 到这里可以按q退出 之后输入命令 : ip a 红色挡住的部分就是我们要的地址,这里就不展示了哈 3.配置vscode 打开vscode 搜索并安装:…

【LLM-agent】(task2)用llama-index搭建AI Agent

note LlamaIndex 实现 Agent 需要导入 ReActAgent 和 Function Tool,循环执行:推理、行动、观察、优化推理、重复进行。可以在 arize_phoenix 中看到 agent 的具体提示词,工具被装换成了提示词ReActAgent 使得业务自动向代码转换成为可能&am…

【Redis】Redis 经典面试题解析:深入理解 Redis 的核心概念与应用

Redis 是一个高性能的键值存储系统,广泛应用于缓存、消息队列、排行榜等场景。在面试中,Redis 是一个高频话题,尤其是其核心概念、数据结构、持久化机制和高可用性方案。 1. Redis 是什么?它的主要特点是什么? 答案&a…

FastExcel使用详解

文章目录 FastExcel使用详解一、引言二、环境准备与依赖引入1、Maven 依赖引入2、实体类定义 三、核心操作:读写 Excel1、读取 Excel1.1 自定义监听器1.2 读取文件 2、写入 Excel2.1 简单写入2.2 模板写入 四、Spring Boot 集成示例1、文件上传(导入&…

图漾相机——C++语言属性设置

文章目录 前言1.SDK API功能介绍1.1 Device组件下的API测试1.1.1 相机工作模式设置(TY_TRIGGER_PARAM_EX)1.1.2 TY_INT_FRAME_PER_TRIGGER1.1.3 TY_INT_PACKET_DELAY1.1.4 TY_INT_PACKET_SIZE1.1.5 TY_BOOL_GVSP_RESEND1.1.6 TY_BOOL_TRIGGER_OUT_IO1.1.…

解决MacOS安装软件时提示“打不开xxx软件,因为Apple无法检查其是否包含恶意软件”的问题

macOS 系统中如何开启“任何来源”以解决安装报错问题? 大家好!今天我们来聊聊在使用 macOS 系统 时,遇到安装应用软件时出现报错的情况。这种情况常常发生在安装一些来自第三方开发者的应用时,因为 macOS 会默认阻止不明开发者的…

实验十 Servlet(一)

实验十 Servlet(一) 【实验目的】 1.了解Servlet运行原理 2.掌握Servlet实现方式 【实验内容】 1、参考课堂例子,客户端通过login.jsp发出登录请求,请求提交到loginServlet处理。如果用户名和密码相同则视为登录成功&#xff0c…

MyBatis-Plus笔记-快速入门

大家在日常开发中应该能发现,单表的CRUD功能代码重复度很高,也没有什么难度。而这部分代码量往往比较大,开发起来比较费时。 因此,目前企业中都会使用一些组件来简化或省略单表的CRUD开发工作。目前在国内使用较多的一个组件就是…

《超自然》:科学与灵性融合的自我转变之路

在现代社会中,许多人开始探寻自我成长、身心疗愈与灵性提升的可能性。Bestselling author Dr. Joe Dispenza 的《超自然:普通人如何创造非凡人生》正是在这样的大背景下问世的。书中既融合了量子物理、神经科学和表观遗传学的前沿理论,又吸收…

《Origin画百图》之脊线图

1.数据准备:将数据设置为y 2.选择绘图>统计图>脊线图 3.生成基础图形,并不好看,接下来对图形属性进行设置 4.双击图形>选择图案>颜色选择按点>Y值 5.这里发现颜色有色阶,过度并不平滑,需要对色阶进行更…

w189电商平台的设计与实现

🙊作者简介:多年一线开发工作经验,原创团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹赠送计算机毕业设计600个选题excel文…

让banner.txt可以自动读取项目版本

文章目录 1.sunrays-dependencies1.配置插件2.pluginManagement统一指定版本 2.common-log4j2-starter1.banner.txt使用$ 符号取出2.查看效果 1.sunrays-dependencies 1.配置插件 <!-- 为了让banner.txt自动获取版本号 --><plugin><groupId>org.apache.mave…

96,【4】 buuctf web [BJDCTF2020]EzPHP

进入靶场 查看源代码 GFXEIM3YFZYGQ4A 一看就是编码后的 1nD3x.php 访问 得到源代码 <?php // 高亮显示当前 PHP 文件的源代码&#xff0c;用于调试或展示代码结构 highlight_file(__FILE__); // 关闭所有 PHP 错误报告&#xff0c;防止错误信息泄露可能的安全漏洞 erro…

基于SpringBoot的智慧康老疗养院管理系统的设计与实现(源码+SQL脚本+LW+部署讲解等)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

音视频多媒体编解码器基础-codec

如果要从事编解码多媒体的工作&#xff0c;需要准备哪些更为基础的内容&#xff0c;这里帮你总结完。 因为数据类型不同所以编解码算法不同&#xff0c;分为图像、视频和音频三大类&#xff1b;因为流程不同&#xff0c;可以分为编码和解码两部分&#xff1b;因为编码器实现不…

Java线程认识和Object的一些方法ObjectMonitor

专栏系列文章地址&#xff1a;https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标&#xff1a; 要对Java线程有整体了解&#xff0c;深入认识到里面的一些方法和Object对象方法的区别。认识到Java对象的ObjectMonitor&#xff0c;这有助于后面的Synchron…

pytorch实现长短期记忆网络 (LSTM)

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 LSTM 通过 记忆单元&#xff08;cell&#xff09; 和 三个门控机制&#xff08;遗忘门、输入门、输出门&#xff09;来控制信息流&#xff1a; 记忆单元&#xff08;Cell State&#xff09; 负责存储长期信息&…

Games104——引擎工具链高级概念与应用

世界编辑器 其实是一个平台&#xff08;hub&#xff09;&#xff0c;集合了所有能够制作地形世界的逻辑 editor viewport&#xff1a;可以说是游戏引擎的特殊视角&#xff0c;会有部分editor only的代码&#xff08;不小心开放就会变成外挂入口&#xff09;Editable Object&…

消息队列应用示例MessageQueues-STM32CubeMX-FreeRTOS《嵌入式系统设计》P343-P347

消息队列 使用信号量、事件标志组和线标志进行任务同步时&#xff0c;只能提供同步的时刻信息&#xff0c;无法在任务之间进行数据传输。要实现任务间的数据传输&#xff0c;一般使用两种方式&#xff1a; 1. 全局变量 在 RTOS 中使用全局变量时&#xff0c;必须保证每个任务…