导入JDBC元数据到Apache Atlas

news2024/11/15 10:59:29

前言

前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美

本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas

数据库schema与catalog

按照SQL标准的解释,在SQL环境下CatalogSchema都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog,每个Catalog又包含多个Schema,而每个Schema又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema,而该Schema又必然属于一个Catalog,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog名称.Schema名称.表名称。这里还有一点需要注意的是,SQL标准并不要求每个数据库对象的完全限定名称是唯一的。

从实现的角度来看,各种数据库系统对CatalogSchema的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog名,使用用户名作为Schema名,具体可参见下表:

表1 常用数据库

供应商Catalog支持Schema支持
Oracle不支持Oracle User ID
MySQL不支持数据库名
MS SQL Server数据库名对象属主名,2005版开始有变
DB2指定数据库对象时,Catalog部分省略Catalog属主名
Sybase数据库名数据库属主名
Informix不支持不需要
PointBase不支持数据库名

原文:https://www.cnblogs.com/ECNB/p/4611309.html

元数据模型层级抽象

不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系

在这里插入图片描述

  • Datasource -> Catalog -> Schema -> Table -> Column
  • Datasource -> Catalog -> Table -> Column
  • Datasource -> Schema -> Table -> Column

元数据转换设计

在这里插入图片描述

提供元数据

借鉴Apache DolphinScheduler中获取Connection的方式,不多赘述。

public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
        BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
        String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
        logger.info("Get connection from datasource {}", datasourceUniqueId);

        DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
            Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
            DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());
            if (null == dataSourceChannel) {
                throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));
            }
            return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);
        });
        return dataSourceClient.getConnection();
    }

转换元数据

  1. 元数据模型

创建数据库的元数据模型

private AtlasEntityDef createJdbcDatabaseDef() {
   AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,
           Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),
           createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),
           createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),
           createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),
           createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string")
   );

   typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);

   return typeDef;
}

创建数据库模式的元数据模型

private AtlasEntityDef createJdbcSchemaDef() {
    AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(
            SchemaProperties.JDBC_TYPE_SCHEMA,
            Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET)
    );

    typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);
    typeDef.setOptions(new HashMap<>() {{
        put("schemaElementsAttribute", "tables");
    }});

    return typeDef;
}

创建数据库表的元数据模型

private AtlasEntityDef createJdbcTableDef() {
    AtlasEntityDef typeDef = createClassTypeDef(
            TableProperties.JDBC_TYPE_TABLE,
            Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),
            createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string")
    );

    typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);
    typeDef.setOptions(new HashMap<>() {{
        put("schemaElementsAttribute", "columns");
    }});

    return typeDef;
}

创建数据库列的元数据模型

private AtlasEntityDef createJdbcColumnDef() {
    AtlasEntityDef typeDef = createClassTypeDef(
            ColumnProperties.JDBC_TYPE_COLUMN,
            Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string")
    );

    typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);
    HashMap<String, String> options = new HashMap<>() {{
        put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");
    }};
    typeDef.setOptions(options);

    return typeDef;
}

创建实体之间的关系模型

private List<AtlasRelationshipDef> createAtlasRelationshipDef() {
    String version = "1.0";
    // 数据库和模式的关系
    AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,
            BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,
            version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false)
    );
    databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_DATABASE_TABLES,
            BaseProperties.RELATIONSHIP_DATABASE_TABLES,
            version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false)
    );
    databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    // 模式和数据表的关系
    // 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)
    AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_SCHEMA_TABLES,
            BaseProperties.RELATIONSHIP_SCHEMA_TABLES,
            version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false)
    );
    schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    // 表和数据列的关系
    AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_TABLE_COLUMNS,
            BaseProperties.RELATIONSHIP_TABLE_COLUMNS,
            version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false)
    );
    tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
  1. 提取元数据

    不再赘述

  2. 转换元数据

使用工厂模式,提供不同类型的元数据转换方式

public interface JdbcTransferFactory {

    JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);

    boolean supportType(String type);

    String getName();
}

List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysqlinformation_schema

public interface JdbcTransfer {

    void transfer();

    JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}

举例:JdbcMysqlTransfer 和 MysqlTransferFactory

@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {

    public static final String MYSQL = "mysql";

    @Override
    public JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {
        return new JdbcMysqlTransfer(metaData, client);
    }

    @Override
    public boolean supportType(String type) {
        return MYSQL.equalsIgnoreCase(type);
    }

    @Override
    public String getName() {

        return MYSQL;
    }
}
public class JdbcMysqlTransfer implements JdbcTransfer {

    private final Jdbc jdbc;
    private final AtlasService atlasService;
    private List<Pattern> ignorePatterns;

    public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {
        this.jdbc = new Jdbc(new JdbcMetadata(metaData));
        this.atlasService = new AtlasService(client);
        this.ignorePatterns = Collections.emptyList();
    }


