Delta lake with Java--数据增删改查

news2025/1/12 16:11:12

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java结合真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

package detal.lake.java;

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;

import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;

public class DeltaLakeCURD {

    //将字符串转换成java.sql.Timestamp
    public static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {
        SimpleDateFormat sf = new SimpleDateFormat(dateFormat);
        java.util.Date date = null;
        try {
            date = sf.parse(strDate);
        } catch (Exception e) {
            e.printStackTrace();
        }
        java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());
        return dateSQL;
    }

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("delta_lake")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.databricks.delta.autoCompact.enabled", "true")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();


        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";
        String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";
        String tableName = "taxidb.YellowTaxis";

        spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");

        //定义表
        DeltaTable.createIfNotExists(spark)
                .tableName(tableName)
                .addColumn("RideId","INT")
                .addColumn("VendorId","INT")
                .addColumn("PickupTime","TIMESTAMP")
                .addColumn("DropTime","TIMESTAMP")
                .location(savePath)
                .execute();


        //加载csv数据并导入delta表
        var df=spark.read().format("delta").table(tableName);
        var schema=df.schema();
        System.out.println(schema.simpleString());
        var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);
        System.out.println("记录总行数:"+df_for_append.count());
        System.out.println("导入数据,开始时间"+  sdf.format(new Date()));
        df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);
        System.out.println("导入数据,结束时间" + sdf.format(new Date()));


        DeltaTable deltaTable = DeltaTable.forName(spark,tableName);


        //插入数据
        List<Row> list = new ArrayList<Row>();
        list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));
        structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));
        StructType structType = DataTypes.createStructType(structFields);
        var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframe

        System.out.println("插入数据,开始时间"+  sdf.format(new Date()));
        yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);
        System.out.println("插入数据,结束时间"+  sdf.format(new Date()));
        System.out.println("插入后数据");
        deltaTable.toDF().select("*").where("RideId=-1").show(false);


        //更新数据
        System.out.println("更新前数据");
        deltaTable.toDF().select("*").where("RideId=999994").show(false);
        System.out.println("更新数据,开始时间"+  sdf.format(new Date()));
        deltaTable.updateExpr(
                "RideId = 999994",
                new HashMap<String, String>() {{
                    put("VendorId", "250");
                }}
        );
        System.out.println("更新数据,结束时间"+  sdf.format(new Date()));
        System.out.println("更新后数据");
        deltaTable.toDF().select("*").where("RideId=999994").show(false);


        //查询数据
        System.out.println("查询数据,开始时间"+  sdf.format(new Date()));
        var selectDf= deltaTable.toDF().select("*").where("RideId=1");
        selectDf.show(false);
        System.out.println("查询数据,结束时间" + sdf.format(new Date()));


        //删除数据
        System.out.println("删除数据,开始时间"+  sdf.format(new Date()));
        deltaTable.delete("RideId=1");
        System.out.println("删除数据,结束时间"+  sdf.format(new Date()));
        deltaTable.toDF().select("*").where("RideId=1").show(false);

    }
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1650793.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mac idea gradle解决异常: SSL peer shut down incorrectly

系统&#xff1a;mac 软件&#xff1a;idea 解决异常: SSL peer shut down incorrectly 查看有没有安装 gradle -v安装 根据项目gradle提示安装版本 brew install gradle7idea的配置 在settings搜索gradle&#xff0c;配置Local installation&#xff0c;选择自己的安装目录…

Unity Shader中获取像素点深度信息

