apache-atlas-hive-hook-源码分析

news2025/1/17 0:48:55

Atlas Hook类图

Hive 元数据变更有2种实现:
1)基于 Hook 函数实现,实现类为 HiveHook
2)基于MetaStoreEventListener 实现, 实现类为HiveMetastoreHookImpl
所以提供2 种配置,即配置钩子函数或监听器,我们目前采用的是第二种方案。Atlas hook的类图如下所示
在这里插入图片描述Hive hook 开发的步骤
1)创建类实现ExecuteWithHookContext
2)打包上传到HIVE_HOME/lib
3)配置,可在hive client中临时配置,或者设置hive-site.xml配置

Hive Hook源码分析

HiveHook重写了AtlasHook的run方法,根据hive的操作类型,生成事件,通过AtlasHook#notifyEntities发送事件消息。

HiveHook#run

public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
@Override
public void run(HookContext hookContext) throws Exception {
//...
    try {
// 从hookContext中获取HiveOperation
        HiveOperation        oper    = OPERATION_MAP.get(hookContext.getOperationName());
        AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects());
        BaseHiveEvent        event   = null;
// 匹配不同的HiveOperation, 创建事件        
        switch (oper) {
            case CREATEDATABASE:
                event = new CreateDatabase(context);
            break;
            case DROPDATABASE:
                event = new DropDatabase(context);
            break;
//...
            case ALTERTABLE_RENAME:
            case ALTERVIEW_RENAME:
                event = new AlterTableRename(context);
            break;
            case ALTERTABLE_RENAMECOL:
                event = new AlterTableRenameCol(context);
            break;
            default:
                //...
            break;
        }
        if (event != null) {
// 用户组信息
            final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
// 委托给AtlasHook#notifyEntities
            super.notifyEntities(event.getNotificationMessages(), ugi);
        }
    } catch (Throwable t) {
        LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
    }
//...
}
}

hive事件类图
在这里插入图片描述BaseHiveEvent事件主要分为以下几类
1)Hive类型 HIVE_TYPE_
2)Hbase类型 HBASE_TYPE_
3)Hive属性 ATTRIBUTE_
4)Hbase属性 HBASE_
5)Hive关系属性 RELATIONSHIP_
6)其他

AtlasHook#notifyEntities

notifyEntities消息通知有同步和异步两种策略,notifyEntities委托给notifyEntitiesInternal

public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
    if (executor == null) { // send synchronously
// 同步通知
        notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
    } else {
// 异步通知
        executor.submit(new Runnable() {
            @Override
            public void run() {
                notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
            }
        });
    }
}

AtlasHook#notifyEntitiesInternal

notifyEntitiesInternal实现了重试策略, 掉用notificationInterface.send方法发送消息

@VisibleForTesting
static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi,
                                   NotificationInterface notificationInterface,
                                   boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
// ...
    final int maxAttempts         = maxRetries < 1 ? 1 : maxRetries;
    Exception notificationFailure = null;
// 多次尝试
    for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) {
        if (numAttempt > 1) { // retry attempt
// 重试,线程sleep
            try {
                LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval);
                Thread.sleep(notificationRetryInterval);
            } catch (InterruptedException ie) {
                LOG.error("Notification hook thread sleep interrupted");
                break;
            }
        }
        try {
//  发送通知,什么地方接收通知
            if (ugi == null) {
                notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
            } else {
                PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
                    @Override
                    public Object run() throws Exception {
                        notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
                        return messages;
                    }
                };
                ugi.doAs(privilegedNotify);
            }
            notificationFailure = null; // notification sent successfully, reset error
            break;
        } catch (Exception e) {
            notificationFailure = e;
            LOG.error("Failed to send notification - attempt #{}; error={}", numAttempt, e.getMessage());
        }
    }

// 通知失败处理
// ...
}

通知消息最终由KafkaNotification将消息发送到kafka, KafkaNotification在AtlasHook静态方法种由NotificationProvider创建

notificationInterface的初始化

notificationInterface的初始化在AtlasHook 的static方法中初始化,notificationInterface的实现类为KafkaNotification,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。

public abstract class AtlasHook {
//...
    protected static Configuration         atlasProperties;
    protected static NotificationInterface notificationInterface;
//...
    private static       ExecutorService      executor = null;

