HiveMetastore 的架构简析

news2024/11/7 11:59:54

HiveMetastore 的架构简析

Hive Metastore 是 Hive 元数据管理的服务。可以把元数据存储在数据库中。对外通过 api 访问。

hive_metastore.thrift

对外提供的 Thrift 接口定义在文件 standalone-metastore/src/main/thrift/hive_metastore.thrift 中。

内容包括用到的结构体和枚举,和常量,和 rpc Service。
如分区定义如下:

struct Partition {
  1: list<string> values // string value is converted to appropriate partition key type
  2: string       dbName,
  3: string       tableName,
  4: i32          createTime,
  5: i32          lastAccessTime,
  6: StorageDescriptor   sd,
  7: map<string, string> parameters,
  8: optional PrincipalPrivilegeSet privileges,
  9: optional string catName
}

Service的定义了 client 和 server 的 RPC 请求。如增加分区的定义如下:

service ThriftHiveMetastore extends fb303.FacebookService
{
	Partition add_partition(1:Partition new_part)
        throws(1:InvalidObjectException o1, 
        	   2:AlreadyExistsException o2, 
        	   3:MetaException o3)
}

ThriftHiveMetastore.java

hive_metastore.thrift 编译之后生成文件 ThriftHiveMetastore.java

ThriftHiveMetastore 结构如下:

public class ThriftHiveMetastore {
	// Iface 定义了 Service 的所有的接口。仅列出了 add_partition
	public interface Iface extends com.facebook.fb303.FacebookService.Iface {
		public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException;
		// omit other methods

	}

   // AsyncIface 定义了异步接口。
	public interface AsyncIface extends com.facebook.fb303.FacebookService .AsyncIface {
		public void add_partition(Partition new_part, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
		// omit other methods

	}

// Client 的实现
	public static class Client extends com.facebook.fb303.FacebookService.Client implements Iface {

		public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException
    {
      send_add_partition(new_part);
      return recv_add_partition();
    }
    // receiveBase 调用 result.read 方法,从 protocal
    public Partition recv_add_partition() throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException
    {
      add_partition_result result = new add_partition_result();
      receiveBase(result, "add_partition");
      if (result.isSetSuccess()) {
        return result.success;
      }
      if (result.o1 != null) {
        throw result.o1;
      }
      if (result.o2 != null) {
        throw result.o2;
      }
      if (result.o3 != null) {
        throw result.o3;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "add_partition failed: unknown result");
    }
	}

// Processor 是服务端处理框架,把所有要处理的 rpc 的名称和处理的映射放到 map里,客户端请求 rpc,先输出 rpc 的名称。
	public static class Processor<I extends Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements org.apache.thrift.TProcessor {

	private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
	    
			processMap.put("add_partition", new add_partition());
		}
	}

    // AsyncProcessor
	public static class AsyncProcessor<I extends AsyncIface> extends com.facebook.fb303.FacebookService.AsyncProcessor<I> {
	}
}

IHMSHandler

IHMSHandler 是服务器端需要实现的接口。除了 ThriftHiveMetastore.Iface 外,还包括其他一些方法。

IHMSHandler extends ThriftHiveMetastore.Iface, Configurable

HMSHandler

HMSHandler 是服务器端的具体实现。创建服务时,创建 HMSHandler,多个线程调用同一个 HMSHandler 对象来处理 client 的请求。

public class HMSHandler extends FacebookBase implements IHMSHandler {
	@Override
    public Partition add_partition(final Partition part)
        throws InvalidObjectException, AlreadyExistsException, MetaException {
      return add_partition_with_environment_context(part, null);
    }
    // omit other methods


    @Override
    public Partition add_partition_with_environment_context(
        final Partition part, EnvironmentContext envContext)
        throws InvalidObjectException, AlreadyExistsException,
        MetaException {
      startTableFunction("add_partition",
          part.getCatName(), part.getDbName(), part.getTableName());
      Partition ret = null;
      Exception ex = null;
      try {
        ret = add_partition_core(getMS(), part, envContext);
      } catch (Exception e) {
        ex = e;
        if (e instanceof MetaException) {
          throw (MetaException) e;
        } else if (e instanceof InvalidObjectException) {
          throw (InvalidObjectException) e;
        } else if (e instanceof AlreadyExistsException) {
          throw (AlreadyExistsException) e;
        } else {
          throw newMetaException(e);
        }
      } finally {
        endFunction("add_partition", ret != null, ex, part != null ?  part.getTableName(): null);
      }
      return ret;
    }
}

getMS 从 ThreadLocal 里,每个线程单独的。

    @Override
    public RawStore getMS() throws MetaException {
      Configuration conf = getConf();
      return getMSForConf(conf);
    }
    public static RawStore getMSForConf(Configuration conf) throws MetaException {
      RawStore ms = threadLocalMS.get();
      if (ms == null) {
        ms = newRawStoreForConf(conf);
        ms.verifySchema();
        threadLocalMS.set(ms);
        ms = threadLocalMS.get();
      }
      return ms;
    }

为什么 Handler 总是一个线程处理一个 client 的请求

