实现HBase表和RDB表的转化(附Java源码资源)

news2025/1/22 16:53:46

实现HBase表和RDB表的转化

在这里插入图片描述
在这里插入图片描述

一、引入

转化为HBase表的三大来源:RDB Table、Client API、Files

在这里插入图片描述
如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。

首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List<Put>的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:

rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){
	List<Put> batch = rdb.nextBatch();
	hbase.putBatch(batch);
}
hbase.close();
rdb.close();

二、代码讲解

1. 目录结构

在这里插入图片描述

2. 具体实现
  • transfer.properties
    在这里插入图片描述

内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
在这里插入图片描述

  1. Run/Debug Configurations中,新建一个Application
  2. 配置好主类
  3. 配置好配置文件的具体路径
  • RDB 接口
public interface RDB extends Com {
    // 要提升性能,需要使用批处理
    boolean hasNextBatch() throws SQLException;// 是否存在下一个批次
    List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>
}
  • RDB 实现类
public class RDBImpl implements RDB {
    private static Logger logger = Logger.getLogger(RDBImpl.class);
    // JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集
    private Properties config;
    /**
     * 它们需要设置成全局变量的原因是它们需要共享
     */
    private Connection con;
    private PreparedStatement pst;
    private ResultSet rst;
    // 定义每个批次处理的记录数的最大数量
    private int batchSize;
    // hbase的行键对应rdb的列的列名
    private String hbaseRowKeyRdbCol;
    private Map<String,Map<String,String>> hbaseRdbColMapping;

    // RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())
    public RDBImpl(Properties config) {
        this.config = config;
    }

    @Override
    public Properties config() {
        return config;
    }

    /**
     * 内部资源初始化
     */
    @Override
    public void init() throws Exception{
        con = getConnection();
        logger.info("RDB 创建 [ 连接 ] 对象成功");
        pst = getStatement(con);
        logger.info("RDB 创建 [ 执行 ] 对象成功");
        rst = getResult(pst);
        logger.info("RDB 创建 [ 结果集 ] 成功");
        batchSize = batchSize();
        hbaseRdbColMapping = hbaseRdbColumnsMapping();
    }

    @Override
    public void close() {
        closeAll(rst,pst,con);
    }


    private String driver(){
        return checkAndGetConfig("rdb.driver");
    }

    private String url(){
        return checkAndGetConfig("rdb.url");
    }

    private String username(){
        return checkAndGetConfig("rdb.username");
    }

    private String password(){
        return checkAndGetConfig("rdb.password");
    }

    private String sql(){
        return checkAndGetConfig("rdb.sql");
    }

    private int batchSize(){
        return Integer.parseInt(checkAndGetConfig("rdb.batchSize"));
    }

    // java.sql下的Connection
    private Connection getConnection() throws ClassNotFoundException, SQLException {
        // 装载驱动
        Class.forName(driver());
        // 获取并返回连接对象
        return DriverManager.getConnection(url(),username(),password());
    }
    private PreparedStatement getStatement(Connection con) throws SQLException {
        return con.prepareStatement(sql());
    }
    private ResultSet getResult(PreparedStatement statement) throws SQLException {
        return statement.executeQuery();
    }
    /**
     * hbase 列族和列与rdb中列的映射关系
     *             hbase列族   hbase列  rdb列
     * @return Map<String,Map<String,String>>
     */
    private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){
        String mapping = checkAndGetConfig("rdb.hbase.columns.mapping");
        Map<String,Map<String,String>> map = new HashMap<>();
        String[] pss = mapping.split(",");
        for(String ps : pss){
            String[] pp = ps.split("->");
            String[] p = pp[0].split(":");
            String rdbCol = pp[1],hbaseColFamily,hbaseColName;
            if(p.length==1){
                hbaseRowKeyRdbCol = pp[1];
            }else {
                hbaseColFamily = p[0];
                hbaseColName = p[1];
                if(!map.containsKey(hbaseColFamily)){
                    map.put(hbaseColFamily,new HashMap<>());
                }
                map.get(hbaseColFamily).put(hbaseColName,rdbCol);
            }
        }
        return map;
    }

    /**
     * 将RDB的列转化为字节数组(需要确定列的数据类型)
     * @param rdbColumn
     * @return
     * @throws SQLException
     */

    private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {
        Object obj = rst.getObject(rdbColumn);
        if(obj instanceof String){
            return Bytes.toBytes((String)obj);
        } else if(obj instanceof Float){
            return Bytes.toBytes(((Float)obj).floatValue());
        } else if(obj instanceof Double){
            return Bytes.toBytes(((Double)obj).doubleValue());
        } else if(obj instanceof BigDecimal){
            return Bytes.toBytes((BigDecimal)obj);
        } else if(obj instanceof Short){
            return Bytes.toBytes(((Short) obj).shortValue());
        } else if(obj instanceof Integer){
            return Bytes.toBytes(((Integer)obj).intValue());
        } else if(obj instanceof Boolean){
            return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());
        } else {
            throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());
        }
    }

    /**
     * 将HBase的列名或列族名转化为字节数组
     * @param name
     * @return
     */
    private byte[] toBytes(String name){
        return Bytes.toBytes(name);
    }

    // 最后一个批次的数据最少有一条
    @Override
    public boolean hasNextBatch() throws SQLException{
        return rst.next();
    }

    @Override
    public List<Put> nextBatch() throws SQLException{
        // 预先分配容量
        List<Put> list = new ArrayList<>(batchSize);
        int count = 0;
        do{
            /**
             * 如何将一行解析为多个put(结合配置文件)
             * 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名
             */
            Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol));
            for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) {
                String columnFamily = e.getKey();
                for (Map.Entry<String, String> s : e.getValue().entrySet()) {
                    String hbaseColumn = s.getKey();
                    String rdbColumn = s.getValue();
                    // 需要将内容转变为字节数组传入方法
                    put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));
                }
            }
            list.add(put);
        }while(++count<batchSize && rst.next());
        return list;
    }

}

