apache atlas 如何自定义hook

news2025/1/1 9:52:14

atals 是开源的数据元数据和数据资产管理平台,平台设计支持强大的图数数据库,nosql,和搜索引擎3个组件构建。都是基于开源构建。

 目前市场上开源的元数据管理工具有Atlas, Datahub, Openmetadata等,你要说二次开发,谁最好,如果是java 开发,还是 Atlas ,灵活,简单。其他两个都要会python,多种语言。

atlas 虽然支持,hbase,hive,impala,sqoop等这些组件的实时元数据采集。但是其他的可以采用自定义hook来实现钩子函数。下图是一个钩子函数的流程:

我们了解钩子函数先了解,数据源,所谓钩子函数,其实是需要源系统配合,这个其实就是源系统的一个监听机制,就是在客户端(写sql)——执行端,在中间有个监听程序,可以获取sql解析过程。如果源系统没有,那就不能实现监听数据获取。

其实不会写监听程序,atlas 也好处理,中间的kafka 就是一个实时监听通道,只要你按照atlas 的格式要求,提交监控程序,就可以实现元数据管理。kafka 有两个topic:ATLAS_HOOK_TOPICATLAS_ENTITIES_TOPIC 。只要满足这两个topic 的数据格式,可以实时写入元数据。

Atlas 在元数据管理,主要分为两部分API和kafka.在kafka之前我们先说一下什么是model .

其实models 类似我们的jdbc连接或者是presto 的catalog 信息。这个元数据的注册信息。就是你连接的是什么数据库,什么程序,字段,表,视图等这些信息需要进行注册,毕竟不同的库,这些信息不一样,比如hive 和hbase 的属性肯定不一样。那就需要建设model ,建model 有两种方式,一种是java API 

另外一个是通过model json 进行提交

源码里面有很多的json model文件

curl -i -X POST -H "Content-Type: application/json" -d '{
    "enumTypes": [],
    "structTypes": [],
    "classificationDefs": [],
    "entityDefs": [
        {
      "category": "ENTITY",
      "version": 1,
      "name": "clickhouse_db",
      "description": "clickhouse_db",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [
        {
          "name": "location",
          "typeName": "string",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": 5
        },
        {
          "name": "clusterName",
          "typeName": "string",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": 8
        },
        {
          "name": "parameters",
          "typeName": "map<string,string>",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        },
        {
          "name": "ownerType",
          "typeName": "string",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        }
      ],
      "superTypes": [
        "DataSet"
      ],
      "subTypes": [],
      "relationshipAttributeDefs": [
        {
          "name": "inputToProcesses",
          "typeName": "array<Process>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "dataset_process_inputs",
          "isLegacyAttribute": false
        },
        {
          "name": "schema",
          "typeName": "array<avro_schema>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "avro_schema_associatedEntities",
          "isLegacyAttribute": false
        },
        {
          "name": "tables",
          "typeName": "array<clickhouse_table>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "clickhouse_table_db",
          "isLegacyAttribute": false
        },
        {
          "name": "meanings",
          "typeName": "array<AtlasGlossaryTerm>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "AtlasGlossarySemanticAssignment",
          "isLegacyAttribute": false
        },
        {
          "name": "outputFromProcesses",
          "typeName": "array<Process>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "process_dataset_outputs",
          "isLegacyAttribute": false
        }
      ],
      "businessAttributeDefs": {}
    }
    ],
    "relationshipDefs": []
}' --user admin:admin "http://localhost:21000/api/atlas/v2/types/typedefs"
 
 
 
 

这一步是要注册数据库类型:注册数据库,注册数据表,注册字段等

下一步要对,库-表,字段进行关系映射

