【Spark】Spark的DataFrame向Impala写入数据异常及源码解析

news2024/11/17 1:42:35

背景

事情是这样的,当前业务有一个场景: 从业务库的Mysql抽取数据到Hive
由于运行环境的网络限制,当前选择的方案:
使用spark抽取业务库的数据表,然后利用impala jdbc数据灌输到hive。(没有spark on hive 的条件)

问题

结果就出现问题了:
报错信息如下:

java.sql.SQLFeatureNotSupportedException: [Cloudera][JDBC](10220) Driver does not support this optional feature.
	at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)
	at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
	at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:658)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
23/03/04 23:24:51 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.sql.SQLFeatureNotSupportedException: [Cloudera][JDBC](10220) Driver does not support this optional feature.
	at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)
	at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
	at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:658)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

问题溯源

sparkmysql中读出来的数据中,存在字段有string的类型。
这个类型在使用DataFrame.write.jdbc()通过impala jdbcHive中写数据的时候,如果没有创建Impalajdbc Dialect的时候,此时这个String的类型,会被转换成
在这里插入图片描述
源自 org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
java.sql.Types.ClOB类型,戳进这个变量。可以看到它代表的值
在这里插入图片描述
接着,我们找到impala jdbccom.cloudera.impala.jdbc.common.SPreparedStatement#checkTypeSupported
方法,发现这个列表里面没有2005所以,程序代码会报错。
在这里插入图片描述
对应的数字编码:
com.cloudera.impala.dsi.dataengine.utilities.TypeUtilities#sqlTypeToString

    public static String sqlTypeToString(short var0) {
        switch(var0) {
        case -11:
            return "SQL_GUID";
        case -10:
            return "SQL_WLONGVARCHAR";
        case -9:
            return "SQL_WVARCHAR";
        case -8:
            return "SQL_WCHAR";
        case -7:
            return "SQL_BIT";
        case -6:
            return "SQL_TINYINT";
        case -5:
            return "SQL_BIGINT";
        case -4:
            return "SQL_LONGVARBINARY";
        case -3:
            return "SQL_VARBINARY";
        case -2:
            return "SQL_BINARY";
        case -1:
            return "SQL_LONGVARCHAR";
        case 0:
            return "NULL";
        case 1:
            return "SQL_CHAR";
        case 2:
            return "SQL_NUMERIC";
        case 3:
            return "SQL_DECIMAL";
        case 4:
            return "SQL_INTEGER";
        case 5:
            return "SQL_SMALLINT";
        case 6:
            return "SQL_FLOAT";
        case 7:
            return "SQL_REAL";
        case 8:
            return "SQL_DOUBLE";
        case 12:
            return "SQL_VARCHAR";
        case 16:
            return "SQL_BOOLEAN";
        case 91:
            return "SQL_TYPE_DATE";
        case 92:
            return "SQL_TYPE_TIME";
        case 93:
            return "SQL_TYPE_TIMESTAMP";
        case 101:
            return "SQL_INTERVAL_YEAR";
        case 102:
            return "SQL_INTERVAL_MONTH";
        case 103:
            return "SQL_INTERVAL_DAY";
        case 104:
            return "SQL_INTERVAL_HOUR";
        case 105:
            return "SQL_INTERVAL_MINUTE";
        case 106:
            return "SQL_INTERVAL_SECOND";
        case 107:
            return "SQL_INTERVAL_YEAR_TO_MONTH";
        case 108:
            return "SQL_INTERVAL_DAY_TO_HOUR";
        case 109:
            return "SQL_INTERVAL_DAY_TO_MINUTE";
        case 110:
            return "SQL_INTERVAL_DAY_TO_SECOND";
        case 111:
            return "SQL_INTERVAL_HOUR_TO_MINUTE";
        case 112:
            return "SQL_INTERVAL_HOUR_TO_SECOND";
        case 113:
            return "SQL_INTERVAL_MINUTE_TO_SECOND";
        case 2003:
            return "SQL_ARRAY";
        default:
            return null;
        }
    }

解决

我们在代码中添加一个这样的类:

import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StringType;
import scala.Option;

import java.sql.Types;


/**
 * @author wmh
 * @date 2021/1/12
 * impala的sql的方言,为了使impala sql能在spark中正确的执行
 */
public class ImpalaDialect extends JdbcDialect {

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:impala") || url.contains("impala");
    }

    @Override
    public String quoteIdentifier(String colName) {
        return "`" + colName + "`";
    }

    @Override
    public Option<DataType> getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) {
        return super.getCatalystType(sqlType, typeName, size, md);
    }

    @Override
    public Option<JdbcType> getJDBCType(DataType dt) {
        if (dt instanceof StringType) {
            return Option.apply(new JdbcType("String", Types.VARCHAR));
        }
        return super.getJDBCType(dt);
    }
}

