【kettle】pdi/data-integration 集成kerberos认证连接hdfs、hive或spark thriftserver

news2025/1/6 17:12:21

一、背景

kerberos认证是比较底层的认证,掌握好了用起来比较简单。
kettle完成kerberos认证后会存储认证信息在jvm中,之后直接连接hive就可以了无需提供额外的用户信息。

spark thriftserver本质就是通过hive jdbc协议连接并运行spark sql任务。

二、思路

kettle中可以使用js调用java类的方法。编写一个jar放到kettle的lib目录下并。在启动kettle后会自动加载此jar中的类。编写一个javascript转换完成kerbero即可。

二、kerberos认证模块开发

准备使用scala语言完成此项目。
hadoop 集群版本: cdh-6.2.0
kettle 版本: 8.2.0.0-342

2.1 生成kerberos工具jar包

2.1.1 创建maven项目并编写pom

创建maven项目,这里依赖比较多觉得没用的删掉即可:
注意:这里为了便于管理,很多包都是compile,最后通过maven-assembly-plugin复制到zip文件中!!!

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <scala.version>2.11.12</scala.version>
    <scala.major.version>2.11</scala.major.version>
    <target.java.version>1.8</target.java.version>
    <hadoop.version>3.0.0-cdh6.2.0</hadoop.version>
    <spark.version>2.4.0-cdh6.2.0</spark.version>
    <hive.version>2.1.1-cdh6.2.0</hive.version>
    <zookeeper.version>3.4.5-cdh6.2.0</zookeeper.version>
    <jackson.version>2.14.2</jackson.version>
    <httpclient5.version>5.2.1</httpclient5.version>
  </properties>
  
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-reflect</artifactId>
      <version>${scala.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-compiler</artifactId>
      <version>${scala.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.28</version>
      <scope>provided</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.9.1</version>
      <scope>provided</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.11.1</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.11.1</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.major.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.major.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_${scala.major.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive-thriftserver_${scala.major.version}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>${zookeeper.version}</version>
      <scope>compile</scope>
    </dependency>

    <!-- jackon -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>${jackson.version}</version>
      <scope>compile</scope>
    </dependency>
    
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
      <scope>compile</scope>
    </dependency>
    
    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
      <artifactId>jackson-dataformat-xml</artifactId>
      <version>${jackson.version}</version>
      <scope>compile</scope>
    </dependency>
    
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>${jackson.version}</version>
      <scope>compile</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <version>5.6.2</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_2.11</artifactId>
      <version>3.2.8</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.scalactic</groupId>
      <artifactId>scalactic_2.12</artifactId>
      <version>3.2.8</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.14</version>
      <scope>provided</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>4.5.6</version>
        <configuration>
        </configuration>
        <executions>
          <execution>
            <id>scala-compiler</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compiler</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <!-- disable surefire -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.7</version>
        <configuration>
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <!-- enable scalatest -->
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>2.2.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>WDF TestSuite.txt</filereports>
        </configuration>
        <executions>
          <execution>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <appendAssemblyId>false</appendAssemblyId>
<!--          <descriptorRefs>-->
<!--            <descriptorRef>jar-with-dependencies</descriptorRef>-->
<!--          </descriptorRefs>-->
          <descriptors>
            <descriptor>src/assembly/assembly.xml</descriptor>
          </descriptors>
          <archive>
            <!--          <manifest>-->
            <!--            &lt;!&ndash; 你的mainclass入口,我就是test.scala 在scala文件夹下, 目录就是scr/main/scala &ndash;&gt;-->
            <!--            <mainClass>com.chenxii.myspark.sparkcore.Test</mainClass>-->
            <!--          </manifest>-->
          </archive>
        </configuration>

        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>cloudera</id>
      <name>cloudera</name>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>
</project>

新建一个空白的xml文件,如下图:
在这里插入图片描述
maven-assembly-plugin插件会使用到assembly.xml,粘贴如下内容,至assembly.xml文件中,再修改下 自己的项目groupid

<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
  <id>appserverB</id>
  <formats>
    <format>zip</format>
  </formats>
  <dependencySets>
    <dependencySet>
      <outputDirectory>/ext-lib</outputDirectory>
      <includes>
        <include>org.apache.hadoop:*</include>
        <include>org.apache.hive:*</include>
        <include>org.apache.hive.shims:*</include>
        <include>org.apache.spark:spark-hive-thriftserver_*</include>
        <include>org.apache.zookeeper:*</include>
        <include>org.apache.curator:*</include>
        <include>org.apache.commons:commons-lang3</include>
        <include>org.apache.commons:commons-configuration2</include>
        <include>org.apache.commons:commons-math3</include>
        <include>com.fasterxml.jackson.core:*</include>
        <include>com.fasterxml.jackson.dataformat:*</include>
        <include>com.fasterxml.jackson.module:*</include>
        <include>org.scala-lang:*</include>
        <include>org.apache.thrift:libthrift</include>
        <include>com.thoughtworks.paranamer:paranamer</include>
        <include>com.google.re2j:re2j</include>
        <include>com.fasterxml.woodstox:woodstox-core</include>
        <include>org.codehaus.woodstox:stax2-api</include>
        <include>org.apache.httpcomponents.core5:*</include>
        <include>org.apache.httpcomponents.client5:*</include>
        <include>org.apache.htrace:*</include>
        <include>com.github.rholder:guava-retrying</include>
        <include>org.eclipse.jetty:jetty-util</include>
        <include>org.mortbay.jetty:*</include>
        <include>自己的项目groupid:*</include>
      </includes>
    </dependencySet>
  </dependencySets>
</assembly>
2.1.2 编写类

KerberosConf 暂时没啥用。

case class KerberosConf(principal: String, keyTabPath: String, conf: String="/etc/krb5.conf")

ConfigUtils 类用于生成hadoop 的Configuration,kerberos认证的时候会用到。

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import java.io.FileInputStream
import java.nio.file.{Files, Paths}

object ConfigUtils {

  val LOGGER = org.slf4j.LoggerFactory.getLogger(KerberosUtils.getClass)

  var hadoopConfiguration: Configuration = null
  var hiveConfiguration: Configuration = null
  private var hadoopConfDir: String = null
  private var hiveConfDir: String = null

  def setHadoopConfDir(dir: String): Configuration = {
    hadoopConfDir = dir
    refreshHadoopConfig
  }

  def getHadoopConfDir: String = {
    if (StringUtils.isEmpty(hadoopConfDir)) {
      val tmpConfDir = System.getenv("HADOOP_CONF_DIR")
      if (StringUtils.isNotEmpty(tmpConfDir) && fileOrDirExists(tmpConfDir)) {
        hadoopConfDir = tmpConfDir
      } else {
        val tmpHomeDir = System.getenv("HADOOP_HOME")
        if (StringUtils.isNotEmpty(tmpHomeDir) && fileOrDirExists(tmpHomeDir)) {
          val tmpConfDirLong = s"${tmpHomeDir}/etc/hadoop"
          val tmpConfDirShort = s"${tmpHomeDir}/conf"
          if (fileOrDirExists(tmpConfDirLong)) {
            hadoopConfDir = tmpConfDirLong
          } else if (fileOrDirExists(tmpConfDirShort)) {
            hadoopConfDir = tmpConfDirShort
          }
        }
      }
    }
    LOGGER.info(s"discover hadoop conf from : ${hadoopConfDir}")
    hadoopConfDir
  }

  def getHadoopConfig: Configuration = {
    if (hadoopConfiguration == null) {
      hadoopConfiguration = new Configuration()
      configHadoop()
    }
    hadoopConfiguration
  }

  def refreshHadoopConfig: Configuration = {
    hadoopConfiguration = new Configuration()
    configHadoop()
  }

  def configHadoop(): Configuration = {
    var coreXml = ""
    var hdfsXml = ""
    val hadoopConfDir = getHadoopConfDir
    if (StringUtils.isNotEmpty(hadoopConfDir)) {
      val coreXmlTmp = s"${hadoopConfDir}/core-site.xml"
      val hdfsXmlTmp = s"${hadoopConfDir}/hdfs-site.xml"
      val coreExists = fileOrDirExists(coreXmlTmp)
      val hdfsExists = fileOrDirExists(hdfsXmlTmp)
      if (coreExists && hdfsExists) {
        LOGGER.info(s"discover hadoop conf from hadoop conf dir: ${hadoopConfDir}")
        coreXml = coreXmlTmp
        hdfsXml = hdfsXmlTmp
        hadoopAddSource(coreXml, hadoopConfiguration)
        hadoopAddSource(hdfsXml, hadoopConfiguration)
      }
    }
    LOGGER.info(s"core-site path : ${coreXml}, hdfs-site path : ${hdfsXml}")
    hadoopConfiguration
  }

  def getHiveConfDir: String = {
    if (StringUtils.isEmpty(hiveConfDir)) {
      val tmpConfDir = System.getenv("HIVE_CONF_DIR")
      if (StringUtils.isNotEmpty(tmpConfDir) && fileOrDirExists(tmpConfDir)){
        hiveConfDir = tmpConfDir
      } else {
        val tmpHomeDir = System.getenv("HIVE_HOME")
        if (StringUtils.isNotEmpty(tmpHomeDir) && fileOrDirExists(tmpHomeDir)) {
          val tmpConfDirShort = s"${tmpHomeDir}/conf}"
          if (fileOrDirExists(tmpConfDir)) {
            hiveConfDir = tmpConfDirShort
          }
        }
      }
    }
    LOGGER.info(s"discover hive conf from : ${hiveConfDir}")
    hiveConfDir
  }

  def configHive(): Configuration = {
    if (hiveConfiguration != null) {
      return hiveConfiguration
    } else {
      hiveConfiguration = new Configuration()
    }
    var hiveXml = ""
    val hiveConfDir = getHiveConfDir
    if (StringUtils.isEmpty(hiveConfDir)) {
      val hiveXmlTmp = s"${hiveConfDir}/hive-site.xml"
      val hiveExist = fileOrDirExists(hiveXml)
      if (hiveExist) {
        LOGGER.info(s"discover hive conf from : ${hiveConfDir}")
        hiveXml = hiveXmlTmp
        hadoopAddSource(hiveXml, hiveConfiguration)
      }
    }
    LOGGER.info(s"hive-site path : ${hiveXml}")
    hiveConfiguration
  }

  def getHiveConfig: Configuration = {
    if (hiveConfiguration == null) {
      hiveConfiguration = new Configuration()
      configHive()
    }
    hiveConfiguration
  }

  def refreshHiveConfig: Configuration = {
    hiveConfiguration = new Configuration()
    configHive()
  }

  def hadoopAddSource(confPath: String, conf: Configuration): Unit = {
    val exists = fileOrDirExists(confPath)
    if (exists) {
      LOGGER.warn(s"add [${confPath} to hadoop conf]")
      var fi: FileInputStream = null
      try {
        fi = new FileInputStream(confPath)
        conf.addResource(fi)
        conf.get("$$")
      } finally {
        if (fi != null) fi.close()
      }
    } else {
      LOGGER.error(s"[${confPath}] file does not exists!")
    }
  }

  def toUnixStyleSeparator(path: String): String = {
    path.replaceAll("\\\\", "/")
  }

  def fileOrDirExists(path: String): Boolean = {
    Files.exists(Paths.get(path))
  }
}

KerberosUtils 就是用于认证的类。

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kerby.kerberos.kerb.keytab.Keytab
import org.slf4j.Logger

import java.io.File
import java.net.URL
import java.nio.file.{Files, Paths}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

object KerberosUtils {
  val LOGGER: Logger = org.slf4j.LoggerFactory.getLogger(KerberosUtils.getClass)

  def loginKerberos(krb5Principal: String, krb5KeytabPath: String, krb5ConfPath: String, hadoopConf: Configuration): Boolean = {
    val authType = hadoopConf.get("hadoop.security.authentication")
    if (!"kerberos".equalsIgnoreCase(authType)) {
      LOGGER.error(s"kerberos utils get hadoop authentication type [${authType}] ,not kerberos!")
    } else {
      LOGGER.info(s"kerberos utils get hadoop authentication type [${authType}]!")
    }
    UserGroupInformation.setConfiguration(hadoopConf)
    System.setProperty("java.security.krb5.conf", krb5ConfPath)
    System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
    UserGroupInformation.loginUserFromKeytab(krb5Principal, krb5KeytabPath)
    val user = UserGroupInformation.getLoginUser
    if (user.getAuthenticationMethod == UserGroupInformation.AuthenticationMethod.KERBEROS) {
      val usnm: String = user.getShortUserName
      LOGGER.info(s"kerberos utils login success, curr user: ${usnm}")
      true
    } else {
      LOGGER.info("kerberos utils login failed")
      false
    }

  }

  def loginKerberos(krb5Principal: String, krb5KeytabPath: String, krb5ConfPath: String): Boolean = {
    val hadoopConf = ConfigUtils.getHadoopConfig
    loginKerberos(krb5Principal, krb5KeytabPath, krb5ConfPath, hadoopConf)
  }

  def loginKerberos(kerberosConf: KerberosConf): Boolean = {
    loginKerberos(kerberosConf.principal, kerberosConf.keyTabPath, kerberosConf.conf)
  }

  def loginKerberos(krb5Principal: String, krb5KeytabPath: String, krb5ConfPath: String,hadoopConfDir:String):Boolean={
    ConfigUtils.setHadoopConfDir(hadoopConfDir)
    loginKerberos(krb5Principal,krb5KeytabPath,krb5ConfPath)
  }

  def loginKerberos(): Boolean = {
    var principal: String = null
    var keytabPath: String = null
    var krb5ConfPath: String = null
    val classPath: URL = this.getClass.getResource("/")
    val classPathObj = Paths.get(classPath.toURI)
    var keytabPathList = Files.list(classPathObj).iterator().asScala.toList
    keytabPathList = keytabPathList.filter(p => p.toString.toLowerCase().endsWith(".keytab")).toList
    val krb5ConfPathList = keytabPathList.filter(p => p.toString.toLowerCase().endsWith("krb5.conf")).toList
    if (keytabPathList.nonEmpty) {
      val ktPath = keytabPathList.get(0)
      val absPath = ktPath.toAbsolutePath
      val keytab = Keytab.loadKeytab(new File(absPath.toString))
      val pri = keytab.getPrincipals.get(0).getName
      if (StringUtils.isNotEmpty(pri)) {
        principal = pri
        keytabPath = ktPath.toString
      }
    }
    if (krb5ConfPathList.nonEmpty) {
      val confPath = krb5ConfPathList.get(0)
      krb5ConfPath = confPath.toAbsolutePath.toString
    }
    if (StringUtils.isNotEmpty(principal) && StringUtils.isNotEmpty(keytabPath) && StringUtils.isNotEmpty(krb5ConfPath)) {
      ConfigUtils.configHadoop()
      // ConfigUtils.configHive()
      val hadoopConf = ConfigUtils.hadoopConfiguration
      loginKerberos(principal, keytabPath, krb5ConfPath, hadoopConf)
    } else {
      false
    }
  }
}
2.1.3 编译打包

mvn packagemaven-assembly-plugin会在target/生成出一个zip包。zip包最里层的各种jar就是需要的jar包了,将这些jar包都放到 kettle 的lib目录或自定义的目录(自定义方法请看下文)就好。

注意:
(1)本例中的kettle8.2中的KETTLE_HOME/plugins\pentaho-big-data-plugin\hadoop-configurations 目录下有几个hadoop plugin,在kettle9之前的版本全局只能有一个hadoop,选择使用哪个hadoop需要在文件KETTLE_HOME/plugins\pentaho-big-data-plugin\plugin.propertiesactive.hadoop.configuration=...配置,就是文件夹的名字,比如此次配置hdp30作为hadoop plugin 基础的hadoop版本,注kettle9以后的版本不是这么配置的,这个hadoop版本越接近实际集群的版本越好,kettle每次此启动都尝试加载此目录下的类,不一样也可以!!!
在这里插入图片描述

(2)由于集群版本可能和任何上边的hadoop plugin都不一致,此时需要把集群版本的依赖jar包提前加载,所以需要把相关依赖放在KETTLE_HOME/lib下,如果想放在另设目录比如KETTLE_HOME/ext-lib。win下可以把Spoon.bat中的两处set LIBSPATH=后都追加;..\ext-lib,注意是分号分隔。当然linux下,修改spoon.shLIBPATH=$CURRENTDIR最后都追加:..\ext-lib,注意是冒号分隔,之所有要有..是因为项目启动类是KETTLE_HOME/launcher\launcher.jar中。

在这里插入图片描述

类和包报错说明:
报错一:
kettle8.2报错很难理解,9之后的版本好得多:如下内容实际Watcher类找不到,但实际是有的。

2024/01/03 17:34:01 - spark-read-ha-sample.0 - Error connecting to database: (using class org.apache.hive.jdbc.HiveDriver)
2024/01/03 17:34:01 - spark-read-ha-sample.0 - org/apache/zookeeper/Watcher

报错二:
出现这种问题的是也是java类加载错误,应该是不同的classloader加载类导致不识别。

loader constraint violation: loader (instance of java/net/URLClassLoader) previously initiated loading for a different type with name "org/apache/curator/RetryPolicy"

报错三:
出现此问题,暂时无法解释。

java.lang.IllegalArgumentException: port out of range:-1
        at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
        at java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254)
        at org.apache.zookeeper.client.ConnectStringParser.<init>(ConnectStringParser.java:76)
        at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:447)
        at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:150)
        at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
        at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
        at org.apache.curator.ConnectionState.reset(ConnectionState.java:262)
        at org.apache.curator.ConnectionState.start(ConnectionState.java:109)
        at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:191)
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:259)
        at org.apache.hive.jdbc.ZooKeeperHiveClientHelper.configureConnParams(ZooKeeperHiveClientHelper.java:63)
        at org.apache.hive.jdbc.Utils.configureConnParams(Utils.java:520)
        at org.apache.hive.jdbc.Utils.parseURL(Utils.java:440)
        at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:134)
        at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107)

