[Spark SQL]Spark SQL读取Kudu,写入Hive

news2024/9/28 9:25:26

SparkUnit

Function:用于获取Spark Session

package com.example.unitl

import org.apache.spark.sql.SparkSession

object SparkUnit {
  def getLocal(appName: String): SparkSession = {
    SparkSession.builder().appName(appName).master("local[*]").getOrCreate()
  }

  def getLocal(appName: String, supportHive: Boolean): SparkSession = {
    if (supportHive) getLocal(appName,"local[*]",true)
    else getLocal(appName)
  }

  def getLocal(appName:String,master:String,supportHive:Boolean): SparkSession = {
    if (supportHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
    else  SparkSession.builder().appName(appName).master(master).getOrCreate()
  }

  def stopSs(ss:SparkSession): Unit ={
    if (ss != null) {
      ss.stop()
    }
  }
}

log4j.properties

Function:设置控制台输出级别

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

KTV

Function:读取kudu,写入hive。Kudu_To_Hive,简称KTV

package com.example.dao

import com.example.unitl.SparkUnit
import org.apache.spark.sql.SparkSession

object KTV {
  def getKuduTableDataFrame(ss: SparkSession): Unit = {
    // 读取kudu
    // 获取tb对象
    val kuduTb = ss.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.master", "10.168.1.12:7051")
      .option("kudu.table", "impala::realtimedcs.bakup_db") // Tips:注意指定库
      .load()

    // create view
    kuduTb.createTempView("v1")

    val kudu_unit1_df = ss.sql(
      """
        |SELECT * FROM `sources_tb1`
        |WHERE `splittime` = "2021-07-11"
        |""".stripMargin)

    // print
    kudu_unit1_df.printSchema()
    kudu_unit1_df.show()

    // load of memory
    kudu_unit1_df.createOrReplaceTempView("v2")
  }

  def insertHive(ss: SparkSession): Unit = {
    // create table
    ss.sql(
      """
        |USE `bakup_db`
        |""".stripMargin)

    ss.sql(
      """
        |  CREATE TABLE IF NOT EXISTS `bak_tb1`(
        |   `id` int,
        |   `packtimestr` string,
        |   `dcs_name` string,
        |   `dcs_type` string,
        |   `dcs_value` string,
        |   `dcs_as` string,
        |   `dcs_as2` string)
        | PARTITIONED BY (
        |   `splittime` string)
        |""".stripMargin)
    println("创建表成功!")

    // create view
    ss.sql(
      """
        |INSERT INTO `bakup_db`
        |SELECT * FROM bak_tb1
        |""".stripMargin)
    println("保存成功!")
  }

