基于spark的Hive2Pg数据同步组件

news2025/1/23 13:16:57

一、背景

        Hive中的数据需要同步到pg供在线使用,通常sqoop具有数据同步的功能,但是sqoop具有一定的问题,比如对数据的切分碰到数据字段存在异常的情况下,数据字段的空值率高、数据字段重复太多,影响sqoop的分区策略,特别是hash分区,调用hash函数容易使得cpu高产生报警同时sqoop的mapreduce任务对数据表的分割以及数据文件也会有一定的不均衡性。为了弥补这些问题,开发了基于spark的数据同步组件,利用spark处理大数据的强大能力及分布式并行性上的优势,通过执行sparksql将数据写入到pg数据库,但是在sparksql 中,保存数据到数据,只有 Append , Overwrite , ErrorIfExists, Ignore 四种模式,不满足特殊场景的需求,尝试利用spark save 源码改进, 批量保存数据,存在则更新不存在则插入。

二、关键设计方案

  1. 方案一

        利用DataFrame框架里自带的df.write.mode(“append”).jdbc(url,pg_table,prop)方法,尝试将df里的每一行row是org.apache.spark.sql.Row类型,结合schema类型转换成DataFrame,方法如下:

import org.apache.spark.sql.types._

  val map = Map("col1" -> 5, "col2" -> 6, "col3" -> 10)

  val (keys, values) = map.toList.sortBy(_._1).unzip

  val rows = spark.sparkContext.parallelize(Seq(Row(values: _*)))

  val schema = StructType(keys.map(

    k => StructField(k, IntegerType, nullable = false)))

  val df = spark.createDataFrame(rows, schema)

  df.show()

经过分析:转DataFrame的过程有重要的两步:首先是通过spark.sparkContext.parallelize将Row类型转成RDD,其次获取schema后利用spark.createDataFrame把RDD和schema变为DataFrame。然而为了取到DataFrame的每一行Row,需要调用DataFrame的foreach方法。

Spark的DataFrame的foreach的执行原理:

        Spark DataFrame 的 foreach() 方法将 DataFrame 的每一行作为 Row 对象进行循环,并将给定函数应用于该行。foreach() 的一些限制:Spark中的foreach()方法是在工作节点而不是Driver程序中调用的。这意味着,如果我们在函数内执行print()操作,将无法在会话或笔记本中看到打印结果,因为结果打印在工作节点中。行是只读的,因此您无法更新行的值。鉴于这些限制,foreach() 方法主要用于将每行的一些信息记录到本地计算机或外部数据库foreach方法无法改变原始的DataFrame数据,仅用于迭代处理每个分区的数据。

        foreach方法的处理是并行的,可以提高处理效率,但需要注意处理的顺序可能不同于原始数据的顺序。常规性能调优四:广播大变量默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复 本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的 时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作 线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变 量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。广播变量在每个 Executor 保存一个副本,此Executor 的所有 task 共用此广播变量,这让变 量产生的副本数量大大减少。在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变 量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如 果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的 复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的 BlockManager 中获取变量。

        一般spark.SparkContext是存在Driver进程里的,工作节点获取不到,每个jvm只能有一个SparkContext,再创建新的SparkContext之前需要先stop()当前活动的SparkContext。

综上分析,方案一不能实现,即DataFrame里不能创建DataFrame

2、方案二

        利用PrepareStatement执行插入更新的sql语句,将DataFrame里的每一行Row的字段数值解析出来封装成sql,每batch个执行一次并提交,碰到有重复的key执行update操作,新的数据执行insert操作。该方案的实现过程借鉴了spark的dataframe的save方法。tableSchema.fields.map(x => x.name).mkString(",")利用map和mkString方法进行字符串操作,同时利用spark的makeSetter方法实现PrepareStatement语句的填充。

三、碰到问题及解决

采用方案二在开发的过程碰到的问题:

  1. Spark error: value foreach is not a member of Object

当调用这样调用的时候:

升级2.12之后,DataFrameforeachPartition 里面不能处理 RowIterator

