apache-atlas-hbase-bridge-源码分析

news2025/1/21 8:51:51

元数据类型

Hbase元数据类型, 包括命令空间、表、列族、列

public enum HBaseDataTypes {
    // Classes
    HBASE_NAMESPACE,
    HBASE_TABLE,
    HBASE_COLUMN_FAMILY,
    HBASE_COLUMN;

    public String getName() {
        return name().toLowerCase();
    }
}

Hbase元数据采集实现
1)批量采集HBaseBridge
2)实时变更 HBaseAtlasCoprocessor
虽然定义了HBASE_COLUMN,但是实际上是没有实现的,毕竟HBASE_COLUMN是动态添加的。

执行流程

HBaseBridge 执行流程如下图所示
在这里插入图片描述

源码分析

HBaseBridge #main

public class HBaseBridge {
   
//...
    private final String        metadataNamespace;
    private final AtlasClientV2 atlasClientV2;
    private final Admin         hbaseAdmin;

    public static void main(String[] args) {
        int exitCode = EXIT_CODE_FAILED;
        AtlasClientV2 atlasClientV2  =null;

        try {
            Options options = new Options();
            options.addOption("n","namespace", true, "namespace");
            options.addOption("t", "table", true, "tablename");
            options.addOption("f", "filename", true, "filename");

            CommandLineParser parser            = new BasicParser();
            CommandLine       cmd               = parser.parse(options, args);
            String            namespaceToImport = cmd.getOptionValue("n");
            String            tableToImport     = cmd.getOptionValue("t");
            String            fileToImport      = cmd.getOptionValue("f");
            Configuration     atlasConf         = ApplicationProperties.get();
            String[]          urls              = atlasConf.getStringArray(ATLAS_ENDPOINT);

           //...

            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
                atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword);
            } else {
                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
            }
//...
            HBaseBridge importer = new HBaseBridge(atlasConf, atlasClientV2);
            if (StringUtils.isNotEmpty(fileToImport)) {
                File f = new File(fileToImport);
                if (f.exists() && f.canRead()) {
                    BufferedReader br   = new BufferedReader(new FileReader(f));
                    String         line = null;

                    while((line = br.readLine()) != null) {
                        String val[] = line.split(":");
                        if (ArrayUtils.isNotEmpty(val)) {
                            //...
                            importer.importHBaseEntities(namespaceToImport, tableToImport);
                        }
                    }
                    exitCode = EXIT_CODE_SUCCESS;
                } else {
                    LOG.error("Failed to read the file");
                }
            } else {
                importer.importHBaseEntities(namespaceToImport, tableToImport);
                exitCode = EXIT_CODE_SUCCESS;
            }
        } catch(ParseException e) {
           //...
        } catch(Exception e) {
            //...
        }finally {
            //...
        }
    }

HBaseBridge#importHBaseEntities

importHBaseEntities 只要负责处理namespaceToImport和tableToImport参数,然后执行相应的流程

private boolean importHBaseEntities(String namespaceToImport, String tableToImport) throws Exception {
    boolean ret = false;

    if (StringUtils.isEmpty(namespaceToImport) && StringUtils.isEmpty(tableToImport)) {
        // when both NameSpace and Table options are not present
        importNameSpaceAndTable();
        ret = true;
    } else if (StringUtils.isNotEmpty(namespaceToImport)) {
        // When Namespace option is present or both namespace and table options are present
        importNameSpaceWithTable(namespaceToImport, tableToImport);
        ret = true;
    } else  if (StringUtils.isNotEmpty(tableToImport)) {
        importTable(tableToImport);
        ret = true;
    }

    return ret;
}

导入所有的命名空间和表

namespaceToImport和tableToImport均为空,导入所有的namespace和table