    @Override
    public JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {
        this.ignorePatterns = ignorePatterns;
        return this;
    }

    private boolean tableIsNotIgnored(String tableName) {
        return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());
    }

    @Override
    public void transfer() {
        // 1.数据库实体转换
        DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);
        AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);

        // 2.表实体转换
        String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);
        List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream()
                .filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName()))
                .map(new TableTransfer(atlasService, databaseEntity))
                .toList();

        // 3.列转换
        for (AtlasEntity tableEntity : tableEntities) {
            String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);
            List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);
            jdbc.getColumns(catalog, catalog, tableName).parallelStream()
                    .forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));
        }
    }

}
  1. 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {

    private final AtlasService atlasService;

    public DatabaseTransfer(AtlasService atlasService) {
        this.atlasService = atlasService;
    }

    @Override
    public AtlasEntity apply(Jdbc jdbc) {
        String userName = jdbc.getUserName();
        String driverName = jdbc.getDriverName();
        String productName = jdbc.getDatabaseProductName();
        String productVersion = jdbc.getDatabaseProductVersion();

        String url = jdbc.getUrl();
        String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;
        String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);
        // 特殊处理 Oracle
        if (productName.equalsIgnoreCase("oracle")){
            catalogName = userName.toUpperCase();
            urlWithNoParams = urlWithNoParams + "/" + catalogName;
        }

        DatabaseProperties properties = new DatabaseProperties();
        properties.setQualifiedName(urlWithNoParams);
        properties.setDisplayName(catalogName);
        properties.setOwner(userName);
        properties.setUrl(url);
        properties.setDriverName(driverName);
        properties.setProductName(productName);
        properties.setProductVersion(productVersion);

        // 1.创建Atlas Entity
        AtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());

        // 2.判断是否存在实体, 存在则填充GUID
        Map<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);
        Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);
        entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));

        // 3,存储或者更新到Atlas中
        if (entityHeader.isPresent()){
            atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));
        }else {
            AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));
            atlasEntity.setGuid(header.getGuid());
        }
        return atlasEntity;
    }
}

效果展示

  1. 元数据类型定义

在这里插入图片描述

在这里插入图片描述

  1. 测试导入元数据

由于mysql没有采用schema,因此jdbc_schema为空

在这里插入图片描述

如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容

在这里插入图片描述

数据表元数据,qualifiedName使用数据库连接url.表名
在这里插入图片描述

如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1284992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Latex笔记】标题页

整体结构 模板结构如下&#xff1a; \documentclass{book} % 导言区&#xff0c;加载宏包和各项设置&#xff0c;包括参考文献、索引等 \usepackage{makeidx} % 调用makeidx 宏包&#xff0c;用来处理索引 \makeindex % 开启索引的收集 \bibliographystyle{plain} % 指定参考…

OpenCV-Python:图像卷积操作

目录 1.图像卷积定义 2.图像卷积实现步骤 3.卷积函数 4.卷积知识考点 5.代码操作及演示 1.图像卷积定义 图像卷积是图像处理中的一种常用操作&#xff0c;主要用于图像的平滑、锐化、边缘检测等任务。它可以通过滑动一个卷积核&#xff08;也称为滤波器&#xff09;在图像…

【C/PTA —— 14.结构体1(课内实践)】

C/PTA —— 14.结构体1&#xff08;课内实践&#xff09; 6-1 计算两个复数之积6-2 结构体数组中查找指定编号人员6-3 综合成绩6-4 结构体数组按总分排序 6-1 计算两个复数之积 struct complex multiply(struct complex x, struct complex y) {struct complex product;product.…

Shopify二次开发之三:liquid语法学习(Tags)

目录 Tags 变量声明 assign capture decrement increment 条件语句 if else unless case HTML form表单生成 style Iteration (遍历) for else break continue cycle paginate Theme &#xff08;主题&#xff09; render渲染一个snippet&#xff0c;可…

用友NC word.docx接口存在任意文件读取漏洞

声明 本文仅用于技术交流&#xff0c;请勿用于非法用途 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任。 一、产品介绍 用友 NC Cloud&#xff0c;大型企业数字化平台&#xff…

MySQL笔记-第04章_运算符

视频链接&#xff1a;【MySQL数据库入门到大牛&#xff0c;mysql安装到优化&#xff0c;百科全书级&#xff0c;全网天花板】 文章目录 第04章_运算符1. 算术运算符2. 比较运算符3. 逻辑运算符4. 位运算符5. 运算符的优先级拓展&#xff1a;使用正则表达式查询 第04章_运算符 …

轻量封装WebGPU渲染系统示例<43>- PBR材质与阴影实(源码)

原理简介: 1. 基于rendering pass graph实现。 2. WGSL Shader 基于文件系统和宏机制动态组装。 当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/feature/rendering/src/voxgpu/sample/PBRShadowTest.ts 当前示例运行效果: 此示例基于此渲染系统实现&a…