如何理解一行转化为多个put?
在这里插入图片描述
结果集的实质?
在这里插入图片描述
rst.next() 的两个作用

rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行,则指向该有效行

a. 只通过config作为参数构造rdb
b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放,checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件transfer.properties,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。

  • HBase接口
public interface HBase extends Com {
    // RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。
    void putBatch(List<Put> batch) throws IOException;
}
  • HBase实现类
public class HBaseImpl implements HBase {
    private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);
    private Properties config;
    private Connection con;
    private Table hbaseTable;


    public HBaseImpl(Properties config) {
        this.config = config;
    }

    @Override
    public Properties config() {
        return config;
    }

    @Override
    public void init() throws Exception {
        con = getCon();
        loggerHBase.info("HBase 创建 [ 连接 ] 成功");
        hbaseTable = checkAndGetTable(con);
        loggerHBase.info("HBase 创建 [ 数据表 ] 成功");
    }

    @Override
    public void close() {
        closeAll(hbaseTable,con);
    }

    private String tableName(){
        return checkAndGetConfig("hbase.table.name");
    }
    private String zkUrl(){
        return checkAndGetConfig("hbase.zk");
    }

    private Connection getCon() throws IOException {
        // hadoop.conf的configuration
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum",zkUrl());
        return ConnectionFactory.createConnection(config);
    }

    private Table checkAndGetTable(Connection con) throws IOException {
        /**
         * Admin : HBase DDL
         */
        Admin admin = con.getAdmin();
        TableName tableName = TableName.valueOf(tableName());
        // 通过tableName判定表是否存在
        if(!admin.tableExists(tableName)){
            throw new IOException("HBase表不存在异常:"+tableName);
        }
        /**
         * Table : HBase DML & DQL
         */
        // 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)
        return con.getTable(tableName);
    }

    @Override
    public void putBatch(List<Put> batch) throws IOException{
        hbaseTable.put(batch);
    }
}

HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。

  • Com接口
public interface Com {
    Logger logger = Logger.getLogger(Com.class);
    // 获取配置对象
    Properties config();

    // 初始化资源
    void init() throws Exception;

    // 释放资源
    void close();

    default String checkAndGetConfig(String key){
        if(!config().containsKey(key)){
            // 因为该方法可能被用于HBase和RDB
            throw new RuntimeException("配置项缺失异常:"+key);
        }
        String item = config().getProperty(key);
        logger.info(String.format("获取配置项 %s : %s",key,item));
        return item;
    }

    default void closeAll(AutoCloseable...acs){
        for (AutoCloseable ac : acs) {
            if (Objects.nonNull(ac)) {
                try {
                    ac.close();
                    logger.info(String.format("释放 %s 成功",ac.getClass().getName()));
                } catch (Exception e) {
                    logger.error("释放资源异常:"+e);
                }
            }
        }
    }
}

在Com接口中,设计了一些普通方法config()实现配置的导出,init()、close()资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。

  • RDBToHBase接口
public interface RDBToHBase {
    // 创建一个RDB对象
    void setRDB(RDB rdb);
    // 创建一个HBase对象
    void setHBase(HBase hbase);
    // 进行数据的传输
    void startTransfer();
}
  • RDBToHBase实现类
