Spark_SQL-DataFrame数据写出以及读写数据库(以MySQl为例)

news2025/1/22 18:59:12

一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

        统一API写法:

       常见源写出:

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName('write').\
        master('local[*]').\
        getOrCreate()

    sc = spark.sparkContext

    # 1.读取文件
    schema = StructType().add('user_id', StringType(), nullable=True).\
        add('movie_id', IntegerType(), nullable=True).\
        add('rank', IntegerType(), nullable=True).\
        add('ts', StringType(), nullable=True)

    df = spark.read.format('csv').\
        option('sep', '\t').\
        option('header', False).\
        option('encoding', 'utf-8').\
        schema(schema=schema).\
        load('../input/u.data')

    # write text 写出,只能写出一个列的数据,需要将df转换为单列df
    df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')).\
        write.\
        mode('overwrite').\
        format('text').\
        save('../output/sql/text')

    # write csv
    df.write.mode('overwrite').\
        format('csv').\
        option('sep',';').\
        option('header', True).\
        save('../output/sql/csv')

    # write json
    df.write.mode('overwrite').\
        format('json').\
        save('../output/sql/json')

    # write parquet
    df.write.mode('overwrite').\
        format('parquet').\
        save('../output/sql/parquet')

二、写出MySQL数据库

        API写法:

        注意:

        ①jdbc连接字符串中,建议使用useSSL=false 确保连接可以正常连接( 不使用SSL安全协议进行连接)

        ②jdbc连接字符串中,建议使用useUnicode=true 来确保传输中不出现乱码

        ③save()不要填参数,没有路径,是写出数据库

        ④dbtable属性:指定写出的表名

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName('write').\
        master('local[*]').\
        getOrCreate()

    sc = spark.sparkContext

    # 1.读取文件
    schema = StructType().add('user_id', StringType(), nullable=True).\
        add('movie_id', IntegerType(), nullable=True).\
        add('rank', IntegerType(), nullable=True).\
        add('ts', StringType(), nullable=True)

    df = spark.read.format('csv').\
        option('sep', '\t').\
        option('header', False).\
        option('encoding', 'utf-8').\
        schema(schema=schema).\
        load('../input/u.data')

    # 2.写出df到MySQL数据库
    df.write.mode('overwrite').\
        format('jdbc').\
        option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8').\
        option('dbtable', 'movie_data').\
        option('user', 'root').\
        option('password', '123456').\
        save()
    
    # 读取
    df.read.mode('overwrite'). \
        format('jdbc'). \
        option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8'). \
        option('dbtable', 'movie_data'). \
        option('user', 'root'). \
        option('password', '123456'). \
        load()
    '''
    JDBC写出,会自动创建表的
    因为DataFrame中的有表结构信息,StructType记录的 各个字段的名称 类型 和是否运行为空
    '''

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1130226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android 下载文件后,调用系统文件管理器打开方式

如果是定制的系统,可能需要注意下有没有内置播放器或者浏览软件!!! 看效果: 第一先上个文件类型判断的方法: //建立一个文件类型与文件后缀名的匹配表public static final String[][] MATCH_ARRAY{//{后缀…

CICD 流程学习(五)Jenkins后端工程构建

案例1:数据库服务部署 MySQL部署 #安装MySQL服务 [rootServices ~]# yum clean all; yum repolist -v ... Total packages: 8,265 [rootServices ~]# yum -y install mysql.x86_64 mysql-server.x86_64 mysql-devel.x86_64 ... Complete! [rootServices ~]# #启动…

【MySQL架构篇】SQL执行流程与缓冲池

文章目录 1. SQL执行流程2. 数据库缓冲池(Buffer Pool)2.1 缓冲池概述2.2 缓冲池如何读取数据2.3 查看和设置缓冲池的大小2.4 多个Buffer Pool实例2.5 引申问题 1. SQL执行流程 查询缓存:因为查询效率往往不高,所以在MySQL8.0之后就抛弃了这个功能解析器…

永中office电子表格使用函数求和

下载安装一个永中office个人版; 基本没用过这软件;看一下有没有电子表格; 有的; 再看一下电子表格有没有类似excel的函数功能; 看一下能不能sum()求和; 可以的;

百度Comate SaaS版本正式发布,助力开发者加速研发过程

百度Comate是基于文心大模型的智能代码助手,让开发者的编码更快、更好、更简单,为开发者自动生成完整的、且更符合实际研发场景的代码行或整个代码块,帮助每一位开发者轻松完成研发任务。10月17日召开的百度世界大会上,百度CTO王海…

CVE-2023-46227 Apache inlong JDBC URL反序列化漏洞