#/v2/types/typedefs
{
  "entityDefs": [],
  "classificationDefs": [],
  "structDefs": [],
  "enumDefs": [],
  "relationshipDefs": [
    {
      "category": "RELATIONSHIP",
      "version": 1,
      "name": "clickhouse_table_db",
      "description": "clickhouse_table_db",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [],
      "relationshipCategory": "AGGREGATION",
      "propagateTags": "NONE",
      "endDef1": {
        "type": "clickhouse_table",
        "name": "db",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      },
      "endDef2": {
        "type": "clickhouse_db",
        "name": "tables",
        "isContainer": true,
        "cardinality": "SET",
        "isLegacyAttribute": false
      }
    },
    {
      "category": "RELATIONSHIP",
      "version": 1,
      "name": "clickhouse_table_columns",
      "description": "clickhouse_table_columns",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [],
      "relationshipCategory": "COMPOSITION",
      "propagateTags": "NONE",
      "endDef1": {
        "type": "clickhouse_table",
        "name": "columns",
        "isContainer": true,
        "cardinality": "SET",
        "isLegacyAttribute": false
      },
      "endDef2": {
        "type": "clickhouse_column",
        "name": "table",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      }
    },
    {
      "category": "RELATIONSHIP",
      "version": 1,
      "name": "clickhouse_table_storagedesc",
      "description": "clickhouse_table_storagedesc",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [],
      "relationshipCategory": "ASSOCIATION",
      "propagateTags": "NONE",
      "endDef1": {
        "type": "clickhouse_table",
        "name": "sd",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      },
      "endDef2": {
        "type": "clickhouse_storagedesc",
        "name": "table",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      }
    }
  ]
}

关系是 数据库-表-字段-属性等关系映射,这个是为了映射跳转。

第二步:kafka写数据

写入数据,可以通过api调研,也可以通过kafka 提交:

{
    "version": {
        "version": "1.0.0",
        "versionParts": Array[1]
    },
    "msgCompressionKind": "NONE",
    "msgSplitIdx": 1,
    "msgSplitCount": 1,
    "msgSourceIP": "10.45.1.116",
    "msgCreatedBy": "bi",
    "msgCreationTime": 1710575827820,
    "message": {
        "type": "ENTITY_CREATE_V2",
        "user": "bi",
        "entities": {
            "entities": [
                {
                    "typeName": "clickhouse_table",
                    "attributes": {
                        "owner": "bi",
                        "ownerType": "USER",
                        "sd": Object{...},
                        "tableType": "MANAGED",
                        "createTime": 1710575827000,
                        "qualifiedName": "test.wuxl_0316_ss@primary",
                        "columns": [
                            Object{...},
                            Object{...}
                        ],
                        "name": "wuxl_0316_ss",
                        "comment": "测试表",
                        "parameters": {
                            "transient_lastDdlTime": "1710575827"
                        },
                        "db": {
                            "typeName": "clickhouse_db",
                            "attributes": {
                                "owner": "bi",
                                "ownerType": "USER",
                                "qualifiedName": "test@primary",
                                "clusterName": "primary",
                                "name": "test",
                                "description": "",
                                "location": "hdfs://HDFS80727/bi/test.db",
                                "parameters": {

                                }
                            },
                            "guid": "-861237351166886",
                            "version": 0,
                            "proxy": false
                        }
                    },
                    "guid": "-861237351166888",
                    "version": 0,
                    "proxy": false
                },
                Object{...},
                Object{...},
                Object{...},
                Object{...}
            ]
        }
    }
}

可以通过flink 提交

-- 使用Flinksql往Atlas自带的topic里写消息
CREATE TABLE ads_zdm_offsite_platform_daren_rank_df_to_kafka (
        data string
) WITH (
  'connector' = 'kafka',
  'topic' = 'ATLAS_HOOK',
  'properties.bootstrap.servers' = 'localhost:9092', 
  'format' = 'raw'
);
 
insert into ads_zdm_offsite_platform_daren_rank_df_to_kafka
select '{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"10.45.1.116","msgCreatedBy":"bi","msgCreationTime":1710575827820,"message":{"type":"ENTITY_CREATE_V2","user":"bi","entities":{"entities":[{"typeName":"clickhouse_table","attributes":{"owner":"bi","ownerType":"USER","sd":{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},"tableType":"MANAGED","createTime":1710575827000,"qualifiedName":"test.wuxl_0316_ss@primary","columns":[{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}],"name":"wuxl_0316_ss","comment":"测试表","parameters":{"transient_lastDdlTime":"1710575827"},"db":{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false}},"guid":"-861237351166888","version":0,"proxy":false},{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false},{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}]}}}' as data
;
 