以上三个问题的解决方法是:
将hadoop,hive,zookeeper 和 curator包都放在lib目录或自定义加载目录!!!虽然KETTLE_HOME\plugins\pentaho-big-data-plugin\hadoop-configurations 目录下选定的hadoop plugin (本例中是hdp30)也有相关依赖,但实际上lib目录和hadoop plugin类不能混用,相关的包必须放在一起,切记,血泪教训!!!不用管lib目录和hadoop plugin中存在不同版本的jar包!!!
由于kettle设计的比较负责支持很多插件,此问题应该是由于不同进程不同类加载器所加载的类不能通用所致。

2.2 启动kettle和类加载说明

debug模式启动:SpoonDebug.bat
在这里插入图片描述如果还想看类加载路径可以在Spoon.bat中的set OPT= 行尾添加jvm选项 "-verbose:class"
如果cmd黑窗口中文乱码可以把SpoonDebug.bat中的 "-Dfile.encoding=UTF-8" 删除即可。
kettle会把所有jar包都缓存,都存储在kettle-home\system\karaf\caches目录下。
日志里打印的所有 bundle数字目录下得jar包都是在缓存目录下。
在这里插入图片描述如果kettle在运行过程中卡掉了,不反应了,八成是因为操作过程中点击了cmd黑窗口,此时在cmd黑窗口内敲击回车,cmd日志就会继续打印,窗口也会恢复响应。

