离线数仓数据导出-hive数据同步到mysql

news2025/1/16 14:10:17

离线数仓数据导出-hive数据同步到mysql

  • MySQL建库建表
  • 数据导出

为方便报表应用使用数据,需将ads各指标的统计结果导出到MySQL数据库中。
datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。所以reader选hdfs-reader,writer选mysql-writer。

在这里插入图片描述
null值 在hive和mysql里的存储格式不一样,需要告诉DataX应该如何转换。
在这里插入图片描述

MySQL建库建表

12.1.1 创建数据库

CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段顺序也要一致
每张表要有主键

1)各活动补贴率
dt activity_id activity_name 三个主键联合而成

DROP TABLE IF EXISTS `ads_activity_stats`;
CREATE TABLE `ads_activity_stats`  (
  `dt` date NOT NULL COMMENT '统计日期',
  `activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
  `activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
  `start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
  `reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
  PRIMARY KEY (`dt`, `activity_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;

数据导出

DataX配置文件生成脚本
方便起见,此处提供了DataX配置文件批量生成脚本,脚本内容及使用方式如下。
1)在~/bin目录下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/export"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))


def generate_json(target_database, target_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "${exportdir}",
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "column": ["*"],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "nullFormat": "\\N"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "replace",
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(target_database, target_table),
                        "connection": [
                            {
                                "jdbcUrl":
                                    "jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8",
                                "table": [target_table]
                            }
                        ]
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    target_database = ""
    target_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--targetdb'):
            target_database = opt_value
        if opt_name in ('-t', '--targettbl'):
            target_table = opt_value

    generate_json(target_database, target_table)

if __name__ == '__main__':
    main(sys.argv[1:])

在~/bin目录下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。

#!/bin/bash

python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats

3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,生成配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察生成的配置文件

[atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/

编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目录下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容


#! /bin/bash

DATAX_HOME=/opt/module/datax

#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){
  target_file=$1
  for i in `hadoop fs -ls -R $target_file | awk '{print $8}'`; do
    hadoop fs -test -z $i
    if [[ $? -eq 0 ]]; then
      echo "$i文件大小为0,正在删除"
      hadoop fs -rm -r -f $i
    fi
  done

}


#数据导出
export_data() {
  datax_config=$1
  export_dir=$2
  hadoop fs -test -e $export_dir
  if [[ $? -eq 0 ]]
  then
    handle_export_path $export_dir
    file_count=$(hadoop fs -ls $export_dir | wc -l)
    if [ $file_count -gt 0 ]
    then
      set -e;
      $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
      set +e;
    else 
      echo "$export_dir 目录为空,跳过~"
    fi
  else
    echo "路径 $export_dir 不存在,跳过~"
  fi
}


case $1 in
  "ads_new_buyer_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  ;;
  "ads_order_by_province")
    export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  ;;
  "ads_page_path")
    export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  ;;
  "ads_repeat_purchase_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  ;;
  "ads_trade_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  ;;
  "ads_trade_stats_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  ;;
  "ads_trade_stats_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  ;;
  "ads_traffic_stats_by_channel")
    export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  ;;
  "ads_user_action")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  ;;
  "ads_user_change")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  ;;
  "ads_user_retention")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  ;;
  "ads_user_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  ;;
  "ads_activity_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  ;;
  "ads_coupon_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  ;;
  "ads_sku_cart_num_top3_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  ;;

"all")
  export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  ;;
esac

(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1617834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新手小白能做视频号小店吗?可以,但这几点一定要搞清楚

大家好,我是电商笨笨熊 视频号小店的推出吸引了不少的电商玩家,其中也有很多新手小白,大家对于此项目充满好奇,尤其是其私域电商的模式和中年以上的未被开发的用户群体,处处都充满着新风口。 但正所谓拿着旧地图找不…

【好书推荐7】《机器学习平台架构实战》

【好书推荐7】《机器学习平台架构实战》 写在最前面《机器学习平台架构实战》编辑推荐内容简介作者简介目  录前  言本书读者内容介绍充分利用本书下载示例代码文件下载彩色图像本书约定 🌈你好呀!我是 是Yu欸 🌌 2024每日百字篆刻时光&…

ONLYOFFICE协作空间:团队高效协作的终极武器!

文章目录 ONLYOFFICE协作空间初创版专业版(云端)企业版(内部部署) 亮点功能实时多人协作编辑高效的项目管理工具无缝集成第三方存储服务安全性和合规性支持Markdown文件群组功能和存储配额管理嵌入功能和数据导入自托管协作空间支…

YOLOv8 关键点检测模型训练部署

文章目录 1、YOLOv8安装及使用1.2、命令行使用1.3、使用python-API模型预测1.4、pt转换ONNX 2、训练三角板关键点检测模型2.1、训练命令 3、ONNX Runtime部署 1、YOLOv8安装及使用 参考链接: 同济子豪兄视频 github原文链接 # 安装yolov8 pip install ultralytics --upgrade …

操作系统:进程间通信 | 管道

目录 1.进程间通信介绍 1.1.简要介绍 1.2.进程间通信的目的 1.3.进程间通信的本质 2.管道 2.1.管道的通信原理 2.2.匿名管道 2.3.命名管道 2.4.基于匿名管道的进程池demo 2.4.1.进程池的相关引入 2.4.2.整体框架的分析 2.4.3.代码的实现 1.进程间通信介绍 1.1.简…

Etsy多账号关联怎么办?Etsy店铺防关联解决方法

Etsy虽然相对于其他跨境电商平台来说比较小众,但因为平台是以卖手工艺品为主的,所以成本较低,利润很高。许多跨境卖家都纷纷入驻,导致平台规则越发严格,操作不当就会封号,比如一个卖家操作多个账号会出现关…

kubernetes部署控制器Deployment

一、概念 在学习rc和rs控制器资源时,这两个资源都是控制pod的副本数量的,但是,他们两个有个缺点,就是在部署新版本pod或者回滚代码的时候,需要先apply资源清单,然后再删除现有pod,通过资源控制&…

赛氪网参与第61届中国高等教育博览会,助力产教融合与科教融汇

为深入贯彻党的二十大精神,落实立德树人根本任务,推动高等教育装备现代化,第61届中国高等教育博览会(以下简称“高博会”)于近日在福建省福州市隆重开幕。作为高等教育领域内的综合性品牌博览会,此次高博会…

达梦(DM)数据库表索引

达梦DM数据库表索引 表索引索引准则其他准则 创建索引显式地创建索引其他创建索引语句 使用索引重建索引删除索引 表索引 达梦数据库表索引相关内容比较多,常用的可能也就固定的一些,这里主要说一下常用的索引,从物理存储角度进行分类&#…

【声呐仿真】学习记录0.5-配置ssh远程连接docker、在docker中使用nvidia显卡

【声呐仿真】学习记录0.5-配置ssh远程连接docker、在docker中使用nvidia显卡 配置ssh远程连接docker1.端口映射2.配置ssh 在docker中使用nvidia显卡配置CUDA 注意:之前已经创建过容器的,需要打包成镜像,重新创建容器,因为要在创建…

# IDEA2019 如何打开 Run Dashboard 运行仪表面板

IDEA2019 如何打开 Run Dashboard 运行仪表面板 段子手168 1、依次点击 IDEA 上面工具栏 —> 【View】 视图。 —> 【Tool Windows】 工具。 —> 【Run Dashboard】 运行仪表面板。 2、如果 【Tool Windows 】工具包 没有 【Run Dashboard】 运行仪表面板 项 依次…

uniapp制作多选下拉框和富文本(短信页面)

实例 多选下拉框实现 http://t.csdnimg.cn/TNmcF 富文本实现 http://t.csdnimg.cn/Ei1iV

网络带宽相关

1.tcp重传率计算 watch -n 5 “cat /proc/net/snmp” 如下博客所讲 https://blog.csdn.net/michaelwoshi/article/details/121189743 2.iperf测试网络带宽 #客户端 #tcp iperf -c 服务端ip -P 4 -b 200M #udp iperf -c 服务端ip -u -P 4 -b 1000M -l 10K #服务端 iperf -s

OPTEE的GDB调试技术实战

【按语】:如果需要调试OPTEE,那么在远程调试配置中使用GDB可能会很有用。远程调试意味着GDB在您的PC上运行,它可以访问源代码,而被调试的程序在远程系统上运行(在本例中,在QEMU环境的OPTEE中)。本博客来探讨OPTEE的GDB…

React基础知识大汇总

函数组件和类组件 函数组件与类组件有什么区别呢? function getName(params:{name:string}){const count 0;return params.name -count; } getName({name:"test"}) getName({name:"哈哈哈"})getName是一个纯函数,不产生任何副作用…

算法竞赛相关问题总结记录

前言 日常在校生或者是工作之余的同学或多或少都会参加一些竞赛,参加竞赛一方面可以锻炼自己的理解与实践能力,也能够增加自己的生活费,竞赛中的一些方案也可以后续作为自己论文的base,甚至是横向课题的框架。在算法竞赛中算法的差别个人感觉差距都不大&…

区块链钱包开发指南: 探究区块链钱包开发涉及

区块链钱包是连接用户与区块链网络的重要工具,它们不仅提供了安全的存储和管理数字资产的功能,还允许用户进行交易和与区块链上的智能合约进行互动。本文将探究区块链钱包开发涉及的关键方面和技术要点。 1. 区块链钱包类型 区块链钱包可以分为以下几种…

Android Studio查看viewtree

前言:之前开发过程一直看的是手机上开发者选项中的显示布局边界,开关状态需要手动来回切换,今天偶然在Android Studio中弄出了布局树觉得挺方便的。

汽车纵染压制专用液压机比例阀放大器

汽车纵染压制专用液压机比例阀放大器是一种专门用于汽车纵梁拉伸工艺的设备,它也可以用于其他金属薄板的压制成型及校正工艺。该类型的液压机通常具备独立的动力机构和电气系统,采用PLC技术进行控制,以确保操作的准确性和稳定性。除了纵梁拉伸…