从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

news2024/10/2 20:25:36

从’discover.partitions’='true’分析Hive的TBLPROPERTIES

前言

Hive3.1.2先建表:

show databases ;

use db_lzy;

show tables ;

create external table if not exists test_external_20230502(
    id int,
    comment1 string,
    comment2 string
)
stored as parquet 
;

create external table if not exists test_external_par_20230502(
    id int,
    comment1 string,
    comment2 string
)
partitioned by (
    dt string
    )
stored as parquet
;

然后查看建表语句:

show create table test_external_20230502;
show create table test_external_par_20230502;

可以看到:

hive (db_lzy)> show create table test_external_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'transient_lastDdlTime'='1683028671')
Time taken: 0.181 seconds, Fetched: 15 row(s)
hive (db_lzy)>
             > show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'transient_lastDdlTime'='1683028770')
Time taken: 0.121 seconds, Fetched: 17 row(s)
hive (db_lzy)>

可以看到都有TBLPROPERTIES表属性的信息,这个信息在写DDL时并没有指定,显然是自动生成的。

在CDP建表有时候会默认添加一个表属性:

'discover.partitions'='true'

这个表属性用于表分区的自动发现。接下来就从这个属性入手,分析Hive的TBLPROPERTIES。

走JDBC增加表属性

先写个简单的JDBC:

package com.zhiyong;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

/**
 * @program: zhiyong_study
 * @description: 测试Hive的TblProperties
 * @author: zhiyong
 * @create: 2023-05-02 19:31
 **/
public class TblpropertiesTest1 {
    public static void main(String[] args) throws Exception{
        final String HIVE_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
        final String HIVE_HOST = "192.168.88.101";
        final String HIVE_PORT = "10000";
        final String HIVE_USER = "root";
        final String HIVE_PASSWORD = "123456";

        String HIVE_URL = "jdbc:hive2://" + HIVE_HOST + ":" + HIVE_PORT + "/default";

        Connection conn = null;
        Statement stmt = null;
        ResultSet resultSet = null;

        String sql_Alter_TBLPROPERTIES = "alter table db_lzy.test_external_par_20230502 set TBLPROPERTIES('zhiyong_key1' = 'zhiyong_value1')";


        Class.forName(HIVE_JDBC_DRIVER);
        conn = DriverManager.getConnection(HIVE_URL, HIVE_USER, HIVE_PASSWORD);
        stmt = conn.createStatement();

        System.out.println("修改表属性:");
        stmt.execute(sql_Alter_TBLPROPERTIES);

        }
    }
}

打断点调试来查看调用的堆栈,可以发现从HiveConnectionTCLIServiceTBinaryProtocolTSocket,调用过程和之前的:

https://lizhiyong.blog.csdn.net/article/details/129742904

分析的基本一致,底层就是走了Thrift协议。那么从语言无关的ssql-Client这端显然是看不出啥端倪了。更多有用的源码信息是在server端,暂时先不看。

此时:

hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_modified_by'='hadoop',
  'last_modified_time'='1683030293',
  'transient_lastDdlTime'='1683030293',
  'zhiyong_key1'='zhiyong_value1')
Time taken: 0.155 seconds, Fetched: 20 row(s)
hive (db_lzy)>

可以看到已经将配置项写入了表属性。

show databases ;
use db_hive_metastore;
show tables ;
select * from TBLS;

显示:

在这里插入图片描述

从表名可以找到刚才修改了表属性的就是这个TBL_ID=17的表。

select * from TABLE_PARAMS
where TBL_ID=17;

显示:

在这里插入图片描述

显然set的表属性就记录在MetaStore元数据库【笔者用的Apache3.1.2是存到了MySQL】的TABLE_PARAMS表中。

可以看到这个表还记录了表的外部属性。所以SQL Boy们删除表数据时有时候会先执行:

alter table 库名.表名 set TBLPROPERTIES ('EXTERNAL'='FALSE');

比较奇怪的是在元数据表中没找到:

alter table 库名.表名 set TBLPROPERTIES  ("external.table.purge"="true");

这个表属性对应的配置项。

但是在HiveStrictManagedMigration.java

