datax做增量导入数据到hive:mysql>hive

news2024/11/13 10:07:29

为什么要做增量导入? 例如mysql表中的数据导入hive,如果第一天抽取了mysql中t_user表中的全部数据,则第二天只需要抽取新增数据即可! 增加导入是利用where 条件查询实现的,查询条件一般是自增的id或者时间列 下面演示基于时间列的数据增量抽取。

1.数据准备

# 1. 在mysql数据库创建如下表结构:
create table t_order(
    id               int   primary key auto_increment,
    amt              decimal(10,2),
    `status`         int  default 0,
    user_id          int,
    create_time      timestamp DEFAULT CURRENT_TIMESTAMP,
    modify_time      timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
# 2.插入数据
insert into t_order values(null,100,0,1001,'2023-07-01 10:10:10','2023-07-01 10:10:10');
insert into t_order values(null,99,0,1002,'2023-07-01 10:10:10','2023-07-01 10:10:10');

select *
from t_order;


-- 2.在hive创建如下表结构
create table t_order(
        id                    int,
        amt                   decimal(10,2),
        `status`              int,
        user_id               int,
        create_time           string,
        modify_time           string
)partitioned by (dt string)
row format delimited  fields terminated by '\t';

-- 手动添加分区
alter table t_order add partition (dt='2023-07-01');

show partitions t_order;

2.编写增量数据导入datax配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop11:3306/test1"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order where DATE_FORMAT(modify_time, '%Y-%m-%d') = '$dt'"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                                {"name": "id","type": "int"},
                                {"name": "amt","type": "double"},
                                {"name": "status","type": "int"},
                                {"name": "user_id","type": "int"},
                                {"name": "create_time","type": "string"},
                                {"name": "modify_time","type": "string"}
                       ],
                        "defaultFS": "hdfs://hdfs-cluster", 
                        "hadoopConfig":{
                                "dfs.nameservices": "hdfs-cluster",
                                "dfs.ha.namenodes.hdfs-cluster": "nn1,nn2",
                                "dfs.namenode.rpc-address.hdfs-cluster.nn1": "hadoop11:8020",
                                "dfs.namenode.rpc-address.hdfs-cluster.nn2": "hadoop12:8020",
                                "dfs.client.failover.proxy.provider.hdfs-cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "path": "/user/hive/warehouse/t_order/dt=$dt",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

3.测试增量导入

 执行:python /opt/installs/datax/bin/datax.py /opt/installs/datax/job/test_job.json -p "-Ddt='2023-07-01'"  原有的两条数据已经导入证明json文件无误

# 再添加到mysql一天的测试数据
insert into t_order values(null,220,0,1001,'2023-07-02 10:10:10','2023-07-02 10:10:10');
update t_order set `status` = 2 , modify_time = '2023-07-02 11:00:00' where id = 2;


 

-- 手动创建hive分区 2023-07-02
alter table t_order add partition (dt='2023-07-02');

 执行:python /opt/installs/datax/bin/datax.py /opt/installs/datax/job/test_job.json -p "-Ddt='2023-07-02'" 

查询结果:已经完成数据导入

4.编写对应的shell脚本执行命令

#! /bin/bash
# 1. 要求用户提供日期如果没有提供,则使用昨天日期
dt=$1

if [ 'x'$1 == 'x' ];then
  dt=$(date -d'-1 day' +%Y-%m-%d)
fi

# 2. 查询dt对应日期的分区是否存在,默认返回结果表里面的列名需要去掉,只保留表中的数据赋值给x1变量
x1=$(hive -e "set hive.cli.print.header=false;show partitions t_order partition(dt='$dt')")

# 3. 如果x1变量等于空,说明分区不存在,则创建分区
echo $x1
if [ "$x1" == "" ]
then
  hive -e "alter table t_order add partition(dt='$dt')"
fi

# 4. 执行py文件
python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/test_job.json

 测试shell脚本,mysql增加2023-07-03的数据增量导入到hive:

insert into t_order values(null,330,0,1003,'2023-07-03 10:10:10','2023-07-03 10:10:10');
update t_order set `status` = 2 , modify_time = '2023-07-03 11:00:00' where id = 3;

测试成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1994770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sns.regplot()用法

概念 seaborn.regplot()函数可以在两个变量之间绘制一个线性回归模型,可以输出线性回归线以及数据的散点图。 参数解释 seaborn.regplot(dataNone, xNone, yNone, x_estimatorNone, x_binsNone, x_cici, scatterTrue, fit_regTrue, ci95, …

s7_200smart采集遇到的问题

对s7_200smart(plc设备不太熟悉)第一次使用了modbus协议来采集数据是采集不到bcd码类型的数据,modbus里面不支持这个数据类型。采用西门子类型来设置采集数据也遇到不少问题? 第一:采集速率不可以太高,最好1秒一次,通…

YOLOv8改进 | 主干网络 | 用EfficientNet卷积替换backbone【教程+代码 】

秋招面试专栏推荐 :深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 专栏目录 :《YOLOv8改进有效…

我们终究会懂得自己并非无所不能

今天参加“全民健身日”公开水域游泳比赛,第一次在游泳上有一种无力感。 以前以为自己游泳怎么都不会累,大不了踩踩水,或者在水上漂着。今天竟然途中可耻地抱着“跟屁虫”休息了。 是不是承认自己的无能,也是一种进步?…

【简历】苏州某大学211硕士:25届Java简历指导通过率低

注:为保证用户信息安全,姓名和学校等信息已经进行同层次变更,内容部分细节也进行了部分隐藏 简历说明 这是一份25届211硕士同学的Java简历,这个学历他的目标必然是冲大厂。 不过他的简历几乎没什么提问点,211在大厂…

1.2 C 语言环境:MinGW 与 CLion 的安装与配置

目录 1 C 语言的由来 2 安装 MinGW 编译器 3 Windows 中安装 CLion 开发环境 3.1 安装 CLion 开发环境 3.2 运行试用 30 天 3.3 新建项目​ 3.4 汉化 4 Mac 中安装 Clion 开发环境 4.1 安装 CLion 开发环境 4.2 运行试用 30 天 4.3 新建项目 ​4.4 汉化 5 向日葵的…

【Linux】系列入门摘抄笔记-6-tar打包压缩和vim编辑器

打包、压缩和解压命令 压缩文件一定要严格区分扩展名 tar 打包程序 tar [主选项+辅选项] [包名] [目标文件或目录]描述:tar命令是Linux下最常用的打包程序。使用tar命令打出来的包称为tar包,因为tar包文件的后缀通常是“.tar”。 每条tar命令只能有一个主选项,而辅助选项…

C语言实现-排序1

文章目录 🎯引言👓排序1.排序的概念以及运用1.1概念1.2运用1.3常见的排序算法 2.排序算法的实现2.1插入排序2.1.1直接插入排序2.1.2希尔排序 2.2选择排序2.2.1直接选择排序2.2.2堆排序 🥇结语 🎯引言 欢迎来到HanLop博客的C语言数…

C#如何对某个词在字符串中出现的次数进⾏计数(LINQ)

文章目录 基础知识实现方法基础计数LINQ优化处理标点符号总结 LINQ(Language-Integrated Query)是C#和VB.NET中强大的查询语言,它可以用来查询集合、SQL数据库、XML文档等。在C#中,我们可以使用LINQ来简化对字符串中特定单词出现次…

C语言实现游戏2048(超详细!!!超易懂!!!)

2048是众所周知的一款经典游戏,在曾经没有智能电脑和手机的年代,也陪伴了我们许多年。那今天就让我们用C语言来回顾一下这款游戏吧~ 一、游戏2048的思路 2048游戏的玩法是在初始的时候,给玩家一个4*4格子的,其中内容全为空的棋盘…

基于SpringBoot+Vue的供应商管理系统(带1w+文档)

基于SpringBootVue的供应商管理系统(带1w文档) 基于SpringBootVue的供应商管理系统(带1w文档) 现今,互联网在我们的日常生活占据着日益重要的地位,我们也越来越离不开对移动设备、电脑等上网设备的使用。传统的供应商管理系统模式主要依靠管理人员纯手工…

PyQt6简易案例代码GUI界面小工具——实现二维码生成器+自定义前后背景色(练手正合适)

目录 专栏导读PyQt6的介绍PyQt6的主要特点包括:使用PyQt6开发应用程序的一般步骤: 库的安装1、初始化与界面设计2、设置前景色、背景色功能完整代码总结 专栏导读 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双…

IP地址的构成

1. IPv4地址 IPv4地址是最早且目前仍然广泛使用的IP地址版本。由32位二进制数构成,应为32为二进制数太长了,所以我们通常用四个十进制数字来表示,每个数字之间用”.”分隔。这些数字的范围是0到255。IPv4地址的格式为: “A.B.C.…

2024世界机器人大会将于8月21日至25日在京举行

2024年的世界机器人大会预定于8月21日至25日,在北京经济技术开发区的北人亦创国际会展中心隆重举办。 本届大会以“共育新质生产力 共享智能新未来”为核心主题,将汇聚来自全球超过300位的机器人行业专家、国际组织代表、杰出科学家以及企业家&#xff0…

再启新征程——灵川县“灵秀山川”区域公共品牌发布会顺利举办

灵川县,自古便享有“楚越通衢,风气之先”的美誉,见证了无数文化的交流与融合。这里,土地肥沃,资源丰富,如同大自然的无尽宝库,孕育了琳琅满目的优质农特产品。立足于本地优势资源,灵…

Python | Leetcode Python题解之第329题矩阵中的最长递增路径

题目: 题解: class Solution:DIRS [(-1, 0), (1, 0), (0, -1), (0, 1)]def longestIncreasingPath(self, matrix: List[List[int]]) -> int:if not matrix:return 0rows, columns len(matrix), len(matrix[0])outdegrees [[0] * columns for _ in…

数字人直播间搭建教程比较:哪种方案更可行?

当前,数字人直播的应用潜力不断显现,各大中小型企业对其关注度和接受度持续上升,连带着各种数字人直播间搭建教程的阅读量也日益上涨。而不少创业者也因此发现了它所蕴含的市场需求和收益空间,并有了通过为企业搭建数字人直播间以…

三防平板定制化:驱动产业高效化发展的新动能

在数字化转型的浪潮中,三防平板作为一种坚固耐用、功能强大的移动设备,正逐渐成为各行各业提升效率、优化管理的关键工具。通过硬件和软件的定制化服务,三防平板不仅能满足特定行业的需求,更能在复杂的工作环境中展现出卓越的性能…

haproxy实验

目录 为什么要用haproxy? haproxy的基本部署实验: 环境准备: 详细步骤: haproxy-多进程与多线程实验: haproxy的全局global配置实验: 为什么要用haproxy? LVS:没有后端检测&a…

Linux学习笔记:Linux基础知识汇总(kill 进程-vi编辑检索-查看当前文件夹的大小-修复硬盘等)

常见指令 Linux 的 find 命令可以用于在指定目录下查找符合条件的文件或目录。find 命令的基本语法为: find [path] [expression]其中,path 指定要查找的目录路径,expression 指定查找条件。下面是一些常用的 find 命令用法和示例&#xff…