如果不是一个线程处理一个 client 的请求,那么 client 先发送一个请求,然后再发送第二个请求时, RawStore ms = threadLocalMS.get(); 就有可能拿到的是其他线程的 ms。

因为 org.apache.thrift.server.TThreadPoolServer.serve方法中。为每个 socket 创建一个 client 对象,并且把 client 的所有请求有 WorkerProcess 进行处理。WorkerProcess 是一个 Runnable。最终提交到 executorService_ 中。

while(!this.stopped_) {
        try {
            TTransport client = this.serverTransport_.accept();
            WorkerProcess wp = new WorkerProcess(client);

            while(true) {
                try {
                    this.executorService_.execute(wp);
                    break;
                } catch (Throwable var13) {
                    // omit
                }
            }
        } catch (TTransportException var14) {
           // 
        }
    }
  • WorkerProcess
    WorkerProcess 也是除了设置停止标志外死循环。
do {
        if (eventHandler != null) {
            eventHandler.processContext(connectionContext, inputTransport, outputTransport);
        }
    } while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));

processor 类型是 TUGIBasedProcessor。

  • 当客户端正常退出时。
    client 会调用 metastore 的 shutdown 方法。此方法里,清除所有的 threadlocal 对象。
	public void shutdown() {
        cleanupRawStore();
        PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();
    }
    protected static void cleanupRawStore() {
        try {
            RawStore rs = HMSHandler.getRawStore();
            if (rs != null) {
                HMSHandler.logInfo("Cleaning up thread local RawStore...");
                rs.shutdown();
            }
        } finally {
            HMSHandler handler = HMSHandler.threadLocalHMSHandler.get();
            if (handler != null) {
                handler.notifyMetaListenersOnShutDown();
            }
            HMSHandler.threadLocalHMSHandler.remove();
            HMSHandler.threadLocalConf.remove();
            HMSHandler.threadLocalModifiedConfig.remove();
            HMSHandler.removeRawStore();
            HMSHandler.logInfo("Done cleaning up thread local RawStore");
        }
    }
  • 异常退出时
    WorkerProcess 的 finally 处理不论是否当前连接调用 shutdown,都执行 eventHandler.deleteContext
finally {
        if (eventHandler != null) {
            eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
}

在 HiveMetaStore.java里定义了 eventHandler, 也调用了 cleanupRawStore,和 shutdown 方法里调用的一样。

@Override
public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
  openConnections.decrementAndGet();
  // If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already
  // cleaned up thread local RawStore. Otherwise, do it now.
  HMSHandler.cleanupRawStore();
}

threadLocal 对象的 remove 方法多次调用是没有副作用的。

其他考虑的点

在异常退出时,没有调用 PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();, 是否会内存溢出。
调用 PerfLogger.getPerfLogger(false). 当参数是 false 时,如果 ThreadLocal 里已经有,则不会创建对象。处理线程的个数是固定的。不会导致内存问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2234989.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JAVA毕业设计】基于Vue和SpringBoot的墙绘产品展示交易平台

本文项目编号 T 049 &#xff0c;文末自助获取源码 \color{red}{T049&#xff0c;文末自助获取源码} T049&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

在全域数据整合过程中,如何确保数据的一致性和准确性

在全域数据整合过程中&#xff0c;确保数据的一致性和准确性是至关重要的&#xff0c;这不仅关系到数据分析结果的可靠性&#xff0c;还直接影响到企业决策的科学性和有效性。Aloudata AIR 逻辑数据编织平台通过数据虚拟化技术&#xff0c;为这一过程提供了强有力的支持。以下是…

w024基于SpringBoot的企业客户管理系统的设计与实现

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0…

element-plus快速实现table组件表头吸顶和滚动条吸底

使用devecoui 组件快速实现 element-plus table 组件&#xff0c;表头吸顶和滚动条吸底&#xff0c;且完美实现固定表头的吸顶效果&#xff0c;同时devecoui组件还可以实现&#xff0c;查询列表的快速开发&#xff0c;里面集成了更多的功能。详细功能请前往&#xff1a;添加链接…

C++STL容器详解——list

目录 一.list 1.list的介绍 2.为什么会有list? 二.list的常见接口 1.list的构造函数 2.list的遍历 3.迭代器类型 4.list的头插头删和尾插尾删 5.list任意位置的插入和删除 6.list的sort()及reverse() 7.迭代器失效 三.整体代码 一.list 1.list的介绍 list的文档说…

服务器数据恢复—EVA存储故障导致上层应用不可用的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台EVA某型号控制器EVA扩展柜FC磁盘。 服务器存储故障&检测&#xff1a; 磁盘故障导致该EVA存储中LUN不可用&#xff0c;导致上层应用无法正常使用。 服务器存储数据恢复过程&#xff1a; 1、将所有磁盘做好标记后从扩展柜中取出。硬…

hf_transformers

强者自定义&#xff0c;弱者用默认&#xff0c;傻逼不看说明书 1. 2.在 model.generate()里填参数&#xff0c;默认为20个新token generated_ids model.generate(**model_inputs, max_new_tokens50) 3. 默认情况下&#xff0c;除非在GenerationConfig文件中指定&#xff0…