boolean migrateToExternalTable(Table tableObj, TableType tableType) throws HiveException {
  String msg;
  switch (tableType) {
  case MANAGED_TABLE:
    if (AcidUtils.isTransactionalTable(tableObj)) {
      msg = createExternalConversionExcuse(tableObj,
          "Table is a transactional table");
      LOG.debug(msg);
      return false;
    }
    LOG.info("Converting {} to external table ...", getQualifiedName(tableObj));
    if (!runOptions.dryRun) {
      String command = String.format(
          "ALTER TABLE %s SET TBLPROPERTIES ('EXTERNAL'='TRUE', 'external.table.purge'='true')",
          getQualifiedName(tableObj));
      runHiveCommand(command);
    }
    return true;
  case EXTERNAL_TABLE:
    msg = createExternalConversionExcuse(tableObj,
        "Table is already an external table");
    LOG.debug(msg);
    break;
  default: // VIEW/MATERIALIZED_VIEW
    msg = createExternalConversionExcuse(tableObj,
        "Table type " + tableType + " cannot be converted");
    LOG.debug(msg);
    break;
  }
  return false;
}

可以看到和这个配置项相关的内容。说明在Hive3.1.2中该配置项有效。并且可以看出通过purge这个属性,使得外部表可以挂载文件。。。

手动新增一条:

在这里插入图片描述

此时:

hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_modified_by'='hadoop',
  'last_modified_time'='1683030293',
  'transient_lastDdlTime'='1683030293',
  'zhiyong_key1'='zhiyong_value1',
  'zhiyong_key2'='zhiyong_value2')
Time taken: 0.098 seconds, Fetched: 21 row(s)
hive (db_lzy)>

重新拿到的建表DDL已经出现了这个配置。。。

至此就可以明白为神马Spark使用Overwrite模式写入时会重建表,并且增加一大坨的spark.sql相关的配置。。。也能够明白为神马IceBerg和Hudi会选用Hive的MetaStore做元数据存储。。。也能够明白为神马Flink会使用Hive的MetaStore做Catalog。。。

就地取材,拿这个表当kv键值对存储,就可以保存表的多种属性,存取都很方便。并且可以直接复用Hive的MetaStore,解析别的表的元数据也很方便。走Thrift来进行rpc远程调用和、走JDBC读源数据表,都可以无缝衔接Hive生态圈的多种组件。这可能也是云原生时代Hadoop和Yarn被OSS和K8S暴打,但是Hive还依旧霸气的原因。。。存储、运算、资源调度都可以用别的方式取代,但是文件映射表的MetaStore一时半会儿还没有太好的替代品。

在Github查找

笔者在CDP7.1.5建表时就见识过:

'discover.partitions'='true'

这个属性了。。。所以一定不是无中生有的。。。既然Apache3.1.2中找不到,那就到Github查看是否别的版本有它。。。

PartitionManagementTask类

在这里插入图片描述

在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java

可以看到:

package org.apache.hadoop.hive.metastore;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.TimeValidator;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * Partition management task is primarily responsible for partition retention and discovery based on table properties.
 *
 * Partition Retention - If "partition.retention.period" table property is set with retention interval, when this
 * metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
 * Dropping partitions after retention period will also delete the data in that partition.
 *
 * Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location
 * for newly added partition directories and create partition objects if it does not exist. Also, if partition object
 * exist and if corresponding directory does not exists under table location then the partition object will be dropped.
 *
 */
public class PartitionManagementTask implements MetastoreTaskThread {
  private static final Logger LOG = LoggerFactory.getLogger(PartitionManagementTask.class);
  public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions";
  public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period";
  private static final Lock lock = new ReentrantLock();
  // these are just for testing
  private static int completedAttempts;
  private static int skippedAttempts;

  private Configuration conf;