【Windows】使用SeaFile搭建本地私有云盘并结合内网穿透实现远程访问

1. 前言 现在我们身边的只能设备越来越多&#xff0c;各种智能手机、平板、智能手表和数码相机充斥身边&#xff0c;需要存储的数据也越来越大&#xff0c;一张手机拍摄的照片都可能有十多M&#xff0c;电影和视频更是按G计算。而智能设备的存储空间也用的捉襟见肘。能存储大量…

探索CSS:从入门到精通Web开发(二)

前言 当我们谈论网页设计和开发时&#xff0c;CSS&#xff08;层叠样式表&#xff09;无疑是其中的重要一环。作为HTML的伴侣&#xff0c;CSS赋予网页以丰富的样式和布局&#xff0c;使得网站看起来更加吸引人并且具备更好的可读性。本书将通过一系列深入浅出的方式&#xff0…

Java多线程 - 黑马教程

文章目录 Java 多线程一、多线程概述二、 多线程创建方式1、继承 Thread 类创建线程2、实现 Runnable 接口3、实现 Callable 接口三、Thread 常用的方法四、线程安全什么是线程安全问题?线程安全问题出现的原因程序模拟线程安全五、线程同步线程同步方式1:同步代码块线程同步…

GPIO的使用--时钟使能含义--代码封装

目录 一、时钟使能的含义 1.为什么要时钟使能&#xff1f; 2.什么是时钟使能&#xff1f; 3.GPIO的使能信号&#xff1f; 二、代码封装 1.封装前完整代码 2.封装结构 封装后代码 led.c led.h key.c key.h main.c 一、时钟使能的含义 1.为什么要时钟使能&#xff1f…

关于如何解决问题?代码习惯。

警钟长鸣 从师哥身上学到的东西&#xff1a; 关于如何解决问题&#xff1f; 1、沟通&#xff1a;有效的沟通&#xff0c;将问题描述清楚&#xff0c;让老师和师哥明白你出了什么问题&#xff0c;给出建议&#xff0c;很多时候一句良言胜过自己摸索很久 2、出现问题由浅入深地…

国标GB28181安防监控平台EasyCVR录像时间轴优化步骤

视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同&#xff0c;支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。音视频流媒体视频平台EasyCVR拓展性强&#xff0c;视频能力丰富&#xff0c;具体可实现视频监控直播、视频轮播、视频录像、…

Linux系统上RabbitMQ安装教程

一、安装前环境准备 Linux&#xff1a;CentOS 7.9 RabbitMQ Erlang 1、系统内须有C等基本工具 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c kernel-devel m4 ncurses-devel tk tc xz socat2、下载安装包 1&#xff09;首先&a…

Harmony Ble蓝牙App(三)特性和属性

Ble蓝牙App&#xff08;三&#xff09;特性使用 前言正文一、获取属性列表二、属性提供者三、获取特性名称四、特性提供者五、加载特性六、源码 前言 在上一篇中我们完成了连接和发现服务两个动作&#xff0c;那么再发现服务之后要做什么呢&#xff1f;发现服务只是让你知道设备…

<Linux>(极简关键、省时省力)《Linux操作系统原理分析之linux存储管理(5)》(21)

《Linux操作系统原理分析之linux存储管理&#xff08;5&#xff09;》&#xff08;21&#xff09; 6 Linux存储管理6.6 Linux 物理空间管理6.6.1 Linux 物理内存空间6.6.2 物理页面的管理6.6.3 空闲页面管理——buddy 算法 6.7 内存的分配与释放6.7.1 物理内存分配的数据结构 6…

Design patterns--代理模式

设计模式之代理模式 我们使用Qt开发大型应用程序时&#xff0c;经常遇见大型程序启动时需要加载一些配置信息、用户末次操作信息&#xff0c;以及算法模型等数据时比较费时&#xff0c;笔者在程序启动时设计欢迎页或加载页等窗体来提示用户程序正在加载某些数据&#xff0c;加载…

基于SSM框架的《超市订单管理系统》Web项目开发(第五天)供应商管理,增删改查

基于SSM框架的《超市订单管理系统》Web项目开发&#xff08;第五天&#xff09;供应商管理&#xff0c;增删改查 上一次我们实现了多表关联查询&#xff0c;还有分页显示数据的功能。还完善了用户管理这一模块。 因此今天我们需要完成的是供应商管理模块&#xff0c;这一模块…

SQL自学通之表达式条件语句与运算

目录 一、目标 二、表达式条件语句 1、表达式&#xff1a; 2、条件 2.1、WHERE 子句 三、运算 1、数值型运算: 1.1、加法() 1.2、减法 (-) 1.3、除法&#xff08;/&#xff09; 1.4、乘法 &#xff08;*&#xff09; 1.5、取模 &#xff08;%&#xff09; 优先级别…