【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

news2025/1/11 15:51:59

使用Scala二次开发Spark实现对MySQL的upsert操作

背景

在我们的数仓升级项目中,遇到了这样的场景:古人开发的任务是使用DataStage运算后,按照主键【或者多个字段拼接的唯一键】来做insert then update,顾名思义,也就是无则插入,有则后一条数据会覆盖前一条。这其实类似于MySQL的upsert,当然Oracle也有类似的Merge操作,这部分主要是数据库开发攻城狮关注的。

古人这么做,目的不外乎2个:重跑后数据不变【幂等性】、不会有主键冲突问题【根据主键或者拼接的唯一键去重】

之前有写过一篇拉链表翻写HQL任务的案例:https://lizhiyong.blog.csdn.net/article/details/129679071

这种场景纯HQL可以写出来,也就是最终的数据=保留不变的历史数据+之前没有记录所以要插入的数据+之前有记录所以直接更新的数据,比拉链表还少了将之前有记录的历史数据置为无效的那部分数据,所以我们习惯称之为“假拉链”。

当字段个数多、上游来源表多、上游运算复杂的情况,HQL写出来其实并不短,这么一大坨做完union all再按照2PC的方式先落盘到tmp的中间表,再回灌到结果表,然后跑数据集成任务再推送给Oracle。。。这一系列骚操作,智商要求不高但是工作量可一点都不少。纯HQL的方式能跑,但2023年了还用这种方式模拟DataStage2015年的功能,不算明智。

离线跑批首选的Spark原生却不支持这么玩,但是想想办法总还是可以实现这种功能。。。

原理分析

Spark的save

参照这一篇:https://lizhiyong.blog.csdn.net/article/details/128090026

可以抽象出最简易的Spark写数据操作:

package com.zhiyong.day20230425upDate


import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.util

object UpDateDemo1 {
  def main(args: Array[String]): Unit = {
    val sc: SparkSession = SparkSession
      .builder
      .appName("sparkUpdateZhiyong")
      .master("local[8]").
      enableHiveSupport
      .getOrCreate

    import org.apache.spark.sql._
    import sc.implicits._

    var df1: DataFrame = sc.range(1000).toDF("id")
    df1.show()

    df1 = df1.withColumn("comment",functions.lit("CSDN@虎鲸不是鱼"))

    df1 = df1.withColumn("comment1",functions.concat(df1.col("id"),df1.col("comment")))
      .drop("comment")

    df1.write
      .format("jdbc")
      .mode(SaveMode.Append)
      .options(
        Map(
          "url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
          "dbtable" -> "test_origin_20001128",
          "user" -> "root",
          "password" -> "123456",
          "driver" -> "com.mysql.cj.jdbc.Driver"
        )
      )
      .save()

  }

}

从这个最常见的save算子入手。点进去可以发现:

package org.apache.spark.sql

/**
 * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use `Dataset.write` to access this.
 *
 * @since 1.4.0
 */
@Stable
final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
  /**
   * Saves the content of the `DataFrame` as the specified table.
   *
   * @since 1.4.0
   */
  def save(): Unit = saveInternal(None)
    
  private val df = ds.toDF()
}

还是这个类里:

  /**
   * Specifies the behavior when data or table already exists. Options include:
   * <ul>
   * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
   * <li>`SaveMode.Append`: append the data.</li>
   * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
   * </ul>
   * <p>
   * The default option is `ErrorIfExists`.
   *
   * @since 1.4.0
   */
  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
    this.mode = saveMode
    this
  }

private def saveInternal(path: Option[String]): Unit = {
  if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
    throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("write")
  }

  assertNotBucketed("save")

  val maybeV2Provider = lookupV2Provider()
  if (maybeV2Provider.isDefined) {
    val provider = maybeV2Provider.get
    val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
      provider, df.sparkSession.sessionState.conf)

    val optionsWithPath = getOptionsWithPath(path)

    val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
      optionsWithPath.originalMap
    val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)

    def getTable: Table = {
      // If the source accepts external table metadata, here we pass the schema of input query
      // and the user-specified partitioning to `getTable`. This is for avoiding
      // schema/partitioning inference, which can be very expensive.
      // If the query schema is not compatible with the existing data, the behavior is undefined.
      // For example, writing file source will success but the following reads will fail.
      if (provider.supportsExternalMetadata()) {
        provider.getTable(
          df.schema.asNullable,
          partitioningAsV2.toArray,
          dsOptions.asCaseSensitiveMap())
      } else {
        DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)
      }
    }

    import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
    val catalogManager = df.sparkSession.sessionState.catalogManager
    mode match {
      case SaveMode.Append | SaveMode.Overwrite =>
        val (table, catalog, ident) = provider match {
          case supportsExtract: SupportsCatalogOptions =>
            val ident = supportsExtract.extractIdentifier(dsOptions)
            val catalog = CatalogV2Util.getTableProviderCatalog(
              supportsExtract, catalogManager, dsOptions)

            (catalog.loadTable(ident), Some(catalog), Some(ident))
          case _: TableProvider =>
            val t = getTable
            if (t.supports(BATCH_WRITE)) {
              (t, None, None)
            } else {
              // Streaming also uses the data source V2 API. So it may be that the data source
              // implements v2, but has no v2 implementation for batch writes. In that case, we
              // fall back to saving as though it's a V1 source.
              return saveToV1Source(path)
            }
        }

        val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
        checkPartitioningMatchesV2Table(table)
        if (mode == SaveMode.Append) {
          runCommand(df.sparkSession) {
            AppendData.byName(relation, df.logicalPlan, finalOptions)
          }
        } else {
          // Truncate the table. TableCapabilityCheck will throw a nice exception if this
          // isn't supported
          runCommand(df.sparkSession) {
            OverwriteByExpression.byName(
              relation, df.logicalPlan, Literal(true), finalOptions)
          }
        }

      case createMode =>
        provider match {
          case supportsExtract: SupportsCatalogOptions =>
            val ident = supportsExtract.extractIdentifier(dsOptions)
            val catalog = CatalogV2Util.getTableProviderCatalog(
              supportsExtract, catalogManager, dsOptions)

            val tableSpec = TableSpec(
              properties = Map.empty,
              provider = Some(source),
              options = Map.empty,
              location = extraOptions.get("path"),
              comment = extraOptions.get(TableCatalog.PROP_COMMENT),
              serde = None,
              external = false)
            runCommand(df.sparkSession) {
              CreateTableAsSelect(
                UnresolvedDBObjectName(
                  catalog.name +: ident.namespace.toSeq :+ ident.name,
                  isNamespace = false
                ),
                partitioningAsV2,
                df.queryExecution.analyzed,
                tableSpec,
                finalOptions,
                ignoreIfExists = createMode == SaveMode.Ignore)
            }
          case _: TableProvider =>
            if (getTable.supports(BATCH_WRITE)) {
              throw QueryCompilationErrors.writeWithSaveModeUnsupportedBySourceError(
                source, createMode.name())
            } else {
              // Streaming also uses the data source V2 API. So it may be that the data source
              // implements v2, but has no v2 implementation for batch writes. In that case, we
              // fallback to saving as though it's a V1 source.
              saveToV1Source(path)
            }
        }
    }

  } else {
    saveToV1Source(path)
  }
}

