Delta lake with Java--分区表

news2024/9/28 13:28:34

今天尝试一下将昨天的数据操作建立的表换成分区表,参考Delta Lake Up and Running做法用分区表的方式来更新数据。还要比较一下分区表的查询与非分区表的查询,结果显示分区表的查询速度要比非分区表要快。直接上代码:

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;

public class DeltaLakePartitionCURD {

    //将字符串转换成java.sql.Timestamp
    public static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {
        SimpleDateFormat sf = new SimpleDateFormat(dateFormat);
        Date date = null;
        try {
            date = sf.parse(strDate);
        } catch (Exception e) {
            e.printStackTrace();
        }
        java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());
        return dateSQL;
    }

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("delta_lake")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.databricks.delta.autoCompact.enabled", "true")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();


        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String savePath="D:\\bigdata\\detla-lake-with-java\\YellowTaxiPartitioned";
        String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";
        String tableName = "taxidb.YellowTaxiPartitioned";


        String savePath2="D:\\bigdata\\detla-lake-with-java\\YellowTaxi";


        spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");

        //定义表
        DeltaTable.createIfNotExists(spark)
                .tableName(tableName)
                .addColumn("RideId","INT")
                .addColumn("VendorId","INT")
                .addColumn("PickupTime","TIMESTAMP")
                .addColumn("DropTime","TIMESTAMP")
                .partitionedBy("VendorId")
                .location(savePath)
                .execute();


        //加载csv数据并导入delta表
        var df=spark.read().format("delta").table(tableName);
        var schema=df.schema();
        System.out.println(schema.simpleString());
        var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);
        System.out.println("记录总行数:"+df_for_append.count());
        System.out.println("导入数据,开始时间"+  sdf.format(new Date()));
        df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);
        System.out.println("导入数据,结束时间" + sdf.format(new Date()));


        DeltaTable deltaTable = DeltaTable.forName(spark,tableName);
        deltaTable.optimize().executeZOrderBy("RideId");

        //插入数据
        List<Row> list = new ArrayList<Row>();
        list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));
        var yellowTaxipDF=spark.createDataFrame(list,schema);//建立需要新增数据并转换成dataframe
        System.out.println("插入数据,开始时间"+  sdf.format(new Date()));
        yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);
        System.out.println("插入数据,结束时间"+  sdf.format(new Date()));
        System.out.println("插入后数据");
        deltaTable.toDF().select("*").where("RideId=-1").show(false);





        System.out.println("更新数据,开始时间"+  sdf.format(new Date()));
        spark.read().format("delta").load(savePath).where("VendorId==-1 and RideId==-1")
                .withColumn("PickupTime", functions.lit("2023-11-01 10:00:00").cast(DataTypes.TimestampType))
                .write().format("delta")
                .option("replaceWhere","VendorId==-1")
                .mode(SaveMode.Overwrite).saveAsTable(tableName);
        System.out.println("更新数据,结束时间"+  sdf.format(new Date()));
        System.out.println("更新后数据");
        deltaTable.toDF().select("*").where("RideId=-1").show(false);


        //更新数据
        System.out.println("更新前数据");
        deltaTable.toDF().select("*").where("RideId=-1").show(false);
        System.out.println("更新数据,开始时间"+  sdf.format(new Date()));
        deltaTable.update(
                functions.col("RideId").equalTo("-1"),
                new HashMap<String, Column>() {{
                    put("PickupTime", functions.lit("2023-01-01 10:00:00").cast(DataTypes.TimestampType));
                }}
        );
        System.out.println("更新数据,结束时间"+  sdf.format(new Date()));
        System.out.println("更新后数据");
        deltaTable.toDF().select("*").where("RideId=-1").show(false);


        //查询数据
        System.out.println("不分区表查询数据,开始时间"+  sdf.format(new Date()));
        spark.read().format("delta").load(savePath2).where("VendorId==4 and RideId==859744").show(false);
        System.out.println("不分区表查询数据,结束时间" + sdf.format(new Date()));


        System.out.println("分区表查询数据,开始时间"+  sdf.format(new Date()));
        spark.read().format("delta").load(savePath).where("VendorId==4 and RideId==859744").show(false);
        System.out.println("分区表查询数据,结束时间" + sdf.format(new Date()));




    }
}