atlas 在自定义表,应用程序,报表等都有很方便的接口,可以通过接口或者kafka提交实时的变更信息,方便实时监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1653286.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

写一个函数返回参数二进制中1的个数--四种方法及原理解释

虽然本方法是java写的&#xff0c;但是其原理适用于大部分语言 方法一&#xff1a;通过取模%运算取出每一位比特位数值&#xff0c;再进行判断&#xff08;该方法不可判断负数&#xff09; 原理&#xff1a; 通过取模num % 2 1 取出该数的每一个二进制位数&#xff0c;再判…

HackMyVM-VivifyTech

目录 信息收集 arp nmap nikto whatweb WEB web信息收集 wpscan feroxbuster hydra 提权 系统信息收集 横向渗透 git提权 get root 信息收集 arp ┌──(root㉿0x00)-[~/HackMyVM] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 08:00:27:9d:6d:7b, …

JAVA语言开发的:一套智慧校园平台主要由哪些系统组成、又有哪些前景呢?让我们一起来看一看

▶技术架构&#xff1a;后端&#xff1a;Java 框架&#xff1a;springboot 前端页面&#xff1a;vue 小程序&#xff1a;小程序原生开发 ▶电子班牌&#xff1a;Java Android 源码有演示&#xff0c;自主研发&#xff0c;官方正版授权&#xff0c;联系客服咨询&#xff0…

使用Java编写的简单彩票中奖概率计算器

前言 在当今社会&#xff0c;彩票已经成为许多人追逐梦想和改变生活的一种方式。然而&#xff0c;中奖的概率却是一个让人犹豫和兴奋的话题。在这篇文章中&#xff0c;我们将探讨如何使用Java编程语言实现一个简单的彩票中奖概率计算器。通过这个计算器&#xff0c;我们可以根…

初识Node.js-REPL(详解交互式解释器)

目录 一、REPL介绍 1.概念 2.主要特点和用途 3.应用 二、 REPL语法 1.简单的表达式运算 2.使用变量 3.多行表达式 下划线(_)变量 三、REPL 命令 四、停止 REPL 五、Gif 实例演示 六、REPL应用实例 七、总结 一、REPL介绍 1.概念 Read&#xff08;读取&#xff09…

【吃透Java手写】1- Spring(上)-启动-扫描-依赖注入-初始化-后置处理器

【吃透Java手写】Spring&#xff08;上&#xff09;启动-扫描-依赖注入-初始化-后置处理器 1 准备工作1.1 创建自己的Spring容器类1.2 创建自己的配置类 ComponentScan1.3 ComponentScan1.3.1 Retention1.3.2 Target 1.4 用户类UserService Component1.5 Component1.6 测试类 2…

吴恩达机器学习笔记:第 9 周-16推荐系统(Recommender Systems) 16.3-16.4

目录 第 9 周 16、 推荐系统(Recommender Systems)16.3 协同过滤16.4 协同过滤算法 第 9 周 16、 推荐系统(Recommender Systems) 16.3 协同过滤 在之前的基于内容的推荐系统中&#xff0c;对于每一部电影&#xff0c;我们都掌握了可用的特征&#xff0c;使用这些特征训练出了…

TikTok 正式起诉美国政府;全新 iPad Pro 将搭载苹果 M4 芯片丨 RTE 开发者日报 Vol.199

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」&#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「…

视频号小店想要长久发展,做店的核心是什么?一篇详解!

大家好&#xff0c;我是电商小V 想要做好视频号小店&#xff0c;那么他的核心是什么呢&#xff1f; 视频号小店的核心还是商品&#xff0c;其实电商运营底层的逻辑都是一样的&#xff0c;都是以商品为核心去运营的&#xff0c;再说的浮夸一点就是&#xff0c;你的商品选择的好&…

ICode国际青少年编程竞赛- Python-2级训练场-基础训练4

ICode国际青少年编程竞赛- Python-2级训练场-基础训练4 1、 for i in range(4):if i > 2:Flyer[i].step(3)else:Flyer[i].step(1) Dev.step(Item[3].x - Dev.x)2、 for i in range(6):if i < 3:Flyer[i].step(2)else:Flyer[i].step(3) Dev.step(Item[2].x - Dev.x)3、 …

