【HBase高级】3. HBase批量装载——Bulk load(1)Bulk load简介与案例介绍

news2024/11/20 7:13:38

2. HBase批量装载——Bulk load

2.1 简介

很多时候,我们需要将外部的数据导入到HBase集群中,例如:将一些历史的数据导入到HBase做备份。我们之前已经学习了HBase的Java API,通过put方式可以将数据写入到HBase中,我们也学习过通过MapReduce编写代码将HDFS中的数据导入到HBase。但这些方式都是基于HBase的原生API方式进行操作的。这些方式有一个共同点,就是需要与HBase连接,然后进行操作。HBase服务器要维护、管理这些连接,以及接受来自客户端的操作,会给HBase的存储、计算、网络资源造成较大消耗。此时,在需要将海量数据写入到HBase时,通过Bulk load(大容量加载)的方式,会变得更高效。可以这么说,进行大量数据操作,Bulk load是必不可少的。

我们知道,HBase的数据最终是需要持久化到HDFS。HDFS是一个文件系统,那么数据可定是以一定的格式存储到里面的。例如:Hive我们可以以ORC、Parquet等方式存储。而HBase也有自己的数据格式,那就是HFile。Bulk Load就是直接将数据写入到StoreFile(HFile)中,从而绕开与HBase的交互,HFile生成后,直接一次性建立与HBase的关联即可。使用BulkLoad,绕过了Write to WAL,Write to MemStore及Flush to disk的过程

更多可以参考官方对Bulk load的描述:https://hbase.apache.org/book.html#arch.bulk.load

2.2 Bulk load MapReduce程序开发

Bulk load的流程主要分为两步:

  1. 通过MapReduce准备好数据文件(Store Files)
  2. 加载数据文件到HBase

2.3 银行转账记录海量冷数据存储案例

银行每天都产生大量的转账记录,超过一定时期的数据,需要定期进行备份存储。本案例,在MySQL中有大量转账记录数据,需要将这些数据保存到HBase中。因为数据量非常庞大,所以采用的是Bulk Load方式来加载数据。

  • 项目组为了方便数据备份,每天都会将对应的转账记录导出为CSV文本文件,并上传到HDFS。我们需要做的就将HDFS上的文件导入到HBase中。
  • 因为我们只需要将数据读取出来,然后生成对应的Store File文件。所以,我们编写的MapReduce程序,只有Mapper,而没有Reducer。

2.3.1 数据集

idID
code流水单号
rec_account收款账户
rec_bank_name收款银行
rec_name收款人姓名
pay_account付款账户
pay_name付款人姓名
pay_comments转账附言
pay_channel转账渠道
pay_way转账方式
status转账状态
timestamp转账时间
money转账金额

2.3.2 项目准备工作

HBase中创建银行转账记录表

create_namespace "ITCAST_BANK"
# disable "TRANSFER_RECORD"
# drop "TRANSFER_RECORD"
create "ITCAST_BANK:TRANSFER_RECORD", { NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => "HexStringSplit"}

创建项目

groupidcn.itcast
artifactidbankrecord_bulkload

导入POM依赖

<repositories><!-- 代码库 -->
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
    </repository>
</repositories>

<dependencies>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-mapreduce</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>

创建包结构

说明
cn.itcast.bank_record.bulkload.mrMapReduce相关代码
cn.itcast.bank_record.entity实体类

导入配置文件
将 core-site.xml、hbase-site.xml、log4j.properties三个配置文件拷贝到resources目录中。

确保Windows环境变量配置正确
1.HADOOP_HOME
在这里插入图片描述
在资料包里面,有一个hadoop_windows客户端文件夹,该文件夹中有一个压缩包,从压缩包中很多windows版本的客户端,找一个2.7.4版本,解压到指定目录即可。

2.HADOOP_USER_NAME
在这里插入图片描述

2.3.3 编写实体类

实现步骤:

  1. 创建实体类TransferRecord
  2. 添加一个parse静态方法,用来将逗号分隔的字段,解析为实体类
  3. 使用以下数据测试解析是否成功
7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0

参考代码:

public class TransferRecord {
    private String id;
    private String code;
    private String rec_account;
    private String rec_bank_name;
    private String rec_name;
    private String pay_account;
    private String pay_name;
    private String pay_comments;
    private String pay_channel;
    private String pay_way;
    private String status;
    private String timestamp;
    private String money;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getRec_account() {
        return rec_account;
    }