    static {
        try {
            atlasProperties = ApplicationProperties.get();
        } catch (Exception e) {
            LOG.info("Failed to load application properties", e);
        }
//...
        metadataNamespace         = getMetadataNamespace(atlasProperties);
        placeCode                 = getPlaceCode(atlasProperties);
        notificationMaxRetries    = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
        notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
// 消息通知接口实现
        notificationInterface     = NotificationProvider.get();
//...
}

HiveMetastoreHookImpl源码分析

HiveMetastoreHookImpl

HiveMetastoreHookImpl 继承了MetaStoreEventListener 类,重写了 onCreateDatabase、onDropDatabase等方法,并委托给HiveMetastoreHook#handleEvent处理

public class HiveMetastoreHookImpl extends MetaStoreEventListener {
    private static final Logger            LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class);
    private        final HiveHook          hiveHook;
    private        final HiveMetastoreHook hook;

    public HiveMetastoreHookImpl(Configuration config) {
        super(config);
        this.hiveHook = new HiveHook();
        this.hook     = new HiveMetastoreHook();
    }
//...
    @Override
    public void onCreateDatabase(CreateDatabaseEvent dbEvent) {
        HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent);
    // 委托给HiveMetastoreHook执行
        hook.handleEvent(context);
    }
    @Override
    public void onDropDatabase(DropDatabaseEvent dbEvent) {
        HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent);
        hook.handleEvent(context);
}
//...
}

HiveMetastoreHookImpl#handleEvent

handleEvent将变更消息通过AtlasHook#notifyEntities,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。