Sigrity Power SI 3D-EM Inductance Extraction模式如何进行电感的提取操作指导(一)

Sigrity Power SI 3D-EM Inductance Extraction模式如何进行电感的提取操作指导(一) Sigrity Power SI使用3D-EM Inductance Extraction模式可以进行电感的提取,以下图为例 2D 视图 <

学习记录:js算法(八十七):单词搜索

文章目录 单词搜索思路一思路二 单词搜索 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其…

应急救援无人车:用科技守护安全!

一、核心功能 快速进入危险区域&#xff1a; 救援无人车能够迅速进入地震、火灾、洪水等自然灾害或重大事故的现场&#xff0c;这些区域往往对人类救援人员构成极大威胁。 通过自主导航和环境感知技术&#xff0c;无人车能够避开危险区域&#xff0c;确保自身安全的同时&…

辩论赛——动态IP与静态IP的巅峰对决

尊敬的各位观众&#xff1a; 大家好&#xff01;欢迎来到今天的演说舞台&#xff0c;我是主持人小蝌蚪。今天&#xff0c;我们将见证一场精彩绝伦的辩论&#xff0c;辩论的双方是动态IP和静态IP。他们将围绕各自的优缺点展开激烈的辩论&#xff0c;为我们揭示代理IP世界中的奥…

红米k70怎么设置「短信通知」在锁屏时隐藏内容,不锁屏时不隐藏内容

红米 K70 设置短信通知在锁屏时隐藏内容、不锁屏时不隐藏内容&#xff0c;可以按照以下步骤进行操作&#xff1a; 打开手机设置&#xff1a;在主屏幕上找到并点击 “设置” 图标&#xff0c;进入手机设置页面。进入通知与控制中心&#xff1a;在设置页面中&#xff0c;找到并点…

【计算机网络】零碎知识点(易忘 / 易错)总结回顾

一、计算机网络的发展背景 1、网络的定义 网络是指将多个计算机或设备通过通信线路、传输协议和网络设备连接起来&#xff0c;形成一个相互通信和共享资源的系统。 2、局域网 LAN 相对于广域网 WAN 而言&#xff0c;局域网 LAN 主要是指在相对较小的范围内的计算机互联网络 …

Python 在PDF中绘制形状(线条、矩形、椭圆形等)

在PDF中绘制图形可以增强文档的视觉效果。通过添加不同类型的形状&#xff0c;如实线、虚线、矩形、圆形等&#xff0c;可以使文档更加生动有趣&#xff0c;提高读者的阅读兴趣。这对于制作报告、演示文稿或是教材特别有用。本文将通过以下几个示例介绍如何使用Python 在PDF中绘…

三菱MR-J4伺服绝对位置检测系统

发生[AL.25 绝对位置丢失]或[AL.E3 绝对位置计数器警告]时&#xff0c;必须再次进行原点设定。否则可能会因此发生预料之外的动作。 概要 常规运行时&#xff0c;编码器由检测1转内位置的编码器和检测转数的旋转累计计数器构成。 绝对位置检测系统与伺服系统控制器电源…

程序员行业会因此受到什么冲击?

床铺再次当选&#xff0c;会对两家关系产生深远影响。在此篇博客中&#xff0c;我们将探讨床铺的政策对我们外贸、就业、留学以及特别是互联网产业和我们程序员职业的潜在影响。 关系趋紧&#xff1a;摩擦可能会更多 床铺在其任期期间对我们施加了诸多贸易税&#xff0c;采取…

Edge浏览器打开PDF无法显示电子签章

Edge浏览器打开PDF无法显示电子签章 直接说处理方式 直接说处理方式 浏览器地址栏&#xff0c;输入 edge://flags/搜索&#xff1a;pdf禁用&#xff1a;New PDF Viewer效果如下

02- 模块化编程-006 ADC0808数码显示对比

1、ADC0808 芯片介绍 ADC0808是一款集成的CMOS设备&#xff0c;包含8位模拟至数字转换器、8通道多路复用器和与微处理器兼容的控制逻辑。8位A/D转换器采用逐次逼近作为转换技术。转换器特点包括高阻抗斩波稳定比较器、256R电压分压器、模拟开关树和逐次逼近寄存器。8通道多路复…

计算机体系结构之多级缓存、缓存miss及缓存hit(二)

前面章节《计算机体系结构之缓存机制原理及其应用&#xff08;一&#xff09;》讲了关于缓存机制的原理及其应用&#xff0c;其中提出了多级缓存、缓存miss以及缓存hit的疑问。故&#xff0c;本章将进行展开讲解&#xff0c; 多级缓存、缓存miss以及缓存hit存在的意义是为了保持…

scala set训练

Set实训内容&#xff1a; 1.创建一个可变Set&#xff0c;用于存储图书馆中的书籍信息&#xff08;假设书籍信息用字符串表示&#xff09;&#xff0c;初始化为包含几本你喜欢的书籍 2.添加两本新的书籍到图书馆集合中&#xff0c;使用操作符 3.删除一本图书馆集合中的书籍&…