这就是Spark的save算子触发的一系列骚操作。。。

由于SaveMode原生只有这么4种:

package org.apache.spark.sql;

import org.apache.spark.annotation.Stable;

/**
 * SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
 *
 * @since 1.3.0
 */
@Stable
public enum SaveMode {
  /**
   * Append mode means that when saving a DataFrame to a data source, if data/table already exists,
   * contents of the DataFrame are expected to be appended to existing data.
   *
   * @since 1.3.0
   */
  Append,
  /**
   * Overwrite mode means that when saving a DataFrame to a data source,
   * if data/table already exists, existing data is expected to be overwritten by the contents of
   * the DataFrame.
   *
   * @since 1.3.0
   */
  Overwrite,
  /**
   * ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
   * an exception is expected to be thrown.
   *
   * @since 1.3.0
   */
  ErrorIfExists,
  /**
   * Ignore mode means that when saving a DataFrame to a data source, if data already exists,
   * the save operation is expected to not save the contents of the DataFrame and to not
   * change the existing data.
   *
   * @since 1.3.0
   */
  Ignore
}

链式编程的.mode(SaveMode.Append)这一步已经修改了保存的模式【正常人都是使用AppendOverwrite】,所以走读源码应该注重的就是append或者overwrite的内容。根据注释可知如果结果表不支持truncate这种骚操作,就会爆一个灰常nice的异常,类似这样:https://lizhiyong.blog.csdn.net/article/details/124575115

但是不要忽视了还有saveToV1Source(path)这个分支。。。

分支

val maybeV2Provider = lookupV2Provider()进去:

private def lookupV2Provider(): Option[TableProvider] = {
  DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
    // TODO(SPARK-28396): File source v2 write path is currently broken.
    case Some(_: FileDataSourceV2) => None
    case other => other
  }
}

单纯从返回值的TableProvider泛型就可以看出点眉目。

package org.apache.spark.sql.connector.catalog;
/**
 * The base interface for v2 data sources which don't have a real catalog. Implementations must
 * have a public, 0-arg constructor.
 * <p>
 * Note that, TableProvider can only apply data operations to existing tables, like read, append,
 * delete, and overwrite. It does not support the operations that require metadata changes, like
 * create/drop tables.
 * <p>
 * The major responsibility of this interface is to return a {@link Table} for read/write.
 * </p>
 *
 * @since 3.0.0
 */
@Evolving
public interface TableProvider {
}

继承关系:

在这里插入图片描述

显然其中并没有JDBC。那么执行maybeV2Provider.isDefined

在这里插入图片描述

反编译后:

public boolean isDefined() {
   return !this.isEmpty();
}

当然不可能有内容。所以JDBC方式会get带empty,当然就是走老的saveToV1Source(path)分支。

不服可以打断点debug一下。。。

打断点找到调用的堆栈

在这里插入图片描述

所以下一步会到达这里:

在这里插入图片描述

也就是:

private def saveToV1Source(path: Option[String]): Unit = {
  partitioningColumns.foreach { columns =>
    extraOptions = extraOptions + (
      DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
      DataSourceUtils.encodePartitioningColumns(columns))
  }

  val optionsWithPath = getOptionsWithPath(path)

  // Code path for data source v1.
  runCommand(df.sparkSession) {
    DataSource(
      sparkSession = df.sparkSession,
      className = source,
      partitionColumns = partitioningColumns.getOrElse(Nil),
      options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan)
  }
}

这年头当然不会没事找事给MySQL或者Oracle表搞partition,所以需要关注的就是执行了命令的这个方法。有用的就是options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan)

package org.apache.spark.sql.execution.datasources

case class DataSource(

/**
 * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
 */
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
  providingInstance() match {
    case dataSource: CreatableRelationProvider =>
      disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = true)
      SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
    case format: FileFormat =>
      disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false)
      DataSource.validateSchema(data.schema)
      planForWritingFileFormat(format, mode, data)
    case _ => throw new IllegalStateException(
      s"${providingClass.getCanonicalName} does not allow create table as select.")
  }
}
    
)

这玩意儿就是生成把给定的逻辑计划写入数据源的逻辑计划。。。好像很拗口。。。

既然返回值是逻辑计划类,显然下一步调用的就是:

/**
 * Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the
 * user-registered callback functions.
 */
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
  val qe = session.sessionState.executePlan(command)
  qe.assertCommandExecuted()
}

继续跳转:

def assertCommandExecuted(): Unit = commandExecuted

继续跳转:

lazy val commandExecuted: LogicalPlan = mode match {
  case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
  case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
  case CommandExecutionMode.SKIP => analyzed
}

后续的跳转就是具体执行逻辑计划【可能catalyst要做语义分析之类的操作】,没啥必要看下去。好奇的话可以打个断点debug:

Exception in thread "main" com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
	at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:829)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:122)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:118)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at com.zhiyong.day20230425upDate.UpDateDemo1$.main(UpDateDemo1.scala:41)
	at com.zhiyong.day20230425upDate.UpDateDemo1.main(UpDateDemo1.scala)
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

报错的堆栈就是调用过程。

从planForWriting方法找数据源

这个方法会返回实际执行的逻辑计划,所以一定要仔细研究。

走JDBC协议一定是操作表而非直接操作文件,所以重点还是CreatableRelationProvider

/**
 * @since 1.3.0
 */
@Stable
trait CreatableRelationProvider {
  /**
   * Saves a DataFrame to a destination (using data source-specific parameters)
   *
   * @param sqlContext SQLContext
   * @param mode specifies what happens when the destination already exists
   * @param parameters data source-specific parameters
   * @param data DataFrame to save (i.e. the rows after executing the query)
   * @return Relation with a known schema
   *
   * @since 1.3.0
   */
  def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      data: DataFrame): BaseRelation
}