  @Override
  public long runFrequency(TimeUnit unit) {
    return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);
  }

  @Override
  public void setConf(Configuration configuration) {
    // we modify conf in setupConf(), so we make a copy
    conf = new Configuration(configuration);
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  private static boolean partitionDiscoveryEnabled(Map<String, String> params) {
    return params != null && params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
            params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");
  }

  @Override
  public void run() {
    if (lock.tryLock()) {
      skippedAttempts = 0;
      String qualifiedTableName = null;
      IMetaStoreClient msc = null;
      try {
        msc = new HiveMetaStoreClient(conf);
        String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
        String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
        String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
        String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
        Set<String> tableTypesSet = new HashSet<>();
        for (String type : tableTypes.split(",")) {
          try {
            tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
          } catch (IllegalArgumentException e) {
            // ignore
            LOG.warn("Unknown table type: {}", type);
          }
        }
        // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
        // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
        if (tableTypesSet.isEmpty()) {
          LOG.info("Skipping partition management as no table types specified");
          return;
        }

        StringBuilder filterBuilder = new StringBuilder()
            .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS)
            .append("discover__partitions").append(" like \"true\" ");
        boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());
        boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());
        if (!managed && external) {
          // only for external tables
          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
              .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");
        } else if (managed && !external) {
          // only for managed tables
          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
              .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");
        }
        if (!tablePattern.trim().isEmpty()) {
          filterBuilder.append(" and ")
              .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)
              .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\"");
        }

        List<String> databases = msc.getDatabases(catalogName, dbPattern);
        List<TableName> candidates = new ArrayList<>();
        for (String db : databases) {
          Database database = msc.getDatabase(catalogName, db);
          if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
            LOG.debug("Skipping table under database: {}", db);
            continue;
          }
          if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
            LOG.info("Skipping table belongs to database {} being failed over.", db);
            continue;
          }
          List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db,
              filterBuilder.toString(), -1);
          tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
        }

        if (candidates.isEmpty()) {
          LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
          return;
        }

        // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also
        // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also
        // defeats the purpose of thread pooled msck repair.
        int threadPoolSize = MetastoreConf.getIntVar(conf,
          MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);
        final ExecutorService executorService = Executors
          .newFixedThreadPool(Math.min(candidates.size(), threadPoolSize),
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
        CountDownLatch countDownLatch = new CountDownLatch(candidates.size());
        LOG.info("Found {} candidate tables for partition discovery", candidates.size());
        setupMsckPathInvalidation();
        Configuration msckConf = Msck.getMsckConf(conf);
        for (TableName table : candidates) {
          // this always runs in 'sync' mode where partitions can be added and dropped
          MsckInfo msckInfo = new MsckInfo(table.getCat(), table.getDb(), table.getTable(),
            null, null, true, true, true, -1);
          executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch));
        }
        countDownLatch.await();
        executorService.shutdownNow();
      } catch (Exception e) {
        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
      } finally {
        if (msc != null) {
          msc.close();
        }
        lock.unlock();
      }
      completedAttempts++;
    } else {
      skippedAttempts++;
      LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);
    }
  }

  public static long getRetentionPeriodInSeconds(final Table table) {
    String retentionPeriod;
    long retentionSeconds = -1;
    if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
      retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);
      if (retentionPeriod.isEmpty()) {
        LOG.warn("'{}' table property is defined but empty. Skipping retention period..",
          PARTITION_RETENTION_PERIOD_TBLPROPERTY);
      } else {
        try {
          TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS);
          timeValidator.validate(retentionPeriod);
          retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, TimeUnit.SECONDS, TimeUnit.SECONDS);
        } catch (IllegalArgumentException e) {
          LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod);
          // will return -1
        }
      }
    }
    return retentionSeconds;
  }

  private void setupMsckPathInvalidation() {
    // if invalid partition directory appears, we just skip and move on. We don't want partition management to throw
    // when invalid path is encountered as these are background threads. We just want to skip and move on. Users will
    // have to fix the invalid paths via external means.
    conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip");
  }

  private static class MsckThread implements Runnable {
    private MsckInfo msckInfo;
    private Configuration conf;
    private String qualifiedTableName;
    private CountDownLatch countDownLatch;

    MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) {
      this.msckInfo = msckInfo;
      this.conf = conf;
      this.qualifiedTableName = qualifiedTableName;
      this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
      try {
        Msck msck = new Msck( true, true);
        msck.init(conf);
        msck.repair(msckInfo);
      } catch (Exception e) {
        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
      } finally {
        // there is no recovery from exception, so we always count down and retry in next attempt
        countDownLatch.countDown();
      }
    }
  }

  @VisibleForTesting
  public static int getSkippedAttempts() {
    return skippedAttempts;
  }

  @VisibleForTesting
  public static int getCompletedAttempts() {
    return completedAttempts;
  }
}

功夫不负有心人,终于在这个类找到了这个表属性。这个任务显然就是守护进程,定时刷新Hive表的MetaStore,从而自动删除多余的分区【对应HDFS只有路径没有文件,但是存在于MetaStore的分区】并且添加缺失的分区【对应HDFS只有路径和文件,但是不存在于MetaStore的分区】。