linux Nginx安装与启动

一、先到官网下载Nginx 官网地址&#xff1a; http://nginx.org/en/download.html 我下载的是nginx-1.20.2 二、下载好的文件上传到服务器&#xff0c;然后解压 1、上传到指定的服务器地址&#xff0c;我这里是公司服务器&#xff0c;目录都是定义好的&#xff0c;自己玩建…

哪个文件加密软件好?迅软加密软件特性解析

哪个文件加密软件好&#xff1f; 这里推荐一款好用的文件加密软件&#xff0c;迅软DSE加密软件&#xff0c;有17年的加密经验了&#xff0c;已为三十万企业解决信息安全问题。简单易用&#xff0c;兼容性强&#xff0c;各类型文件都可加密。完善的售后保障&#xff0c;各地有服…

C++之STL-priority_queue和仿函数的讲解

目录 一、priority_queue的介绍和使用 1.1 priority_queue的介绍 1.2 priority_queue的基本接口 二、仿函数的介绍 2.1 基本概念 2.2 适用场景 三、模拟实现priority_queue 3.1 向上调整算法 3.2 向下调整算法 3.3 整体框架 一、priority_queue的介绍和使用 1.1 prio…

数据可视化训练第一天(matplotlib直线;散点图,随机漫步)

前言 本人自己的练习记录&#xff1b;如有错误请指正&#xff1b; https://matplotlib.org/stable/gallery/lines_bars_and_markers/index.html 官方有许多例子&#xff0c;可以找到自己需要的图像模仿进行绘制 1.一个简单的直线例子 就如同我们学习C语言的第一个helloword时…

白话机器3:PCA与SVM详细数学原理

一、PCA数学原理 1.数据标准化 首先&#xff0c;需要对原始数据进行标准化处理&#xff0c;使得每个特征的均值为0&#xff0c;方差为1。假设有一个的数据矩阵X&#xff0c;其中每一列是一个样本&#xff0c;每一行是一个特征。 标准化公式如下&#xff1a; 其中&#xff0c;…

加速数据要素流通,“隐语杯”全国高校隐私计算大赛正式启动报名!

当前&#xff0c;我国数字经济正处在一个快速增长的阶段&#xff0c;数据要素逐渐成为促进社会经济繁荣的关键驱动力。随着国家对数据治理及隐私保护政策的不断完善&#xff0c;隐私计算技术的创新和实践应用变得愈发重要。面对数据安全与隐私保护的双重挑战&#xff0c;如何实…

系统稳定性判定分析(二)----频域分析法相关辐角原理

文章目录 辐角原理&#xff08;即Cauchy原理&#xff09;引理分析辐角原理定义与证明 参考文献 为后续更好从频域层面分析控制系统的稳定性&#xff0c;本节首先介绍在后续分析中用到的辐角原理。 根据复变函数对数的定义&#xff0c;有 l n f ( s ) l n ∣ f ( z ) ∣ i ( a…

libcity笔记:libcity/evaluator/traj_loc_pred_evaluator.py

1 构造函数 2 _check_config 检查配置是否符合评估器的要求&#xff0c;确保评估过程能够顺利执行 3 collect 4 evaluate 5 save_result & clear

如何使用多协议视频汇聚/视频安防系统EasyCVR搭建智慧园区视频管理平台?

智慧园区作为现代化城市发展的重要组成部分&#xff0c;不仅承载着产业升级的使命&#xff0c;更是智慧城市建设的重要体现。随着产业园区竞争的逐渐白热化&#xff0c;将项目打造成完善的智慧园区是越来越多用户关注的内容。 然而我们往往在规划前期就开始面临众多难题&#…

三、Redis五种常用数据结构-Hash

Hash是redis中常用的一种无序数据结构。结构类似HashMap。 具体结构如下&#xff1a;key field value 1、优缺点 1.1、优点 同类数据归类整合储存&#xff0c;方便数据管理。相比于string操作消耗内存和CPU更小。分字段存储&#xff0c;节省网络流量。 1.2、缺点 过期时间…