private void importNameSpaceAndTable() throws Exception {

    NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors();

    if (ArrayUtils.isNotEmpty(namespaceDescriptors)) {
        for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
            String namespace = namespaceDescriptor.getName();
            importNameSpace(namespace);
        }
    }

    TableDescriptor[] htds = hbaseAdmin.listTables();
    if (ArrayUtils.isNotEmpty(htds)) {
        for (TableDescriptor htd : htds) {
            String tableName = htd.getTableName().getNameAsString();
            importTable(tableName);
        }
    }
}

导入指定的命名空间

namespaceToImport不为空,导入指定的namespace和namespace下的table

private void importNameSpaceWithTable(String namespaceToImport, String tableToImport) throws Exception {
    importNameSpace(namespaceToImport);

    List<TableDescriptor> hTableDescriptors = new ArrayList<>();

    if (StringUtils.isEmpty(tableToImport)) {
// 导入指定namespace
        List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(namespaceToImport);
        if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) {
            hTableDescriptors = getTableDescriptors(matchingNameSpaceDescriptors);
        }
    } else {
        tableToImport = namespaceToImport +":" + tableToImport;
        TableDescriptor[] htds = hbaseAdmin.listTables(Pattern.compile(tableToImport));
        hTableDescriptors.addAll(Arrays.asList(htds));
    }

    if (CollectionUtils.isNotEmpty(hTableDescriptors)) {
        for (TableDescriptor htd : hTableDescriptors) {
            String tblName = htd.getTableName().getNameAsString();
            importTable(tblName);
        }
    }
}

导入指定的表

tableToImport不为空,导入指定的table和table的命名空间。importTable会处理表、列族的实体,没有处理列

public void importTable(final String tableName) throws Exception {
    String            tableNameStr = null;
    TableDescriptor[] htds         = hbaseAdmin.listTables(Pattern.compile(tableName));

    if (ArrayUtils.isNotEmpty(htds)) {
        for (TableDescriptor htd : htds) {
            String tblNameWithNameSpace    = htd.getTableName().getNameWithNamespaceInclAsString();
            String tblNameWithOutNameSpace = htd.getTableName().getNameAsString();

            if (tableName.equals(tblNameWithNameSpace)) {
                tableNameStr = tblNameWithNameSpace;
            } else if (tableName.equals(tblNameWithOutNameSpace)) {
                tableNameStr = tblNameWithOutNameSpace;
            } else {
                // when wild cards are used in table name
                if (tblNameWithNameSpace != null) {
                    tableNameStr = tblNameWithNameSpace;
                } else if (tblNameWithOutNameSpace != null) {
                    tableNameStr = tblNameWithOutNameSpace;
                }
            }

            byte[]                 nsByte       = htd.getTableName().getNamespace();
            String                 nsName       = new String(nsByte);
            NamespaceDescriptor    nsDescriptor = hbaseAdmin.getNamespaceDescriptor(nsName);
            AtlasEntityWithExtInfo entity       = createOrUpdateNameSpace(nsDescriptor);
            ColumnFamilyDescriptor[]    hcdts        = htd.getColumnFamilies();
// 处理表、列族,没有处理列
            createOrUpdateTable(nsName, tableNameStr, entity.getEntity(), htd, hcdts);
        }
    } else {
        throw new AtlasHookException("No Table found for the given criteria. Table = " + tableName);
    }
}

createOrUpdateTable处理表、列族列实体,比较简单这里就不详细描述

导入命名空间

public void importNameSpace(final String nameSpace) throws Exception {
    List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(nameSpace);

    if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) {
        for (NamespaceDescriptor namespaceDescriptor : matchingNameSpaceDescriptors) {
            createOrUpdateNameSpace(namespaceDescriptor);
        }
    } else {
        throw new AtlasHookException("No NameSpace found for the given criteria. NameSpace = " + nameSpace);
    }
}

createOrUpdateNameSpace处理命名空间实体,比较简单这里就不详细描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/26331.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis基于XML的使用——缓存