2.3 编写js通过kerberos认证

在这里插入图片描述
配置信息就是填写kerberos的配置。
javascript代码完成kerberos认证。
在这里插入图片描述配置信息内填写如下:
在这里插入图片描述javascript代码内容如下:
kerberos认证中是需要HADOOP_CONF_DIR的,如果调用没有hadoop_conf_dir参数据方法就是去环境变量中取了。
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

// 给类起个别名,java没有这种写法,python有。
var utils = Packages.全类路径.KerberosUtils;
// 使用 HADOOP_CONF_DIR 或 HADOOP_HOME 环境变量,配置登录 Kerberos
var loginRes = utils.loginKerberos(krb5_principal,krb5_keytab,krb5_conf);

// 使用用户提供的 hadoop_conf_dir 登录kerberos
// hadoop_conf_dir 参数可以从上一步获取,也可以直接写死。
// var loginRes = utils.loginKerberos(krb5_principal,krb5_keytab,krb5_conf,hadoop_conf_dir);

添加一个写结果的模块!
在这里插入图片描述
好了,执行启动!
在这里插入图片描述

如果报如下错误,说明kettle没有找到java类,检查类路径和包是否错误!

TypeError: Cannot call property loginKerberos in object [JavaPackage utils]. It is not a function, it is "object". (script#6)

如果打印如下内容,说明执行认证成功了。
在这里插入图片描述

2024/01/02 18:18:04 - 写日志.0 -
2024/01/02 18:18:04 - 写日志.0 - ------------> 行号 1------------------------------
2024/01/02 18:18:04 - 写日志.0 - loginRes = Y

三、包装模块开发

keberos认证会在jvm存储信息,这些信息如果想使用必须前于hive或hadoop任务一个job
结构如下:
在这里插入图片描述kerberos-login 就是刚刚写的转换。

必须如上包装,层数少了,认证不过去!!!

四、连接hdfs

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述如果项目中使用也必须使用前面的包装模块把hadoop任务包在里边!

五、连接hive或者spark thriftserver

连接hive和spark thriftserver是一样的。以下以spark举例说明。
在这里插入图片描述
注意连接hive或者spark之前一定先手动运行下刚刚的kerberos-login认证模块!!!,否则测试连接特征列表都将就失败或报错!!!

4.1 zookeeper的ha方式连接

在这里插入图片描述

# 主机名称:
# 注意这里主机名会后少写一个:2181
zk-01.com:2181,zk-02.com:2181,zk-03.com

# 数据库名称:
# 后边把kerberos连接参数也加上。zooKeeperNamespace 参数从HIVE_HOME/conf/hive-site.xml或SPARK_HOME/conf/hive-site.xml文件获取即可。而serviceDiscoveryMode=zooKeeper是固定写法。
default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=spark2_server

# 端口号:
# 主机名故意少写一个,就在这里补上了。
2181

在这里插入图片描述先手动运行下kerberos认证模块,再测试连接下:
填写完毕后,可以点击特征列表按钮,找到URL项查看格式,应为
jdbc:hive2://zk-01.com:2181,zk-01.com:2181,zk-01.com:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=...
在这里插入图片描述测试连接:
在这里插入图片描述

4.2 单点连接方式

在这里插入图片描述

在这里插入图片描述

# 主机名称
# 就是hive server2 的主机 host,不要写IP

# 数据库名称:
# HIVE_HOME/conf/hive-site.xml或SPARK_HOME/conf/hive-site.xml中找到配置 hive.server2.authentication.kerberos.principal 
# 比如spark/_HOST@XXXXX.COM
# 本质也是在default数据库后边拼接连接字符串
default;principal=spark/_HOST@XXXXX.COM

# 端口号也在SPARK_HOME/conf/hive-site.xml中找到配置hive.server2.thrift.port有
10016

填写完毕后,可以点击特征列表按钮,找到URL项查看格式,应为:jdbc:hive2://host:port/default;principal=... 格式。
在这里插入图片描述
测试连接:
在这里插入图片描述

五、其他

kettle读取Hive表不支持bigint和timstamp类型解决

六 kettle使用技巧

6.1 kettle 任务嵌套使用相对目录

在这里插入图片描述
以上内容殊荣输入也是可以的:${Internal.Entry.Current.Directory}

参考文章:

hive 高可用详解: Hive MetaStore HA、hive server HA原理详解;hive高可用实现
kettle开发篇-JavaScript脚本-Day31
kettle组件javaScript脚本案例1
kettle配置javascript环境 kettle javascript
Javascript脚本组件
Kettle之【执行SQL脚本】控件用法 文章介绍了环境变量和 占位符 ? 的使用方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1360870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android开发编程从入门到精通,安卓技术从初级到高级全套教学

一、教程描述 本套教程基于JDK1.8版本&#xff0c;教学内容主要有&#xff0c;1、环境搭建&#xff0c;UI布局&#xff0c;基础UI组件&#xff0c;高级UI组件&#xff0c;通知&#xff0c;自定义组件&#xff0c;样式主题&#xff1b;2、四大组件&#xff0c;Intent&#xff0…

C#编程-实现继承

C#允许您通过扩展现有类的功能以创建新类来实现继承。 从基类创建派生类 使用以下语法在C#中创建派生类: class <derived_class>:<base_class>{...}确定继承的层次结构 要确定继承层次结构,必须检查派生类与基类之间的关系种类。确保派生类是一种基类。 请考虑以…

数据结构 模拟实现Stack栈(数组模拟)

目录 一、栈的概念 二、栈的接口 三、栈的方法实现 &#xff08;1&#xff09;push方法 &#xff08;2&#xff09;pop方法 &#xff08;3&#xff09;peek方法 &#xff08;4&#xff09;size方法 ​编辑 &#xff08;5&#xff09;empty方法 四、最终代码 一、栈的…

自动重置密码

在运维工作中为用户重置密码是常见的操作&#xff0c;虽然手工运行 passwd 命令就可以很方便地设置&#xff0c;但在用户忘记密码后还需要管理员操作。在用户数量很大时也是不小的工作量。因此为用户提供工具来自动重置密码就很有必要。 技术方案 技术方案比较简单&#xff0…

和鲸社区数据分析每周挑战【第一百二十一期:电商店铺经营分析】

和鲸社区数据分析每周挑战【第一百二十一期&#xff1a;电商店铺经营分析】 文章目录 和鲸社区数据分析每周挑战【第一百二十一期&#xff1a;电商店铺经营分析】一、数据文档二、探索性数据分析三、品类销售效果评估四、用户参与活动优惠的购物行为分析五、不同订单来源对购买…

Qt实现文本编辑器(二)

上一章节讲述了如何制作文本编辑页面&#xff0c;以及应该有哪些功能需要实现&#xff0c;只是做了展示效果&#xff0c;实际的点击事件并没有处理。今天来具体讲解下是如何实现菜单栏以及工具栏上对应的需求吧~ 功能实现 功能&#xff1a; 1、动作消息触发 2、具体功能&am…

idea设置注释在鼠标当前位置,使其不从顶格位置添加注释

idea设置注释在鼠标当前位置&#xff0c;使其不从顶格位置添加注释 默认情况下&#xff0c;注释都是从改行的顶格开始&#xff0c;看起来不太美观而且不易清除分级 设置让其从代码处开始&#xff0c;步骤&#xff1a;File–>Sttings–>Editor–>Code Style &#xff…

使用 CompletableFuture 分批处理任务

一、无返回值任务函数 // 数据分批 List<List<StatisticsDTO>> batches Lists.partition(statisticsList, BATCH_SIZE); List<CompletableFuture<Void>> futures new ArrayList<>(batches.size());// 数据处理 for (int i 0; i < batches…

C++:类和对象(3)

目录 1.构造函数调用规则 2.深拷贝和浅拷贝 3.初始化列表 4.类对象作为类成员 1.构造函数调用规则 默认情况下&#xff0c;C编译器至少给类添加三个函数&#xff1a; 1.默认构造函数(无参&#xff0c;函数体为空) 2.默认析构函数(无参&#xff0c;函数体为空) 3.默认拷贝构…

GNSS位移监测站对尾矿库坝体表面位移进行自动化监测

表面位移监测&#xff1a;通过GNSS位移监测站对尾矿库坝体表面位移进行自动化监测&#xff0c;掌握尾矿坝整体表面位置的变化及其变化速率&#xff08;包括平面位移和垂直沉降&#xff09;&#xff0c;确定尾矿坝坝体整体位移变形的情况&#xff0c;是确定尾矿库安全性的重要指…

一文讲透SPSS相关性分析结果怎么看?

推荐采用《SPSS统计分析入门与应用精解&#xff08;视频教学版&#xff09;》 杨维忠、张甜 清华大学出版社“5.1 双变量相关分析” 的解答。 本节内容选自《SPSS统计分析入门与应用精解&#xff08;视频教学版&#xff09;》 杨维忠、张甜 清华大学出版社“5.1 双变量相关分析…

Protobuf 编码结构

编码结构 什么是protobuf protocol buffers 是一种语言无关、平台无关、可扩展的序列化结构数据的方法&#xff0c;可用于数据通信协议和数据存储等&#xff0c;它是 Google 提供的一个具有高效协议数据交换格式工具库&#xff0c;是一种灵活、高效和自动化机制的结构数据序列…

24款奔驰C260L升级C63包围 渣男的外表

今天店里来了一台24款奔驰C260L 一提车就过来升级 我们公司还有包上牌服务 车主说 升级完包围 帮忙安排一下 原车的包围 没有那么霸气 特别是后杠 光溜溜的 升级后 四出尾喉 尾翼 直接牌面就起来了&#xff0c;星骏汇小许Xjh15863

真核微生物基因序列鉴定工具EukRep工具的安装和详细使用方法

介绍 EukRep是一种用于鉴定并分析环境中的真核微生物的工具。它基于16S rRNA基因序列&#xff0c;可以帮助研究人员确定和分类环境样品中存在的真核微生物群落。 EukRep 从宏基因组数据集中分类真核和原核序列 安装 要求Python3 推荐使用conda安装&#xff1a; $ conda cre…

HUAWEI华为荣耀MagicBook X 15酷睿i5-10210U处理器集显(BBR-WAH9)笔记本电脑原装出厂Windows10系统

链接&#xff1a;https://pan.baidu.com/s/1YVcnOP5YKfFOoLt0z706rg?pwdfwp0 提取码&#xff1a;fwp0 MagicBook荣耀原厂Win10系统自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、华为/荣耀电脑管家等预装程序 文件格式&#xff1a;esd/wim/swm 安装…

【项目实战】分布式计算和通信框架(AKKA)入门介绍

一、AKKA是什么&#xff1f; Akka是一个用于构建高并发、分布式、可容错、事件驱动的应用程序的工具包和运行时。它基于Actor模型&#xff0c;提供了一种高效的并发编程模型&#xff0c;可以轻松地编写出高并发、分布式、可容错的应用程序。Akka还提供了一些常用的组件&#xf…

DMX512输出协议详解

目录 ​编辑 1、DMX512协议简介 2、DMX512协议分析 DMX512指令帧介绍 DMX512信息包 3、DMX512接口电路 4、参考代码 1、DMX512协议简介 DMX512是一种用于舞台灯光控制的数字传输协议。它是由美国舞台灯光协会&#xff08;USITT&#xff09;于1990年发布的工业标准&…

利用小红书笔记详情API:为内容运营提供强大的支持

利用小红书笔记详情API&#xff0c;内容运营者可以获得对小红书平台上的笔记内容的深入洞察&#xff0c;从而为其运营工作提供强大的支持。以下是该API如何支持内容运营的几个关键方面&#xff1a; 获取笔记内容与数据&#xff1a; API允许内容运营者直接获取小红书平台上的笔记…

Python轴承故障诊断 (九)基于VMD+CNN-BiLSTM的故障分类

往期精彩内容&#xff1a; Python-凯斯西储大学&#xff08;CWRU&#xff09;轴承数据解读与分类处理 Python轴承故障诊断 (一)短时傅里叶变换STFT Python轴承故障诊断 (二)连续小波变换CWT_pyts 小波变换 故障-CSDN博客 Python轴承故障诊断 (三)经验模态分解EMD_轴承诊断 …

57.6K star!一个免费开源的 API 开发生态系统

&#xff01;&#xff01;&#xff01;文末有链接&#xff01;&#xff01;&#xff01; 小伙伴们&#xff0c;你们有没有遇到这样的问题呢&#xff1f;当你作为前端开发者和后端开发者一起协同工作时&#xff0c;联调接口成了必须要做的工作。 而为了验证接口的稳定性和安全…