public class RDBToHBaseImpl implements RDBToHBase {
    // 日志显示
    private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class);
    private RDB rdb;
    private HBase hbase;

    @Override
    public void setRDB(RDB rdb) {
        this.rdb = rdb;
    }

    @Override
    public void setHBase(HBase hbase) {
        this.hbase = hbase;
    }

    @Override
    public void startTransfer() {
        try {
            rdb.init();
            loggerRH.info("RDB 初始化成功");
            hbase.init();
            loggerRH.info("HBase 初始化成功");
            loggerRH.info("数据从 RDB 迁移至 HBase 开始...");
            int count = 0;
            while (rdb.hasNextBatch()) {
                final List<Put> batch = rdb.nextBatch();
                hbase.putBatch(batch);
                loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));
            }
            loggerRH.info("数据从 RDB 迁移至 HBase 结束...");
        } catch (Exception e){
            loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);
        } finally{
            hbase.close();
            rdb.close();
        }
    }
}
  • AppRDBToHBase 实现类
public class AppRDBToHBase
{
    private static Logger logger = Logger.getLogger(AppRDBToHBase.class);
    private static void start(String[] args){
        try {
            if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) {
                throw new NullPointerException("配置文件路径空指针异常");
            }
            final String PATH = args[0];
            final File file = new File(PATH);
            if (!file.exists() || file.length() == 0 || !file.canRead()) {
                throw new IOException("配置文件不存在、不可读、空白");
            }
            Properties config = new Properties();
            // final String path = args[0];
            config.load(new FileReader(file));

            RDB rdb = new RDBImpl(config);
            HBase hBase = new HBaseImpl(config);
            RDBToHBase rdbToHBase = new RDBToHBaseImpl();
            rdbToHBase.setRDB(rdb);
            rdbToHBase.setHBase(hBase);
            rdbToHBase.startTransfer();
        }catch(Exception e){
            logger.error("配置异常",e);
        }
    }
    public static void main( String[] args ) {
        start(args);
    }
}

对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输

其他:日志文件系统Log.4j的应用
  • 准备:需要在Resources模块下配置log4j.properties文件
  • 注意:
    • 日志文件信息的输出方式有三种logger.error()、logger.info()、logger.warn() ,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。
    • log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。
    • 对于不同类或接口下的logger,需要注意进行名字的区分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1522355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.文本的方式读写文件

文章目录 写入文件代码运行结果 读出文件代码运行结果 文件打开模式标记&#xff08;查表&#xff09; 写入文件 ------读写文件一共五步&#xff1a;------ 第一步&#xff1a;包含头文件 第二步&#xff1a;创建流对象 第三步&#xff1a;指定方式打开文件 第四步&#xff1…

三.使用java的API文档

在Java中&#xff0c;API是指“应用程序接口”&#xff08;Application Programming Interface&#xff09;。Java API是Java编程语言中提供的类和接口的集合&#xff0c;用于开发各种类型的应用程序。类比C的STL&#xff08;标准模板库&#xff09;。 通俗理解就当做些封装好…

【django framework】ModelSerializer+GenericAPIView接口数据流

GenericAPIView数据从序列化到最终返回响应的数据流 // 以ModelSerializergenerics.CreateAPIView为例 程序终归是为了处理数据&#xff0c;怎么处理&#xff0c;以怎样的顺序和方法去处理&#xff0c;就涉及到了具体的业务流程。当我们是用了一个牛掰的框架&#xff0c;发现原…

uniapp 写安卓app,运行到手机端 调试

手机 设置》关于手机》点击版本号 4-5次&#xff0c;弹出手机锁屏页面&#xff0c;输入手机锁屏密码 2.手机 设置中 》搜索 开发人员选项 》 调试》打开USB调试 同页面 找到 选择USB配置》选择 MIDIhbuilder 编辑器 点击 》运行》运行到手机或模拟器》运行到Android App基座 》…

Linux内存管理--系列文章貮

接上文&#xff0c;用户态写完&#xff0c;本章写内核态内存空间。 3.2内核态内存 大家会发现用户态空间不管32还是64位&#xff0c;这种内存分布是相差不大的。是因为使用虚拟内存的系统&#xff0c;会让应用程序感到和别的程序是相互独立的&#xff0c;互不干扰&#xff0c…

网络编程套接字——实现简单的UDP网络程序

目录 1、预备知识 1.1、认识端口号 1.2、端口号 vs 进程pid 1.3、认识TCP协议 1.4、认识UDP协议 1.5、网络字节序 2、socket编程接口 2.1、socket常见API 2.2、sockaddr结构 3、实现一个简易的UDP服务器和客户端通信 log.hpp UdpServer.hpp UdpClient.cc Main.cc…

pytorch CV入门 - 汇总

初次编辑&#xff1a;2024/2/14&#xff1b;最后编辑&#xff1a;2024/3/9 参考网站-微软教程&#xff1a;https://learn.microsoft.com/en-us/training/modules/intro-computer-vision-pytorch 更多的内容可以参考本作者其他专栏&#xff1a; Pytorch基础&#xff1a;https…

力扣热题100_矩阵_240_搜索二维矩阵 II