public void handleEvent(HiveOperationContext operContext) {
        ListenerEvent listenerEvent = operContext.getEvent();
//...
        try {
            HiveOperation        oper    = operContext.getOperation();
            AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent);
            BaseHiveEvent        event   = null;
            switch (oper) {
                case CREATEDATABASE:
                    event = new CreateDatabase(context);
                    break;
               //...
                case ALTERTABLE_RENAMECOL:
                    FieldSchema columnOld = operContext.getColumnOld();
                    FieldSchema columnNew = operContext.getColumnNew();
                    event = new AlterTableRenameCol(columnOld, columnNew, context);
                    break;
                default:
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent);
                    }
                    break;
            }
            if (event != null) {
                final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI();
// 消息通知,AtlasHook#notifyEntities
                super.notifyEntities(event.getNotificationMessages(), ugi);
            }
        } catch (Throwable t) {
            LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10177.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nvm包管理工具下载安装

1&#xff0c;去github官网&#xff0c;输入nvm-windows&#xff0c;点击第一个nvm项目&#xff0c;在右侧点击releases,选择箭头指向的安装包 2&#xff0c;下载很快&#xff0c;但是安装前&#xff0c;得先卸载本机的nodejs,并且为nvm的包创建一个英文文件夹&#xff0c;…

Java---Map双列集合

目录 一、双列集合的介绍 二、Map的使用 1&#xff1a;Map中常见的API &#xff08;1&#xff09;put方法 &#xff08;2&#xff09;remove方法 2&#xff1a;Map的遍历 &#xff08;1&#xff09;通过键找值的方式遍历 &#xff08;2&#xff09;通过键值对对象遍历 &…

FRNet代码

代码目录简简单单&#xff0c;令人心旷神怡。 模型框架&#xff1a; 数据增强包括; 接着看一下数据集&#xff1a; import os from PIL import Image import numpy as np from sklearn.model_selection import train_test_splitimport torch import torch.utils.data as da…

Tomcat+Maven+Servlet安装与部署

文章目录前言一、Tomcat8下载安装二、MavenServlet部署1.创键Maven项目&#xff08;idea2021community&#xff09;2.pom.xml下引入servlet依赖3.main下创建webapp/WEB-INF/web.xml4.验证HttpServlet是否导入&#xff08;配置WebServlet路径&#xff09;5.手动打包web项目6.浏览…

DAMA-CDGA/CDGP数据治理认证包括哪几个方面?

DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职业…

【夯实Kafka知识体系及基本功】分析一下消费者(Consumer)实现原理分析「原理篇」

Consumer消费者 消费者可以从broker中读取数据。 一个消费者可以消费多个topic中的数据&#xff08;其中一个partion&#xff09;。 Consumer Group&#xff08;消费组&#xff09; 每个Consumer属于一个特定的Consumer Group。 可为每个Consumer指定group name&#xff0c…

自动驾驶--定位技术

[整理自百度技术培训中心课程](https://bit.baidu.com/products?id70) 为什么无人车需要精确的定位系统 在地下车库实现自动泊车的一个非常关键的技术是什么呢&#xff1f;那就是定位技术。 为什么无人车需要一个精确的定位系统。为什么无人车需要精确的定位系统&#xff…

网络线缆连接器和线槽

一、信息插座 1、信息插座简介 信息插座是终端(工作站)与水平干线子系统连接的接口,在水平干线子系统中双绞线的两 端是直接压接到配线架和信息插座中的,不需要跳线。 2、信息插座的配置 综合布线系统的设计,应该根据实际情况确定 所需信息插座个数和分布位置,也就决…

计算机的另一半

本篇先介绍了计算机中数字编码&#xff0c;地址的概念。然后介绍了组成计算机的另外一半内容&#xff0c;也就是CPU&#xff0c;和前面的计算机一半合起来就可以组成一个简易版的计算机了。至此终于大概看到了计算机的全貌。 数字编码系统 这里我们简单说一下计算机里数字编码…

智慧环卫解决方案-最新全套文件

智慧环卫解决方案-最新全套文件一、建设背景二、思路架构三、建设方案四、获取 - 智慧环卫全套最新解决方案合集一、建设背景 城市环境卫生管理是一项复杂而系统的社会工程&#xff0c;是与人民群众生活联系最密切的重要工作之一。环卫水平不仅是一个城市的“脸面”&#xff0…

kotlin 之单例类详解

object 单例对象的声明&#xff1a; object Model{var temp "1"val temp2 "2"const val temp3 "3" }抛出疑问&#xff1a;使用object修饰的类&#xff0c;是哪种类型的单例模式 这里我们先回顾一下java六种单例模式 1. 饿汉式 public c…

String、StringBuffer和StringBuilder类的区别

在 Java 中字符串属于对象&#xff0c;Java 提供了 String 类来创建和操作字符串。String 类是不可变类&#xff0c;即一旦一个 String 对象被创建以后&#xff0c;包含在这个对象中的字符序列是不可改变的&#xff0c;直至这个对象被销毁。 Java 提供了两个可变字符串类 Stri…

EFLFK——ELK日志分析系统+kafka+filebeat架构(3)

ELFK——ELK结合filebeat日志分析系统&#xff08;2&#xff09;_Evens7xxX的博客-CSDN博客 紧接上期&#xff0c;在ELFK的基础上&#xff0c;添加kafka做数据缓冲 附kafka消息队列 nginx服务器配置filebeat收集日志&#xff1a;192.168.116.40&#xff0c;修改配置将采集到的…

SoC-ZCU106求解非线性方程(一):环境安装

一、大家好久不见&#xff0c;本次给大家带来的是SoC求解非线性方程问题。计划发布三篇文章&#xff0c;这是第一篇----环境安装。 主要的解决的问题是&#xff1a;PL侧给PS传输数据&#xff0c;然后PS将数据作为已知量求解非线性方程&#xff0c;为了简化问题复杂度&#xff…

中睿天下实力入选2022信创产业独角兽TOP100

近日&#xff0c;中国科学院主管的权威媒体《互联网周刊》、德本咨询、eNet研究院联合发布了“2022信创产业独角兽100强”榜单。中睿天下凭借在网络安全攻击溯源领域的深耕、硬的技术能力和突出的产品创新力&#xff0c;实力入选榜单&#xff0c;在上榜的安全企业中&#xff0c…

Docker(四)—— 部署Nginx、Tomcat

一、部署Nginx 将Nginx后台挂载后&#xff0c;用curl命令访问&#xff0c;进行本机自测&#xff1a; 二、部署Tomcat 出现404页面的原因&#xff1a;为了缩小镜像的大小&#xff0c;官方下载的Tomcat镜像是精简版的&#xff0c;只提供了必要、核心的内容。我们进入容器内部的/w…

You辉编程_kafka

一、什么是kafka? 是分布式(项目部署于多个服务器)的基于发布/订阅模式的消息队列&#xff0c;主要用于处理活跃的数据&#xff0c;如&#xff1a;登录、浏览、点击、分享等用户行为产生的数据&#xff0c;说白了就是一个消息系统&#xff08;消息队列&#xff09;。 进一步…

java项目-第132期ssm学生会管理系统-ssm+shiro+activity社团毕业设计

java项目-第132期ssm学生会管理系统-ssmshiroactivity社团毕业设计 【源码请到资源专栏下载】 今天分享的项目是《学生会管理系统》 该项目分为不同的角色&#xff0c;其中包含超级管理员、生活文体部部长、行政秘书部部长、 外联部部长、策划部部长、学生会干事等角色&#xf…

[附源码]java毕业设计基于的网上饮品店

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

FITC标记葡聚糖(40kDa),FITC Dextran-40,CAS号:60842-46-8

中文名称&#xff1a;FITC标记葡聚糖(40kDa) 英文名称&#xff1a;FITC Dextran-40 CAS号&#xff1a;60842-46-8 产品规格&#xff1a;50mg|250mg|1g 本制品是对平均分子量约40kDa葡聚糖进行标记的荧光素衍生物&#xff0c;即异硫氰酸荧光素葡聚糖40&#xff08;fluoresce…