在获取到同步锁后,就会跑MSCK命令。

runFrequency方法中获取到的刷新频率,显然就是线程池的吊起频率。。。

package org.apache.hadoop.hive.metastore;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * Any task that will run as a separate thread in the metastore should implement this
 * interface.
 */
public interface MetastoreTaskThread extends Configurable, Runnable {

  /**
   * Get the frequency at which the thread should be scheduled in the thread pool.  You must call
   * {@link #setConf(Configuration)} before calling this method.
   * @param unit TimeUnit to express the frequency in.
   * @return frequency
   */
  long runFrequency(TimeUnit unit);
}

可惜在3.1.2的Hive中还没有这个类:

在这里插入图片描述

那么配置类中也就找不到对应的配置项:

在这里插入图片描述

还是要去Github找新一点的版本。。。

MetastoreConf类

在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java

可以看到:

在这里插入图片描述

// Partition management task params
    PARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency",
      "metastore.partition.management.task.frequency",
      6, TimeUnit.HOURS, "Frequency at which timer task runs to do automatic partition management for tables\n" +
      "with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" +
      "discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" +
      "management will look for partitions in table location and add partitions objects for it in metastore.\n" +
      "Similarly if partition object exists in metastore and partition location does not exist, partition object\n" +
      "will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" +
      "is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" +
      "than the specified retention period will be automatically dropped from metastore along with the data."),

显然这个配置项也不是出现在Hive3.1.2,而是更新的版本!!!而且,默认值是6小时???

HiveMetaStore

HiveMetaStore中:

private static void startRemoteOnlyTasks(Configuration conf) throws Exception {
    if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
        ThreadPool.initialize(conf);
        Collection<String> taskNames = MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_REMOTE_ONLY);
        Iterator var2 = taskNames.iterator();

        while(var2.hasNext()) {
            String taskName = (String)var2.next();
            MetastoreTaskThread task = (MetastoreTaskThread)JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
            task.setConf(conf);
            long freq = task.runFrequency(TimeUnit.MILLISECONDS);
            ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
        }

    }
}

可以看到会按照频率来使用线程池调度这些任务。单位是毫秒。。。

COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on", "hive.compactor.initiator.on", false, "Whether to run the initiator and cleaner threads on this metastore instance or not.\nSet this to true on one instance of the Thrift metastore service as part of turning\non Hive transactions. For a complete list of parameters required for turning on\ntransactions, see hive.txn.manager."),

又是和:

/**
 * An interface that allows Hive to manage transactions.  All classes
 * implementing this should extend {@link HiveTxnManagerImpl} rather than
 * implementing this directly.
 */
public interface HiveTxnManager {
}

有关的。显然这是保证Hive事务的接口。。。

至此,可知Hive会在MetastoreConf类写一些编码级的配置【堆内存中,可以在session会话中set部分值】,并且在元数据表TABLE_PARAMS固化一些kv键值对配置。这样,Hive的MetaStore守护进程就可以从这2种方式存储的kv键值根据key拿到value,进而执行事务的操作。

CDP7.1.5【或者更早的发行版】就支持的配置项,对应Apache这边需要alpha4.0才支持。。。白piao版总是要比缴纳了保护费的企业版差劲一些,不管是稳定性还是功能性。。。Kylin4.0企业版的功能在开源的5.0姗姗来迟。。。也是这道理。。。甜点要先服务付费客户,卖剩下的给白piao的客户免费品尝。。。不过能白piao就不错了,还要啥自行车。。。

自动分区发现的使用

Apache版本

按照Apache Hive的官方文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-DiscoverPartitions

在这里插入图片描述

这是Hive4.0的特性。

Discover Partitions

Table property “discover.partitions” can now be specified to control automatic discovery and synchronization of partition metadata in Hive Metastore.

When Hive Metastore Service (HMS) is started in remote service mode, a background thread (PartitionManagementTask) gets scheduled periodically every 300s (configurable via metastore.partition.management.task.frequency config) that looks for tables with “discover.partitions” table property set to true and performs MSCK REPAIR in sync mode. If the table is a transactional table, then Exclusive Lock is obtained for that table before performing MSCK REPAIR. With this table property, “MSCK REPAIR TABLE table_name SYNC PARTITIONS” is no longer required to be run manually.