文章目录 题目链接解题思路解题代码 题目链接 240. 搜索二维矩阵 II 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 示例 1&#xff1a; 输入&#xf…

计算机网络 |内网穿透

其实内网穿透&#xff0c;也挺好玩的&#xff0c;如果在大学的时候&#xff0c;那个时候讲计算机网络的老师能横向延展&#xff0c;估计课也会更有趣不少&#xff0c;本来计算机网络这门课就是计算机课程中可玩性最搞的。 只能说&#xff0c;怪可惜的 回到正题&#xff0c;内网…

提高安全投资回报:威胁建模和OPEN FAIR™风险分析

对大多数人和企业来说&#xff0c;安全意味着一种成本。但重要的是如何获得适合的量&#xff0c;而不是越多越好。然而&#xff0c;你如何决定什么时候可以有足够的安全性&#xff0c;以及你如何获得它&#xff1f;则完全是另一回事。 该篇文章是由The Open Group安全论坛主办&…

k8s之图形界面DashBoard【九】

文章目录 9. DashBoard9.1 部署Dashboard9.2 使用DashBoard 镇场 9. DashBoard 之前在kubernetes中完成的所有操作都是通过命令行工具kubectl完成的。其实&#xff0c;为了提供更丰富的用户体验&#xff0c;kubernetes还开发了一个基于web的用户界面&#xff08;Dashboard&…

leetcode 热题 100_环形链表 II

题解一&#xff1a; 哈希表&#xff1a;遍历链表&#xff0c;用哈希表存储遍历过的链表节点&#xff0c;判断链表节点是否在哈希表中存在&#xff0c;如果存在说明链表出现过&#xff0c;第一个重复出现的节点即为开始入环的第一个节点。 import java.util.HashSet;public cla…

第八阶段:uni-app小程序 --首页开发(2)

一&#xff1a;分析页面布局 1.1: 功能 搜索框&#xff1a; 轮播图&#xff1a; 分类的导航区&#xff1a; 楼层区&#xff1a; 二&#xff1a; 利用命令创建home分支 git branch git checkout -b home git branch 三&#xff1a; 配置网络请求(main.js 入口函数&#x…

python问题:vscode切换环境,pip安装库网络错误

python问题&#xff1a;vscode切换环境&#xff0c;pip安装库网络错误 vscode切换环境pip安装库网络错误 记录一下遇见的python问题。 vscode切换环境 在vscode上面的搜索框输入 > select interpreter然后选择需要的环境。 pip安装库网络错误 用requirements.txt来安装…

CentOS部署 JavaWeb 实现 MySql业务

一、项目打war包 在eclispe或idea中找到项目&#xff0c;右键打war包。 二、上传项目到linux 2.1云服务器虚拟机均可 以tomcat为例 /usr/local/tomcat/webapps 将war包通过ssh连接上传到webapps目录下。 如果是root目录则不需要项目名即 ip或域名端口直接访问&#xff08…

linux下重启ORACLE

切换到oracle用户 su - oracle 登录oracle sqlplus / as sysdba 启动数据库 startup 退出数据库 exit 启动监听 lsnrctl start FINISH

Docker使用(二)Docker安装和常见典型操作

Docker使用(二)Docker安装和常见典型操作 二、软件安装 1、Docker安装 &#xff08;1&#xff09;环境准备 [rootlocalhost ~]# uname -r 3.10.0-327.el7.x86_64 # cat /etc/os-release &#xff08;2&#xff09;卸载旧版本 $ sudo yum remove docker \ ​ docker-cli…

OpenCV4.9.0开源计算机视觉库在 Linux 中安装

返回目录&#xff1a;OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇&#xff1a;OpenCV 环境变量参考 下一篇&#xff1a;将OpenCV与gcc和CMake结合使用 引言&#xff1a; OpenCV是一个开源的计算机视觉库&#xff0c;由英特尔公司所赞助。它是一个跨…

KBP210-ASEMI新能源专用整流桥KBP210

编辑&#xff1a;ll KBP210-ASEMI新能源专用整流桥KBP210 型号&#xff1a;KBP210 品牌&#xff1a;ASEMI 封装&#xff1a;KBP-4 正向电流&#xff08;Id&#xff09;&#xff1a;2A 反向耐压&#xff08;VRRM&#xff09;&#xff1a;1000V 正向浪涌电流&#xff1a;6…

SpingBoot集成Rabbitmq及Docker部署

文章目录 介绍RabbitMQ的特点Rabbitmq术语消息发布接收流程 Docker部署管理界面说明Overview: 这个页面显示了RabbitMQ服务器的一般信息&#xff0c;例如集群节点的名字、状态、运行时间等。Connections: 在这里&#xff0c;可以查看、管理和关闭当前所有的TCP连接。Channels: …