使用Scala二次开发Spark实现对MySQL的upsert操作
背景
在我们的数仓升级项目中,遇到了这样的场景:古人开发的任务是使用DataStage运算后,按照主键【或者多个字段拼接的唯一键】来做insert then update
,顾名思义,也就是无则插入,有则后一条数据会覆盖前一条。这其实类似于MySQL的upsert,当然Oracle也有类似的Merge操作,这部分主要是数据库开发攻城狮关注的。
古人这么做,目的不外乎2个:重跑后数据不变【幂等性】、不会有主键冲突问题【根据主键或者拼接的唯一键去重】
之前有写过一篇拉链表翻写HQL任务的案例:https://lizhiyong.blog.csdn.net/article/details/129679071
这种场景纯HQL可以写出来,也就是最终的数据=保留不变的历史数据+之前没有记录所以要插入的数据+之前有记录所以直接更新的数据,比拉链表还少了将之前有记录的历史数据置为无效的那部分数据,所以我们习惯称之为“假拉链”。
当字段个数多、上游来源表多、上游运算复杂的情况,HQL写出来其实并不短,这么一大坨做完union all再按照2PC的方式先落盘到tmp的中间表,再回灌到结果表,然后跑数据集成任务再推送给Oracle。。。这一系列骚操作,智商要求不高但是工作量可一点都不少。纯HQL的方式能跑,但2023年了还用这种方式模拟DataStage2015年的功能,不算明智。
离线跑批首选的Spark原生却不支持这么玩,但是想想办法总还是可以实现这种功能。。。
原理分析
Spark的save
参照这一篇:https://lizhiyong.blog.csdn.net/article/details/128090026
可以抽象出最简易的Spark写数据操作:
package com.zhiyong.day20230425upDate
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.util
object UpDateDemo1 {
def main(args: Array[String]): Unit = {
val sc: SparkSession = SparkSession
.builder
.appName("sparkUpdateZhiyong")
.master("local[8]").
enableHiveSupport
.getOrCreate
import org.apache.spark.sql._
import sc.implicits._
var df1: DataFrame = sc.range(1000).toDF("id")
df1.show()
df1 = df1.withColumn("comment",functions.lit("CSDN@虎鲸不是鱼"))
df1 = df1.withColumn("comment1",functions.concat(df1.col("id"),df1.col("comment")))
.drop("comment")
df1.write
.format("jdbc")
.mode(SaveMode.Append)
.options(
Map(
"url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
"dbtable" -> "test_origin_20001128",
"user" -> "root",
"password" -> "123456",
"driver" -> "com.mysql.cj.jdbc.Driver"
)
)
.save()
}
}
从这个最常见的save算子入手。点进去可以发现:
package org.apache.spark.sql
/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
* key-value stores, etc). Use `Dataset.write` to access this.
*
* @since 1.4.0
*/
@Stable
final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Saves the content of the `DataFrame` as the specified table.
*
* @since 1.4.0
*/
def save(): Unit = saveInternal(None)
private val df = ds.toDF()
}
还是这个类里:
/**
* Specifies the behavior when data or table already exists. Options include:
* <ul>
* <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
* <li>`SaveMode.Append`: append the data.</li>
* <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
* </ul>
* <p>
* The default option is `ErrorIfExists`.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter[T] = {
this.mode = saveMode
this
}
private def saveInternal(path: Option[String]): Unit = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("write")
}
assertNotBucketed("save")
val maybeV2Provider = lookupV2Provider()
if (maybeV2Provider.isDefined) {
val provider = maybeV2Provider.get
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, df.sparkSession.sessionState.conf)
val optionsWithPath = getOptionsWithPath(path)
val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
def getTable: Table = {
// If the source accepts external table metadata, here we pass the schema of input query
// and the user-specified partitioning to `getTable`. This is for avoiding
// schema/partitioning inference, which can be very expensive.
// If the query schema is not compatible with the existing data, the behavior is undefined.
// For example, writing file source will success but the following reads will fail.
if (provider.supportsExternalMetadata()) {
provider.getTable(
df.schema.asNullable,
partitioningAsV2.toArray,
dsOptions.asCaseSensitiveMap())
} else {
DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)
}
}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val catalogManager = df.sparkSession.sessionState.catalogManager
mode match {
case SaveMode.Append | SaveMode.Overwrite =>
val (table, catalog, ident) = provider match {
case supportsExtract: SupportsCatalogOptions =>
val ident = supportsExtract.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
supportsExtract, catalogManager, dsOptions)
(catalog.loadTable(ident), Some(catalog), Some(ident))
case _: TableProvider =>
val t = getTable
if (t.supports(BATCH_WRITE)) {
(t, None, None)
} else {
// Streaming also uses the data source V2 API. So it may be that the data source
// implements v2, but has no v2 implementation for batch writes. In that case, we
// fall back to saving as though it's a V1 source.
return saveToV1Source(path)
}
}
val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
checkPartitioningMatchesV2Table(table)
if (mode == SaveMode.Append) {
runCommand(df.sparkSession) {
AppendData.byName(relation, df.logicalPlan, finalOptions)
}
} else {
// Truncate the table. TableCapabilityCheck will throw a nice exception if this
// isn't supported
runCommand(df.sparkSession) {
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
}
}
case createMode =>
provider match {
case supportsExtract: SupportsCatalogOptions =>
val ident = supportsExtract.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
supportsExtract, catalogManager, dsOptions)
val tableSpec = TableSpec(
properties = Map.empty,
provider = Some(source),
options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
external = false)
runCommand(df.sparkSession) {
CreateTableAsSelect(
UnresolvedDBObjectName(
catalog.name +: ident.namespace.toSeq :+ ident.name,
isNamespace = false
),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
finalOptions,
ignoreIfExists = createMode == SaveMode.Ignore)
}
case _: TableProvider =>
if (getTable.supports(BATCH_WRITE)) {
throw QueryCompilationErrors.writeWithSaveModeUnsupportedBySourceError(
source, createMode.name())
} else {
// Streaming also uses the data source V2 API. So it may be that the data source
// implements v2, but has no v2 implementation for batch writes. In that case, we
// fallback to saving as though it's a V1 source.
saveToV1Source(path)
}
}
}
} else {
saveToV1Source(path)
}
}
这就是Spark的save算子触发的一系列骚操作。。。
由于SaveMode原生只有这么4种:
package org.apache.spark.sql;
import org.apache.spark.annotation.Stable;
/**
* SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
*
* @since 1.3.0
*/
@Stable
public enum SaveMode {
/**
* Append mode means that when saving a DataFrame to a data source, if data/table already exists,
* contents of the DataFrame are expected to be appended to existing data.
*
* @since 1.3.0
*/
Append,
/**
* Overwrite mode means that when saving a DataFrame to a data source,
* if data/table already exists, existing data is expected to be overwritten by the contents of
* the DataFrame.
*
* @since 1.3.0
*/
Overwrite,
/**
* ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
* an exception is expected to be thrown.
*
* @since 1.3.0
*/
ErrorIfExists,
/**
* Ignore mode means that when saving a DataFrame to a data source, if data already exists,
* the save operation is expected to not save the contents of the DataFrame and to not
* change the existing data.
*
* @since 1.3.0
*/
Ignore
}
链式编程的.mode(SaveMode.Append)
这一步已经修改了保存的模式【正常人都是使用Append
和Overwrite
】,所以走读源码应该注重的就是append或者overwrite的内容。根据注释可知如果结果表不支持truncate
这种骚操作,就会爆一个灰常nice的异常,类似这样:https://lizhiyong.blog.csdn.net/article/details/124575115
但是不要忽视了还有saveToV1Source(path)
这个分支。。。
分支
点val maybeV2Provider = lookupV2Provider()
进去:
private def lookupV2Provider(): Option[TableProvider] = {
DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
// TODO(SPARK-28396): File source v2 write path is currently broken.
case Some(_: FileDataSourceV2) => None
case other => other
}
}
单纯从返回值的TableProvider
泛型就可以看出点眉目。
package org.apache.spark.sql.connector.catalog;
/**
* The base interface for v2 data sources which don't have a real catalog. Implementations must
* have a public, 0-arg constructor.
* <p>
* Note that, TableProvider can only apply data operations to existing tables, like read, append,
* delete, and overwrite. It does not support the operations that require metadata changes, like
* create/drop tables.
* <p>
* The major responsibility of this interface is to return a {@link Table} for read/write.
* </p>
*
* @since 3.0.0
*/
@Evolving
public interface TableProvider {
}
继承关系:
显然其中并没有JDBC。那么执行maybeV2Provider.isDefined
:
反编译后:
public boolean isDefined() {
return !this.isEmpty();
}
当然不可能有内容。所以JDBC方式会get带empty,当然就是走老的saveToV1Source(path)
分支。
不服可以打断点debug一下。。。
打断点找到调用的堆栈
所以下一步会到达这里:
也就是:
private def saveToV1Source(path: Option[String]): Unit = {
partitioningColumns.foreach { columns =>
extraOptions = extraOptions + (
DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
DataSourceUtils.encodePartitioningColumns(columns))
}
val optionsWithPath = getOptionsWithPath(path)
// Code path for data source v1.
runCommand(df.sparkSession) {
DataSource(
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan)
}
}
这年头当然不会没事找事给MySQL或者Oracle表搞partition,所以需要关注的就是执行了命令的这个方法。有用的就是options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan)
:
package org.apache.spark.sql.execution.datasources
case class DataSource(
/**
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
*/
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
providingInstance() match {
case dataSource: CreatableRelationProvider =>
disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = true)
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false)
DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ => throw new IllegalStateException(
s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
)
这玩意儿就是生成把给定的逻辑计划写入数据源
的逻辑计划。。。好像很拗口。。。
既然返回值是逻辑计划类,显然下一步调用的就是:
/**
* Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
qe.assertCommandExecuted()
}
继续跳转:
def assertCommandExecuted(): Unit = commandExecuted
继续跳转:
lazy val commandExecuted: LogicalPlan = mode match {
case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
case CommandExecutionMode.SKIP => analyzed
}
后续的跳转就是具体执行逻辑计划【可能catalyst要做语义分析之类的操作】,没啥必要看下去。好奇的话可以打个断点debug:
Exception in thread "main" com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:829)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:122)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:118)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at com.zhiyong.day20230425upDate.UpDateDemo1$.main(UpDateDemo1.scala:41)
at com.zhiyong.day20230425upDate.UpDateDemo1.main(UpDateDemo1.scala)
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure
报错的堆栈就是调用过程。
从planForWriting方法找数据源
这个方法会返回实际执行的逻辑计划,所以一定要仔细研究。
走JDBC协议一定是操作表而非直接操作文件,所以重点还是CreatableRelationProvider
。
/**
* @since 1.3.0
*/
@Stable
trait CreatableRelationProvider {
/**
* Saves a DataFrame to a destination (using data source-specific parameters)
*
* @param sqlContext SQLContext
* @param mode specifies what happens when the destination already exists
* @param parameters data source-specific parameters
* @param data DataFrame to save (i.e. the rows after executing the query)
* @return Relation with a known schema
*
* @since 1.3.0
*/
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation
}
其实Scala的trait就相当于Java的接口。。。所以要找继承关系:
毫无疑问3选1,就是JdbcRelationProvider
。
package org.apache.spark.sql.execution.datasources.jdbc
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
class JdbcRelationProvider extends CreatableRelationProvider
with RelationProvider with DataSourceRegister {
override def shortName(): String = "jdbc"
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val jdbcOptions = new JDBCOptions(parameters)
val resolver = sqlContext.conf.resolver
val timeZoneId = sqlContext.conf.sessionLocalTimeZone
val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
}
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)
try {
val tableExists = JdbcUtils.tableExists(conn, options)
if (tableExists) {
mode match {
case SaveMode.Overwrite =>
if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
// In this case, we should truncate table and then load.
truncateTable(conn, options)
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)
} else {
// Otherwise, do not truncate the table, instead drop and recreate it
dropTable(conn, options.table, options)
createTable(conn, options.table, df.schema, isCaseSensitive, options)
saveTable(df, Some(df.schema), isCaseSensitive, options)
}
case SaveMode.Append =>
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)
case SaveMode.ErrorIfExists =>
throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
}
} else {
createTable(conn, options.table, df.schema, isCaseSensitive, options)
saveTable(df, Some(df.schema), isCaseSensitive, options)
}
} finally {
conn.close()
}
createRelation(sqlContext, parameters)
}
}
从shortName=“jdbc”,和写入数据时指定的格式一致:
df1.write
.format("jdbc")
显然JdbcRelationProvider
就是要找的数据源。createRelation
就是实际写入数据的方法。
方案
直接去修改源码createRelation
方法当然是可以的,但是这么玩还得重新编译,如果单位租了1w美刀/年/机的CDP,这么玩没有人提供售后服务。。。所以最合适的方案当然还是类似笔者之前的做法:https://lizhiyong.blog.csdn.net/article/details/124575115
能够在不动源码、不重新编译和部署的前提下做二次开发满足需求当然再好不过了。编译和部署搞不好把机器折腾down了又是5w字检讨。。。
那么接下来要做的事情就是重写一个数据源,再想办法让Spark的Catalyst生成逻辑计划时使用该数据源。
具体实现
重写数据源
先找猫画虎写个ZhiyongMysqlUpsertRelationProvider
:
报错:Symbol conf is inaccessible from this place
还只能和Spark该对象的包名一致。。。坑,是真的坑。。。
平台开发人员拥有root
或者hdfs.keytab
是再正常不过的事情了,但是SQL Boy们操作Linux服务器的水平不敢恭维。。。安全起见,只保留追加模式,且有表的前提下才能upsert。其余情况一律抛异常:
package org.apache.spark.sql
import org.apache.spark.sql.ZhiyongMysqlUpsertJdbcUtils.upsertTable
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createTable, dropTable, isCascadingTruncateTable, saveTable, truncateTable}
import org.apache.spark.sql.execution.datasources.jdbc.{JdbcOptionsInWrite, JdbcRelationProvider, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources.BaseRelation
class ZhiyongMysqlUpsertRelationProvider extends JdbcRelationProvider {
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)
try {
val tableExists = JdbcUtils.tableExists(conn, options)
if (tableExists) {
mode match {
case SaveMode.Overwrite =>
if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
// In this case, we should truncate table and then load.
throw new RuntimeException("【CSDN@虎鲸不是鱼】:只允许使用Append模式")
// truncateTable(conn, options)
// val tableSchema = JdbcUtils.getSchemaOption(conn, options)
// saveTable(df, tableSchema, isCaseSensitive, options)
} else {
// Otherwise, do not truncate the table, instead drop and recreate it
throw new RuntimeException("【CSDN@虎鲸不是鱼】:只允许使用Append模式")
// dropTable(conn, options.table, options)
// createTable(conn, options.table, df.schema, isCaseSensitive, options)
// saveTable(df, Some(df.schema), isCaseSensitive, options)
}
case SaveMode.Append =>
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
// saveTable(df, tableSchema, isCaseSensitive, options)
upsertTable(df, tableSchema, isCaseSensitive, options)
// TODO: 需要写upsert的具体方法
case SaveMode.ErrorIfExists =>
throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)
case SaveMode.Ignore =>
println("【CSDN@虎鲸不是鱼】:Ignore模式无操作")
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
}
} else {
throw new RuntimeException("【CSDN@虎鲸不是鱼】:结果表不存在,请租户自行确认表是否正确或手动建表")
// createTable(conn, options.table, df.schema, isCaseSensitive, options)
// saveTable(df, Some(df.schema), isCaseSensitive, options)
}
} finally {
conn.close()
}
createRelation(sqlContext, parameters)
}
}
先这么写。
重写JDBC工具类
由于saveTable
这种方法是调用了工具类:
package org.apache.spark.sql.execution.datasources.jdbc
/**
* Util functions for JDBC tables.
*/
object JdbcUtils extends Logging with SQLConfHelper {
/**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
options: JdbcOptionsInWrite): Unit = {
val url = options.url
val table = options.table
val dialect = JdbcDialects.get(url)
val rddSchema = df.schema
val batchSize = options.batchSize
val isolationLevel = options.isolationLevel
val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
val repartitionedDF = options.numPartitions match {
case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
n, JDBCOptions.JDBC_NUM_PARTITIONS)
case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
case _ => df
}
repartitionedDF.rdd.foreachPartition { iterator => savePartition(
table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options)
}
/**
* Returns an Insert SQL statement for inserting a row into the target table via JDBC conn.
*/
def getInsertStatement(
table: String,
rddSchema: StructType,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
val columns = if (tableSchema.isEmpty) {
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
} else {
// The generated insert statement needs to follow rddSchema's column sequence and
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
// RDD column names for user convenience.
val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
}
dialect.quoteIdentifier(normalizedName)
}.mkString(",")
}
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}
}
}
该工具类内部还调用了别的方法。
所以还需要先临摹一个类似的工具类以及一个类似的获取状态的方法。
通过options
对象的JdbcOptionsInWrite
可以看到:
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager}
import java.util.{Locale, Properties}
import org.apache.commons.io.FilenameUtils
import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* Options for the JDBC data source.
*/
class JDBCOptions(
val parameters: CaseInsensitiveMap[String])
extends Serializable with Logging {
import JDBCOptions._
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
def this(url: String, table: String, parameters: Map[String, String]) = {
this(CaseInsensitiveMap(parameters ++ Map(
JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table)))
}
//此处省略一大坨内容
class JdbcOptionsInWrite(
override val parameters: CaseInsensitiveMap[String])
extends JDBCOptions(parameters) {
import JDBCOptions._
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
def this(url: String, table: String, parameters: Map[String, String]) = {
this(CaseInsensitiveMap(parameters ++ Map(
JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table)))
}
require(
parameters.get(JDBC_TABLE_NAME).isDefined,
s"Option '$JDBC_TABLE_NAME' is required. " +
s"Option '$JDBC_QUERY_STRING' is not applicable while writing.")
val table = parameters(JDBC_TABLE_NAME)
}
object JDBCOptions {
private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
private val jdbcOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
jdbcOptionNames += name.toLowerCase(Locale.ROOT)
name
}
val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
val JDBC_QUERY_STRING = newOption("query")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")
val JDBC_UPPER_BOUND = newOption("upperBound")
val JDBC_NUM_PARTITIONS = newOption("numPartitions")
val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
val JDBC_TRUNCATE = newOption("truncate")
val JDBC_CASCADE_TRUNCATE = newOption("cascadeTruncate")
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
val JDBC_PUSHDOWN_AGGREGATE = newOption("pushDownAggregate")
val JDBC_PUSHDOWN_LIMIT = newOption("pushDownLimit")
val JDBC_PUSHDOWN_TABLESAMPLE = newOption("pushDownTableSample")
val JDBC_KEYTAB = newOption("keytab")
val JDBC_PRINCIPAL = newOption("principal")
val JDBC_TABLE_COMMENT = newOption("tableComment")
val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
val JDBC_CONNECTION_PROVIDER = newOption("connectionProvider")
}
可以发现这里有很多平时写jdbc时的option。
工具类暂时先这么写出来:
package org.apache.spark.sql
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{getInsertStatement, savePartition}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.StructType
object ZhiyongMysqlUpsertJdbcUtils {
//需要重写方法
def getUpsertStatement(table: String,
rddSchema: StructType,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
println("【CSDN@虎鲸不是鱼】:开始获取UpSert状态")
s"""
|
|""".stripMargin
}
def upsertTable(
df: DataFrame,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
options: JdbcOptionsInWrite): Unit = {
val url = options.url
val table = options.table
val dialect = JdbcDialects.get(url)
val rddSchema: StructType = df.schema
val batchSize: Int = options.batchSize
val isolationLevel = options.isolationLevel
println("dialect=" + dialect)
println("rddSchema=" + rddSchema)
// val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
// 这里需要改
val upsertStmt = getUpsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
val repartitionedDF = options.numPartitions match {
case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
n, JDBCOptions.JDBC_NUM_PARTITIONS)
case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
case _ => df
}
repartitionedDF.rdd.foreachPartition { iterator =>
savePartition(
table, iterator, rddSchema, upsertStmt, batchSize, dialect, isolationLevel, options)
}
}
}
获取upsert状态的方法稍后写。
重写隐式类
参照:https://docs.scala-lang.org/zh-cn/overviews/core/implicit-classes.html
Scala 2.10引入了一种叫做隐式类的新特性。隐式类指的是用implicit关键字修饰的类。在对应的作用域内,带有这个关键字的类的主构造函数可用于隐式转换。
隐式类型是在SIP-13中提出的。参照着可以这么写:
package org.apache.spark.sql
object ZhiyongMysqlDataFrameWriter {
implicit class ZhiyongMysqlUpsertDataFrameWriter(writer: DataFrameWriter[Row]){
def upsert():Unit={
println("反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法")
}
}
}
将启动类写数据部分简单修改和运行:
df1.write
.format("jdbc")
.mode(SaveMode.Append)
.options(
Map(
"url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
"dbtable" -> "test_origin_20001128",
"user" -> "root",
"password" -> "123456",
"driver" -> "com.mysql.cj.jdbc.Driver"
)
)
//.save()
.upsert //Scala可以反射
根据报错的log:
反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error, unexpected empty statement(line 1, pos 0)
== SQL ==
^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:304)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89)
at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at com.zhiyong.day20230425upDate.UpDateDemo1$.main(UpDateDemo1.scala:48)
at com.zhiyong.day20230425upDate.UpDateDemo1.main(UpDateDemo1.scala)
说明可以反射调用成功。但是Java就没办法这样子使用了,只能改源码和重新编译、部署。。。同样是JVM语言,不得不佩服Martin Odersky的Scala,太强大了。。。但是Scala的学习难度也不小,野生程序猿如果胆敢不去系统学习,就会变成笔者这样的学徒工。。。
接下来就是补充upsert方法以及上一步的getUpsertStatement方法。
补充getUpsertStatement方法
参照:
/**
* Returns an Insert SQL statement for inserting a row into the target table via JDBC conn.
*/
def getInsertStatement(
table: String,
rddSchema: StructType,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
val columns = if (tableSchema.isEmpty) {
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
} else {
// The generated insert statement needs to follow rddSchema's column sequence and
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
// RDD column names for user convenience.
val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
}
dialect.quoteIdentifier(normalizedName)
}.mkString(",")
}
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}
显然这玩意儿就是拼接了一个类似:
insert into db_name.tb_name (col1,col2,col3) values ("value1","value2","value3")
的玩意儿。。。之后走遍历分区、按1000条一个批次去灌数据。
所以接下来要做的事情就是拼接出一个类似:
insert into db_name.tb_name (pk1,col1,col2,col3) values ("pk1_value","value1","value2","value3") on duplicate key update col1="value1",col2=value2,col3=value3
的玩意儿:
def getUpsertStatement(table: String,
rddSchema: StructType,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
println("【CSDN@虎鲸不是鱼】:开始获取UpSert状态")
val columns = if (tableSchema.isEmpty) {
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
} else {
// The generated insert statement needs to follow rddSchema's column sequence and
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
// RDD column names for user convenience.
val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
}
dialect.quoteIdentifier(normalizedName)
}.mkString(",")
}
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
s"""
|INSERT INTO $table ($columns) VALUES ($placeholders)
|ON DUPLICATE KEY UPDATE
|${columns.split(",").map(col=>s"$col=VALUES($col)").mkString(",")}
|""".stripMargin
}
使用Scala还是要比Java简洁不少。
补充upsert方法
由于
这些玩意儿都是private的,不想修改源码重新编译和部署就得用反射的方式操作它们:
def upsert():Unit={
println("反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法")
writer
val extraOptionsField: Field = writer.getClass.getDeclaredField("extraOptions")
val dfField = writer.getClass.getDeclaredField("df")
val sourceField = writer.getClass.getDeclaredField("source")
val partitioningColumnsField = writer.getClass.getDeclaredField("partitioningColumns")
extraOptionsField.setAccessible(true) //关闭安全检查就可以提升反射速度
dfField.setAccessible(true) //关闭安全检查就可以提升反射速度
sourceField.setAccessible(true) //关闭安全检查就可以提升反射速度
partitioningColumnsField.setAccessible(true) //关闭安全检查就可以提升反射速度
val extraOptions = extraOptionsField.get(writer).asInstanceOf[CaseInsensitiveMap[String]]
val df: DataFrame = dfField.get(writer).asInstanceOf[DataFrame]
val partitioningColumns = partitioningColumnsField.get(writer).asInstanceOf[Option[Seq[String]]]
val logicalPlanField = df.getClass.getDeclaredField("logicalPlan")
logicalPlanField.setAccessible(true) //关闭安全检查就可以提升反射速度
var logicalPlan = logicalPlanField.get(df).asInstanceOf[LogicalPlan]
val session = df.sparkSession
logicalPlan =
DataSource(
sparkSession = session,
className = "org.apache.spark.sql.ZhiyongMysqlUpsertRelationProvider",
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap).planForWriting(SaveMode.Append, logicalPlan)
val qe: QueryExecution = session.sessionState.executePlan(logicalPlan)
SQLExecution.withNewExecutionId(qe)(qe.toRdd)
}
此时基本完工。
验证
准备MySQL表和数据
[root@zhiyong1 ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.30 MySQL Community Server (GPL)
mysql> use db_lzy;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
然后建表:
create table if not exists test_upsert_20230429_res(
id int comment '主键',
col1 varchar(2000) comment '字段1',
col2 varchar(2000) comment '字段2',
col3 varchar(2000) comment '字段3',
primary key (id)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
;
create table if not exists test_upsert_20230429_src(
id int comment '主键',
col1 varchar(2000) comment '字段1',
col2 varchar(2000) comment '字段2',
col3 varchar(2000) comment '字段3',
primary key (id)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
;
然后准备数据:
insert into test_upsert_20230429_src values
(1,'a1','b1','c1'),
(2,'a2','b2','c2'),
(3,'a3','b3','c3'),
(5,'a55','b555','c5555'),
(6,'a66','b666','c6666')
;
查数据:
mysql> select * from test_upsert_20230429_src;
+----+------+------+-------+
| id | col1 | col2 | col3 |
+----+------+------+-------+
| 1 | a1 | b1 | c1 |
| 2 | a2 | b2 | c2 |
| 3 | a3 | b3 | c3 |
| 5 | a55 | b555 | c5555 |
| 6 | a66 | b666 | c6666 |
+----+------+------+-------+
5 rows in set (0.00 sec)
这个表就是原始数据。
初始化数据
truncate table test_upsert_20230429_res;
insert into test_upsert_20230429_res
select * from test_upsert_20230429_src;
查数据:
mysql> select * from test_upsert_20230429_res;
+----+------+------+-------+
| id | col1 | col2 | col3 |
+----+------+------+-------+
| 1 | a1 | b1 | c1 |
| 2 | a2 | b2 | c2 |
| 3 | a3 | b3 | c3 |
| 5 | a55 | b555 | c5555 |
| 6 | a66 | b666 | c6666 |
+----+------+------+-------+
5 rows in set (0.00 sec)
准备DataFrame
val df2: DataFrame = sc.sparkContext.parallelize(
Seq(
(1, "a1", "b1", "c1"),
(2, "a2", "b2", "c2"),
(3, "a3", "b3", "c3"),
(4, "a4", "b4", "c4"),
(5, "a5", "b5", "c5"),
(6, "a6", "b6", "c6"),
(7, "a7", "b7", "c7"),
(8, "a8", "b8", "c8"),
(9, "a9", "b9", "c9"),
(5, "a10", "b10", "c10")
)
).toDF("id", "col1", "col2", "col3")
df2.show()
结果:
+---+----+----+----+
| id|col1|col2|col3|
+---+----+----+----+
| 1| a1| b1| c1|
| 2| a2| b2| c2|
| 3| a3| b3| c3|
| 4| a4| b4| c4|
| 5| a5| b5| c5|
| 6| a6| b6| c6|
| 7| a7| b7| c7|
| 8| a8| b8| c8|
| 9| a9| b9| c9|
| 5| a10| b10| c10|
+---+----+----+----+
执行upsert
df2.write
.format("jdbc")
.mode(SaveMode.Append)
.options(
Map(
"url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
"dbtable" -> "test_upsert_20230429_res",
"user" -> "root",
"password" -> "123456",
"driver" -> "com.mysql.cj.jdbc.Driver"
)
)
.upsert()
结果:
反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法
dialect=MySQLDialect
rddSchema=StructType(StructField(id,IntegerType,false),StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true))
【CSDN@虎鲸不是鱼】:开始获取UpSert状态
23/04/29 17:10:21 INFO CodeGenerator: Code generated in 20.5899 ms
23/04/29 17:10:21 INFO SparkContext: Starting job: upsert at UpDateDemo1.scala:57
23/04/29 17:10:21 INFO DAGScheduler: Got job 4 (upsert at UpDateDemo1.scala:57) with 8 output partitions
23/04/29 17:10:21 INFO DAGScheduler: Final stage: ResultStage 4 (upsert at UpDateDemo1.scala:57)
23/04/29 17:10:21 INFO DAGScheduler: Parents of final stage: List()
23/04/29 17:10:21 INFO DAGScheduler: Missing parents: List()
23/04/29 17:10:21 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[12] at upsert at UpDateDemo1.scala:57), which has no missing parents
23/04/29 17:10:21 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 32.2 KiB, free 15.8 GiB)
23/04/29 17:10:21 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 13.3 KiB, free 15.8 GiB)
23/04/29 17:10:21 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on DESKTOP-VRV0NDO:54939 (size: 13.3 KiB, free: 15.8 GiB)
23/04/29 17:10:21 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1513
23/04/29 17:10:21 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 4 (MapPartitionsRDD[12] at upsert at UpDateDemo1.scala:57) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
23/04/29 17:10:21 INFO TaskSchedulerImpl: Adding task set 4.0 with 8 tasks resource profile 0
23/04/29 17:10:21 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 9) (DESKTOP-VRV0NDO, executor driver, partition 0, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 10) (DESKTOP-VRV0NDO, executor driver, partition 1, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID 11) (DESKTOP-VRV0NDO, executor driver, partition 2, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 3.0 in stage 4.0 (TID 12) (DESKTOP-VRV0NDO, executor driver, partition 3, PROCESS_LOCAL, 4565 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 4.0 in stage 4.0 (TID 13) (DESKTOP-VRV0NDO, executor driver, partition 4, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 5.0 in stage 4.0 (TID 14) (DESKTOP-VRV0NDO, executor driver, partition 5, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 6.0 in stage 4.0 (TID 15) (DESKTOP-VRV0NDO, executor driver, partition 6, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 7.0 in stage 4.0 (TID 16) (DESKTOP-VRV0NDO, executor driver, partition 7, PROCESS_LOCAL, 4573 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO Executor: Running task 0.0 in stage 4.0 (TID 9)
23/04/29 17:10:21 INFO Executor: Running task 1.0 in stage 4.0 (TID 10)
23/04/29 17:10:21 INFO Executor: Running task 3.0 in stage 4.0 (TID 12)
23/04/29 17:10:21 INFO Executor: Running task 2.0 in stage 4.0 (TID 11)
23/04/29 17:10:21 INFO Executor: Running task 4.0 in stage 4.0 (TID 13)
23/04/29 17:10:21 INFO Executor: Running task 5.0 in stage 4.0 (TID 14)
23/04/29 17:10:21 INFO Executor: Running task 6.0 in stage 4.0 (TID 15)
23/04/29 17:10:21 INFO Executor: Running task 7.0 in stage 4.0 (TID 16)
23/04/29 17:10:21 INFO CodeGenerator: Code generated in 12.8791 ms
23/04/29 17:10:21 INFO Executor: Finished task 0.0 in stage 4.0 (TID 9). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 1.0 in stage 4.0 (TID 10). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 3.0 in stage 4.0 (TID 12). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 4.0 in stage 4.0 (TID 13). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 2.0 in stage 4.0 (TID 11). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 5.0 in stage 4.0 (TID 14). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 6.0 in stage 4.0 (TID 15). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 7.0 in stage 4.0 (TID 16). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 12) in 208 ms on DESKTOP-VRV0NDO (executor driver) (1/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 11) in 208 ms on DESKTOP-VRV0NDO (executor driver) (2/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 5.0 in stage 4.0 (TID 14) in 207 ms on DESKTOP-VRV0NDO (executor driver) (3/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 6.0 in stage 4.0 (TID 15) in 207 ms on DESKTOP-VRV0NDO (executor driver) (4/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 4.0 in stage 4.0 (TID 13) in 208 ms on DESKTOP-VRV0NDO (executor driver) (5/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 10) in 211 ms on DESKTOP-VRV0NDO (executor driver) (6/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 9) in 212 ms on DESKTOP-VRV0NDO (executor driver) (7/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 7.0 in stage 4.0 (TID 16) in 206 ms on DESKTOP-VRV0NDO (executor driver) (8/8)
23/04/29 17:10:21 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
23/04/29 17:10:21 INFO DAGScheduler: ResultStage 4 (upsert at UpDateDemo1.scala:57) finished in 0.269 s
23/04/29 17:10:21 INFO DAGScheduler: Job 4 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/29 17:10:21 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
23/04/29 17:10:21 INFO DAGScheduler: Job 4 finished: upsert at UpDateDemo1.scala:57, took 0.277371 s
23/04/29 17:10:21 INFO SparkContext: Invoking stop() from shutdown hook
23/04/29 17:10:22 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-VRV0NDO:4040
23/04/29 17:10:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/04/29 17:10:22 INFO MemoryStore: MemoryStore cleared
23/04/29 17:10:22 INFO BlockManager: BlockManager stopped
23/04/29 17:10:22 INFO BlockManagerMaster: BlockManagerMaster stopped
23/04/29 17:10:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/04/29 17:10:22 INFO SparkContext: Successfully stopped SparkContext
23/04/29 17:10:22 INFO ShutdownHookManager: Shutdown hook called
23/04/29 17:10:22 INFO ShutdownHookManager: Deleting directory C:\Users\zhiyong\AppData\Local\Temp\spark-2ae1f427-ec0e-4e23-9b59-31b796d980d2
Process finished with exit code 0
貌似成功吊起了重写的类和方法。
查看结果
mysql> select * from test_upsert_20230429_res;
+----+------+------+------+
| id | col1 | col2 | col3 |
+----+------+------+------+
| 1 | a1 | b1 | c1 |
| 2 | a2 | b2 | c2 |
| 3 | a3 | b3 | c3 |
| 4 | a4 | b4 | c4 |
| 5 | a5 | b5 | c5 |
| 6 | a6 | b6 | c6 |
| 7 | a7 | b7 | c7 |
| 8 | a8 | b8 | c8 |
| 9 | a9 | b9 | c9 |
+----+------+------+------+
9 rows in set (0.00 sec)
再次验证
显然upsert成功。但是重复数据保留的是第一条。按照预期应该是保留最后的几条数据,所以再次运行和查数:
mysql> select * from test_upsert_20230429_res;
+----+------+------+------+
| id | col1 | col2 | col3 |
+----+------+------+------+
| 1 | a1 | b1 | c1 |
| 2 | a2 | b2 | c2 |
| 3 | a3 | b3 | c3 |
| 4 | a4 | b4 | c4 |
| 5 | a10 | b10 | c10 |
| 6 | a6 | b6 | c6 |
| 7 | a7 | b7 | c7 |
| 8 | a8 | b8 | c8 |
| 9 | a9 | b9 | c9 |
+----+------+------+------+
9 rows in set (0.00 sec)
显然是分布式运算时顺序错乱导致的。貌似Spark中一个partition最后到达的数据具有随机性,不像DataStage单线程的那种模式遵循先来后到。
这样就实现了对MySQL的upsert操作。对Oracle等RDBMS也可以参照着搞。
尾言
古人用DataStage实现的效果,一定是可以写代码实现。对Spark做二次开发有时候需要用Scala,要求不低。
如果是用Flink流式处理:https://lizhiyong.blog.csdn.net/article/details/124161096
由于可以Java写Sink,不管是先select检测有无数据再分情况做insert和update还是直接拼upsert,难度都要低不少。
并且这么做不需要像纯HQL那样写很长的SQL,各种join,太长了解析AST难免报错,还得手动拆分SQL或者中间落盘。而且Tez的性能并不如Spark。。。这些常见的情况,都应该平台化以减少重复的工作量才能降本增效。。。
SQL Boy写SQL是因为他们只会写SQL,没有任何选择的余地。。。而真·大数据开发攻城狮
写SQL单纯是因为领导阶级规定了只允许写SQL,而且写SQL比Java/Scala下班早。。。必要的时候有多种方式实现。
愿天下不再有肤浅的SQL Boy。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130442316