1、介绍 MyBatis 内置了一个强大的事务性查询缓存机制&#xff0c;它可以非常方便地 配置和定制。 为了使它更加强大而且易于配置&#xff0c;我们对 MyBatis 3 中的缓存实现进行了许多改进。 默认情况下&#xff0c;只启用了本地的会话缓存&#xff0c;它仅仅对一个会话中的数…

【Java】异常处理

异常本质上是程序上的错误&#xff0c;包括程序逻辑错误和系统错误。比如使用空的引用、数组下标越界、内存溢出错误等. 错误在我们编写程序的过程中会经常发生&#xff0c;包括编译期间和运行期间的错误&#xff0c;在编译期间出现的错误有编译器帮助我们一起修正&#xff0c;…

使用java代码向mysql数据库插入100万条数据

使用java代码向mysql数据库插入100万条数据 使用springboot集成Mysql数据库&#xff0c;并使用java代码循环向msql数据库插入100万条数据&#xff0c;并测试插入时间 目录结构使用java代码向mysql数据库插入100万条数据一、使用工具二、项目结构图创建springboot项目启动类创建…

高并发-防止雪崩与穿透

一、DB查询前加锁 /** * 本地堆内缓存&#xff0c;优先级最高 */ ON_HEAP(1), /** * 本地堆外缓存&#xff0c;不影响GC&#xff0c;可以管理比堆内缓存更多的数据 * 数据get/set涉及序列化&#xff0c;性能次于本地堆内缓存 */ OFF_HE…

手撕红黑树

目录 一、概念 二、红黑树的插入操作 第一步: 按照二叉搜索树的规则插入新节点 第二步: 插入后检测性质是否造到破坏&#xff0c;若遭到破坏则进行调整 情况一: cur为红&#xff0c;parent为红&#xff0c;grandfather为黑&#xff0c;uncle存在且为红 情况二: cur为红&a…

JLink 添加新设备用于下载/调试固件

新驱动的安装目录结构如下&#xff1a; 可以看出新版本的 JLink 驱动中已经没有 Devices 目录和 JLinkDevices.xml 文件了&#xff0c;即旧的方法已经不能在新的驱动中使用了。 如果需要继续使用旧的方式添加新设备&#xff0c;则需要下载 JLink_V770d 之前的版本。 在新驱动…

若依框架解读(微服务版)—— 4.认证,登出(Gateway网关)

认证 我们可以查看token值 我们进入授权中心&#xff0c;这里其他的解析解析token的步骤与上一篇文章中的生成token是逆操作&#xff0c;也比较简单。我们进入ignoreWhite.getWhites()方法 此处的两个注解是获取nacos当中的白名单&#xff0c;我们打开nacos&#xff0c;进入网关…

Shell程序退出状态码的命令详解

在本篇文章当中主要给大家介绍了一些常见的程序退出的状态码&#xff01;并且给出一下例子帮助大家仔细理解&#xff0c;并且使用C语言和python语言实现获取子进程退出时候的退出状态码。 程序退出状态码 前言 在本篇文章当中主要给大家介绍一个shell的小知识——状态码。这是…

Object.defineProperty用法

