Hudi-并发控制

news2024/11/29 20:36:19

并发控制

Hudi支持的并发控制

MVCC

Hudi的表操作,如压缩、清理、提交,hudi会利用多版本并发控制来提供多个表操作写入和查询之间的快照隔离。使用MVCC这种模型,Hudi支持并发任意数量的操作作业,并保证不会发生任何冲突。Hudi默认这种模型。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。

OPTIMISTIC CONCURRENCY

针对写入操作(upsert、insert等)利用乐观并发控制来启用多个writer将数据写到同一个表中,Hudi支持文件级的乐观一致性,即对于发生在同一个表中的任何2个提交(写入),如果它们没有写入正在更改的重叠文件,则允许两个写入都成功。此功能处于实验阶段,需要用到Zookeeper或HiveMetastore来获取锁。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7FMhAPib-1676291934060)(http://image.codekiller.top/img/Hudi/image-20230114180831716.png)]

使用并发写方式

如果需要开启乐观并发写入,需要设置以下属性:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>

Hudi获取锁的服务提供两种模式使用zookeeper或HiveMetaStore:

  • 相关zookeeper参数:

    hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
    hoodie.write.lock.zookeeper.url
    hoodie.write.lock.zookeeper.port
    hoodie.write.lock.zookeeper.lock_key
    hoodie.write.lock.zookeeper.base_path
    
  • 相关HiveMetastore参数,HiveMetastore URI是从运行时加载的hadoop配置文件中提取的:

    hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
    hoodie.write.lock.hivemetastore.database
    hoodie.write.lock.hivemetastore.table
    

使用Spark DataFrame并发写入

(1)启动spark-shell

spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

(2)编写代码【核心为写入时的 hoodie 相关参数】

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
 
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
 
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
  .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
  .option("hoodie.write.lock.zookeeper.url", "hadoop1,hadoop2,hadoop3")
  .option("hoodie.write.lock.zookeeper.port", "2181")
  .option("hoodie.write.lock.zookeeper.lock_key", "test_table")
  .option("hoodie.write.lock.zookeeper.base_path", "/multiwriter_test")
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

(3)使用zk客户端,验证是否使用了zk

/opt/module/apache-zookeeper-3.5.7/bin/zkCli.sh 
[zk: localhost:2181(CONNECTED) 0] ls /

(4)zk下产生了对应的目录,/multiwriter_test下的目录,为代码里指定的lock_key

[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test

使用Delta Streamer并发写入

基于前面DeltaStreamer的例子,使用Delta Streamer消费kafka的数据写入到hudi中,这次加上并发写的参数。

(1)进入配置文件目录,修改配置文件添加对应参数,提交到Hdfs上

cd /opt/module/hudi-props/
cp kafka-source.properties kafka-multiwriter-source.propertis
vim kafka-multiwriter-source.propertis 
 
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=hadoop1,hadoop2,hadoop3
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=test_table2
hoodie.write.lock.zookeeper.base_path=/multiwriter_test2
 
hadoop fs -put /opt/module/hudi-props/kafka-multiwriter-source.propertis /hudi-props

(2)运行Delta Streamer

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/opt/module/spark-3.2.2/jars/hudi-utilities-bundle_2.12-0.12.0.jar \
--props hdfs://hadoop1:8020/hudi-props/kafka-multiwriter-source.propertis \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field userid \
--target-base-path hdfs://hadoop1:8020/tmp/hudi/hudi_test_multi  \
--target-table hudi_test_multi \
--op INSERT \
--table-type MERGE_ON_READ

(3)查看zk是否产生新的目录

/opt/module/apache-zookeeper-3.5.7-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test2

常规调优

  1. 并行度

    Hudi对输入进行分区默认并发度为1500,以确保每个Spark分区都在2GB的限制内(在Spark2.4.0版本之后去除了该限制),如果有更大的输入,则相应地进行调整。建议设置shuffle的并发度,配置项为 hoodie.[insert|upsert|bulkinsert].shuffle.parallelism,以使其至少达到inputdatasize/500MB。

  2. Off-heap(堆外)内存
    Hudi写入parquet文件,需要使用一定的堆外内存,如果遇到此类故障,请考虑设置类似 spark.yarn.executor.memoryOverhead或 spark.yarn.driver.memoryOverhead的值。

  3. Spark 内存
    通常Hudi需要能够将单个文件读入内存以执行合并或压缩操作,因此执行程序的内存应足以容纳此文件。另外,Hudi会缓存输入数据以便能够智能地放置数据,因此预留一些 spark.memory.storageFraction通常有助于提高性能。

  4. 调整文件大小
    设置 limitFileSize以平衡接收/写入延迟与文件数量,并平衡与文件数据相关的元数据开销。

  5. 时间序列/日志数据
    对于单条记录较大的数据库/ nosql变更日志,可调整默认配置。另一类非常流行的数据是时间序列/事件/日志数据,它往往更加庞大,每个分区的记录更多。在这种情况下,请考虑通过.bloomFilterFPP()/bloomFilterNumEntries()来调整Bloom过滤器的精度,以加速目标索引查找时间,另外可考虑一个以事件时间为前缀的键,这将使用范围修剪并显着加快索引查找的速度。

  6. GC调优
    请确保遵循Spark调优指南中的垃圾收集调优技巧,以避免OutOfMemory错误。[必须]使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下:

    -XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
    
    -XX:NewSize=1g 
    -XX:SurvivorRatio=2 
    -XX:+UseCompressedOops 
    -XX:+UseConcMarkSweepGC 
    -XX:+UseParNewGC 
    -XX:CMSInitiatingOccupancyFraction=70 
    -XX:+PrintGCDetails 
    -XX:+PrintGCTimeStamps 
    -XX:+PrintGCDateStamps 
    -XX:+PrintGCApplicationStoppedTime 
    -XX:+PrintGCApplicationConcurrentTime 
    -XX:+PrintTenuringDistribution 
    -XX:+HeapDumpOnOutOfMemoryError 
    -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
    

    上述2个示例一样,一个换行,一个没换行。

  7. OutOfMemory错误

    如果出现OOM错误,则可尝试通过如下配置处理:spark.memory.fraction=0.2,spark.memory.storageFraction=0.2允许其溢出而不是OOM(速度变慢与间歇性崩溃相比)。

  8. 完整的生产配置

    spark.driver.extraClassPath /etc/hive/conf
    spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
    spark.driver.maxResultSize 2g
    spark.driver.memory 4g
    spark.executor.cores 1
    spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
    spark.executor.id driver
    spark.executor.instances 300
    spark.executor.memory 6g
    spark.rdd.compress true
    
    spark.kryoserializer.buffer.max 512m
    spark.serializer org.apache.spark.serializer.KryoSerializer
    spark.shuffle.service.enabled true
    spark.sql.hive.convertMetastoreParquet false
    spark.submit.deployMode cluster
    spark.task.cpus 1
    spark.task.maxFailures 4
    
    spark.yarn.driver.memoryOverhead 1024
    spark.yarn.executor.memoryOverhead 3072
    spark.yarn.max.executor.failures 100
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/344659.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小米电视安装 Plex 打造家庭影院

背景 最近突然想重温教父&#xff0c;本来想着直接投屏就可以&#xff0c;后来看了别人搭建的基于 NAS 的家庭影院很动心&#xff0c;也想依葫芦画瓢做一个&#xff0c;跟对象申请经费的时候被拒了&#xff0c;理由是有这钱还不如开个会员直接看。 我寻思不同电影在不同的平台…

遥感反演叶面积指数 (LAI)

叶面积指数 叶面积指数&#xff08;Leaf Area Index, LAI&#xff09;是反映一个生态系统中单位面积上的叶面积综合的一半&#xff0c;是模拟陆地生态过程、水热循环和生物地球化学循环的重要参数。 本文主要介绍LAI的遥感反演方法&#xff0c;其主要分为统计方法、植被辐射传输…

传奇私服搭建网站的几种方法

搭建网站的几种方法&#xff1a;一些人&#xff0c;连简单的搭建网站都不会&#xff0c;还要请技术帮忙&#xff0c;真是牛B&#xff0c;这里简单介绍下几种办法一&#xff1a;2003系统下&#xff0c;直接使用IIS&#xff0c;这个太简单了&#xff0c;桌面上就有IIS&#xff0c…

权威报告!这五个消费趋势,告诉你如何抓住中国消费者的心和钱包

有人说2023年是消费复苏的一年&#xff0c;市场回暖趋势明显&#xff1b;也有人说之前的亏空太大&#xff0c;想要短时间追上来不太可能&#xff0c;因此2023的消费市场最多是不低迷&#xff0c;达不到火热。这可把做生意的各位老板整纠结了&#xff0c;究竟今年要不要投个大手…

mysql 跳过事务 gtid

企业生产场景mysql主从复制故障原因 企业生产场景mysql主从复制故障原因 实验一&#xff1a; 目的&#xff1a;解决主从不同步&#xff08;本例中sql线程出现问题&#xff09; 方法&#xff1a;模拟故障场景 1.在SLAVE上建立一个名为yingying数据库。…

Webstorm 代码没有提示,uniapp 标签报错

问题 项目是用脚手架创建的&#xff1a; vue create -p dcloudio/uni-preset-vue my-project 打开之后&#xff0c;添加view标签警告报错的。代码也没有提示&#xff0c;按官方说法&#xff1a;CLI 工程默认带了 uni-app 语法提示和 5App 语法提示。 但是我这里就是有问题。…

Oracle实现高可用性的工具(负载均衡/故障切换)

Oracle实现高可用性的工具&#xff08;负载均衡/故障切换&#xff09;1 Oracle RAC故障转移负载均衡2 Data Guard负载均衡-读写分离Data Guard Broker3 GDSGSM&#xff1a;连接管理工具主要功能Data Guard Broker功能是监控Data Guard状态&#xff0c;当主库异常时自动切换角色…

idea2021版本新建maven项目

首先我们需要下载maven版本(maven下载地址Maven – Download Apache Maven)&#xff0c;并且配置好maven仓库与环境变量&#xff0c;这里不细述了。打开idea选择新建项目&#xff0c;选择maven&#xff0c;效果如下图 我们选择maven-archetype-webapp类型。 下一步&#xff0c;…

4.9 内部类

文章目录1.内部类概述2.特点3.练习 : 内部类入门案例4.成员内部类4.1 练习 : 被private修饰4.2 练习 : 被static修饰5.局部内部类6.匿名内部类1.内部类概述 如果一个类存在的意义就是为指定的另一个类&#xff0c;可以把这个类放入另一个类的内部。 就是把类定义在类的内部的情…

MQ中间件概念一览

一、概述 1. 大多应用中&#xff0c;可通过消息服务中间件来提升系统异步通信、扩展解耦能力 2. 消息服务中两个重要概念&#xff1a; 消息代理&#xff08;message broker&#xff09;和目的地&#xff08;destination&#xff09; 当消息发送者发送消息以后&#xff0c;将由…

有了ChatGPT 微软对元宇宙不香了?

押注ChatGPT是微软最近的主要发力点&#xff0c;另一边&#xff0c;它开始向元宇宙业务挥出裁员“大刀”。海外消息称&#xff0c;微软解散了成立仅四个月的工业元宇宙团队&#xff0c;约100名员工被全被解雇。 这只是微软放缓元宇宙战略的长尾动作&#xff0c;此前&#xff0…

【MFC】模拟采集系统——界面设计(17)

功能介绍 启动界面 开始采集&#xff1a; PS&#xff1a;不涉及 数据保存&#xff0c;重现等功能 界面设计 界面分为三块&#xff1a;顶部黑条带关闭按钮、左边对话框&#xff0c;右边的主界面 资源&#xff1a; 顶部黑条 top.bmp 2* 29 &#xff08;宽 * 高 像素点&…

SAS应用入门学习笔记7

代码说明&#xff1a; 1&#xff09;distinct 想获得region变量有的多少种&#xff1f; 2&#xff09;如果是常规语句&#xff0c;我们是使用proc freq 语句&#xff1a; where for filter&#xff1a; 然后有一个escape语句的概念&#xff1a; 这是一个简单的语法&#xff…

Redis集群离线安装

近日&#xff0c;由于客户的系统运行环境在一个封闭的网络内&#xff0c;不能与互联网联通&#xff0c;也不能提供yum库&#xff0c;所以运行环境只能采用离线安装的方式&#xff0c;我总结了一下本次的安装经过&#xff0c;希望对需要的人有所帮助。一、安装gcc查看gcc版本要求…

牛客网Python篇数据分析习题(五)

1.现有牛客网12月每天练习题目的数据集nowcoder.csv。包含如下字段&#xff08;字段之间用逗号分隔&#xff09;&#xff1a; user_id:用户id question_id&#xff1a;问题编号 result&#xff1a;运行结果 date&#xff1a;练习日期 请你统计答对和答错的总数分别是多少。 imp…

态路小课堂丨下一代数据中心100G接口第二篇——SFP-DD封装

100G光模块根据封装模式可分为QSFP28、CXP、CFP、CFP2、FCP4、DSFP和SFP-DD等。态路小课堂之前已经大量介绍了相关内容&#xff08;。 态路小课堂丨下一代数据中心100G接口——DSFP态路小课堂丨100G解决方案-425G NRZ光模块态路小课堂丨什么是100G QSFP28单波光模块&#xff1f…

为什么要用springboot进行开发呢?

文章目录前言1、那么Springboot是怎么实现自动配置的1.1 启动类1.2 SpringBootApplication1.3 Configuration1.4 ComponentScan1.5 EnableAutoConfiguration1.6 两个重要注解1.7 AutoConfigurationPackage注解1.8 Import(AutoConfigurationImportSelector.class)注解1.9自动配置…

素数相关(结合回文数,合数)线性筛素数(欧拉筛法)Euler【算法模板笔记】

一、朴素筛法&#xff08;埃拉托斯特尼筛法&#xff09;Eratosthenes 筛法&#xff08;埃拉托斯特尼筛法&#xff0c;简称埃氏筛法&#xff09;时间复杂度是O(nloglogn)不常用&#xff0c;被欧拉筛代替&#xff0c;略二、线性筛素数&#xff08;欧拉筛法&#xff09;简介线性筛…

C++007-C++循环结构

文章目录C007-C循环结构for循环for循环举例for循环格式题目描述 输出十次手机号题目描述 打印区间内的整数题目描述 打印字符之间的所有字符题目描述 打印区间内符合条件的整数数数量作业在线练习&#xff1a;总结C007-C循环结构 在线练习&#xff1a; http://noi.openjudge.cn…

HappyAI 算法任务调度平台 - 开篇

HappyAI 算法任务调度平台 - 开篇 HappyAI 算法任务调度平台 - 接入 HappyAI 算法任务调度平台 - 开发 1. 支持不同算法侧接入即算法集群&#xff08;如&#xff1a;paddle算法平台&#xff1b;mmdetection算法平台&#xff09; 2. 支持不同相机取流&#xff08;如&#xff…