项目介绍 Apache InLong(应龙)是一站式、全场景的海量数据集成框架,同时支持数据接入、数据同步和数据订阅,提供自动、安全、可靠和高性能的数据传输能力,方便业务构建基于流式的数据分析、建模和应用。 项目地址 h…

SpringCloud微服务 【实用篇】| 认识微服务

目录 一:认识微服务 1. 微服务框架介绍 2. 服务架构演变 3. 微服务技术对比 4. SpringCloud 图书推荐:《巧用ChatGPT快速提高职场晋升力》 一:认识微服务 本课程学习于黑马,会通过分层次学习,分为三部分去讲解微…

BUUCTF刷题记录

[BJDCTF2020]Easy MD51 进入题目页面,题目提示有一个链接,应该是题目源码 进入环境,是一个查询框,无论输入什么都没有回显,查看源码也没什么用 利用bp抓包查看有没有什么有用的东西 发现响应的Hint那里有一个sql语句&…

【高效开发工具系列】Postman

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

人工智能:CNN(卷积神经网络)、RNN(循环神经网络)、DNN(深度神经网络)的知识梳理

卷积神经网络(CNN) 卷积神经网络(CNN),也被称为ConvNets或Convolutional Neural Networks,是一种深度学习神经网络架构,主要用于处理和分析具有网格状结构的数据,特别是图像和视频数…

学习笔记二十三:Deployment入门到企业实战应用

Deployment入门到企业实战应用 Deployment控制器:概念、原理解读Deployment概述Deployment工作原理:如何管理rs和Pod?什么叫做更新节奏和更新逻辑呢 Deployment使用案例:创建一个web站点,2个副本deploy-demo详细解读 通过k8s实现滚…

[100天算法】-最长有效括号(day 38)

题目描述 给定一个只包含 ( 和 ) 的字符串,找出最长的包含有效括号的子串的长度。示例 1:输入: "(()" 输出: 2 解释: 最长有效括号子串为 "()" 示例 2:输入: ")()())" 输出: 4 解释: 最长有效括号子串为 "()()"来源&#…

Windows与Linux服务器互传文件

使用winscp实现图形化拖动的方式互传文件. 1.下载winscp软件并安装,官方地址: https://winscp.net/eng/index.php 2.打开软件: 文件协议选择scp,输入linux服务器的IP和端口号,然后输入你的用户名和密码就可以登陆了。…

在nodejs中实现实时通信的几种方式

在nodejs中实现实时通信的几种方式 在当今世界中,实时通信至关重要。无论是聊天应用程序还是实时体育更新,实时通信都是保持用户活跃度所必需的。Node.js 因其速度、可扩展性和可靠性而成为开发实时应用程序的流行工具。在本文中,我们将探讨…

【学习笔记】记录一个win 11 操作文件卡顿,Windows 资源管理器CPU占用飙升问题

【学习笔记】记录一个win 11 操作文件卡顿,Windows 资源管理器CPU占用飙升问题 前段时间忽然发现电脑操作文件都会特别的卡,例如复制粘贴文件,写入文件等操作,卡的怀疑人生,原本以为是电脑太久没重启,重启…

如何将本地 PDF 文件进行翻译

在日常工作和学习中,我们经常会遇到需要翻译 PDF 文件的情况。比如,我们需要将一份英文的技术文档翻译成中文,或者将一份中文的法律文件翻译成英文。 传统上,我们可以使用专业翻译软件或服务来翻译 PDF 文件。但是,这…

【产品经理】APP备案(阿里云)

工信部《关于开展移动互联网应用程序备案工作的通知》 工业和信息化部印发了《关于开展移动互联网应用程序备案工作的通知》,“在中华人民共和国境内从事互联网信息服务的App主办者,应当依照相关法律法规等规定履行备案手续,未履行备案手续的…

Linux shell编程学习笔记15:定义数组、获取数组元素值和长度

一、 Linux shell 脚本编程中的数组概述 数组是一种常见的数据结构。跟大多数编程语言一样,大多数Linux shell脚本支持数组,但对数组的支持程度各不相同,比如数组的维度,是支持一维数组还是多维数组?再如,…

MongoDB 的集群架构与设计

一、前言 MongoDB 有三种集群架构模式,分别为主从复制(Master-Slaver)、副本集(Replica Set)和分片(Sharding)模式。 Master-Slaver 是一种主从复制的模式,目前已经不推荐使用。Re…

threejs(3)-详解材质与纹理

一、Matcap(MeshMatcapMaterial)材质原理与应用 Matcap是一张含有光照信息的贴图,通常是直接截取材质球截图来使用。因此Matcap可以很好的模拟静止光源下的光照效果。 最直接的方式就是直接使用在View空间下的模型法向量的xy分量去采样Matcap。 另外还有一种常见…