解决方法:(1

2)就是使用foreach替代foreachPartition

2、Caused by: java.io.NotSerializableException: org.postgreslq.jdbc.PgPrepareStatement

原因: prep是一个PrepareStatement对象,这个对象无法序列化,在标1的地方执行,而传入map中的对象是需要分布式传送到各个节点上,传送前先序列化,到达相应机器上后再反序列化,PrepareStatement是个Java类,如果一个java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口,对象prep在driver端,collect后的数据也在driver端,就不需prep序列化传到各个节点了。但这样其实会有collect的性能问题。

解决方案:使用mappartition在每一个分区内维持一个pgsql连接进行插入

3、在insert on conflict的语句上报Postgre SQL ERROR:there is no unique or exclusion constraint matching the ON CONFLICT specification。

        执行insert into test values('a','b') on conflict(a,b) do update set c='1';由于建表时没有建关于a,bCONSTRAINT,于是就会报错,为表添加CONSTRAINT

4、java jar 后面传参数,参数中含有空格的处理方法将含有空格的参数加上双引号。

四、总结

        该组件实现了对于数据规整规范的情况下,直接调用DataFrame的write.mode()方法批量写入,对于特殊的情况hive表里对于pg表的主键字段有重复的情况,进行了重新的封装,通过执行s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set name1=? ,name2=?"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1360109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

飞塔FortiGate-1000C设备引进助力易天构建网络安全新防线

在当今数字化浪潮的推动下,企业对网络安全的需求日益迫切。为了应对不断升级的网络威胁,给客户提供最为优质的产品,易天引进了最新兼容性测试设备飞塔FortiGate-1000C,为光模块产品交付提供了更强劲的性能保障。 FortiGate-1000C是…

filecoin通过filutils 区块浏览器获取历史收益数据

filecoin 历史收益数据 每天每T平均收益 导出历史每日收益为文档 filutils 区块浏览器 导出历史每日收益为文档 #!/bin/bashfor i in {1..10} doecho $iresult$(curl --location --request POST https://api.filutils.com/api/v2/powerreward \--header User-Agent: Apifox/1.…

fmincon函数求解非线性超越方程的学习记录

最近的算法中用到了fmincon函数,寻找多变量非线性方程最小值的函数;因此学习一下; fmincon函数的基础语法如下所示: fmincon函数是为了求解下列方程的最小值; b 和 beq 是向量,A 和 Aeq 是矩阵&#xff0c…

企业级大数据安全架构(二)安全方案

作者:楼高 1 Knox访问控制 Apache Knox是一个为Apache Hadoop部署提供交互的应用网关,通过其REST API和用户友好的UI,为所有与Hadoop集群的REST和HTTP交互提供了统一的访问点。Knox不仅仅是一个访问网关,它还具备强大的访问控制…

(2024,少样本微调自适应,泛化误差界限,减小泛化误差的措施)多模态基础模型的少样本自适应:综述

Few-shot Adaptation of Multi-modal Foundation Models: A Survey 公和众和号:EDPJ(添加 VX:CV_EDPJ 或直接进 Q 交流群:922230617 获取资料) 目录 0. 摘要 1. 简介 2. 多模态基础模型的预训练 3. 多模态基础模…

关于kthread_stop的疑问(linux3.16)

线程一旦启动起来后,会一直运行,除非该线程主动调用do_exit函数,或者其他的进程调用kthread_stop函数,结束线程的运行。 之前找销毁内核线程的接口时,发现了kthread_stop这个接口。网上说这个函数能够销毁一个内核线程…

JavaScript:构造函数面向对象

JavaScript:构造函数&面向对象 构造函数实例化静态成员实例成员 内置构造函数引用类型基本含义常用属性方法ObjectArray 包装类型基本含义常用属性方法StringNumber 面向对象原型对象constructor对象原型原型链原型继承 构造函数 在讲解构造函数之前&#xff0…

[NISACTF 2022]bingdundun~

