Spark写PGSQL分区表

news2025/1/18 6:48:56

这里写目录标题

    • 需求
    • 碰到的问题
      • 格式问题
      • 分区问题(重点)
    • 解决
      • 完整代码
      • 效果

需求

spark程序计算后的数据需要往PGSQL中的分区表进行写入。

碰到的问题

格式问题

使用了字符串格式,导致插入报错。

val frame = df.withColumn("insert_time",current_timestamp()))
Batch entry 0 INSERT INTO t ("a","insert_time") VALUES 
(1,'2023-08-01 10:00:00') was aborted: ERROR: column 
"insert_time" is of type timestamp without time zone but 
expression is of type character varying

分区问题(重点)

一直都是spark计算完后写单表或者hive的表,都需要去手动去维护分区。但是写PGSQL空表(只有表字段,还没有数据,没有创建分区),需要手动先创建分区,否则会报错。

报错信息

Partition key of the failing row contains (insert_time) = 
(2023-08-04 21:14:09.641).  Call getNextException to see other 
errors in the batch.

插入失败的行的分区键包含的时间戳值 2023-08-04 21:14:09.641 在分区表中找不到对应的分区范围。

解决

最终的解决方案是在插入数据之前,通过代码去添加分区,添加好分区后再写入数据即可。

object WritePgSQL {

  def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder()
          .appName("SparkPostgreSQLPartitionedTable")
          .config("spark.master", "local")
          .getOrCreate()

        // 设置PostgreSQL连接信息
        val postgresUrl = "jdbc:postgresql://192.168.160.123:5432/test"
        val connectionProperties = new java.util.Properties()
        connectionProperties.setProperty("user", "test")
        connectionProperties.setProperty("password", "123456")

        // 创建测试数据
        val data = Seq(
              (1, "2023-08-01 10:00:00"),
              (2, "2023-08-02 12:00:00"),
              (3, "2023-08-03 15:00:00")
        )

        val columns = Seq("a", "insert_time1")
        val df = spark.createDataFrame(data).toDF(columns: _*)



        val frame = df.drop("insert_time1")
          .withColumn("insert_time", current_timestamp().cast("timestamp"))

        
        // 动态创建分区范围
        // p1 可以换成p20230804这样的分区格式
        // t为表名
        // (TIMESTAMP '2023-08-04 00:00:00') 分区开始范围,一般通过代码生成,为计算时间的零点
        // (TIMESTAMP '2023-08-05 00:00:00') 分区结束范围,一般通过代码生成,为计算时间的下一天零点
        val createPartitionSql =
              s"""
          CREATE TABLE "p1" PARTITION OF t FOR VALUES FROM (TIMESTAMP '2023-08-04 00:00:00') TO (TIMESTAMP '2023-08-05 00:00:00') ;
          """

        println(createPartitionSql)

        // 执行创建分区 SQL
        val connection = java.sql.DriverManager.getConnection(postgresUrl, connectionProperties)
        val statement = connection.createStatement()
        statement.executeUpdate(createPartitionSql)
        connection.close()
        // 将数据写入PostgreSQL分区表
        frame.write
          .mode("append")
          .jdbc(postgresUrl, "t", connectionProperties)
  }
}



完整代码

自动生成当天日期和分区名称

object WritePgSQL {

  def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder()
          .appName("SparkPostgreSQLPartitionedTable")
          .config("spark.master", "local")
          .getOrCreate()

        // 设置PostgreSQL连接信息
        val postgresUrl = "jdbc:postgresql://192.168.160.123:5432/test"
        val connectionProperties = new java.util.Properties()
        connectionProperties.setProperty("user", "test")
        connectionProperties.setProperty("password", "123456")
        // 创建测试数据
        val data = Seq(
              (1, "2023-08-01 10:00:00"),
              (2, "2023-08-02 12:00:00"),
              (3, "2023-08-03 15:00:00")
        )

        val columns = Seq("a", "insert_time1")
        val df = spark.createDataFrame(data).toDF(columns: _*)

        val frame = df.drop("insert_time1")
          .withColumn("insert_time", current_timestamp().cast("timestamp"))