Version information

As of Hive 4.0.0 (HIVE-20707).

对应这个issue:

在这里插入图片描述

https://issues.apache.org/jira/browse/HIVE-20707。

Partition Retention

Table property “partition.retention.period” can now be specified for partitioned tables with a retention interval. When a retention interval is specified, the background thread running in HMS (refer Discover Partitions section), will check the age (creation time) of the partition and if the partition’s age is older than the retention period, it will be dropped. Dropping partitions after retention period will also delete the data in that partition. For example, if an external partitioned table with ‘date’ partition is created with table properties “discover.partitions”=“true” and “partition.retention.period”=“7d” then only the partitions created in last 7 days are retained.

Version information

As of Hive 4.0.0 (HIVE-20707).

这个特性是分区保留。。。

CDP版本

也就只能在CDP去使用这个特性了。。。Alpha的Apache Hive4.0估计一时半会儿也没什么人去用。

在CDP中使用Hive时,参照官方的说法:https://docs.cloudera.com/runtime/7.0.3/using-hiveql/topics/hive-automate-msck.html

Automated partition discovery and repair is useful for processing log data, and other data, in Spark and Hive catalogs. You learn how to set the partition discovery parameter to suit your use case. An aggressive partition discovery and repair configuration can delay the upgrade process.

Apache Hive can automatically and periodically discover discrepancies in partition metadata in the Hive metastore and in corresponding directories, or objects, on the file system. After discovering discrepancies, Hive performs synchronization.

The discover.partitions table property enables and disables synchronization of the file system with partitions. In external partitioned tables, this property is enabled (true) by default when you create the table. To a legacy external table (created using a version of Hive that does not support this feature), you need to add discover.partitions to the table properties to enable partition discovery.

By default, the discovery and synchronization of partitions occurs every 5 minutes. This is too often if you are upgrading and can result in the Hive DB being queried every few milliseconds, leading to performance degradation. During upgrading the high frequency of batch routines dictates running discovery and synchronization infrequently, perhaps hourly or even daily. You can configure the frequency as shown in this task.

  1. Assuming you have an external table created using a version of Hive that does not support partition discovery, enable partition discovery for the table.

    ALTER TABLE exttbl SET TBLPROPERTIES ('discover.partitions' = 'true');
    
  2. In Cloudera Manager, click Clusters > Hive > Configuration, search for Hive Server Advanced Configuration Snippet (Safety Valve) for hive-site.xml.

  3. Add the following property and value to hive-site.xml: Property: metastore.partition.management.task.frequency Value: 600.

    This action sets synchronization of partitions to occur every 10 minutes expressed in seconds. If you are upgrading, consider running discovery and synchonization once every 24 hours by setting the value to 86,400 seconds.

也就是说,CDP的Hive默认外部的分区表是开启了分区自动发现特性,并且默认是5分钟吊起一次。这里设置时单位是秒。至少CDP7.0.3就已经可用了。。。

使用

如果不符合期望,可以手动修改:

ALTER TABLE 表名 SET TBLPROPERTIES ('discover.partitions'='true');
ALTER TABLE 表名 SET TBLPROPERTIES ('metastore.partition.management.task.frequency' = 600);

这。。。是写到元数据表做持久化的,和Apache的堆内存对象又不同了。。。

不过按照:

/**
 * Get the variable as a long indicating a period of time
 * @param conf configuration to retrieve it from
 * @param var variable to retrieve
 * @param outUnit Timeout to return value in
 * @return value, or default value if value not in config file
 */
public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
  assert var.defaultVal.getClass() == TimeValue.class;
  String val = conf.get(var.varname);

  if (val == null) {
    // Look for it under the old Hive name
    val = conf.get(var.hiveName);
  }

  if (val != null) {
    return convertTimeStr(val, ((TimeValue)var.defaultVal).unit, outUnit);
  } else {
    return outUnit.convert(((TimeValue)var.defaultVal).val, ((TimeValue)var.defaultVal).unit);
  }
}

如果堆内存没有,就去配置中拿,也没啥毛病。。。

别的问题

自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务立即被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。

所以:https://lizhiyong.blog.csdn.net/article/details/127680034

笔者之前做平台开发时,还需要+一个msck或者alter table add partition,只有这条命令也刷完了,才能保证最终一致性!!!