Object.defineProperty() 定义新属性和修改原有的属性 Object.defineProperty( obj&#xff0c;prop,descriptor) 翻译&#xff1a;对象.定义属性&#xff08;对象&#xff0c;属性名必须是字符写法&#xff0c;{ value:所有 }&#xff09; 创建一个对象&#xff1a; var o…

OSI七层参考模型和TCP/IP四层(五层)参考模型

OSI七层参考模型 OSI&#xff08;OSI&#xff0c;Open System Interconnection&#xff09;七层模型&#xff0c;是参考模型是国际标准化组织&#xff08;ISO&#xff09;制定的一个用于计算机或通信系统间互联的标准体系。它是一个七层的、抽象的模型体&#xff0c;不仅…

泰勒展开式

泰勒展开式 文章目录泰勒展开式简介定义近似举例推导理解参考简介 泰勒公式&#xff0c;也称泰勒展开式&#xff0c;可以用来在局部范围内近似复杂函数。 通俗的讲&#xff1a; 设有一个复杂的未知函数f(x)f(x)f(x)&#xff0c;我们想要知道它在某个范围[a,b][a,b][a,b]内的值…

抽象工厂模式

思考抽象工厂模式 抽象工厂专注于产品簇的实现&#xff0c;主要是那些有关联关系的&#xff0c;如果只有一个产品的话就退化成了工厂方法模式 1.抽象工厂模式的本质 抽象工厂模式的本质:选择产品簇的实现。 产品簇&#xff08;Product family&#xff09;指具有相同或相似的功能…

Simulink电机控制代码生成-----关于PI控制器参数整定的一点总结

目录 PI控制器的参数整定方法 方法一&#xff1a; 方法二&#xff1a; 方法对比 总结 看过很多论文&#xff0c;对PI参数的整定方法五花八门&#xff0c;还有PI参数整定的口诀&#xff0c;所谓口诀就是试凑法。除了试凑法&#xff0c;本文提供另外两种方法来整定PI参数&am…

【JavaSE】类与对象--封装性

文章目录一、面向对象的三大特性二、封装性1.什么是封装性&#xff1f;2.为什么要有封装性&#xff1f;3.封装性的作用4.封装性的实现步骤5.访问限定修饰符一、面向对象的三大特性 面向对象的三个基本特征是&#xff1a;封装、继承、多态。 二、封装性 1.什么是封装性&#xf…

BLE学习(4):蓝牙地址类型和设备的隐私

蓝牙地址也被称为蓝牙MAC地址&#xff0c;它能唯一标识一个蓝牙设备的48位的值。在蓝牙规范中&#xff0c;它被称为BD_ADDR。蓝牙的地址类型可以分为两种&#xff1a;public addresses和random addresses&#xff0c;其中random addresses又可再细分为几类&#xff0c;如下图所…

centos7.8手动部署php环境 01 nginx1.8.0编译安装

环境说明&#xff1a; 一、使用电脑 MacBook Pro 二、ssh 工具 finalShell 三、本地虚拟机 VMware Fusion 四、服务器配置 CentOS 7.8 64位 ps&#xff1a;虚拟机安装CentOS省略 登录服务器 一、安装基础工具 yum install net-tools -y yum install wget -y二、将yum源更改为腾…

创新型智慧农业信息化系统建设方案

农业电信增值服务与农村信息化建设要充分考虑新的信息技术、移动通信技术带来的革命&#xff0c;尤其是三网融合、移动互联网、下一代互联网、物联网、云计算、移动支付等新型产品与业务对我们的影响。 存在问题 •网络覆盖基础差 仍然有很多行政村没有宽带网络的覆盖&#x…

TiDB HTAP

TiDB 数据库 HTAP 概述 HTAP技术 OLAP和OLTP带来了多副本的问题。 HTAP的要求 HTAP的架构 异步复制&#xff0c;不参与投票。 HTAP的特性 行列混合 列存支持基于主键的实时更新TiFlash作为列存副本OLTP和OLAP业务隔离 智能选择&#xff08;CBO自动或者人工选择&#xff09;…

IB数学的备战技巧有哪些?

去知名品牌院校进修是许多学员的理想&#xff0c;可是众所周知这些重点大学的入校规定是十分严谨的。因此许多学员为了更好地提高自身的入校概率&#xff0c;就报名参加了一些可以大大加分的国际课程内容和考試。在其中IB数学课则是最受众多学员钟爱的一个课程内容。近些年&…

PyTorch 官方库「上新」,TorchMultimodal 助力多模态人工智能

多模态人工智能是一种新型 AI 范式&#xff0c;是指图像、文本、语音、视频等多种数据类型&#xff0c;与多种智能处理算法相结合&#xff0c;以期实现更高的性能。 近日&#xff0c;PyTorch 官方发布了一个 domain library–TorchMultimodal&#xff0c;用于 SoTA 多任务、多模…