1. 背景
spark默认的jdbc只会用单task读取数据,读取大数据量时,效率低。
2. 解决方案
根据分区字段,如日期进行划分,增加task数量提升效率。
/**
* 返回每个task按时间段划分的过滤语句
* @param startDate
* @param endDate
* @param threadCount
* @return
*/
def getPredicateDates(startDate: String, endDate: String, threadCount: Int): Array[String] = {
getPredicates(startDate, endDate, threadCount).map(x=>s"recordDate>='${x._1}' and recordDate <='${x._2}'")
}
/**
* 将startDate到endDate间的日期,根据给定的threadCount参数,做时间段划分,例如:
* getPredicates("2017-01-01", "2017-01-31", 10)
* 返回:
* 2017-01-01 -> 2017-01-04
* 2017-01-05 -> 2017-01-08
* 2017-01-09 -> 2017-01-12
* 2017-01-13 -> 2017-01-16
* 2017-01-17 -> 2017-01-20
* 2017-01-21 -> 2017-01-24
* 2017-01-25 -> 2017-01-28
* 2017-01-29 -> 2017-01-31
*
* @param startDate 开始日期
* @param endDate 结束日期
* @param threadCount 线程数
* @return 包含各个连续时段的数组
*/
def getPredicates(startDate: String, endDate: String, threadCount: Int): Array[(String, String)] = {
val dayDiff = DateTimeUtils.rangeDay(startDate, endDate)
val buff = new ArrayBuffer[(String, String)]()
if (dayDiff <= threadCount) {
//天数差小于期望的线程数,则按照每天一个线程处理
var tempDate = startDate
while (tempDate <= endDate) {
buff += (tempDate -> tempDate)
tempDate = DateTimeUtils.dateAddOne(tempDate)
}
} else {
//天数差大于期望的线程数,则按照线程数对时间段切分
val offset = (dayDiff / threadCount).toInt
var tempDate = startDate
while (DateTimeUtils.dateAddN(tempDate, offset) <= endDate) {
buff += (tempDate -> DateTimeUtils.dateAddN(tempDate, offset))
tempDate = DateTimeUtils.dateAddOne(DateTimeUtils.dateAddN(tempDate, offset))
}
if (tempDate != endDate) {
buff += (tempDate -> endDate)
}
}
buff.toArray
}
DateTimeUtils工具类
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}
object DateTimeUtils {
def rangeDay(startDateStr: String, endDateStr: String): Long = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val startDate: Date = dateFormat.parse(startDateStr)
val endDate: Date = dateFormat.parse(endDateStr)
(endDate.getTime() - startDate.getTime()) / 1000 / 60 / 60 / 24
}
def dateAddOne(dateStr: String): String = {
var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var dateInfo: Date = dateFormat.parse(dateStr)
var cal: Calendar = Calendar.getInstance()
cal.setTime(dateInfo)
cal.add(Calendar.DATE, 1)
dateFormat.format(cal.getTime)
}
def dateAddN(dateStr: String, value: Int): String = {
var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var dateInfo: Date = dateFormat.parse(dateStr)
var cal: Calendar = Calendar.getInstance()
cal.setTime(dateInfo)
cal.add(Calendar.DATE, value)
dateFormat.format(cal.getTime)
}
}
举例
val startDate = DateTimeUtils.dateAddN(calcDate,-365) //获取计算日期一年前的日期作为开始时间
val predicates= getPredicateDates(startDate,calcDate,12) //分12个task读取,提高性能
val url = PropUtils.getProxyJdbc() //jdbc连接的代理(需按自己的项目实现)
val res = spark.read.jdbc(url, tableName, predicates,PropUtils.getProperties())
3. 实验及结论
使用1个节点 8核16G的Clickhouse数据库,spark从clickhouse读取近4亿行数据。
单Task运行时间:14min
按日期划分成12个Task,运行时间:1.6min
结论:性能提升88.6%