Hudi(10):Hudi集成Spark之并发控制

news2025/1/23 7:25:59

目录

0. 相关文章链接

1. Hudi支持的并发控制

1.1. MVCC

1.2. OPTIMISTIC CONCURRENCY

2. 使用并发写方式

3. 使用Spark DataFrame并发写入

4. 使用Delta Streamer并发写入


0. 相关文章链接

 Hudi文章汇总 

1. Hudi支持的并发控制

1.1. MVCC

        Hudi的表操作,如压缩、清理、提交,hudi会利用多版本并发控制来提供多个表操作写入和查询之间的快照隔离。使用MVCC这种模型,Hudi支持并发任意数量的操作作业,并保证不会发生任何冲突。Hudi默认这种模型。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。

1.2. OPTIMISTIC CONCURRENCY

        针对写入操作(upsert、insert等)利用乐观并发控制来启用多个writer将数据写到同一个表中,Hudi支持文件级的乐观一致性,即对于发生在同一个表中的任何2个提交(写入),如果它们没有写入正在更改的重叠文件,则允许两个写入都成功。此功能处于实验阶段,需要用到Zookeeper或HiveMetastore来获取锁。

2. 使用并发写方式

  • 如果需要开启乐观并发写入,需要设置以下属性:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>
  • Hudi获取锁的服务提供两种模式使用zookeeper或HiveMetaStore:

相关zookeeper参数:

hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path

相关HiveMetastore参数,HiveMetastore URI是从运行时加载的hadoop配置文件中提取的:

hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table

3. 使用Spark DataFrame并发写入

(1)启动spark-shell

spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

(2)编写代码【核心为写入时的 hoodie 相关参数】

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
  .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
  .option("hoodie.write.lock.zookeeper.url", "hadoop1,hadoop2,hadoop3")
  .option("hoodie.write.lock.zookeeper.port", "2181")
  .option("hoodie.write.lock.zookeeper.lock_key", "test_table")
  .option("hoodie.write.lock.zookeeper.base_path", "/multiwriter_test")
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

(3)使用zk客户端,验证是否使用了zk

/opt/module/apache-zookeeper-3.5.7/bin/zkCli.sh 
[zk: localhost:2181(CONNECTED) 0] ls /

(4)zk下产生了对应的目录,/multiwriter_test下的目录,为代码里指定的lock_key

[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test

4. 使用Delta Streamer并发写入

基于前面DeltaStreamer的例子,使用Delta Streamer消费kafka的数据写入到hudi中,这次加上并发写的参数。

1)进入配置文件目录,修改配置文件添加对应参数,提交到Hdfs上

cd /opt/module/hudi-props/
cp kafka-source.properties kafka-multiwriter-source.propertis
vim kafka-multiwriter-source.propertis 

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=hadoop1,hadoop2,hadoop3
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=test_table2
hoodie.write.lock.zookeeper.base_path=/multiwriter_test2

hadoop fs -put /opt/module/hudi-props/kafka-multiwriter-source.propertis /hudi-props

2)运行Delta Streamer

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/opt/module/spark-3.2.2/jars/hudi-utilities-bundle_2.12-0.12.0.jar \
--props hdfs://hadoop1:8020/hudi-props/kafka-multiwriter-source.propertis \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field userid \
--target-base-path hdfs://hadoop1:8020/tmp/hudi/hudi_test_multi  \
--target-table hudi_test_multi \
--op INSERT \
--table-type MERGE_ON_READ

3)查看zk是否产生新的目录

/opt/module/apache-zookeeper-3.5.7-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[zk: localhost:2181(CONNECTED) 1] ls /multiwriter_test2


注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/147307.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

阿里云 EDAS Java服务日志中打印调用链TraceId

最近要搭建阿里云的日志服务SLS&#xff0c;收集服务日志&#xff0c;进行统一的搜索查询。但遇到一个问题如何在日志中打印链路的TraceId&#xff0c;本文章记录一下对EDAS免费的解决方法。 先看一下阿里官方文档 业务日志关联调用链的TraceId信息 从文档上看&#xff0c;想要…

基于SSM的资源发布系统

项目介绍&#xff1a; 该系统基于SSM技术&#xff0c;数据层为MyBatis&#xff0c;数据库使用mysql&#xff0c;MVC模式&#xff0c;B/S架构&#xff0c;具有完整的业务逻辑。系统共分为管理员&#xff0c;用户两种角色&#xff0c;主要功能&#xff1a;登陆注册&#xff0c;用…

数据结构:跳表

文章目录跳表跳表的由来单链表的查找效率太低提高单链表的查找效率跳表的时间复杂度分析跳表的空间复杂度分析跳表的插入操作跳表的删除操作跳表索引动态更新跳表 对链表进行改造&#xff0c;在链表上加多级索引的结构就是跳表&#xff0c;使其可以支持类似“二分”的查找算法。…

Redis查询之RediSearch和RedisJSON讲解

文章目录1 Redis查询1.1 RedisMod介绍1.2 安装Redis1.3 RediSearchRedisJSON安装1.3.1 下载安装1.3.2 修改配置1.4 RedisJSON操作1.4.1 基本操作1.4.1.1 保存操作JSON.SET1.4.1.2 读取操作JSON.GET1.4.1.3 批量读取操作JSON.MGET1.4.1.4 删除操作JSON.DEL1.4.1.5 其他命令1.4.1…

鲲鹏Bigdata pro之Hive的基本操作(创建表、查询表)

1 介绍 本文主要依据《鲲鹏Bigdata pro之Hive集群部署》实验教程上的Hive操作例子讲解&#xff0c;方便大数据学员重用相应的操作语句。同时对实验过程中出现的问题给以解决方法&#xff0c;重现问题解决的过程。以让大家认识到&#xff0c;出现问题很正常&#xff1b;同时&am…