最终运行结果:

从最后一张图红框可以看到比较结果,分区表查询效率要高一点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1643818.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用memcache 和 redis 、 实现session 会话复制和保持

一、NoSQL介绍 NoSQL是对Not Only SQL、非传统关系型数据库的统称 NoSQL一词诞生于1998年&#xff0c;2009年这个词汇再次提出指非关系型、分布式、不提供ACID的数据库设计模式 随着互联网时代的数据爆发时增长、数据库技术发展的日新月异&#xff0c;要适应新的业务需求&am…

JavaWeb_请求响应_简单参数实体参数

一、SpringBoot方式接收携带简单参数的请求 简单参数&#xff1a;参数名与形参变量名相同&#xff0c;定义形参即可接收参数。并且在接收过程中&#xff0c;会进行自动的类型转换。 启动应用程序后&#xff0c;在postman中进行测试&#xff1a; 请求成功&#xff0c;响应回了O…

eve 导入linux

mkdir /opt/unetlab/addons/qemu/linux-centos7 cd /opt/unetlab/addons/qemu/linux-centos7 上传hda.qcow2 /opt/unetlab/wrappers/unl_wrapper -a fixpermissions Linux images - (eve-ng.net) Due to very high demand of this section and problems with how to crea…

# 在 Windows 命令提示符(cmd)中,可以通过以下方法设置长命令自动换行

在 Windows 命令提示符&#xff08;cmd&#xff09;中&#xff0c;可以通过以下方法设置长命令自动换行 1、点击 cmd 窗口左上角标题栏&#xff0c;选择【属性】。 2、在【属性】菜单中&#xff0c;依次点击【选项】&#xff0c;找到【编辑选项】下面的【自动换行】&#xff…

Vue CLI脚手架项目目录和运行流程介绍