    public void setRec_account(String rec_account) {
        this.rec_account = rec_account;
    }

    public String getRec_bank_name() {
        return rec_bank_name;
    }

    public void setRec_bank_name(String rec_bank_name) {
        this.rec_bank_name = rec_bank_name;
    }

    public String getRec_name() {
        return rec_name;
    }

    public void setRec_name(String rec_name) {
        this.rec_name = rec_name;
    }

    public String getPay_account() {
        return pay_account;
    }

    public void setPay_account(String pay_account) {
        this.pay_account = pay_account;
    }

    public String getPay_name() {
        return pay_name;
    }

    public void setPay_name(String pay_name) {
        this.pay_name = pay_name;
    }

    public String getPay_comments() {
        return pay_comments;
    }

    public void setPay_comments(String pay_comments) {
        this.pay_comments = pay_comments;
    }

    public String getPay_channel() {
        return pay_channel;
    }

    public void setPay_channel(String pay_channel) {
        this.pay_channel = pay_channel;
    }

    public String getPay_way() {
        return pay_way;
    }

    public void setPay_way(String pay_way) {
        this.pay_way = pay_way;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    public String getMoney() {
        return money;
    }

    public void setMoney(String money) {
        this.money = money;
    }

    @Override
    public String toString() {
        return "TransferRecord{" +
                "id='" + id + '\'' +
                ", code='" + code + '\'' +
                ", rec_account='" + rec_account + '\'' +
                ", rec_bank_name='" + rec_bank_name + '\'' +
                ", rec_name='" + rec_name + '\'' +
                ", pay_account='" + pay_account + '\'' +
                ", pay_name='" + pay_name + '\'' +
                ", pay_comments='" + pay_comments + '\'' +
                ", pay_channel='" + pay_channel + '\'' +
                ", pay_way='" + pay_way + '\'' +
                ", status='" + status + '\'' +
                ", timestamp='" + timestamp + '\'' +
                ", money='" + money + '\'' +
                '}';
    }

    public static TransferRecord parse(String line) {
        TransferRecord transferRecord = new TransferRecord();
        String[] fields = line.split(",");

        transferRecord.setId(fields[0]);
        transferRecord.setCode(fields[1]);
        transferRecord.setRec_account(fields[2]);
        transferRecord.setRec_bank_name(fields[3]);
        transferRecord.setRec_name(fields[4]);
        transferRecord.setPay_account(fields[5]);
        transferRecord.setPay_name(fields[6]);
        transferRecord.setPay_comments(fields[7]);
        transferRecord.setPay_channel(fields[8]);
        transferRecord.setPay_way(fields[9]);
        transferRecord.setStatus(fields[10]);
        transferRecord.setTimestamp(fields[11]);
        transferRecord.setMoney(fields[12]);

        return transferRecord;
    }

    public static void main(String[] args) {
        String str = "7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0";
        TransferRecord tr = parse(str);

        System.out.println(tr);
    }
}

2.3.4 构建读取数据的Mapper

HBase提供了两个类来专门对MapReduce支持:

  1. ImmutableBytesWritable:对应rowkey
  2. MapReduceExtendedCell:对应 列 → 值(键值对)

实现步骤:

  1. 创建一个BankRecordMapper的类继承Mapper类,Mapper的泛型为
    a)输入key:LongWritable
    b)输入value:Text
    c)输出key:ImmutableBytesWritable
    d)输出value:MapReduceExtendedCell
  2. 将Mapper获取到Text文本行,转换为TransferRecord实体类
  3. 从实体类中获取ID,并转换为rowkey
  4. 使用KeyValue类构建单元格,每个需要写入到表中的字段都需要构建出来单元格
  5. 使用context.write将输出输出
    a)构建输出key:new ImmutableBytesWrite(rowkey)
    b)构建输出的value:new MapReduceExtendedCell(keyvalue对象)

参考代码:

import cn.itcast.bank_record.entity.TransferRecord;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BankRecordMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // HBase需要有rowkey,列名 => 值
        TransferRecord transferRecord = TransferRecord.parse(value.toString());
        String rowkey = transferRecord.getId();

        // 列蔟
        byte[] cf = Bytes.toBytes("C1");
        byte[] colId = Bytes.toBytes("id");
        byte[] colCode = Bytes.toBytes("code");
        byte[] colRec_account = Bytes.toBytes("rec_account");
        byte[] colRec_bank_name = Bytes.toBytes("rec_bank_name");
        byte[] colRec_name = Bytes.toBytes("rec_name");
        byte[] colPay_account = Bytes.toBytes("pay_account");
        byte[] colPay_name = Bytes.toBytes("pay_name");
        byte[] colPay_comments = Bytes.toBytes("pay_comments");
        byte[] colPay_channel = Bytes.toBytes("pay_channel");
        byte[] colPay_way = Bytes.toBytes("pay_way");
        byte[] colStatus = Bytes.toBytes("status");
        byte[] colTimestamp = Bytes.toBytes("timestamp");
        byte[] colMoney = Bytes.toBytes("money");

        KeyValue idKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getId()));
        KeyValue codeKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getCode()));
        KeyValue rec_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_account()));
        KeyValue rec_bank_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_bank_name()));
        KeyValue rec_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_name()));
        KeyValue pay_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_account()));
        KeyValue pay_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_name()));
        KeyValue pay_commentsKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_comments()));
        KeyValue pay_channelKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_channel()));
        KeyValue pay_wayKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_way()));
        KeyValue statusKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getStatus()));
        KeyValue timestampKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getTimestamp()));
        KeyValue moneyKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getMoney()));

        ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));
        context.write(rowkeyWritable, new MapReduceExtendedCell(idKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(codeKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(rec_accountKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(rec_bank_nameKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(rec_nameKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_accountKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_nameKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_commentsKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_channelKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_wayKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(statusKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(timestampKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(moneyKeyValue));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/191354.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis(三)使用MyBatis完成CRUD(增删改查)

准备工作 1、创建module&#xff08;Maven的普通Java模块&#xff09;&#xff1a;mybatis-002-crud 2、pom.xml 打包方式jar依赖&#xff1a;mybatis依赖mysql驱动依赖junit依赖logback依赖3、mybatis-config.xml放在类的根路径下 4、CarMapper.xml放在类的根路径下 5、lo…

redis的完整学习

Redis 1.Nosql 单机mysql缓存机制分库分表水平拆分mysql集群&#xff1a;本质上是数据库的读写 MyISAM:表锁&#xff0c;效率低Innodb&#xff1a;行锁 特点 解耦&#xff01; 1.方便扩展 2.大数据量高性能 3.数据类型是多样型的&#xff08;不需要设计数据库&#xff…

c语言 预处理

int main() {//printf("%s\n", __FILE__);//打印所在文件夹位置//printf("%d\n", __LINE__);//打印当前所在行号//printf("%s\n", __DATE__);//打印当前系统日期//printf("%s\n", __TIME__);//时间//printf("%s\n", __FUNCT…

分享155个ASP源码,总有一款适合您

ASP源码 分享155个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 154个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/12oYeESSXJCd32n463LBt4w?pwd5i1n 提取码&#x…

Java线程池中的execute和submit

一、概述 execute和submit都是线程池中执行任务的方法。 execute是Executor接口中的方法 public interface Executor {void execute(Runnable command); }submit是ExecuteService接口中的方法。 public interface ExecutorService extends Executor {<T> Future<T…

vue+element模仿腾讯视频电影网站(二),增加视频播放详情页

一.前言 1. 本项目在线预览&#xff1a;点击访问 2. 作者其他博客成品汇总预览&#xff1a;点击访问 3. 接上一篇&#xff1a;《vueelement模仿腾讯视频电影网站》 暂时源码并没有提供其他获取渠道&#xff0c;私聊作者获取即可&#xff0c;或通过博客后面名片添加作者&#…

【SSM】Mybatis小技巧汇总

Mybatis技巧一&#xff1a;#{} 和 ${} 的区别使用 ${} 特例一&#xff08;排序&#xff09;使用 ${} 特例二&#xff08;表连接&#xff09;使用 ${} 特例三&#xff08;批量删除&#xff09;技巧二&#xff1a;typeAliases 别名机制别名 Alias 性质技巧三&#xff1a;mappersm…

串级PID控制原理-1

串级计算机控制系统的典型结构如图1所示&#xff0c;系统中有两个PID控制器&#xff0c;Gc2(s)称为副调节器传递函数&#xff0c;包围Gc2(s)的内环称为副回路。Gc1(s)称为主调节器传递函数&#xff0c;包围Gc1(s)的外环称为主回路。主调节器的输出控制量u1作为副回路的给定量R2…

Vuex基本概念

一、基本概念vuex&#xff1a;为了解决不关联的组件整个网站状态数据共享问题&#xff0c;专为Vue.js开发的状态管理模式。采用集中式存储管理应用的所有组件状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。vuex有5个主要成员&#xff1a;state&#xff1…

DAMA数据管理知识体系指南之数据架构管理

第4章 4.1 简介 数据架构管理是定义和维护如下规范的过程&#xff1a; 提供标准的、通用的业务术语/辞典。 表达战略性的数据需求。 为满足如上需求&#xff0c;概述高层次的整合设计。 使企业战略和相关业务架构相一致。 数据架构是用于定义数据需求、指导对数据资产的整合和…

【C++】从0到1入门C++编程学习笔记 - 提高编程篇:STL常用容器(vector容器)

文章目录一、vector基本概念二、vector构造函数三、vector赋值操作四、vector容量和大小五、vector插入和删除六、vector数据存取七、vector互换容器八、vector预留空间一、vector基本概念 功能&#xff1a; vector数据结构和数组非常相似&#xff0c;也称为单端数组 vector…

Discord多账号抢白名单,如何避免账号关联被封号?

相信玩NFT项目的都不会对Discord陌生&#xff0c;现在NFT的项目都会开Discord伺服器&#xff0c;并且将内容公告在上面、在伺服器里互动&#xff0c;所以如果你想参与NFT的世界&#xff0c;学会使用Discord是一件非常重要的事情。 东哥前2天也出了关于discord如何使用、如何抢白…

很多网站、APP 前段时间一下都变灰了。 先来感受一下变灰后的效果。

很多网站、APP 前段时间一下都变灰了。 先来感受一下变灰后的效果。 这种灰色的效果怎么实现的呢&#xff1f;如何做到图片、文字、按钮都变灰的效果呢&#xff1f; 方案 1&#xff0c;换一套灰色的 UI&#xff0c;那显然成本太大了&#xff0c;用脚指头想一想就知道不太可能…

C语言---选择排序和堆排序

文章目录前言一、简单选择排序1.简介2.算法思路3.代码实现二、堆排序1.简介2.算法思路3.代码实现总结前言 堆排序是选择排序的一种&#xff0c;今天我们讲解一下堆排序和简单选择排序 一、简单选择排序 1.简介 选择排序&#xff08;Selection sort&#xff09;是一种简单直观…

ZoomCharts JavaScript 1.20.2 Crack

深入探索数据 令人惊叹的数据可视化方式 - 这里是 ZoomCharts JavaScript 图表的不同交互可能性和功能。 内容向下钻取和向上钻取 深入研究特定数据点或获得更大的图景。通过放大或缩小与图表进行物理交互&#xff0c;浏览不同的数据级别。 数据过滤 选择一个或多个数据点查看具…

【软件测试面试】他凭什么能在面试中狂揽10个offer?

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 小高&#xff1a; 记…

2023年屏蔽iOS16系统更新,去除小红点,最新方法

昨天开始&#xff0c;屏蔽iOS系统更新的旧文件已经过期&#xff0c;许多老粉收到了更新提醒&#xff0c;因此现在给大家带来最新的屏蔽文件 这个文件可以屏蔽iOS系统更新和提醒&#xff0c;防止手机自动下载更新。 这个方法支持所有 iPhone 和 iPad&#xff0c;支持所有 iOS 和…

# Android未来几年发展规划【纵横发展】

前言 如果你是移动开发人员&#xff0c;那么首先要跟上技术的最新发展趋势&#xff0c;并时刻关注新事物&#xff0c;即使有时你甚至需要质疑自己的信仰。应用开发人员一方面一直在努力想办法简化和缩短开发过程&#xff0c;另一方面也在努力构建最佳的设计和用户体验。每年我…

【MPP数据库】StarRocks分区、分桶探索与实践

1.先学习一下StarRocks的架构图&#xff1a; 2.基本概念 2.1 Row & Column 一张表包括行&#xff08;Row&#xff09;和列&#xff08;Column&#xff09;。Row 即用户的一行数据。Column 用于描述一行数据中不同的字段。 Column 可以分为两大类&#xff1a;Key 和 Value…

leetcode刷题记录总结-7.递归回溯算法(进行中)

文章目录零、回溯算法理论总览什么是回溯法回溯法的效率回溯法解决的问题如何理解回溯法回溯法模板一、组合问题[77. 组合](https://leetcode.cn/problems/combinations/)题解递归实现组合型枚举&#xff1a;每个点选与不选子集问题模板组合问题解决思路回溯思路&#xff1a;遍…