[NISACTF 2022]bingdundun~ wp 信息搜集 进入题目: 点一下 upload? : 注意看上面的 URL ,此时是 ?bingdundunupload 。 随便找个文件上传一下: 注意看上面的 URL ,此时变成:upload.php 。 那么我有理…

【力扣算法日记】无重复字符的最长子串

最近刷了很多算法题,这些解题过程也拓展了自己的思路,是个适合记录的素材。所以决定在继技术知识点详解的【一文系列】之后,开启新坑——【力扣算法系列】,来记录力扣刷题过程。 分享题目不确定,目前打算只分享我认为…

聊一聊 .NET高级调试 内核模式堆泄露

一:背景 1. 讲故事 前几天有位朋友找到我,说他的机器内存在不断的上涨,但在任务管理器中查不出是哪个进程吃的内存,特别奇怪,截图如下: 在我的分析旅程中都是用户态模式的内存泄漏,像上图中的…

【JVM】类加载器ClassLoader

一、简介 在Java中,类加载器(ClassLoader)是一个关键的组件,它负责将字节码文件加载到内存并转换成Java类。Java的类加载器主要可以分成两类:系统提供的和由Java应用开发人员编写的。Java开发者可以根据需要创建自己的…

ES集群分片数据的高可用

文章目录 ES集群分片数据的高可用1. 集群设置索引节点2. 集群新增文档数据3.查看集群中文档数据分片节点4. 让节点9201宕机,查看其分片变化5. 让节点9201,查看分片变化 ES集群分片数据的高可用 集群中的索引主分片和副分片在不同的计算机上,如…

揭开 JavaScript 作用域的神秘面纱(下)

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

pyqtgraph 教程

pyqtgraph 教程 简介 PyQtGraph 是一个用于科学和工程数据可视化的开源库,基于 PyQt 和 NumPy 构建而成。它提供了丰富的绘图工具和交互功能,可以用于创建高性能的实时数据图表、图像显示和信号处理应用。 以下是 PyQtGraph 的一些特点和功能&#xf…

.NET Standard 支持的 .NET Framework 和 .NET Core

.NET Standard 是针对多个 .NET 实现推出的一套正式的 .NET API 规范。 推出 .NET Standard 的背后动机是要提高 .NET 生态系统中的一致性。 .NET 5 及更高版本采用不同的方法来建立一致性,这种方法在大多数情况下都不需要 .NET Standard。 但如果要在 .NET Framewo…

基于sumo实现交通灯控制算法的模板

基于sumo实现交通灯控制算法的模板 目录 在windows安装run hello world networkroutesviewsettings & configurationsimulation 交通灯控制系统 介绍文件生成器类(FileGenerator)道路网络(Network)辅助函数生成道路网络&am…

从细菌基因组中提取噬菌体变异序列工具PhaseFinder的介绍、安装和使用方法

PhaseFinder ## 概览,不翻译了,大家自己看吧 The PhaseFinder algorithm is designed to detect DNA inversion mediated phase variation in bacterial genomes using genomic or metagenomic sequencing data. It works by identifying regions flank…

Java学习笔记(四)——正则表达式

文章目录 正则表达式基本规则字符类(只匹配一个字符)预定义字符(只匹配一个字符)数量词练习正则表达式插件 爬虫利用正则表达式获取想要的内容爬取网络信息练习有条件的爬取贪婪爬取非贪婪爬取正则表达式在字符串中的使用 分组捕获分组正则表达式外部使用非捕获分组正则表达式忽…

公共用例库计划--个人版(二)主体界面设计

1、任务概述 计划内容:完成公共用例库的开发实施工作,包括需求分析、系统设计、开发、测试、打包、运行维护等工作。 1.1、 已完成: 需求分析、数据库表的设计:公共用例库计划–个人版(一) 1.2、 本次待…

神经网络-卷积层

卷积 输入通道数, 输出通道数,核大小 参数具体含义 直观理解各个参数的网站(gif) https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md大概长这样,cyan是青色的意思 channel数(终于理解论文里图片放好多层的原因…