        // 获取今天和明天的时间范围
        // 获取当前日期
        val currentDate = LocalDate.now()
        // 获取下一天的日期
        val nextDayDate = currentDate.plusDays(1)
        // 创建固定的时间部分(00:00:00)
        val startTime = LocalTime.of(0, 0, 0)
       // 组合日期和时间来得到完整的日期时间,并格式化为字符串
       val currentDateTimeString = LocalDateTime.of(currentDate, startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
       val nextDayDateTimeString = LocalDateTime.of(nextDayDate, startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))

       // 格式化为yyyyMMdd字符串
       val dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")
       val currentDateString = currentDate.format(dateFormatter)

       // 动态创建分区范围
        val createPartitionSql =
              s"""
          CREATE TABLE "p$currentDateString" PARTITION OF t
          FOR VALUES FROM (TIMESTAMP '$currentDateTimeString') TO (TIMESTAMP '$nextDayDateTimeString') ;
          """
        // 执行创建分区 SQL
        val connection = java.sql.DriverManager.getConnection(postgresUrl, connectionProperties)
        val statement = connection.createStatement()
        statement.executeUpdate(createPartitionSql)
        connection.close()
        // 将数据写入PostgreSQL分区表
        frame.write
          .mode("append")
          .jdbc(postgresUrl, "t", connectionProperties)
  }
}

效果

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/837889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一语道破 python 迭代器和生成器

简而言之:迭代器是一个抽象化的概念,在python中表示访问数据集合中元素的一种方式;生成器也是一个抽象化的概念,在python 中,边循环边生成所需数据,是一种时间换空间的方法。从访问数据方式上来看&#xff…

应急响应-主机后门webshell的排查思路(webshell,启动项,隐藏账户,映像劫持,rootkit后门)

0x00 windows主机后门排查思路 针对主机后门windows,linux,在对方植入webshell后,需要立即响应,排查出后门位置,以及排查对外连接,端口使用情况等等 排查对外连接状态: 借助工具:p…

T31开发笔记:librtmp拉流测试

若该文为原创文章,转载请注明原文出处。 T31使用librtmp拉流并保存成FLV文件或H264和AAC文件。 librtmp编译在前面有教程,自行编译。 实现的目的是想要获取获取rtmp的AAC流并播放,实时双向对讲功能。 一、硬件和开发环境 1、硬件&#xff1…

Linux6.31 Kubernetes 二进制部署

文章目录 计算机系统5G云计算第二章 LINUX Kubernetes 部署一、二进制搭建 Kubernetes v1.201.操作系统初始化配置2.部署 etcd 集群3.Kubernetes 集群架构与组件4.部署 Master 组件5.部署 Worker Node 组件6.部署 CNI 网络组件——部署 flannel1)K8S 中 Pod 网络通信…

Android 版本 对应的 API版本

