使用spark进行hbase的bulkload

news2024/12/24 2:43:10

使用spark进行hbase的bulkload

一、 背景

HBase 是一个面向列,schemaless,高吞吐,高可靠可水平扩展的 NoSQL 数据库,用户可以通过 HBase client 提供的 put get 等 api 实现在数据的实时读写。在过去的几年里,HBase 有了长足的发展,它在越来越多的公司里扮演者越来越重要的角色。
HBase 擅长于海量数据的实时读取,原生 HBase 没有二级索引,复杂查询场景支持的不好。同时因为 split,磁盘,网络抖动,Java GC 等多方面的因素会影响其 RT 表现,所以通常我们在使用HBase的同时也会使用其他的存储中间件,比如 ES,Reids,Mysql 等等。避免 HBase 成为信息孤岛,我们需要数据导入导出的工具在这些中间件之间做数据迁移,而最常用的莫过于阿里开源的 DataX。Datax从 其他数据源迁移数据到 HBase 实际上是走的 HBase 原生 api 接口,在少量数据的情况下没有问题,但当我们需要从 Hive 里,或者其他异构存储里批量导入几亿,几十亿的数据,那么用 DataX 这里就显得不那么适合,因为走原生接口为了避免影响生产集群的稳定性一定要做好限流,那么海量数据的迁移就很很慢,同时数据的持续写入会因为 flush,compaction 等机制占用较多的系统资源。为了解决批量导入的场景,Bulkload 应运而生。

二、HBase Bulkload
在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。

1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,
在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘
(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。


2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

  • Extract,异构数据源数据导入到 HDFS 之上。
  • Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
  • Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。

 三、实践

hive表


 

 hbase表

 依赖

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<properties>

        <maven.compiler.source>1.8</maven.compiler.source>

        <maven.compiler.target>1.8</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>

        <log4j.version>1.7.30</log4j.version>

        <zk.version>3.4.5-cdh5.16.2</zk.version>

        <scala.version>2.12.10</scala.version>

        <scala.tools.version>2.12</scala.tools.version>

        <spark.version>3.2.0</spark.version>

        <hbase.version>1.2.0-cdh5.16.2</hbase.version>

        <config.version>1.4.0</config.version>

    </properties>

     

    <repositories>

        <repository>

            <id>nexus-aliyun</id>

            <url>http://maven.aliyun.com/nexus/content/groups/public</url>

        </repository>

        <repository>

            <id>cloudera</id>

            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

        </repository>

    </repositories>

    <dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.version}</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-log4j12</artifactId>

            <version>${log4j.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>${zk.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>${hbase.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql_${scala.tools.version}</artifactId>

            <version>${spark.version}</version>

        </dependency>

         

    </dependencies>

spark 代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

package com.jojo

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}

import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

 * Description:Hbase批量加载   同一列族多列

 */

object HbaseBulkLoadApp {

  val zookeeperQuorum = "cdh01,cdh02,cdh03"//zookeeper信息

  val dataSourcePath = "hdfs://cdh03:8020/user/hive/warehouse/sample_07" //源文件

  val hFilePath = "hdfs://cdh03:8020/tmp/result"//hfile的存储路径

  val hdfsRootPath = "hdfs://cdh03:8020/"//根路径

  val tableName = "sample_07"//表名

  val familyName = "basic"//列族

  val arr = Array("code","description""total_emp","salary")//列的名字集合

  def main(args: Array[String]): Unit = {

    //获取content

    val sparkConf = new SparkConf()

      .setAppName(s"${this.getClass.getSimpleName}")

      .setMaster("local")

      //指定序列化格式,默认是java序列化

      .set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

      //告知哪些类型需要序列化

      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)

    //hadoop配置

    val hadoopConf = new Configuration()

    hadoopConf.set("fs.defaultFS", hdfsRootPath)

    //获取输出路径

    val fileSystem = FileSystem.get(hadoopConf)

    //获取hbase配置

    val hconf = HBaseConfiguration.create()

    //设置zookeeper集群

    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)

    //设置端口

    hconf.set("hbase.zookeeper.property.clientPort""2181");

    //设置hfile最大个数

    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")

    //设置hfile的大小

    hconf.set("hbase.hregion.max.filesize","10737418240")

    hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    //获取hbase连接

    val hbaseConn = ConnectionFactory.createConnection(hconf)

    val admin = hbaseConn.getAdmin

    /**

     * 保存生成的HFile文件

     * 注:bulk load  生成的HFile文件需要落地

     * 然后再通过LoadIncrementalHFiles类load进Hbase

     * 此处关于  sortBy 操作详解:

     * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,

     * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,

     * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。

     * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序

     * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行

     * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)

     *

     * @param hfileRDD

     */

    // 0. 准备程序运行的环境

    // 如果 HBase 表不存在,就创建一个新表

    if (!admin.tableExists(TableName.valueOf(tableName))) {

      val desc = new HTableDescriptor(TableName.valueOf(tableName))

      val hcd = new HColumnDescriptor(familyName)

      desc.addFamily(hcd)

      admin.createTable(desc)

      print("创建了一个新表")

    }

    // 如果存放 HFile文件的路径已经存在,就删除掉

    if(fileSystem.exists(new Path(hFilePath))) {

      fileSystem.delete(new Path(hFilePath), true)

      print("删除hdfs上存在的路径")

    }

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:

    // java.io.IOException: Added a key not lexically larger than previous.

    val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)

      .map(row => {

        // 处理数据的逻辑

        val arrs = row.split("\t")

        var kvlist: Seq[KeyValue] = List()//存储多个列

        var rowkey: Array[Byte] = null

        var cn: Array[Byte] = null

        var v: Array[Byte] = null

        var kv: KeyValue = null

        val cf = familyName.getBytes //列族

        rowkey = Bytes.toBytes(arrs(0)) //key

        for (i <- 1 to (arrs.length - 1)) {

          cn = arr(i).getBytes() //列的名称

          v = Bytes.toBytes(arrs(i)) //列的值

          //将rdd转换成HFile需要的格式,上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

        }

        (new ImmutableBytesWritable(rowkey), kvlist)

      })

    val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data

      .flatMapValues(_.iterator)

    // 2. Save Hfiles on HDFS

    val table = hbaseConn.getTable(TableName.valueOf(tableName))

    val job = Job.getInstance(hconf)

    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setMapOutputValueClass(classOf[KeyValue])

    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    hfileRDD

      .sortBy(x => (x._1, x._2.getKeyString), true//要保持 整体有序

      .saveAsNewAPIHadoopFile(hFilePath,

        classOf[ImmutableBytesWritable],

        classOf[KeyValue],

        classOf[HFileOutputFormat2],

        hconf)

    print("成功生成HFILE")

    val bulkLoader = new LoadIncrementalHFiles(hconf)

    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))

    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()

    sc.stop()

  }

}

 其中可能遇到的问题:

1

EndOfStreamException: Unable to read additional data from server sessionid 0x17f44ca01833e45, likely server has closed socket

 解决:

  主要是zk的版本不匹配,在依赖选择匹配的zk版本。

输出结果

https://www.cnblogs.com/huangguoming/articles/12967868.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/783385.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++面向对象程序设计-基础入门(超详细)

目录 一、c概述 二、初识c 1、第一个c程序 2、c面向对象的三大特性&#xff08;重要&#xff09; 三、作用域运算符&#xff1a;&#xff1a; 1、使用关键字namespace创建一个命名空间 2、命名空间只能定义在全局 3、 命名空间嵌套 4、随时将新的成员加入命名空间 5、命…

uni-app : 监听路由变化

在App.vue中 在 onLaunch中,利用拦截器监听 navigateTo等, 切记要在 invoke回调函数中查看, 要是再 success回调函数中,都路由完成了,还看啥? onLaunch(){ uni.addInterceptor(navigateTo, { //监听跳转invoke(e) {console.log(******** invoke-navigateTo ********, e.url)}…

AI工具集:【stablefoundation】satblediffusion官方免费实验机器人

stablefoundation是satble diffusion官方免费实验机器人,与midjourney一样在discord上操作 视频教程 https://v.douyin.com/ibgQTU7/ 图文教程 1、打开网址: https://stabledigest.substack.com/ 2、点击discord 3、加入stable foundation 4、点击找到机器人频道&#xf…

Linux —— 环境变量

环境变量&#xff08;environment variables&#xff09;&#xff0c;一般指在操作系统中用来指定操作系统运行环境的一些参数&#xff1b;如在编写的C/C代码链接时&#xff0c;所链接的动态、静态库的位置&#xff0c;就是通过相关环境变量帮助编译器进行查找的&#xff1b;环…

Stephen Wolfram:一次只添加一个词

It’s Just Adding One Word at a Time 一次只添加一个词 That ChatGPT can automatically generate something that reads even superficially like human-written text is remarkable, and unexpected. But how does it do it? And why does it work? My purpose here is t…

PID输出反馈回路调控算法原理

本文章学习研究PID闭环回路控制算法&#xff0c;介绍帮助大家理解这个算法&#xff0c;希望看后觉得有用就三连支持一下。 目录 认识PID: PID算法知识理论学习&#xff1a; 首先看PID原理的框图&#xff1a; 一、比例算法P&#xff1a; 二、积分算法I&#xff1a; 三、微…

new Vue () 中的 render 函数与 templete 模板

首先新建一个空的 Vue 项目&#xff0c;我们会在 main.js 文件中发现如下代码 import Vue from vue import App from ./App.vue// 关闭vue的生产提示 Vue.config.productionTip falsenew Vue({render: h > h(App), }).$mount(#app) $mount(#app) &#xff1a;可以参考之前…

Git使用--多人协作

多人协作 多⼈协作git branch -rgit checkout -b dev origin/dev 远程分⽀删除后&#xff0c;本地git branch -a依然能看到的解决办法git remote show origingit remote prune origin 多⼈协作 截止到目前&#xff0c;我们学习了如下Git的相关知识&#xff1a; 基本完成Git的…

Wireshark抓包验证TCP协议的三次握手与四次挥手

TCP的基本知识与Wireshark TCP的一些先知知识可以看下面的文章&#xff0c;了解TCP协议的基本原理&#xff0c;与报文的首部格式。 https://blog.csdn.net/weixin_52308622/article/details/131141490?spm1001.2014.3001.5501 https://blog.csdn.net/weixin_52308622/artic…

30天自制操作系统 day2 换种方式制作磁盘镜像 makefile

制作磁盘镜像工具 用的是它自己写的工具&#xff0c;叫edimg。使用方式如下 edimg imgin:../z_tools/fdimg0at.tek wbinimg src:ipl.bin len:512 from:0 to:0 imgout:helloos.img读取fdimg0at.tek&#xff0c;在读取ipl.bin&#xff0c;从ipl.bin的开头读512个字节到fdim…

《TCP IP网络编程》第八章

第 8 章 域名及网络地址 DNS 是对IP地址和域名进行相互转换的系统&#xff0c;其核心是 DNS 服务器。域名就是我们常常在地址栏里面输入的地址&#xff0c;将比较难记忆的IP地址变成人类容易理解的信息。 计算机内置的默认DNS服务器并不知道网络上所有域名的IP地址信息。若该DN…

第 355 场 LeetCode 周赛

A 按分隔符拆分字符串 简单模拟 class Solution { public:vector<string> splitWordsBySeparator(vector<string> &words, char separator) {vector<string> res;for (auto &s: words) {int n s.size();for (int i 0, j 0; i < n;) {while (j …

C++——String类的增删查改

目录 前言 1.String类的增删查改 1.1增 实验代码&#xff1a; 运行结果&#xff1a; 实验代码&#xff1a; 运行结果:​编辑 1.2删 实验代码&#xff1a; 结果: 1.3查找 练习&#xff1a;查找文件后缀 运行结果&#xff1a; 1.4 改 前言 上篇博客中&#xff0c;我介绍了St…

VisualStudio如何进行插件开发?

文章目录 0.引言1.工具准备2.创建插件项目&#xff08;VSIX&#xff09;3.自定义VSIX属性4.创建一个command命令5.设置command名称6.编写command功能7.调试插件8.安装插件 0.引言 使用Visual Studio插件可以极大地提升开发效率、提供更好的集成环境、丰富扩展生态系统、方便调试…

【团队协作开发】IDEA中Git新建自己的dev工作分支,合并到master主分支教程(极其简单,新手)

文章目录 一、创建新dev工作分支二、push到自己的远程dev工作分支三、工作分支合并到master主分支1、先切换到master主分支2、将远程工作dev分支的内容merge到当前master分支中3、将merge提交到远程master分支 一、创建新dev工作分支 创建完新dev分支以后将默认切换到新dev分支…

K8S 证书过期后,kubeadm 重新生成证书

前言 K8S 各个组件需要与 api-server 进行通信&#xff0c;通信使用的证书都存放在 /etc/kubernetes/pki 路径下&#xff0c;kubeadm 生成的证书默认有效为 1 年&#xff0c;因此需要定时更新证书&#xff0c;否则证书到期会导致整个集群不可用。 本篇文章主要介绍如何通过 k…

openGauss学习笔记-17 openGauss 简单数据管理-表达式

文章目录 openGauss学习笔记-17 openGauss 简单数据管理-表达式17.1 简单表达式17.2 条件表达式17.3 子查询表达式17.4 数组表达式17.5 行表达式 openGauss学习笔记-17 openGauss 简单数据管理-表达式 表达式类似一个公式&#xff0c;我们可以将其应用在查询语句中&#xff0c…

SpringMVC注解介绍(二)

目录 1.RequestPart上传文件 2.获取Cookie 1.使用CookieValue 3.获取Session 3.1SessionAttribute 4.设置Session 4.1HttpSession设置Session 5.获取Header 5.1RequestHeader 6.返回数据 1.返回Json对象 7.请求转发或请求重定向 7.1forward和redirect区别 7.2请求…

element的el-upload实现多个图片上传以及预览与删除

<el-form-itemlabel"实验室照片:"prop"labUrlList"v-if"ruleForm.labHave"><el-upload:action"urlUpload":headers"loadHeader"list-type"picture-card":file-list"ruleForm.labUrlList"class…

LabVIEW在IMAQ图像中手动选择多个ROI

LabVIEW在IMAQ图像中手动选择多个ROI 设计了一个VI&#xff0c;用于在图像上生成和叠加一系列感兴趣区域&#xff08;ROI&#xff09;&#xff0c;并在IMAQ图像控件中显示它们。想挑选其中的一些进行后续处理。可以在控件中手动选择 ROI 吗&#xff1f; 以编程方式生成的 ROI…