Java设计模式中接口隔离原则是什么?迪米特原则又是什么,啥又是合成复用原则,这些又怎么运用

继续整理记录这段时间来的收获&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 3.5 接口隔离原则 3.5.1 特点 使用的类不应该被迫依赖于不想使用的方法&#xff0c;应该依赖接口方法 3.5.2 案例(安全门) 防火功能代码 public interface Fi…

第一章:统计学习方法概论

大纲1.1统计学习的特点1.2统计学习方法步骤1.3 统计学习的分类基本分类&#xff1a;1.4 监督学习方法的三要素模型&#xff1a;条件概率分布P(Y∣X)P(Y|X)P(Y∣X)或决策分布Yf(X)Yf(X)Yf(X)策略&#xff1a;在所有假设空间中选择一个最优模型注意事项&#xff1a;算法&#xff…

Java设计模式中适配器模式是什么/适配器模式可以干什么/又如何实现

继续整理记录这段时间来的收获&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 5.3 适配器模式 5.3.1 概述 将一个类的接口转换为客户希望的另一种接口&#xff0c;使得原本由于接口不兼容而不能一起工作的那些类能一起工作分为类适配器模式和…

一套采用ASP.NET开发的工作通OA协同办公系统源码 流程审批 公文流转 文档管理

分享一套采用ASP.NET基于C#开发&#xff0c;使用桌面式的OA协同办公系统&#xff0c;超好用户体验效果的后台管理界面&#xff0c;集成 资讯、邮件、日程、文档&#xff08;在线文件档案管理&#xff09;、流程审批、公文流转、沟通与分享&#xff08;在线聊天和内部论坛&#…

基于LLVM的C编译器--lcc——以CLion用SSH连接WSL Ubuntu22.04为例

Windows 10 22H2CLion 2022.3.1Ubuntu 20.04 &#xff08;Microsoft Store内的WSL发行版&#xff09; 一、下载WSL&#xff0c;换源&#xff0c;切换到WSL2 1.1 保证windows版本 在设置->系统->关于中查看 必须是win10及以上对于x64系统&#xff1a;版本1903或更高版…

ArcGIS基础实验操作100例--实验63由图片创建点符号

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 高级编辑篇--实验63 由图片创建点符号 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff08;1&…

Java设计模式中代理模式是什么/JDK动态代理分为哪些,静态代理又怎么实现,又适合哪些场景

继续整理记录这段时间来的收获&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 5.结构型模式 5.1 概述 根据如何将类或对象按某种布局组成更大的结构&#xff0c;分为类结构模式和对象结构模式&#xff0c;前者采用继承机制来组织接口和类&am…

视频序列对比学习

前言 视频embedding化也即表征有很多实际的应用场景&#xff0c;比如文本-视频 pair的检索等等。由于视频一般来说较长&#xff0c;所以对于给定的一段话&#xff0c;其中的某些sentence句子一般对应着视频中某几个clip片段&#xff0c;之前常规的做法都是去匹配所有的sentence…

人工服务、人工智能和分析是联络中心的主要趋势

数字联络中心提供商 IPI 宣布了其对 2023 年的预测。IPI 非常重视提供卓越的客户联系&#xff0c;认为未来一年将由以下趋势定义&#xff1a;专注于人工服务&#xff1b;增加对人工智能和自动化的采用&#xff1b;以及更多地使用数据和分析。 关注人性化服务 据 IPI 称&#…

实现QTreeView、QTableView子项中的复选框勾选/取消勾选功能

1.前言本博文所说的技术点适用于同时满足下面条件的所有视图类&#xff1a;模型类从 QAbstractItemModel派生。代理类从QStyledItemDelegate派生。故本博文所说的技术点也适用于QTableView。2.需求提出基于Qt的model/view framework技术&#xff0c;利用QTreeView树视图实现业务…

【异常】SpringSecurity登录失败:Full authentication is required to access this resource

一、报错提示 SpringSecurity提示如下内容&#xff1a; 2023-01-07 06:08:51.843 [cdi-ids-commonprovider] [http-nio-9092-exec-14] WARN com.desaysv.tsp.logic.ids.config.MyAuthenticationEntryPoint - 登录失败&#xff1a;Full authentication is required to acces…

基于Java+Jsp+SpringMVC漫威手办商城系统设计和实现

基于JavaJspSpringMVC漫威手办商城系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联…

2023 年值得关注的 7 大人工智能 (AI) 技术趋势

&#x1f482; 个人网站:【海拥】【摸鱼游戏】【神级源码资源网】&#x1f91f; 前端学习课程&#xff1a;&#x1f449;【28个案例趣学前端】【400个JS面试题】&#x1f485; 想寻找共同学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】 人工智能 (AI) 已经接…

图数据库Neo4j实战(全网最详细教程)

1.图数据库Neo4j介绍 1.1 什么是图数据库&#xff08;graph database&#xff09; 随着社交、电商、金融、零售、物联网等行业的快速发展&#xff0c;现实社会织起了了一张庞大而复杂的关系网&#xff0c;传统数据库很难处理关系运算。大数据行业需要处理的数据之间的关系随数…

《Go 并发数据结构和算法实践》学习笔记 Day 1

极客时间21天打卡活动&#xff1a;2023.1.16-2.5 链表的接口&#xff1a; 插入元素删除元素读取元素 并发化改造&#xff1a; 并发插入元素并发删除元素并发读取元素 锁&#xff0c;每个节点都定义一把锁。 并发插入 区域猜想&#xff1a;如果某个CPU 锁定了某个节点&…