2. HBase批量装载——Bulk load
2.1 简介
很多时候,我们需要将外部的数据导入到HBase集群中,例如:将一些历史的数据导入到HBase做备份。我们之前已经学习了HBase的Java API,通过put方式可以将数据写入到HBase中,我们也学习过通过MapReduce编写代码将HDFS中的数据导入到HBase。但这些方式都是基于HBase的原生API方式进行操作的。这些方式有一个共同点,就是需要与HBase连接,然后进行操作。HBase服务器要维护、管理这些连接,以及接受来自客户端的操作,会给HBase的存储、计算、网络资源造成较大消耗。此时,在需要将海量数据写入到HBase时,通过Bulk load(大容量加载)的方式,会变得更高效。可以这么说,进行大量数据操作,Bulk load是必不可少的。
我们知道,HBase的数据最终是需要持久化到HDFS。HDFS是一个文件系统,那么数据可定是以一定的格式存储到里面的。例如:Hive我们可以以ORC、Parquet等方式存储。而HBase也有自己的数据格式,那就是HFile。Bulk Load就是直接将数据写入到StoreFile(HFile)中,从而绕开与HBase的交互,HFile生成后,直接一次性建立与HBase的关联即可。使用BulkLoad,绕过了Write to WAL,Write to MemStore及Flush to disk的过程
更多可以参考官方对Bulk load的描述:https://hbase.apache.org/book.html#arch.bulk.load
2.2 Bulk load MapReduce程序开发
Bulk load的流程主要分为两步:
- 通过MapReduce准备好数据文件(Store Files)
- 加载数据文件到HBase
2.3 银行转账记录海量冷数据存储案例
银行每天都产生大量的转账记录,超过一定时期的数据,需要定期进行备份存储。本案例,在MySQL中有大量转账记录数据,需要将这些数据保存到HBase中。因为数据量非常庞大,所以采用的是Bulk Load方式来加载数据。
- 项目组为了方便数据备份,每天都会将对应的转账记录导出为CSV文本文件,并上传到HDFS。我们需要做的就将HDFS上的文件导入到HBase中。
- 因为我们只需要将数据读取出来,然后生成对应的Store File文件。所以,我们编写的MapReduce程序,只有Mapper,而没有Reducer。
2.3.1 数据集
id | ID |
---|---|
code | 流水单号 |
rec_account | 收款账户 |
rec_bank_name | 收款银行 |
rec_name | 收款人姓名 |
pay_account | 付款账户 |
pay_name | 付款人姓名 |
pay_comments | 转账附言 |
pay_channel | 转账渠道 |
pay_way | 转账方式 |
status | 转账状态 |
timestamp | 转账时间 |
money | 转账金额 |
2.3.2 项目准备工作
HBase中创建银行转账记录表
create_namespace "ITCAST_BANK"
# disable "TRANSFER_RECORD"
# drop "TRANSFER_RECORD"
create "ITCAST_BANK:TRANSFER_RECORD", { NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => "HexStringSplit"}
创建项目
groupid | cn.itcast |
---|---|
artifactid | bankrecord_bulkload |
导入POM依赖
<repositories><!-- 代码库 -->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
创建包结构
包 | 说明 |
---|---|
cn.itcast.bank_record.bulkload.mr | MapReduce相关代码 |
cn.itcast.bank_record.entity | 实体类 |
导入配置文件
将 core-site.xml、hbase-site.xml、log4j.properties三个配置文件拷贝到resources目录中。
确保Windows环境变量配置正确
1.HADOOP_HOME
在资料包里面,有一个hadoop_windows客户端文件夹,该文件夹中有一个压缩包,从压缩包中很多windows版本的客户端,找一个2.7.4版本,解压到指定目录即可。
2.HADOOP_USER_NAME
2.3.3 编写实体类
实现步骤:
- 创建实体类TransferRecord
- 添加一个parse静态方法,用来将逗号分隔的字段,解析为实体类
- 使用以下数据测试解析是否成功
7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0
参考代码:
public class TransferRecord {
private String id;
private String code;
private String rec_account;
private String rec_bank_name;
private String rec_name;
private String pay_account;
private String pay_name;
private String pay_comments;
private String pay_channel;
private String pay_way;
private String status;
private String timestamp;
private String money;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getRec_account() {
return rec_account;
}
public void setRec_account(String rec_account) {
this.rec_account = rec_account;
}
public String getRec_bank_name() {
return rec_bank_name;
}
public void setRec_bank_name(String rec_bank_name) {
this.rec_bank_name = rec_bank_name;
}
public String getRec_name() {
return rec_name;
}
public void setRec_name(String rec_name) {
this.rec_name = rec_name;
}
public String getPay_account() {
return pay_account;
}
public void setPay_account(String pay_account) {
this.pay_account = pay_account;
}
public String getPay_name() {
return pay_name;
}
public void setPay_name(String pay_name) {
this.pay_name = pay_name;
}
public String getPay_comments() {
return pay_comments;
}
public void setPay_comments(String pay_comments) {
this.pay_comments = pay_comments;
}
public String getPay_channel() {
return pay_channel;
}
public void setPay_channel(String pay_channel) {
this.pay_channel = pay_channel;
}
public String getPay_way() {
return pay_way;
}
public void setPay_way(String pay_way) {
this.pay_way = pay_way;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMoney() {
return money;
}
public void setMoney(String money) {
this.money = money;
}
@Override
public String toString() {
return "TransferRecord{" +
"id='" + id + '\'' +
", code='" + code + '\'' +
", rec_account='" + rec_account + '\'' +
", rec_bank_name='" + rec_bank_name + '\'' +
", rec_name='" + rec_name + '\'' +
", pay_account='" + pay_account + '\'' +
", pay_name='" + pay_name + '\'' +
", pay_comments='" + pay_comments + '\'' +
", pay_channel='" + pay_channel + '\'' +
", pay_way='" + pay_way + '\'' +
", status='" + status + '\'' +
", timestamp='" + timestamp + '\'' +
", money='" + money + '\'' +
'}';
}
public static TransferRecord parse(String line) {
TransferRecord transferRecord = new TransferRecord();
String[] fields = line.split(",");
transferRecord.setId(fields[0]);
transferRecord.setCode(fields[1]);
transferRecord.setRec_account(fields[2]);
transferRecord.setRec_bank_name(fields[3]);
transferRecord.setRec_name(fields[4]);
transferRecord.setPay_account(fields[5]);
transferRecord.setPay_name(fields[6]);
transferRecord.setPay_comments(fields[7]);
transferRecord.setPay_channel(fields[8]);
transferRecord.setPay_way(fields[9]);
transferRecord.setStatus(fields[10]);
transferRecord.setTimestamp(fields[11]);
transferRecord.setMoney(fields[12]);
return transferRecord;
}
public static void main(String[] args) {
String str = "7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0";
TransferRecord tr = parse(str);
System.out.println(tr);
}
}
2.3.4 构建读取数据的Mapper
HBase提供了两个类来专门对MapReduce支持:
- ImmutableBytesWritable:对应rowkey
- MapReduceExtendedCell:对应 列 → 值(键值对)
实现步骤:
- 创建一个BankRecordMapper的类继承Mapper类,Mapper的泛型为
a)输入key:LongWritable
b)输入value:Text
c)输出key:ImmutableBytesWritable
d)输出value:MapReduceExtendedCell- 将Mapper获取到Text文本行,转换为TransferRecord实体类
- 从实体类中获取ID,并转换为rowkey
- 使用KeyValue类构建单元格,每个需要写入到表中的字段都需要构建出来单元格
- 使用context.write将输出输出
a)构建输出key:new ImmutableBytesWrite(rowkey)
b)构建输出的value:new MapReduceExtendedCell(keyvalue对象)
参考代码:
import cn.itcast.bank_record.entity.TransferRecord;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BankRecordMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// HBase需要有rowkey,列名 => 值
TransferRecord transferRecord = TransferRecord.parse(value.toString());
String rowkey = transferRecord.getId();
// 列蔟
byte[] cf = Bytes.toBytes("C1");
byte[] colId = Bytes.toBytes("id");
byte[] colCode = Bytes.toBytes("code");
byte[] colRec_account = Bytes.toBytes("rec_account");
byte[] colRec_bank_name = Bytes.toBytes("rec_bank_name");
byte[] colRec_name = Bytes.toBytes("rec_name");
byte[] colPay_account = Bytes.toBytes("pay_account");
byte[] colPay_name = Bytes.toBytes("pay_name");
byte[] colPay_comments = Bytes.toBytes("pay_comments");
byte[] colPay_channel = Bytes.toBytes("pay_channel");
byte[] colPay_way = Bytes.toBytes("pay_way");
byte[] colStatus = Bytes.toBytes("status");
byte[] colTimestamp = Bytes.toBytes("timestamp");
byte[] colMoney = Bytes.toBytes("money");
KeyValue idKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getId()));
KeyValue codeKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getCode()));
KeyValue rec_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_account()));
KeyValue rec_bank_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_bank_name()));
KeyValue rec_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_name()));
KeyValue pay_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_account()));
KeyValue pay_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_name()));
KeyValue pay_commentsKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_comments()));
KeyValue pay_channelKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_channel()));
KeyValue pay_wayKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_way()));
KeyValue statusKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getStatus()));
KeyValue timestampKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getTimestamp()));
KeyValue moneyKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getMoney()));
ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));
context.write(rowkeyWritable, new MapReduceExtendedCell(idKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(codeKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(rec_accountKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(rec_bank_nameKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(rec_nameKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_accountKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_nameKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_commentsKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_channelKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_wayKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(statusKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(timestampKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(moneyKeyValue));
}
}