目录 一、项目目录介绍 二、运行流程介绍 一、项目目录介绍 二、运行流程介绍 项目在启动时会先运行main.js main.js的核心代码如下 1.导入Vue import Vue from vue 2.导入App.vue import App from ./App.vue 3.实例化Vue,将App.vue渲染到index.html容器中 new Vue({r…

为什么说虚拟化技术是现代网络安全的重要组成部分?

虚拟化技术是一种对计算机资源的抽象和资源管理技术&#xff0c;将电脑的各种实体资源&#xff08;CPU、内存、磁盘空间、网络适配器等&#xff09;予以抽象、转换后呈现出来&#xff0c;并可供分割、组合为一个或多个电脑配置环境。今天德迅云安全带您了解为什么虚拟化技术能成…

dp 动态规划 力扣

64. 最小路径和 给定一个包含非负整数的 m x n 网格 grid &#xff0c;请找出一条从左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。 说明&#xff1a;每次只能向下或者向右移动一步。 示例 1&#xff1a; 输入&#xff1a;grid [[1,3,1],[1,5,1],[4,2,1]] 输…

实时聊天系统设计

设计一个聊天系统最主要是保证消息能够及时可靠的从一端传入到另外一端&#xff0c;同时要支持对历史消息的查看。按照同时聊天人数聊天系统可以分&#xff0c;一对一&#xff08;one on one&#xff09;和群聊&#xff08;group chat&#xff09;&#xff0c;按照消息传递的及…

.排序总讲.

在这里赘叙一下我对y总前四节所讲排序的分治思想以及递归的深度理解。 就以788.逆序对 这一题来讲&#xff08;我认为这一题对于分治和递归的思想体现的淋淋尽致&#xff09;。 题目&#xff1a; 给定一个长度为 n&#x1d45b; 的整数数列&#xff0c;请你计算数列中的逆序对…

C++进阶 | [2] 多态

摘要&#xff1a;多态的概念&#xff0c;多态的条件&#xff0c;虚函数的重写&#xff0c;抽象类&#xff0c;多态的原理&#xff0c;虚函数与虚函数表&#xff0c;与多态有关的问答题 1. Concept 多态的概念&#xff1a;通俗来说&#xff0c;就是多种形态&#xff0c;具体点就…

Redis(Jedis和SpringBoot整合Redis)

文章目录 1.Jedis1.介绍2.环境配置1.创建maven项目2.pom.xml引入依赖3.新建一个包并创建一个文件 3.Jedis远程连接到Redis1.Redis放到服务器可以连接的前提条件2.为Redis设置密码1.编辑配置文件2.找到 requirepass3.设置密码为root4.重启Redis&#xff0c;在shutdown的时候报错…

C语言写一个终端进度条

C语言写一个终端进度条 这个功能挺简单的&#xff0c;主要有以下两点&#xff1a; 如何获取终端宽度如何让字符在原地闪烁 如何获取终端宽度 这里用到了设备控制接口函数ioctl()&#xff0c;下面简单的介绍一下这个函数的用法&#xff1a; ioctl是一个在Unix和类Unix系统中…

[C语言]指针进阶详解

指针是C语言的精髓所以内容可能会比较多&#xff0c;需要我们认真学习 目录 1、字符指针 2、指针数组 3、数组指针 3.1数组指针的定义 3.2&数组名vs数组名 3.3数组指针的使用 4、数组传参和指针传参 4.1一维数组传参 4.2二维数组传参 4.3一级指针传参 4.4二级指…

如何使用SSH密钥克隆仓库

1.创建SSH Key 在用户目录下查看有没有.ssh目录。如果有且该.ssh目录下有id_rsa&#xff08;私钥&#xff09;&#xff0c;和id_rse_pub(公钥)这俩文件&#xff0c;那么这一步就可以跳过。否则使用以下指令创建SSH Key ssh-keygen -t rsa -C "xxxqq.com" "xx…

【C语言】详解预处理

、 最好的时光&#xff0c;在路上;最好的生活&#xff0c;在别处。独自上路去看看这个世界&#xff0c;你终将与最好的自己相遇。&#x1f493;&#x1f493;&#x1f493; 目录 •✨说在前面 &#x1f34b;预定义符号 &#x1f34b; #define • &#x1f330;1.#define定义常…

解决HTTP 403 Forbidden错误:禁止访问目录索引问题的解决方法

解决HTTP 403 Forbidden错误&#xff1a;禁止访问目录索引问题的解决方法 过去有人曾对我说&#xff0c;“一个人爱上小溪&#xff0c;是因为没有见过大海。”而如今我终于可以说&#xff0c;“我已见过银河&#xff0c;但我仍只爱你一颗星。” 在Web开发和服务器管理中&#x…

3-qt综合实例-贪吃蛇的游戏程序

引言&#xff1a; 如题&#xff0c;本次实践课程主要讲解贪吃蛇游戏程序。 qt贪吃蛇项目内容&#xff1a; 一、功能需求 二、界面设计 各组件使用&#xff1a; 对象名 类 说明 Widget QWidge 主窗体 btnRank QPushButton 排行榜-按钮 groupBox QGroupBox 难…

C/C++开发,opencv-ml库学习,ml模块代码实现研究

目录 一、opencv-ml模块 1.1 ml简介 1.2 StatModel基类及通用函数 1.3 ml模块各算法基本应用 二、ml模块的实现原理 2.1 cv::ml::StatModel的train函数实现原理 2.2 cv::ml::StatModel的predict函数实现原理 2.3 cv::ml::StatModel的save函数和load函数 一、opencv-ml模…

Nginx(搭建高可用集群)

文章目录 1.基本介绍1.在微服务架构中的位置2.配置前提3.主从模式架构图 2.启动主Nginx和两个Tomcat1.启动linux的tomcat2.启动win的tomcat3.启动主Nginx&#xff0c;进入安装目录 ./sbin/nginx -c nginx.conf4.windows访问 http://look.sunxiansheng.cn:7777/search/cal.jsp 3…

力扣 647. 回文子串

题目来源&#xff1a;https://leetcode.cn/problems/palindromic-substrings/description/ C题解1&#xff1a;暴力解法。不断地移动窗口&#xff0c;判断是不是回文串。 class Solution { public:int countSubstrings(string s) {int len s.size();int res 0;for(int i 0;…