并发控制
Hudi支持的并发控制
MVCC
Hudi的表操作,如压缩、清理、提交,hudi会利用多版本并发控制来提供多个表操作写入和查询之间的快照隔离。使用MVCC这种模型,Hudi支持并发任意数量的操作作业,并保证不会发生任何冲突。Hudi默认这种模型。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。
OPTIMISTIC CONCURRENCY
针对写入操作(upsert、insert等)利用乐观并发控制来启用多个writer将数据写到同一个表中,Hudi支持文件级的乐观一致性,即对于发生在同一个表中的任何2个提交(写入),如果它们没有写入正在更改的重叠文件,则允许两个写入都成功。此功能处于实验阶段,需要用到Zookeeper或HiveMetastore来获取锁。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7FMhAPib-1676291934060)(http://image.codekiller.top/img/Hudi/image-20230114180831716.png)]
使用并发写方式
如果需要开启乐观并发写入,需要设置以下属性:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>
Hudi获取锁的服务提供两种模式使用zookeeper或HiveMetaStore:
-
相关zookeeper参数:
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.zookeeper.url hoodie.write.lock.zookeeper.port hoodie.write.lock.zookeeper.lock_key hoodie.write.lock.zookeeper.base_path
-
相关HiveMetastore参数,HiveMetastore URI是从运行时加载的hadoop配置文件中提取的:
hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider hoodie.write.lock.hivemetastore.database hoodie.write.lock.hivemetastore.table
使用Spark DataFrame并发写入
(1)启动spark-shell
spark-shell \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
(2)编写代码【核心为写入时的 hoodie 相关参数】
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
.option("hoodie.write.lock.zookeeper.url", "hadoop1,hadoop2,hadoop3")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "test_table")
.option("hoodie.write.lock.zookeeper.base_path", "/multiwriter_test")
.option(TABLE_NAME, tableName)
.mode(Append)
.save(basePath)
(3)使用zk客户端,验证是否使用了zk
/opt/module/apache-zookeeper-3.5.7/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
(4)zk下产生了对应的目录,/multiwriter_test下的目录,为代码里指定的lock_key
[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test
使用Delta Streamer并发写入
基于前面DeltaStreamer的例子,使用Delta Streamer消费kafka的数据写入到hudi中,这次加上并发写的参数。
(1)进入配置文件目录,修改配置文件添加对应参数,提交到Hdfs上
cd /opt/module/hudi-props/
cp kafka-source.properties kafka-multiwriter-source.propertis
vim kafka-multiwriter-source.propertis
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=hadoop1,hadoop2,hadoop3
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=test_table2
hoodie.write.lock.zookeeper.base_path=/multiwriter_test2
hadoop fs -put /opt/module/hudi-props/kafka-multiwriter-source.propertis /hudi-props
(2)运行Delta Streamer
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/opt/module/spark-3.2.2/jars/hudi-utilities-bundle_2.12-0.12.0.jar \
--props hdfs://hadoop1:8020/hudi-props/kafka-multiwriter-source.propertis \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field userid \
--target-base-path hdfs://hadoop1:8020/tmp/hudi/hudi_test_multi \
--target-table hudi_test_multi \
--op INSERT \
--table-type MERGE_ON_READ
(3)查看zk是否产生新的目录
/opt/module/apache-zookeeper-3.5.7-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test2
常规调优
-
并行度
Hudi对输入进行分区默认并发度为1500,以确保每个Spark分区都在2GB的限制内(在Spark2.4.0版本之后去除了该限制),如果有更大的输入,则相应地进行调整。建议设置shuffle的并发度,配置项为 hoodie.[insert|upsert|bulkinsert].shuffle.parallelism,以使其至少达到inputdatasize/500MB。
-
Off-heap(堆外)内存
Hudi写入parquet文件,需要使用一定的堆外内存,如果遇到此类故障,请考虑设置类似 spark.yarn.executor.memoryOverhead或 spark.yarn.driver.memoryOverhead的值。 -
Spark 内存
通常Hudi需要能够将单个文件读入内存以执行合并或压缩操作,因此执行程序的内存应足以容纳此文件。另外,Hudi会缓存输入数据以便能够智能地放置数据,因此预留一些 spark.memory.storageFraction通常有助于提高性能。 -
调整文件大小
设置 limitFileSize以平衡接收/写入延迟与文件数量,并平衡与文件数据相关的元数据开销。 -
时间序列/日志数据
对于单条记录较大的数据库/ nosql变更日志,可调整默认配置。另一类非常流行的数据是时间序列/事件/日志数据,它往往更加庞大,每个分区的记录更多。在这种情况下,请考虑通过.bloomFilterFPP()/bloomFilterNumEntries()来调整Bloom过滤器的精度,以加速目标索引查找时间,另外可考虑一个以事件时间为前缀的键,这将使用范围修剪并显着加快索引查找的速度。 -
GC调优
请确保遵循Spark调优指南中的垃圾收集调优技巧,以避免OutOfMemory错误。[必须]使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下:-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
上述2个示例一样,一个换行,一个没换行。
-
OutOfMemory错误
如果出现OOM错误,则可尝试通过如下配置处理:spark.memory.fraction=0.2,spark.memory.storageFraction=0.2允许其溢出而不是OOM(速度变慢与间歇性崩溃相比)。
-
完整的生产配置
spark.driver.extraClassPath /etc/hive/conf spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof spark.driver.maxResultSize 2g spark.driver.memory 4g spark.executor.cores 1 spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof spark.executor.id driver spark.executor.instances 300 spark.executor.memory 6g spark.rdd.compress true spark.kryoserializer.buffer.max 512m spark.serializer org.apache.spark.serializer.KryoSerializer spark.shuffle.service.enabled true spark.sql.hive.convertMetastoreParquet false spark.submit.deployMode cluster spark.task.cpus 1 spark.task.maxFailures 4 spark.yarn.driver.memoryOverhead 1024 spark.yarn.executor.memoryOverhead 3072 spark.yarn.max.executor.failures 100