其实Scala的trait就相当于Java的接口。。。所以要找继承关系:

在这里插入图片描述

毫无疑问3选1,就是JdbcRelationProvider

package org.apache.spark.sql.execution.datasources.jdbc

import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}

class JdbcRelationProvider extends CreatableRelationProvider
  with RelationProvider with DataSourceRegister {

  override def shortName(): String = "jdbc"

  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val jdbcOptions = new JDBCOptions(parameters)
    val resolver = sqlContext.conf.resolver
    val timeZoneId = sqlContext.conf.sessionLocalTimeZone
    val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
    val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
    JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
  }

  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      df: DataFrame): BaseRelation = {
    val options = new JdbcOptionsInWrite(parameters)
    val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
    val dialect = JdbcDialects.get(options.url)
    val conn = dialect.createConnectionFactory(options)(-1)
    try {
      val tableExists = JdbcUtils.tableExists(conn, options)
      if (tableExists) {
        mode match {
          case SaveMode.Overwrite =>
            if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
              // In this case, we should truncate table and then load.
              truncateTable(conn, options)
              val tableSchema = JdbcUtils.getSchemaOption(conn, options)
              saveTable(df, tableSchema, isCaseSensitive, options)
            } else {
              // Otherwise, do not truncate the table, instead drop and recreate it
              dropTable(conn, options.table, options)
              createTable(conn, options.table, df.schema, isCaseSensitive, options)
              saveTable(df, Some(df.schema), isCaseSensitive, options)
            }

          case SaveMode.Append =>
            val tableSchema = JdbcUtils.getSchemaOption(conn, options)
            saveTable(df, tableSchema, isCaseSensitive, options)

          case SaveMode.ErrorIfExists =>
            throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)

          case SaveMode.Ignore =>
            // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
            // to not save the contents of the DataFrame and to not change the existing data.
            // Therefore, it is okay to do nothing here and then just return the relation below.
        }
      } else {
        createTable(conn, options.table, df.schema, isCaseSensitive, options)
        saveTable(df, Some(df.schema), isCaseSensitive, options)
      }
    } finally {
      conn.close()
    }

    createRelation(sqlContext, parameters)
  }
}

从shortName=“jdbc”,和写入数据时指定的格式一致:

df1.write
  .format("jdbc")

显然JdbcRelationProvider就是要找的数据源。createRelation就是实际写入数据的方法。

方案

直接去修改源码createRelation方法当然是可以的,但是这么玩还得重新编译,如果单位租了1w美刀/年/机的CDP,这么玩没有人提供售后服务。。。所以最合适的方案当然还是类似笔者之前的做法:https://lizhiyong.blog.csdn.net/article/details/124575115

能够在不动源码、不重新编译和部署的前提下做二次开发满足需求当然再好不过了。编译和部署搞不好把机器折腾down了又是5w字检讨。。。

那么接下来要做的事情就是重写一个数据源,再想办法让Spark的Catalyst生成逻辑计划时使用该数据源。

具体实现

重写数据源

先找猫画虎写个ZhiyongMysqlUpsertRelationProvider

在这里插入图片描述

报错:Symbol conf is inaccessible from this place

还只能和Spark该对象的包名一致。。。坑,是真的坑。。。

平台开发人员拥有root或者hdfs.keytab是再正常不过的事情了,但是SQL Boy们操作Linux服务器的水平不敢恭维。。。安全起见,只保留追加模式,且有表的前提下才能upsert。其余情况一律抛异常:

package org.apache.spark.sql

import org.apache.spark.sql.ZhiyongMysqlUpsertJdbcUtils.upsertTable
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{createTable, dropTable, isCascadingTruncateTable, saveTable, truncateTable}
import org.apache.spark.sql.execution.datasources.jdbc.{JdbcOptionsInWrite, JdbcRelationProvider, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources.BaseRelation

class ZhiyongMysqlUpsertRelationProvider extends JdbcRelationProvider {
  override def createRelation(sqlContext: SQLContext,
                              mode: SaveMode,
                              parameters: Map[String, String],
                              df: DataFrame): BaseRelation = {
    val options = new JdbcOptionsInWrite(parameters)
    val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
    val dialect = JdbcDialects.get(options.url)
    val conn = dialect.createConnectionFactory(options)(-1)


    try {
      val tableExists = JdbcUtils.tableExists(conn, options)
      if (tableExists) {
        mode match {
          case SaveMode.Overwrite =>
            if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
              // In this case, we should truncate table and then load.
              throw new RuntimeException("【CSDN@虎鲸不是鱼】:只允许使用Append模式")
              //              truncateTable(conn, options)
              //              val tableSchema = JdbcUtils.getSchemaOption(conn, options)
              //              saveTable(df, tableSchema, isCaseSensitive, options)
            } else {
              // Otherwise, do not truncate the table, instead drop and recreate it
              throw new RuntimeException("【CSDN@虎鲸不是鱼】:只允许使用Append模式")
              //              dropTable(conn, options.table, options)
              //              createTable(conn, options.table, df.schema, isCaseSensitive, options)
              //              saveTable(df, Some(df.schema), isCaseSensitive, options)
            }

          case SaveMode.Append =>
            val tableSchema = JdbcUtils.getSchemaOption(conn, options)
            //            saveTable(df, tableSchema, isCaseSensitive, options)
            upsertTable(df, tableSchema, isCaseSensitive, options)

          // TODO: 需要写upsert的具体方法


          case SaveMode.ErrorIfExists =>
            throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)

          case SaveMode.Ignore =>
            println("【CSDN@虎鲸不是鱼】:Ignore模式无操作")
          // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
          // to not save the contents of the DataFrame and to not change the existing data.
          // Therefore, it is okay to do nothing here and then just return the relation below.
        }
      } else {
        throw new RuntimeException("【CSDN@虎鲸不是鱼】:结果表不存在,请租户自行确认表是否正确或手动建表")
        //        createTable(conn, options.table, df.schema, isCaseSensitive, options)
        //        saveTable(df, Some(df.schema), isCaseSensitive, options)
      }
    } finally {
      conn.close()
    }

    createRelation(sqlContext, parameters)
  }

}

先这么写。

重写JDBC工具类

由于saveTable这种方法是调用了工具类:

package org.apache.spark.sql.execution.datasources.jdbc

/**
 * Util functions for JDBC tables.
 */