  def main(args: Array[String]): Unit = {
    //get ss
    val ss = SparkUnit.getLocal("KTV", true)
    // 做动态分区, 所以要先设定partition参数
    // default是false, 需要额外下指令打开这个开关
    ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
    ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");

    // 调用方法
    getKuduTableDataFrame(ss)
    insertHive(ss)

    // 关闭连接
    SparkUnit.stopSs(ss)
  }
}

运行:

运行时请将hive的配置文件 hive-site.xml文件,复制到项目resource下。

hue查看写入的数据:

略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1506856.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.下载安装ESP32开发环境ESP-IDE

ESP32简介 ESP32介绍 说到ESP32,首先ESP32不是一个芯片,ESP32是一个系列芯片, 是乐鑫自主研发的一系列芯片微控制器。它主要的功能就是支持WiFi和蓝牙, ESP32指的是ESP32裸芯片。但是,“ESP32”一词通常指ESP32系列芯…

python基础练习 特殊回文数

资源限制 内存限制:512.0MB C/C时间限制:1.0s Java时间限制:3.0s Python时间限制:5.0s 问题描述 123321是一个非常特殊的数,它从左边读和从右边读是一样的。   输入一个正整数n, 编程求所有这样的…

RocketMQ、Kafka、RabbitMQ 消费原理,顺序消费问题【图文理解】

B站视频地址 文章目录 一、开始二、结果1、RocketMQ 消费关系图1-1、queue和consumer的关系1-2、consumer 和线程的关系 2、Kafka 消费关系图1-1、partitions和consumer的关系1-2、consumer 和线程的关系 3、RabbitMQ 消费关系图1-1、queue和consumer的关系1-2、consumer 和线程…

Python+Django+Html网页前后端指纹信息识别

程序示例精选 PythonDjangoHtml网页前后端指纹信息识别 如需安装运行环境或远程调试,见文章底部个人QQ名片,由专业技术人员远程协助! 前言 这篇博客针对《PythonDjangoHtml网页前后端指纹信息识别》编写代码,代码整洁&#xff0…

深入理解Java多线程与线程池:提升程序性能的利器

✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心哦!✨✨ 🎈🎈作者主页: 喔的嘛呀🎈🎈 目录 引言 一、实现多线程 1.1. 继承Thread类 1.2. 实现Runnab…

WPF(1)的MVVM的数据驱动学习示例

MVVM Model:数据模型、View 界面、ViewModel 业务逻辑处理 项目结构 界面数据绑定 <Window x:Class"WpfApp1.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/x…

opencv人脸识别实战3:多线程和GUI界面设计(PyCharm实现)

一、多线程设计 1、在一个新线程中调用了 scan_face() 函数来进行人脸识别操作。根据识别结果&#xff0c;更新界面显示结果&#xff0c;最后释放资源。 def f_scan_face_thread():var.set(刷脸)ans scan_face()if ans 0:print("最终结果&#xff1a;无法识别")va…

C++ 中的头文件和源文件

#include<>一般用于包含系统头文件&#xff0c;诸如stdlib.h、stdio.h、iostream等&#xff1b; 类库目录下查找失败&#xff0c;编译器会终止查找&#xff0c;直接报错&#xff1a;No such file or directory. #include""一般用于包含自定义头文件&#xff…

NLP 算法实战项目:使用 BERT 进行模型微调,进行文本情感分析

本篇我们使用公开的微博数据集(weibo_senti_100k)进行训练&#xff0c;此数据集已经进行标注&#xff0c;0: 负面情绪&#xff0c;1:正面情绪。数据集共计82718条(包含标题)。如下图&#xff1a; 下面我们使用bert-base-chinese预训练模型进行微调并进行测试。 技术交流&#x…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Gauge)

数据量规图表组件&#xff0c;用于将数据展示为环形图表。 说明&#xff1a; 该组件从API Version 8开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 可以包含单个子组件。 说明&#xff1a; 建议使用文本组件构建当前数值文本和辅…

信息系统项目管理师005:工业互联网(1信息化发展—1.2现代化基础设施—1.2.2工业互联网)

文章目录 1.2.2 工业互联网1.内涵和外延2.平台体系3.融合应用 记忆要点总结 1.2.2 工业互联网 工业互联网(Industrial Internet)是新一代信息通信技术与工业经济深度融合的新型基础设施、应用模式和工业生态&#xff0c;通过对人、机、物、系统等的全面连接&#xff0c;构建起覆…

【EDK II】作为UEFI的实现,EDK II 的架构是什么样的

目录 前言 EDK II 架构 配置文件 结语 前言 基本输入输出系统 (Basic Input Output System, BIOS) 最早由 IBM&#xff08;International Business Machines Corporation) 公司于1981年提出并开发&#xff0c;后来成为个人计算机(PC)的标准固件接口。但受限于传统BIOS (Le…

Git分支管理(IDEA)

文章目录 Git分支管理&#xff08;IDEA&#xff09;1.Git分支管理&#xff08;IDEA&#xff09;1.基本介绍1.分支理解2.示意图 2.搭建分支和合并的环境1.创建Gitee仓库2.创建普通maven项目3.克隆Gitee项目到E:\GiteeRepository4.复制erp文件夹下的内容到IDEA项目下5.IDEA项目中…

Kafka的分区机制

Kafka的分区机制是其核心功能之一&#xff0c;旨在提高可扩展性和并行处理能力。下面概述了Kafka分区的基本概念和工作原理&#xff1a; Kafka分区基本概念 分区&#xff08;Partition&#xff09;&#xff1a;Kafka中的主题&#xff08;Topic&#xff09;可以细分为多个分区…

软件测试APP完整测试作业流程(附流程图),公司级软件测试流程化办公

目录 1. 概述 2. 软件测试流程 3. 软件测试周期人员活动图 4. 总结 1. 概述 1.1 目的 有效的保证软件质量&#xff1b; 有效的制定不同测试类型&#xff08;软件系统测试、音频主观性测试、Field Trial、专项测试、自动化测试、性 能测试、用户体验测试&#xff09;的软件…

【HarmonyOS】ArkUI - 自定义卡片样式

ArkUI - 自定义卡片样式 HarmonyOS API 9 没有提供原生的卡片样式&#xff0c;我定义了一个卡片样式&#xff0c;可以方便大家在日常开发中使用。 效果图&#xff1a; 卡片样式代码如下&#xff1a; Styles function card() {.width(95%).padding(20).backgroundColor(Col…

【CSP】2022-03-2 出行计划 经典差分和前缀和 (包含完整思路、代码和写代码过程中遇到的问题)

2022-03-2 出行计划 差分和前缀和 2022-03-2 出行计划 差分和前缀和思路遇到的问题&#xff08;不小心出现的细节问题&#xff09;完整代码 2022-03-2 出行计划 差分和前缀和 这题很久之前做过一次&#xff0c;现在已经基本忘记了&#xff0c;所以重新做一遍&#xff0c;然后一…

Linux动态追踪——ftrace

目录 摘要 1 初识 1.1 tracefs 1.2 文件描述 2 函数跟踪 2.1 函数的调用栈 2.2 函数调用栈 2.3 函数的子调用 3 事件跟踪 4 简化命令行工具 5 总结 摘要 Linux下有多种动态追踪的机制&#xff0c;常用的有 ftrace、perf、eBPF 等&#xff0c;每种机制适应于不同的场…

银河麒麟V10 安装部署大数据管理软件 DataSophon

一、概览 1、愿景 致力于快速实现部署、管理、监控以及自动化运维大数据云原生平台&#xff0c;帮助您快速构建起稳定、高效、可弹性伸缩的大数据云原生平台。 2、DataSophon是什么 《三体》&#xff0c;这部获世界科幻文学最高奖项雨果奖的作品以惊艳的"硬科幻"…

Jmeter+Ant+Git/SVN+Jenkins实现持续集成接口测试,一文精通(一)

前言 Jmeter&#xff0c;Postman一些基本大家相比都懂。那么真实在项目中去使用&#xff0c;又是如何使用的呢&#xff1f;本文将一文详解jmeter接口测试 一、接口测试分类 二、目前接口架构设计 三、市面上的接口测试工具 四、Jmeter简介&#xff0c;安装&#xff0c;环境…