会出现这个问题:
在这里插入图片描述

	at com.cloudera.impala.hivecommon.api.HS2Client.executeStatementInternal(Unknown Source)
	at com.cloudera.impala.hivecommon.api.HS2Client.executeStatement(Unknown Source)
	at com.cloudera.impala.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelper(Unknown Source)
	at com.cloudera.impala.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.execute(Unknown Source)
	at com.cloudera.impala.jdbc.common.SPreparedStatement.executePreparedAnyBatch(Unknown Source)
	at com.cloudera.impala.jdbc.common.SPreparedStatement.executeBatch(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Caused by: com.cloudera.impala.support.exceptions.GeneralException: [Cloudera][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:AnalysisException: Char size must be > 0: 0

上述问题解释一下:
注意最后一句:errorMessage:AnalysisException: Char size must be > 0: 0
是因为在DataFrame里面存在’'没有长度的空字符串,这样的空字符串会导致如上报错
因为在spark构建insert into xx table values(cast('' as char(0)) ,因为这个char(0)的数字不能等于0,所以会出现如上错误。所以字符串中不能为
‘’
源代码路径:impalajdbc41/2.6.4/impalajdbc41-2.6.4.jar!/com/cloudera/impala/impala/querytranslation/ImpalaInsertQueryGenerator.class
在这里插入图片描述
那么针对这个问题,我们要在impala的jdbc的参数上面加上一个UseNativeQuery=1, 即可解决该问题。
这个UseNativeQuery=1参数含义是:
在这里插入图片描述
上图来自impala jdbc的官方文档
我这里来翻译一下:
此属性指定驱动程序是否转换应用程序发出的查询。
1:驱动程序不会转换应用程序发出的查询,直接使用sql查询。
0:驱动程序将应用程序发出的查询转换为Impala SQL中的等效形式。

也就是说,如果查询sql本来就是impala查询sql,那么就不用进行转换了。

总结

如果有什么更好的方法,请在下方评论区留言,谢谢大哥们了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/389486.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cmd命令教程

小提示&#xff1a; 在本文中&#xff0c;我将向您展示可以在 Windows 命令行上使用的 40 个命令 温馨提示&#xff1a;在本教程中学习使用适用于 Windows 10 和 CMD 网络命令的最常见基本 CMD 命令及其语法和示例 文章目录为什么命令提示符有用一、cmd是什么&#xff1f;如何在…

AcWing 4868. 数字替换(DFS + 剪枝优化)

AcWing 4868. 数字替换&#xff08;DFS 剪枝优化&#xff09;一、问题二、思路三、代码一、问题 二、思路 题目中要求变换次数最小&#xff0c;其实第一印象应该是贪心&#xff0c;即我们每一次都去成各位中最大的那个数字。但是这个想法很容易推翻。因为你这次乘了一个最大的…

gdb/git的基本使用

热爱编程的你&#xff0c;一定经常徘徊在写bug和改bug之间&#xff0c;调试器也一定是你随影而行的伙伴&#xff0c;离开了它你应该会寝食难安吧&#xff01; 目录 gdb的使用 断点操作 运行调试 观察数据 Git的使用 仓库的创建和拉取 .gitignore “三板斧” 常用指令 gd…

CV——day82 读论文:遥感目标检测的改进注意力特征融合SSD (AF-SSD)方法

遥感目标检测的改进注意力特征融合SSD 方法I. INTRODUCTIONII. RELATED WORKB. 特征融合C.注意力机制III. PROPOSED METHODA. 特性融合模块——**FFM**B.双路径注意模块——DAMC. 多尺度接受域——MRFIV. EXPERIMENTSA. Data Sets and TrainingV. CONCLUSIONAttention and Feat…

mac安装开发工具:clipy、iterm2、go、brew、mysql、redis、wget等

wget brew install wget clipy Releases Clipy/Clipy GitHub 环境变量 ~下有三个文件 .zshrc .zprofile .bash_profile > cat .zshrc export PATH$PATH:/usr/local/mysql/bin> cat .zprofile eval "$(/opt/homebrew/bin/brew shellenv)"> cat .bas…

[1.3.2]计算机系统概述——中断和异常

文章目录第一章 计算机系统概述中断和异常&#xff08;一&#xff09;中断的作用&#xff08;二&#xff09;中断的类型&#xff08;三&#xff09;中断机制的基本原理小结第一章 计算机系统概述 中断和异常 中断的作用中断的类型 内中断&#xff08;也称“异常”&#xff09;…

29- 迁移学习 (TensorFlow系列) (深度学习)

知识要点 迁移学习: 使用别人预训练模型参数时&#xff0c;要注意别人的预处理方式。 常见的迁移学习方式&#xff1a; 载入权重后训练所有参数.载入权重后只训练最后几层参数.载入权重后在原网络基础上再添加一层全连接层&#xff0c;仅训练最后一个全连接层.训练数据是 10_m…

滚蛋吧,正则表达式!

大家好&#xff0c;我是良许。 不知道大家有没有被正则表达式支配过的恐惧&#xff1f;看着一行火星文一样的表达式&#xff0c;虽然每一个字符都认识&#xff0c;但放在一起直接就让人蒙圈了~ 你是不是也有这样的操作&#xff0c;比如你需要使用「电子邮箱正则表达式」&…

面试之String、StringBuffer、StringBuilder区别

String、StringBuffer、StringBuilder区别 (1)是否可变 string对象不可变&#xff1b; StringBuffer、StringBuilder继承自AbstractStringBuilder类&#xff0c;实现原理都基于可修改的char数组&#xff0c;默认大小为16 (2)线程安全性 string中的对象不可变&#xff0c;可…

Java中String类intern()详解

1、背景在开发过程中很多朋友&#xff0c;由于不会正确使用intern()&#xff0c;导致开发的程序&#xff0c;执行效率比较差。同时最近发现一道非常有意思的关于intern()的面试题&#xff0c;这道面试题还是有不小的难度&#xff0c;相信很多朋友看到以后也不知道怎么解答&…

c++类与对象整理(上)

目录 1.类的引入 2.类的定义 3.类的访问限定符及封装 1&#xff09;访问限定符 2&#xff09;封装 4.类的作用域 5.类的实例化 6.类的对象大小的计算 1&#xff09;类对象的存储方式 2&#xff09;内存对齐和大小计算 ​编辑 7.类成员函数的this指针 1&#xff09…

linux配置网络详解

linux配置网络详解 文章目录linux配置网络详解前置准备配置流程错误排查前置准备 确定是否有网&#xff0c;比如在家里&#xff0c;确定是否连上网线&#xff1f;确定这个网线的网关是什么&#xff1f;&#xff08;这个需要和给你办网的人确定&#xff09;&#xff0c;在公司的…

超详细JDK1.8所有版本下载地址

JDK1.8即为JDK8&#xff0c;JDK8是目前是最成熟最稳定的版本&#xff0c;本文将详细介绍JDK1.8历史版本的下载方式。 在此附上JDK1.8安装与配置教程 超详细JDK1.8安装与配置 一、JDK官网 首先打开oracle官网&#xff0c;官网首页地址为 JDK官网首页地址 点击Products 点击…

Kotlin实现简单的学生信息管理系统

文章目录一、实验内容二、实验步骤1、页面布局2、数据库3、登录活动4、增删改查三、运行演示四、实验总结五、源码下载一、实验内容 根据Android数据存储的内容&#xff0c;综合应用SharedPreferences和SQLite数据库实现一个用户信息管理系统&#xff0c;强化对SharedPreferen…

ks通过恶意低绩效来变相裁员(六)各方核心利益点分析

目录 公司利益点 管理层利益点 直接管理者利益点 一线干活的同学 一线嫡系同学 公司利益点 核心利益点&#xff1a;围绕财报营收&#xff0c;降本&#xff0c;拿到好看的财报数据&#xff0c;让资本市场继续看好自己 核心手段&#xff1a; 扩展新业务&#xff0c;挖掘已…

基于数据驱动的智能空调系统需求响应可控潜力评估研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

深入理解多线程

一、线程基本概念 1、概述 线程是允许应用程序并发的一种机制。线程共享进程内的所有资源。 线程是调度的基本单位。 每个线程都有自己的 errno。 所有 pthread 函数均以返回 0 表示成功&#xff0c;返回一个正值表示失败。 编译 pthread 程序需要添加链接库&#xff08;…

【Java】反射机制和代理机制

目录一、反射1. 反射概念2. 反射的应用场景3. 反射机制的优缺点4. 反射实战获取 Class 对象的四种方式二、代理机制1. 代理模式2. 静态代理3. 动态代理3.1 JDK动态代理机制1. 介绍2.JDK 动态代理类使用步骤3. 代码示例3.2 CGLIB 动态代理机制1.介绍2.CGLIB 动态代理类使用步骤3…

程序员压力大?用 PyQt 做一个美*女GIF设置桌面,每天都有好心情

嗨害大家好鸭&#xff01;我是小熊猫~ 要说程序员工作的最大压力不是来自于工作本身&#xff0c; 而是来自于需要不断学习才能更好地完成工作&#xff0c; 因为程序员工作中面对的编程语言是在不断更新的&#xff0c; 同时还要学习熟悉其他语言来提升竞争力… 好了&#xff0c…

使用Python通过拉马努金公式快速求π

使用Python通过拉马努金公式快速求π 一、前言 π是一个数学常数&#xff0c;定义为&#xff1a;圆的周长与直径的比值。 π是一个无理数&#xff0c;也是一个超越数&#xff0c;它的小数部分无限不循环。 π可以用来精确计算圆周长、圆面积、球体积等几何形状的关键值。 有关…