别的问题

自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。

所以:https://lizhiyong.blog.csdn.net/article/details/127680034

笔者之前做平台开发时,还需要+一个msck或者alter table add partition,只有这条命令也刷完了,才能保证最终一致性!!!

总结

通过一顿操作猛如虎。。。笔者找到了Hive的MetaStore存储的元数据,并且分析出了和表配置相关的运行机理。恰巧,也意识到了白piao版在功能上的滞后性。。。还是有所收获的。。。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130468723

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/484535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言通过控制台命令行传入参数

Linux 与 windows运行c语言程序 切换到对应目录下 1. gcc hello.c -o hello 2.Linux: ./hello Windows: hello.exe int main(){}默认无参数 但在一些情况下想要直接通过在上述过程中第二步就传入参数而不是使用scanf..之类的输入语句就需要使用有参数的main方法: int main() {…

Docker--harbor私有库部署与管理

目录 一、本地私有仓库 搭建本地私有仓库 Docker容器的重启策略 二、Harbor 1、什么是Harbor 2、Harbor特性 3、Harbor的构成 三、Harbor部署 实验步骤 1、安装Docker-Compose服务 2、部署Harbor服务 1、下载或上传Harbor安装程序 2、修改Harbor安装的配置文件 3、…

基于TI板MSP430 玩转PID

文章目录 前言一、整体框架二、PID算法1. 位置式PID2. 增量式PID3. 比例外置式PID4. 积分限幅、输出限幅和PID参数整定5. 位置式PID和增量式PID的区别及抉择 三、初值获取1. 定时器输入捕获2. 外部中断3. ADC采样 前言 具体啥是PID&#xff0c;我这里不做介绍&#xff0c;网上…

SpringMVC(后)SSM整合

10、文件上传和下载 10.1、文件下载 ResponseEntity用于控制器方法的返回值类型&#xff0c;该控制器方法的返回值就是响应到浏览器的响应报文 使用ResponseEntity实现下载文件的功能 RequestMapping("/testDown") public ResponseEntity<byte[]> testResp…

【Hello Algorithm】复杂度 二分法

作者&#xff1a;小萌新 专栏&#xff1a;算法 作者简介&#xff1a;大二学生 希望能和大家一起进步 本篇博客简介&#xff1a;介绍算法的复杂度 对数器和二分法 复杂度 对数器 二分法 复杂度常数时间操作非常数时间操作时间复杂度空间复杂度 二分法有序数组中找一个值寻找有序…

树的存储和遍历

文章目录 6.5 树与森林6.5.1 树的存储结构1. 双亲表示法(顺序存储结构)2 孩子链表表示法3 孩子兄弟表示法(二叉树表示法) 6.5.2 森林与二叉树的转换1 树转换成二叉树2 二叉树转换成树3 森林转换成二叉树4 二叉树转换成森林 6.5.3 树和森林的遍历1. 树的遍历2. 森林的遍历 6.6 赫…

数据库篇:表设计、创建编辑以及导出导入数据

微信小程序云开发实战-答题积分赛小程序系列 数据库篇:表设计、添加编辑以及导出导入数据 原型: 最终实现界面截图:

Moqui REST API的两种实现方法

实现Restful API的方法 实现REST API有两种方法。第一种&#xff1a; The main tool for building a REST API based on internal services and entity operations is to define resource paths in a Service REST API XML file such as the moqui.rest.xml file in moqui-fr…

chatGPT国内可用镜像源地址

chatGPT国内可用镜像源地址 彷丶徨丶 关注 IP属地: 湖北 0.811 2023.03.15 16:02:16 字数 1,152 阅读 249,582 如果你正在尝试访问Chatgpt网站&#xff0c;但由于某些原因无法访问该网站&#xff0c;那么你可以尝试使用Chatgpt的国内镜像网站。以下是一些Chatgpt国内镜像网站的…

java基础知识——27.动态代理

这篇文章&#xff0c;我们来学一下java的动态代理 目录 1.动态代理的介绍 2.具体的代码实现 1.动态代理的介绍 动态代理&#xff1a;无侵入式的额外给代码增加功能 很不好理解&#xff0c;下面&#xff0c;我们通过两个例子来说明一下什么是动态代理&#xff1a; 例一&a…

shell编程 -- 基础

shell是一个命令行解释器&#xff0c;它接收应用程序/用户命令&#xff0c;然后调用操作系统内核。 linux笔记 链接&#xff1a;https://pan.baidu.com/s/16GZCPfUTRzUqIyGnYwPuUg?pwds5xt 提取码&#xff1a;s5xt 脚本执行 采用bash或者sh脚本的相对路径或绝对路径&#x…

TikTok跨境电商如何选品和营销?

鑫优尚电子商务&#xff1a;TikTok目前发展飞速&#xff0c;全球的MAU是5.6亿。现在作为全球炙手可热的短视频平台&#xff0c;全球流量相当庞大&#xff0c;覆盖75个语种、全球150个国家和地区。 对于从事跨境电商行业的人来说&#xff0c;又怎能错过一个流量这么好的平台呢&a…

ChatGPT注册详细步骤教程-ChatGPT申请教程

注册chatGPT账号的详细经验教程 注册ChatGPT账号是使用这一自然语言生成技术的关键步骤。下面是注册ChatGPT账号的详细经验教程&#xff1a; 访问OpenAI注册页面 在Web浏览器中打开OpenAI注册页面。 2.输入个人信息 在注册页面上&#xff0c;您需要提供以下个人信息&#…

树莓派 二维云台调零控制

目录 舵机的工作原理 案例程序 要求&#xff1a; 程序&#xff1a; 二维云台是通过IIC总线进行控制的&#xff0c;我们可以通过窗口命令输入&#xff1a;i2cdetect -y 1来检测IIC总线是否连接正常。 当有40显示的时候就说明IIC总线正常。 操控舵机我们需要一个PCA9685的模…

【移动端网页布局】流式布局案例 ④ ( Banner 栏制作 | 固定定位 | 标准流 | 百分比宽度设置 )

文章目录 一、Banner 栏样式及核心要点1、实现效果2、核心要点分析 二、完整代码示例1、HTML 标签结构2、CSS 样式3、展示效果 一、Banner 栏样式及核心要点 1、实现效果 在上一篇博客中 , 实现了 搜索栏 , 在本篇博客开始实现 搜索栏 下方的 Banner 栏 ; 2、核心要点分析 Bann…

OpenCV实战(21)——基于随机样本一致匹配图像

OpenCV实战&#xff08;21&#xff09;——基于随机样本一致匹配图像 0. 前言1. 基于随机样本一致匹配图像1.1 计算基本矩阵与匹配集1.2 随机样本一致算法 2. 算法优化2.1 优化基本矩阵2.2 优化匹配集 3. 完整代码小结系列链接 0. 前言 当两台摄像机拍摄同一场景时&#xff0c…

【Vue面试题】Vue2.x生命周期?

文章目录 1.有哪些生命周期&#xff08;系统自带&#xff09;?beforeCreate( 创建前 )created ( 创建后&#xff09;beforeMount (挂载前)mount (挂载后)beforeUpdate (更新前)updated (更新后)beforeDestroy&#xff08;销毁前&#xff09;destroy&#xff08;销毁后&#xf…

突发:深度学习之父Hinton为了警告AI的风险,不惜从谷歌离职!

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 今天&#xff0c;AI领域发生了一件标志性事件。那就是Hinton 为了能更自由的表达对AI失控的担忧&#xff0c;不惜从工作了10年的谷歌离职&#xff0c;可见他真的深切的感受到了危机。 不久前&#xff0c;纽约时报的一篇采访…

干货! ICLR:将语言模型绑定到符号语言中个人信息

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; ╱ 作者简介╱ 承洲骏 上海交通大学硕士生&#xff0c;研究方向为代码生成&#xff0c;目前在香港大学余涛老师的实验室担任研究助理。 个人主页&#xff1a;http://blankcheng.github.io 谢天宝 香港大学一年级…

武忠祥老师每日一题||不定积分基础训练(六)

解法一&#xff1a; 求出 f ( x ) , 进而对 f ( x ) 进行积分。 求出f(x),进而对f(x)进行积分。 求出f(x),进而对f(x)进行积分。 令 ln ⁡ x t , 原式 f ( t ) ln ⁡ ( 1 e t ) e t 令\ln xt,原式f(t)\frac{\ln (1e^t)}{e^t} 令lnxt,原式f(t)etln(1et)​ 则 ∫ f ( x ) d…