Android 14(开发者预览版) 如需详细了解平台变更,请参阅 Android 14 文档。 Android 13(API 级别 33) 如需详细了解平台变更,请参阅 Android 13 文档。 Android 12(API 级别 31、32&#xf…

《每天5分钟玩转kubernetes》读书笔记

笔记 概念 Pod是脆弱的,但应用是健壮的。 kubelet运行在Cluster所有节点上,负责启动Pod和容器。kubeadm用于初始化Cluster。kubectl是k8s命令行工具。通过kubectl可以部署和管理应用,查看各种资源,创建、删除和更新各种组件。 …

推荐一款非常简单实用的数据库连接工具Navicat Premium

Navicat Premium是一款非常实用的数据库连接工具,别再用HeidiSQL和idea自带的数据库连接了,看完这篇文章,赶紧把Navicat Premium用起来吧。 首先,需要获取Navicat Premium的安装包,可以通过以下网盘链接下载&#xff0…

谷歌联合CMU提出全新语义金字塔概念,无需额外训练使LLMs学会执行视觉任务

​ 论文链接:https://arxiv.org/abs/2306.17842 代码仓库:https://github.com/google-research/magvit/ 在目前的大模型社区中,发展较为成熟的当属以ChatGPT为代表的纯语言模型(LLMs),以GPT-4为代表的多模态…

【大数据】ELK最简入门案例(带你进入ELK世界)

文章目录 1. 前言2. 安装3. 启动ELK启动Elasticsearch启动Kibana启动Logstash 4. 测试ELK环境 本文通过最简单纯正的案例带你入门ELK世界。 1. 前言 ELK是Elasticsearch、Logstash、Kibana的缩写,如果对Elasticsearch、Logstash、Kibana不是很了解,可以…

2023华数杯C题完整模型代码

华数杯C题完整论文模型代码已经完成,文末获取! 母亲的心理健康状况对婴儿的成长和发展有重要的影响。本研究使用大数据分析方法,探索了母亲的心理健康状况、婴儿的行为特征以及婴儿的睡眠质量之间的相关性。我们采集了大量的数据,…

Python零基础入门(十一)——异常处理

系列文章目录 个人简介:机电专业在读研究生,CSDN内容合伙人,博主个人首页 Python入门专栏:《Python入门》欢迎阅读,一起进步!🌟🌟🌟 码字不易,如果觉得文章不…

MS5182N/MS5189N——16bit、4/8 通道、200KSPS、 SAR 型 ADC

产品简述 MS5182N/MS5189N 是 4/8 通道、 16bit 、电荷再分配逐次 逼近型模数转换器。采用单电源供电。 MS5182N/MS5189N 内 部集成无失码的 16 位 SAR ADC 、低串扰多路复用器、内部低 漂移基准电压源 ( 可以选择 2.5 或 4.096 V) 、温度传感器、可选 择的单极…

Java 之LocalDateTime的介绍和使用

LocalDateTime是Java的日期和时间类之一,用于表示不带时区信息的日期时间。 LocalDateTime 没有时区, 所以也就不能用来直接获取时间戳LocalDateTime 是一个基于值得类, 所以该类的示例不是通过构造函数的方式进行创建 以下是一些关于Loca…

华为推出手机系统云翻新服务:什么是云翻新?如何使用?

华为手机系统云翻新是华为推出的一项功能,旨在通过云服务提供系统翻新的服务。它可以帮助用户对手机的系统进行优化和更新,以提高手机的性能和流畅度。具体而言,华为手机系统云翻新功能提供了免费的云空间,用户可以将手机中的系统…

【学习笔记】生成式AI(ChatGPT原理,大型语言模型)

ChatGPT原理剖析 语言模型 文字接龙 ChatGPT在测试阶段是不联网的。 ChatGPT背后的关键技术:预训练(Pre-train) 又叫自监督式学习(Self-supervised Learning),得到的模型叫做基石模型(Founda…

JavaScript【静态方法、实例方法/to类、实例方法/get类、实例方法/set类、Math与Date实操、 JS时间戳、日期互相转换】(九)

目录 Math对象_静态方法三 Date对象 Date对象_静态方法 Date对象_实例方法/to类 Date对象_实例方法/get类 Date对象_实例方法/set类 Math与Date实操 JS时间戳、日期互相转换 Math对象_静态方法三 Math.random() Math.random() 返回0到1之间的一个伪随机数,可…

python中几个有趣的函数和推导式

前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 一、range()函数 1、range()通常用来做循环。 2、range()生成器的特性。 例子:假如range()中使用的数值特别大,为100000000000000000000000000000? python解释…

同比增长50%!W/AR HUD赛道持续向好背后的变化

在智能座舱进入域控制器时代的同时,带来人机交互体验升级的HUD赛道,同样持续火热。 高工智能汽车研究院监测数据显示,2023年1-6月中国市场(不含进出口)乘用车前装标配W/AR HUD交付90.49万台,潜在选装规模6…

【深度学习_TensorFlow】梯度下降

写在前面 一直不太理解梯度下降算法是什么意思,今天我们就解开它神秘的面纱 写在中间 线性回归方程 如果要求出一条直线,我们只需知道直线上的两个不重合的点,就可以通过解方程组来求出直线 但是,如果我们选取的这两个点不在直…

使用 Amazon ECS Anywhere 在边缘部署 Amazon IoT Greengrass

1.概述 亚马逊云科技提供了完备的IoT服务能力,涵盖设备服务、连接和控制服务以及云端分析服务,是快速构建安全可靠、可扩展的 IoT 平台的常见选择。Amazon IoT Greengrass 边缘运行时和云服务,可帮助您在设备上构建、部署和管理 IoT 应用。A…