object JdbcUtils extends Logging with SQLConfHelper {
  /**
   * Saves the RDD to the database in a single transaction.
   */
  def saveTable(
      df: DataFrame,
      tableSchema: Option[StructType],
      isCaseSensitive: Boolean,
      options: JdbcOptionsInWrite): Unit = {
    val url = options.url
    val table = options.table
    val dialect = JdbcDialects.get(url)
    val rddSchema = df.schema
    val batchSize = options.batchSize
    val isolationLevel = options.isolationLevel

    val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
    val repartitionedDF = options.numPartitions match {
      case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
        n, JDBCOptions.JDBC_NUM_PARTITIONS)
      case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
      case _ => df
    }
    repartitionedDF.rdd.foreachPartition { iterator => savePartition(
      table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options)
    }
      
  /**
   * Returns an Insert SQL statement for inserting a row into the target table via JDBC conn.
   */
  def getInsertStatement(
      table: String,
      rddSchema: StructType,
      tableSchema: Option[StructType],
      isCaseSensitive: Boolean,
      dialect: JdbcDialect): String = {
    val columns = if (tableSchema.isEmpty) {
      rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
    } else {
      // The generated insert statement needs to follow rddSchema's column sequence and
      // tableSchema's column names. When appending data into some case-sensitive DBMSs like
      // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
      // RDD column names for user convenience.
      val tableColumnNames = tableSchema.get.fieldNames
      rddSchema.fields.map { col =>
        val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
          throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
        }
        dialect.quoteIdentifier(normalizedName)
      }.mkString(",")
    }
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    s"INSERT INTO $table ($columns) VALUES ($placeholders)"
  }
  }
}

该工具类内部还调用了别的方法。

所以还需要先临摹一个类似的工具类以及一个类似的获取状态的方法。

通过options对象的JdbcOptionsInWrite可以看到:

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, DriverManager}
import java.util.{Locale, Properties}

import org.apache.commons.io.FilenameUtils

import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors

/**
 * Options for the JDBC data source.
 */
class JDBCOptions(
    val parameters: CaseInsensitiveMap[String])
  extends Serializable with Logging {

  import JDBCOptions._

  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

  def this(url: String, table: String, parameters: Map[String, String]) = {
    this(CaseInsensitiveMap(parameters ++ Map(
      JDBCOptions.JDBC_URL -> url,
      JDBCOptions.JDBC_TABLE_NAME -> table)))
  }

//此处省略一大坨内容

class JdbcOptionsInWrite(
    override val parameters: CaseInsensitiveMap[String])
  extends JDBCOptions(parameters) {

  import JDBCOptions._

  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

  def this(url: String, table: String, parameters: Map[String, String]) = {
    this(CaseInsensitiveMap(parameters ++ Map(
      JDBCOptions.JDBC_URL -> url,
      JDBCOptions.JDBC_TABLE_NAME -> table)))
  }

  require(
    parameters.get(JDBC_TABLE_NAME).isDefined,
    s"Option '$JDBC_TABLE_NAME' is required. " +
      s"Option '$JDBC_QUERY_STRING' is not applicable while writing.")

  val table = parameters(JDBC_TABLE_NAME)
}

object JDBCOptions {
  private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
  private val jdbcOptionNames = collection.mutable.Set[String]()

  private def newOption(name: String): String = {
    jdbcOptionNames += name.toLowerCase(Locale.ROOT)
    name
  }

  val JDBC_URL = newOption("url")
  val JDBC_TABLE_NAME = newOption("dbtable")
  val JDBC_QUERY_STRING = newOption("query")
  val JDBC_DRIVER_CLASS = newOption("driver")
  val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
  val JDBC_LOWER_BOUND = newOption("lowerBound")
  val JDBC_UPPER_BOUND = newOption("upperBound")
  val JDBC_NUM_PARTITIONS = newOption("numPartitions")
  val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
  val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
  val JDBC_TRUNCATE = newOption("truncate")
  val JDBC_CASCADE_TRUNCATE = newOption("cascadeTruncate")
  val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
  val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
  val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
  val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
  val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
  val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
  val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
  val JDBC_PUSHDOWN_AGGREGATE = newOption("pushDownAggregate")
  val JDBC_PUSHDOWN_LIMIT = newOption("pushDownLimit")
  val JDBC_PUSHDOWN_TABLESAMPLE = newOption("pushDownTableSample")
  val JDBC_KEYTAB = newOption("keytab")
  val JDBC_PRINCIPAL = newOption("principal")
  val JDBC_TABLE_COMMENT = newOption("tableComment")
  val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
  val JDBC_CONNECTION_PROVIDER = newOption("connectionProvider")
}

可以发现这里有很多平时写jdbc时的option。

工具类暂时先这么写出来:

package org.apache.spark.sql

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.{getInsertStatement, savePartition}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.StructType

object ZhiyongMysqlUpsertJdbcUtils {

  //需要重写方法
  def getUpsertStatement(table: String,
                         rddSchema: StructType,
                         tableSchema: Option[StructType],
                         isCaseSensitive: Boolean,
                         dialect: JdbcDialect): String = {

    println("【CSDN@虎鲸不是鱼】:开始获取UpSert状态")

    s"""
       |
       |""".stripMargin

  }

  def upsertTable(
                   df: DataFrame,
                   tableSchema: Option[StructType],
                   isCaseSensitive: Boolean,
                   options: JdbcOptionsInWrite): Unit = {
    val url = options.url
    val table = options.table
    val dialect = JdbcDialects.get(url)
    val rddSchema: StructType = df.schema
    val batchSize: Int = options.batchSize
    val isolationLevel = options.isolationLevel

    println("dialect=" + dialect)
    println("rddSchema=" + rddSchema)

    //    val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
    // 这里需要改

    val upsertStmt = getUpsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)

    val repartitionedDF = options.numPartitions match {
      case Some(n) if n <= 0 => throw QueryExecutionErrors.invalidJdbcNumPartitionsError(
        n, JDBCOptions.JDBC_NUM_PARTITIONS)
      case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
      case _ => df
    }
    repartitionedDF.rdd.foreachPartition { iterator =>
      savePartition(
        table, iterator, rddSchema, upsertStmt, batchSize, dialect, isolationLevel, options)
    }
  }

}

获取upsert状态的方法稍后写。

重写隐式类

参照:https://docs.scala-lang.org/zh-cn/overviews/core/implicit-classes.html

Scala 2.10引入了一种叫做隐式类的新特性。隐式类指的是用implicit关键字修饰的类。在对应的作用域内,带有这个关键字的类的主构造函数可用于隐式转换。