1.顶点着色器中对深度进行计算 v2f vert(appdata v) {v2f o;o.pos UnityObjectToClipPos(v.vertex);o.uv TRANSFORM_TEX(v.uv, _MainTex);o.depth (o.pos.z / o.pos.w 1.0) * 0.5; // Normalize depth to [0, 1]return o; }但是达不到预期&#xff0c;最后返回的值一直大于…

连通“数据”,让制造变“聪明”

说起数据智能&#xff0c;你第一时间想到的是什么呢&#xff1f;是科技感十足的智慧城市&#xff1f;还是炫酷的人工智能景象&#xff1f; 数据作为企业的战略资产越来越受到重视&#xff0c;从最初的数据协助业务协同&#xff0c;转化为数据驱动业务&#xff0c;数据驱动运营…

Hive两代命令行客户端(Hive、Beeline)

Hive命令行客户端 Hive有两个主要的客户端工具&#xff0c;分别是旧版的Hive CLI&#xff08;Command Line Interface&#xff09;和新版的Beeline。 Hive CLI&#xff1a; Hive CLI 是 Hive 最早期的命令行客户端工具&#xff0c;它使用 JDBC 连接到 Hive 服务器&#xff0c;…

栈的2道面试题【有效的括号】【用栈实现队列】

栈的面试题&#xff1a; 1.有效的括号 题目&#xff1a; 有效的括号 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合…

深入剖析Tomcat(七) 日志记录器

在看原书第六章之前&#xff0c;一直觉得Tomcat记日志的架构可能是个“有点东西”的东西。在看了第六章之后呢&#xff0c;额… 就这&#xff1f;不甘心的我又翻了翻logback与新版tomcat的源码&#xff0c;额…&#xff0c;日志架构原来也没那么神秘。本篇文章先过一遍原书内容…

为什么会查询不到DNS信息?怎么排查?

DNS&#xff08;域名系统&#xff09;是将域名转换为相应 IP 地址的关键系统。查询 DNS 信息具有重要作用&#xff0c;通过查询 DNS 信息&#xff0c;我们可以知道域名对应的 IP 地址&#xff0c;这是最主要的信息&#xff0c;使设备能与目标服务器进行通信&#xff1b;其次是域…

网络 IO 模式

同步 IO 与异步 IO 同步 IO 和异步 IO 是关于数据读写方式的两种不同模式。 同步 IO 是指在程序读写数据时&#xff0c;需要等待操作完成后才能继续执行后面的程序。这种模式下&#xff0c;当程序使用阻塞式 IO 时&#xff0c;会一直等待IO操作完成&#xff0c;程序会暂停执行…

Python中设计注册登录代码

import hashlib import json import os import sys # user interface 用户是界面 UI """ 用户登录系统 1.注册 2.登陆 0.退出 """ # 读取users.bin def load(path): return json.load(open(path, "rt")) # 保存user.bin def save(dic…

Autosar NvM配置-手动配置Nvblock及使用-基于ETAS软件

文章目录 前言NvDataInterfaceNvBlockNvM配置SWC配置RTE Mapping使用生成的接口操作NVM总结前言 NVM作为存储协议栈中最顶层的模块,是必须要掌握的。目前项目基本使用MCU带的Dflash模块,使用Fee模拟eeprom。在项目前期阶段,应该充分讨论需要存储的内容,包括应用数据,诊断…

ETLCloud工具怎么实现多流SQL实时运算?

多流SQL实时运算的特点和应用场景 多流SQL实时运算是一种先进的数据处理技术&#xff0c;它在大数据处理领域中扮演着至关重要的角色&#xff0c;尤其是在需要对多个数据流进行实时分析和处理的应用场景中。该技术结合了SQL&#xff08;结构化查询语言&#xff09;的易用性和流…

15.计算机网络

1.物理层的互联设备 中继器 和 集线器 2.集线器可以看做特殊的多路中继器 集线器 不可以做到自动寻址的功能 3.数据链路层 网桥 和 交换机 4.交换机是多端口网桥 5.网络层 路由器 6.应用层 网关 7.广播域 网络层 可以形成多个广播域 冲突域 网络层数据链路层 可以形成多个冲突域…

matlab 基于拉依达检验法(3σ准则) 实现多类别多参数的批量异常样本检验 V2.0

简介 拉依达检验法&#xff08;3σ准则&#xff09;是一种统计学方法&#xff0c;用于检测数据中的异常值。这种方法基于正态分布的特性来确定数据点是否可能是异常值。以下是关于拉依达检验法&#xff08;3σ准则&#xff09;的详细介绍&#xff1a; 基本原理&#xff1a; 拉…

分布式锁概述

什么是分布式锁 分布式锁是一种在分布式计算环境中用于同步访问共享资源的机制。它的主要目的是在一个分布式系统中&#xff0c;当多个进程或服务需要同时访问同一个资源时&#xff0c;确保任一时刻只有一个进程或服务能够执行涉及该资源的关键操作。这类似于传统单体应用中的…

C语言判断字符旋转

前言 今天我们使用c语言来写代码来实现字符串选择的判断&#xff0c;我们来看题目 题目描述 写一个函数&#xff0c;判断一个字符串是否为另外一个字符串旋转之后的字符串。 例如&#xff1a;给定s1 AABCD和s2 BCDAA&#xff0c;返回1 给定s1abcd和s2ACBD&#xff0c;返回0. A…

【分治算法】【Python实现】快速排序

文章目录 [toc]Python实现时间复杂性最坏时间复杂性最好时间复杂性平均时间复杂性 个人主页&#xff1a;丷从心 系列专栏&#xff1a;分治算法 学习指南&#xff1a;算法学习指南 Python实现 def partition(arr, low, high):pivot arr[low]# 将 pivot 元素移动到列表的最右…

flutter中固定底部按钮,防止键盘弹出时按钮跟随上移

当我们想要将底部按钮固定在底部&#xff0c;我们只需在Widget中的Scaffold里面加一句 resizeToAvoidBottomInset: false, // 设置为false&#xff0c;固定页面不会因为键盘弹出而移动 效果图如下

OFD(Open Fixed-layout Document)

OFD(Open Fixed-layout Document) &#xff0c;是由工业和信息化部软件司牵头中国电子技术标准化研究院成立的版式编写组制定的版式文档国家标准&#xff0c;属于中国的一种自主格式&#xff0c;要打破政府部门和党委机关电子公文格式不统一&#xff0c;以方便地进行电子文档的…

pwn学习(一)

pwn:二进制漏洞挖掘与利用&#xff08;程序里面的漏洞&#xff09; CTF中的Pwn是仅保留漏洞代码和基本逻辑的二进制程序&#xff0c;选手通过自身对漏洞的熟悉程度来快速的在逆向分析中找到漏洞点&#xff0c;并且结合自身对漏洞利用的熟悉程度来编写EXP脚本&#xff0c;从而获…

pxe远程安装

PXE 规模化&#xff1a;可以同时装配多台服务器 自动化&#xff1a;自动安装操作系统和各种配置 不需要光盘U盘 前置需要一台PXE服务器 pxe是预启动执行环境&#xff0c;再操作系统之前运行 实验&#xff1a; 首先先关闭防火墙等操作 [rootlocalhost ~]# systemc…