隐式类型是在SIP-13中提出的。参照着可以这么写:

package org.apache.spark.sql

object ZhiyongMysqlDataFrameWriter {

  implicit class ZhiyongMysqlUpsertDataFrameWriter(writer: DataFrameWriter[Row]){
    def upsert():Unit={

      println("反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法")

    }
  }

}

将启动类写数据部分简单修改和运行:

    df1.write
      .format("jdbc")
      .mode(SaveMode.Append)
      .options(
        Map(
          "url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
          "dbtable" -> "test_origin_20001128",
          "user" -> "root",
          "password" -> "123456",
          "driver" -> "com.mysql.cj.jdbc.Driver"
        )
      )
      //.save()
      .upsert //Scala可以反射

根据报错的log:

反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
Syntax error, unexpected empty statement(line 1, pos 0)

== SQL ==

^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:304)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
	at com.zhiyong.day20230425upDate.UpDateDemo1$.main(UpDateDemo1.scala:48)
	at com.zhiyong.day20230425upDate.UpDateDemo1.main(UpDateDemo1.scala)

说明可以反射调用成功。但是Java就没办法这样子使用了,只能改源码和重新编译、部署。。。同样是JVM语言,不得不佩服Martin Odersky的Scala,太强大了。。。但是Scala的学习难度也不小,野生程序猿如果胆敢不去系统学习,就会变成笔者这样的学徒工。。。

接下来就是补充upsert方法以及上一步的getUpsertStatement方法。

补充getUpsertStatement方法

参照:

/**
 * Returns an Insert SQL statement for inserting a row into the target table via JDBC conn.
 */
def getInsertStatement(
    table: String,
    rddSchema: StructType,
    tableSchema: Option[StructType],
    isCaseSensitive: Boolean,
    dialect: JdbcDialect): String = {
  val columns = if (tableSchema.isEmpty) {
    rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
  } else {
    // The generated insert statement needs to follow rddSchema's column sequence and
    // tableSchema's column names. When appending data into some case-sensitive DBMSs like
    // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
    // RDD column names for user convenience.
    val tableColumnNames = tableSchema.get.fieldNames
    rddSchema.fields.map { col =>
      val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
        throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
      }
      dialect.quoteIdentifier(normalizedName)
    }.mkString(",")
  }
  val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
  s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}

显然这玩意儿就是拼接了一个类似:

insert into db_name.tb_name (col1,col2,col3) values ("value1","value2","value3")

的玩意儿。。。之后走遍历分区、按1000条一个批次去灌数据。

所以接下来要做的事情就是拼接出一个类似:

insert into db_name.tb_name (pk1,col1,col2,col3) values ("pk1_value","value1","value2","value3") on duplicate key update col1="value1",col2=value2,col3=value3

的玩意儿:

def getUpsertStatement(table: String,
                       rddSchema: StructType,
                       tableSchema: Option[StructType],
                       isCaseSensitive: Boolean,
                       dialect: JdbcDialect): String = {

  println("【CSDN@虎鲸不是鱼】:开始获取UpSert状态")

  val columns = if (tableSchema.isEmpty) {
    rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
  } else {
    // The generated insert statement needs to follow rddSchema's column sequence and
    // tableSchema's column names. When appending data into some case-sensitive DBMSs like
    // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
    // RDD column names for user convenience.
    val tableColumnNames = tableSchema.get.fieldNames
    rddSchema.fields.map { col =>
      val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
        throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
      }
      dialect.quoteIdentifier(normalizedName)
    }.mkString(",")
  }
  val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
  s"""
     |INSERT INTO $table ($columns) VALUES ($placeholders)
     |ON DUPLICATE KEY UPDATE
     |${columns.split(",").map(col=>s"$col=VALUES($col)").mkString(",")}
     |""".stripMargin
  

}

使用Scala还是要比Java简洁不少。

补充upsert方法

由于

在这里插入图片描述

这些玩意儿都是private的,不想修改源码重新编译和部署就得用反射的方式操作它们:

    def upsert():Unit={

      println("反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法")


      writer


      val extraOptionsField: Field = writer.getClass.getDeclaredField("extraOptions")
      val dfField = writer.getClass.getDeclaredField("df")
      val sourceField = writer.getClass.getDeclaredField("source")
      val partitioningColumnsField = writer.getClass.getDeclaredField("partitioningColumns")
      extraOptionsField.setAccessible(true) //关闭安全检查就可以提升反射速度
      dfField.setAccessible(true) //关闭安全检查就可以提升反射速度
      sourceField.setAccessible(true) //关闭安全检查就可以提升反射速度
      partitioningColumnsField.setAccessible(true)  //关闭安全检查就可以提升反射速度
      val extraOptions = extraOptionsField.get(writer).asInstanceOf[CaseInsensitiveMap[String]]
      val df: DataFrame = dfField.get(writer).asInstanceOf[DataFrame]
      val partitioningColumns = partitioningColumnsField.get(writer).asInstanceOf[Option[Seq[String]]]
      val logicalPlanField = df.getClass.getDeclaredField("logicalPlan")
      logicalPlanField.setAccessible(true)  //关闭安全检查就可以提升反射速度
      var logicalPlan = logicalPlanField.get(df).asInstanceOf[LogicalPlan]
      val session = df.sparkSession

      logicalPlan =
        DataSource(
        sparkSession = session,
        className = "org.apache.spark.sql.ZhiyongMysqlUpsertRelationProvider",
        partitionColumns = partitioningColumns.getOrElse(Nil),
        options = extraOptions.toMap).planForWriting(SaveMode.Append, logicalPlan)
      val qe: QueryExecution = session.sessionState.executePlan(logicalPlan)
      SQLExecution.withNewExecutionId(qe)(qe.toRdd)

    }

此时基本完工。

验证

准备MySQL表和数据

[root@zhiyong1 ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.30 MySQL Community Server (GPL)
mysql> use db_lzy;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed

然后建表:

create table if not exists test_upsert_20230429_res(
    id int comment '主键',
    col1 varchar(2000) comment '字段1',
    col2 varchar(2000) comment '字段2',
    col3 varchar(2000) comment '字段3',
    primary key (id)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
;

create table if not exists test_upsert_20230429_src(
    id int comment '主键',
    col1 varchar(2000) comment '字段1',
    col2 varchar(2000) comment '字段2',
    col3 varchar(2000) comment '字段3',
    primary key (id)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
;

然后准备数据:

insert into test_upsert_20230429_src values
(1,'a1','b1','c1'),
(2,'a2','b2','c2'),
(3,'a3','b3','c3'),
(5,'a55','b555','c5555'),
(6,'a66','b666','c6666')
;

查数据:

mysql> select * from test_upsert_20230429_src;
+----+------+------+-------+
| id | col1 | col2 | col3  |
+----+------+------+-------+
|  1 | a1   | b1   | c1    |
|  2 | a2   | b2   | c2    |
|  3 | a3   | b3   | c3    |
|  5 | a55  | b555 | c5555 |
|  6 | a66  | b666 | c6666 |
+----+------+------+-------+
5 rows in set (0.00 sec)

这个表就是原始数据。

初始化数据

truncate table test_upsert_20230429_res;
insert into test_upsert_20230429_res
select * from test_upsert_20230429_src;

查数据:

mysql> select * from test_upsert_20230429_res;
+----+------+------+-------+
| id | col1 | col2 | col3  |
+----+------+------+-------+
|  1 | a1   | b1   | c1    |
|  2 | a2   | b2   | c2    |
|  3 | a3   | b3   | c3    |
|  5 | a55  | b555 | c5555 |
|  6 | a66  | b666 | c6666 |
+----+------+------+-------+
5 rows in set (0.00 sec)

准备DataFrame

val df2: DataFrame = sc.sparkContext.parallelize(
  Seq(
    (1, "a1", "b1", "c1"),
    (2, "a2", "b2", "c2"),
    (3, "a3", "b3", "c3"),
    (4, "a4", "b4", "c4"),
    (5, "a5", "b5", "c5"),
    (6, "a6", "b6", "c6"),
    (7, "a7", "b7", "c7"),
    (8, "a8", "b8", "c8"),
    (9, "a9", "b9", "c9"),
    (5, "a10", "b10", "c10")
  )
).toDF("id", "col1", "col2", "col3")

df2.show()

结果:

+---+----+----+----+
| id|col1|col2|col3|
+---+----+----+----+
|  1|  a1|  b1|  c1|
|  2|  a2|  b2|  c2|
|  3|  a3|  b3|  c3|
|  4|  a4|  b4|  c4|
|  5|  a5|  b5|  c5|
|  6|  a6|  b6|  c6|
|  7|  a7|  b7|  c7|
|  8|  a8|  b8|  c8|
|  9|  a9|  b9|  c9|
|  5| a10| b10| c10|
+---+----+----+----+

执行upsert

df2.write
  .format("jdbc")
  .mode(SaveMode.Append)
  .options(
    Map(
      "url" -> "jdbc:mysql://192.168.88.100:3306/db_lzy",
      "dbtable" -> "test_upsert_20230429_res",
      "user" -> "root",
      "password" -> "123456",
      "driver" -> "com.mysql.cj.jdbc.Driver"
    )
  )
  .upsert()

结果:

反射调用隐式类ZhiyongMysqlUpsertDataFrameWriter的upsert方法
dialect=MySQLDialect
rddSchema=StructType(StructField(id,IntegerType,false),StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true))
【CSDN@虎鲸不是鱼】:开始获取UpSert状态
23/04/29 17:10:21 INFO CodeGenerator: Code generated in 20.5899 ms
23/04/29 17:10:21 INFO SparkContext: Starting job: upsert at UpDateDemo1.scala:57
23/04/29 17:10:21 INFO DAGScheduler: Got job 4 (upsert at UpDateDemo1.scala:57) with 8 output partitions
23/04/29 17:10:21 INFO DAGScheduler: Final stage: ResultStage 4 (upsert at UpDateDemo1.scala:57)
23/04/29 17:10:21 INFO DAGScheduler: Parents of final stage: List()
23/04/29 17:10:21 INFO DAGScheduler: Missing parents: List()
23/04/29 17:10:21 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[12] at upsert at UpDateDemo1.scala:57), which has no missing parents
23/04/29 17:10:21 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 32.2 KiB, free 15.8 GiB)
23/04/29 17:10:21 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 13.3 KiB, free 15.8 GiB)
23/04/29 17:10:21 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on DESKTOP-VRV0NDO:54939 (size: 13.3 KiB, free: 15.8 GiB)
23/04/29 17:10:21 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1513
23/04/29 17:10:21 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 4 (MapPartitionsRDD[12] at upsert at UpDateDemo1.scala:57) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
23/04/29 17:10:21 INFO TaskSchedulerImpl: Adding task set 4.0 with 8 tasks resource profile 0
23/04/29 17:10:21 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 9) (DESKTOP-VRV0NDO, executor driver, partition 0, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 10) (DESKTOP-VRV0NDO, executor driver, partition 1, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID 11) (DESKTOP-VRV0NDO, executor driver, partition 2, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 3.0 in stage 4.0 (TID 12) (DESKTOP-VRV0NDO, executor driver, partition 3, PROCESS_LOCAL, 4565 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 4.0 in stage 4.0 (TID 13) (DESKTOP-VRV0NDO, executor driver, partition 4, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 5.0 in stage 4.0 (TID 14) (DESKTOP-VRV0NDO, executor driver, partition 5, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 6.0 in stage 4.0 (TID 15) (DESKTOP-VRV0NDO, executor driver, partition 6, PROCESS_LOCAL, 4539 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO TaskSetManager: Starting task 7.0 in stage 4.0 (TID 16) (DESKTOP-VRV0NDO, executor driver, partition 7, PROCESS_LOCAL, 4573 bytes) taskResourceAssignments Map()
23/04/29 17:10:21 INFO Executor: Running task 0.0 in stage 4.0 (TID 9)
23/04/29 17:10:21 INFO Executor: Running task 1.0 in stage 4.0 (TID 10)
23/04/29 17:10:21 INFO Executor: Running task 3.0 in stage 4.0 (TID 12)
23/04/29 17:10:21 INFO Executor: Running task 2.0 in stage 4.0 (TID 11)
23/04/29 17:10:21 INFO Executor: Running task 4.0 in stage 4.0 (TID 13)
23/04/29 17:10:21 INFO Executor: Running task 5.0 in stage 4.0 (TID 14)
23/04/29 17:10:21 INFO Executor: Running task 6.0 in stage 4.0 (TID 15)
23/04/29 17:10:21 INFO Executor: Running task 7.0 in stage 4.0 (TID 16)
23/04/29 17:10:21 INFO CodeGenerator: Code generated in 12.8791 ms
23/04/29 17:10:21 INFO Executor: Finished task 0.0 in stage 4.0 (TID 9). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 1.0 in stage 4.0 (TID 10). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 3.0 in stage 4.0 (TID 12). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 4.0 in stage 4.0 (TID 13). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 2.0 in stage 4.0 (TID 11). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 5.0 in stage 4.0 (TID 14). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 6.0 in stage 4.0 (TID 15). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO Executor: Finished task 7.0 in stage 4.0 (TID 16). 1226 bytes result sent to driver
23/04/29 17:10:21 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 12) in 208 ms on DESKTOP-VRV0NDO (executor driver) (1/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 11) in 208 ms on DESKTOP-VRV0NDO (executor driver) (2/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 5.0 in stage 4.0 (TID 14) in 207 ms on DESKTOP-VRV0NDO (executor driver) (3/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 6.0 in stage 4.0 (TID 15) in 207 ms on DESKTOP-VRV0NDO (executor driver) (4/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 4.0 in stage 4.0 (TID 13) in 208 ms on DESKTOP-VRV0NDO (executor driver) (5/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 10) in 211 ms on DESKTOP-VRV0NDO (executor driver) (6/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 9) in 212 ms on DESKTOP-VRV0NDO (executor driver) (7/8)
23/04/29 17:10:21 INFO TaskSetManager: Finished task 7.0 in stage 4.0 (TID 16) in 206 ms on DESKTOP-VRV0NDO (executor driver) (8/8)
23/04/29 17:10:21 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
23/04/29 17:10:21 INFO DAGScheduler: ResultStage 4 (upsert at UpDateDemo1.scala:57) finished in 0.269 s
23/04/29 17:10:21 INFO DAGScheduler: Job 4 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/29 17:10:21 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
23/04/29 17:10:21 INFO DAGScheduler: Job 4 finished: upsert at UpDateDemo1.scala:57, took 0.277371 s
23/04/29 17:10:21 INFO SparkContext: Invoking stop() from shutdown hook
23/04/29 17:10:22 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-VRV0NDO:4040
23/04/29 17:10:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/04/29 17:10:22 INFO MemoryStore: MemoryStore cleared
23/04/29 17:10:22 INFO BlockManager: BlockManager stopped
23/04/29 17:10:22 INFO BlockManagerMaster: BlockManagerMaster stopped
23/04/29 17:10:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/04/29 17:10:22 INFO SparkContext: Successfully stopped SparkContext
23/04/29 17:10:22 INFO ShutdownHookManager: Shutdown hook called
23/04/29 17:10:22 INFO ShutdownHookManager: Deleting directory C:\Users\zhiyong\AppData\Local\Temp\spark-2ae1f427-ec0e-4e23-9b59-31b796d980d2

Process finished with exit code 0

貌似成功吊起了重写的类和方法。

查看结果

mysql> select * from test_upsert_20230429_res;
+----+------+------+------+
| id | col1 | col2 | col3 |
+----+------+------+------+
|  1 | a1   | b1   | c1   |
|  2 | a2   | b2   | c2   |
|  3 | a3   | b3   | c3   |
|  4 | a4   | b4   | c4   |
|  5 | a5   | b5   | c5   |
|  6 | a6   | b6   | c6   |
|  7 | a7   | b7   | c7   |
|  8 | a8   | b8   | c8   |
|  9 | a9   | b9   | c9   |
+----+------+------+------+
9 rows in set (0.00 sec)

再次验证

显然upsert成功。但是重复数据保留的是第一条。按照预期应该是保留最后的几条数据,所以再次运行和查数:

mysql> select * from test_upsert_20230429_res;
+----+------+------+------+
| id | col1 | col2 | col3 |
+----+------+------+------+
|  1 | a1   | b1   | c1   |
|  2 | a2   | b2   | c2   |
|  3 | a3   | b3   | c3   |
|  4 | a4   | b4   | c4   |
|  5 | a10  | b10  | c10  |
|  6 | a6   | b6   | c6   |
|  7 | a7   | b7   | c7   |
|  8 | a8   | b8   | c8   |
|  9 | a9   | b9   | c9   |
+----+------+------+------+
9 rows in set (0.00 sec)

显然是分布式运算时顺序错乱导致的。貌似Spark中一个partition最后到达的数据具有随机性,不像DataStage单线程的那种模式遵循先来后到。

这样就实现了对MySQL的upsert操作。对Oracle等RDBMS也可以参照着搞。

尾言

古人用DataStage实现的效果,一定是可以写代码实现。对Spark做二次开发有时候需要用Scala,要求不低。

如果是用Flink流式处理:https://lizhiyong.blog.csdn.net/article/details/124161096

由于可以Java写Sink,不管是先select检测有无数据再分情况做insert和update还是直接拼upsert,难度都要低不少。

并且这么做不需要像纯HQL那样写很长的SQL,各种join,太长了解析AST难免报错,还得手动拆分SQL或者中间落盘。而且Tez的性能并不如Spark。。。这些常见的情况,都应该平台化以减少重复的工作量才能降本增效。。。

SQL Boy写SQL是因为他们只会写SQL,没有任何选择的余地。。。而真·大数据开发攻城狮写SQL单纯是因为领导阶级规定了只允许写SQL,而且写SQL比Java/Scala下班早。。。必要的时候有多种方式实现。

愿天下不再有肤浅的SQL Boy。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130442316

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/475628.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

能上网的ChatGPT,会带来什么改变

最近关注AI的人&#xff0c;应该会注意到这条新闻。 ChatGPT官方推出新模式—Default&#xff08;GPT-3.5&#xff09;with browsing 这个是之前ChatGPT没有的功能&#xff0c;ChatGPT以前的训练数据是在2021年左右&#xff0c;并不知道最近的新闻。 现在ChatGPT 能够联网以…

AIGC提词生成图片(人物照片)练习笔记

文生图, 图生图 等 AIGC 创作大火, 也来体验一下吧. 本文记录了环境搭建过程与使用心得. 如果动手能力弱或只想省心, 有 环境要求 列出来我的环境吧: CPU, AMD Ryzen 7 5800X 8-Core ProcessorRAM, 32GGPU, NVIDIA GeForce RTX 2070 SUPER (8G)OS, Windows 11 专业版开发环境…

Qt MSVC开发

环境搭建 安装Virsual Studio&#xff0c;版本可以选择最新版本&#xff0c;我安装的时vs2022&#xff0c;安装时需要勾选 C 桌面开发 安装QT, 下载地址&#xff1a;https://download.qt.io/official_releases/online_installers/&#xff0c;安装时需要选择MSVC 2019 64-bit。…

Packet Tracer - 配置 RIPv2

Packet Tracer - 配置 RIPv2 目标 第 1 部分&#xff1a;配置 RIPv2 第 2 部分&#xff1a;验证配置 拓扑图 背景信息 尽管在现代网络中极少使用 RIP&#xff0c;但是作为了解基本网络路由的基础则十分有用。 在本活动中&#xff0c;您将使用适当的网络语句和被动接口配置…

【LeetCood206】反转链表

题目 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 答案1&#xff1a; 新建链表&#xff0c;遍历原链表&#xff0c;一个一个头插到新建的链表.直到结点为null public ListNode reverseList(ListNode head) {ListNode secondListHead n…

【细读Spring Boot源码】监听器合集-持续更新中

前言 监听器汇总 归属监听器名称作用cloudBootstrapApplicationListenercloudLoggingSystemShutdownListenercloudRestartListenercloudLoggingSystemShutdownListenerspringbootEnvironmentPostProcessorApplicationListener用于触发在spring.factories文件中注册的Environm…

osg操控器之动画路径操控器osgGA::AnimationPathManipulator分析

目录 1. 前言 2. 示例代码 3. 动画路径操控器源码分析 3.1. 构造函数 3.2. home函数 3.3. handle函数 3.3.1 帧事件处理 3.3.2. 按键事件处理 4. 主要接口说明 1. 前言 osg官方提供了很多操控器&#xff0c;在源码目录下的src\osgGA目录下&#xff0c;cpp文件名含有Ma…

初识AUTOSAR

目录 应用层 Runnable Port 运行时环境 基础软件层 总结 AUTOSAR&#xff0c;全称为Automotive Open System Architecture&#xff0c;即汽车开放系统架构。它最初于2003年由当时全球各家顶级汽车制造商&#xff08;奔驰、宝马、大众等&#xff09;、零部件供应商&#x…

【Unity入门】21.预制体

【Unity入门】预制体 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity入门系列博客&#xff0c;所学知识来自B站阿发老师~感谢 &#xff08;一&#xff09;预制体制作 &#xff08;1&#xff09;什么是预制体 这一章节的博客&#xff0c;我们将会学习一个预制体的概念。什么是…

【C语言进阶】-- 重点字符串函数内存函数及其模拟实现(strlen,strcmp,strcat...memcpy,memmove)

目录 1、strlen 1.1 strlen的模拟实现 2、strcpy 2.1 strcpy的模拟实现 3、strcat 3.1 strcat的模拟实现 4、strcmp 4.1 strcmp的模拟实现 5、strstr 5.1 strstr的模拟实现 6、memcpy 6.1 memcpy的模拟实现 7、memmove 7.1 memmove的模拟实现 前言 C语言中对字符…

Ant Design Vue,a-table组件加序号

<a-table:columns"columns":pagination"pagination":data-source"dataSource":defaultExpandAllRows"true"change"tableChange":rowKey"(record, index) > index 1"> columns是表格列的配置&#xff0c…

【2023五一杯数学建模】 B题 快递需求分析问题 建模方案及MATLAB实现代码

【2023五一杯数学建模】 B题 快递需求分析问题 1 题目 请依据以下提供的附件数据和背景内容&#xff0c;建立数学模型&#xff0c;完成接下来的问题&#xff1a;问题背景是&#xff0c;网络购物作为一种重要的消费方式&#xff0c;带动着快递服务需求飞速增长&#xff0c;为我…

25特别放送:我的Gopher成长之路

很早就开始准备这篇文章了,但总是想了又想不知怎样才能更好的写下自己最真实的想法,后来在经过了好几个晚上睡前的思考后才得以完成。 首先,写这篇文章的目的并不是为了吹嘘Go语言有多厉害,也不是鼓励大家都来学习Go语言,仅是为了记录和分享。当然如果是兴趣使然,那么欢…

实时更新天气微信小程序开发

1.新建一个天气weather项目 2.在app.json中创建一个路由页面 当我们点击保存的时候&#xff0c;微信小程序会自动的帮我们创建好页面 3.在weather页面上书写我们的骨架 4.此时我们的页面很怪&#xff0c;因为没有给它添加样式和值。此时我们给它一个样式。&#xff08;样式写在…

蓝桥杯——二分专题

二分分为&#xff1a;实数二分&#xff0c;二分理论题 二分套路题&#xff1a;最小值最大化&#xff0c;最大值最小化 运用二分满足条件&#xff1a;有界&#xff0c;单调。 1.两个二分模板 找>x的第一个&#xff0c;mid&#xff08;lowhigh&#xff09;//2 &#xff0c;没…

java基础知识——23.正则表达式

这篇文章我们简略的讲一下java的正则表达式 目录 1.正则表达式概述 2.正则表达式的简单匹配规则 3.正则表达式的复杂匹配规则 4.正则表达式的分组匹配规则 5.正则表达式的非贪婪匹配 6.使用正则表达式进行搜索和替换 1.正则表达式概述 首先&#xff0c;我们需要明确一个…

leetcode 面试题 02.04. 分割链表

原题为&#xff1a; 给你一个链表的头节点 head 和一个特定值 x &#xff0c;请你对链表进行分隔&#xff0c;使得所有 小于 x 的节点都出现在大于或等于x 的节点之前。 你不需要 保留 每个分区中各节点的初始相对位置。 测试示例如下&#xff1a; 输入&#xff1a;head [1,4…

Flink第一章:环境搭建

系列文章目录 Flink第一章:环境搭建 文章目录 系列文章目录前言一、Idea项目1.创建项目2.pom.依赖3.DataSet4.DataStreaming 二、环境搭建1.Standalone2.Flink on Yarn 总结 前言 Flink也是现在现在大数据技术中火爆的一门,反正大数据的热门技术学的也差不多了,啃完Flink基本…

Packet Tracer - 研究直连路由

Packet Tracer - 研究直连路由 目标 第 1 部分&#xff1a;研究 IPv4 直连路由 第 2 部分&#xff1a;研究 IPv6 直连路由 拓扑图 背景信息 本活动中的网络已配置。 您将登录路由器并使用 show 命令发现并回答以下有关直连路由的问题。 注&#xff1a;用户 EXEC 密码是 c…

A2B汽车音响系统开发设计与改装

hezkz17进数字音频系统研究开发答疑群 1 前装与后装