尚硅谷大数据项目《在线教育之离线数仓》笔记007

news2024/9/28 17:25:20

视频地址:尚硅谷大数据项目《在线教育之离线数仓》_哔哩哔哩_bilibili

目录

第12章 报表数据导出

P112

01、创建数据表

02、修改datax的jar包

03、ads_traffic_stats_by_source.json文件

P113

P114

P115

P116

P117

P118

P119

P120

P121

P122【122_在线教育数仓开发回顾 04:23】


第12章 报表数据导出

P112

01、创建数据表

# 第12章 报表数据导出
CREATE DATABASE IF NOT EXISTS edu_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

# 12.1.2 创建表


# 01)各来源流量统计
DROP TABLE IF EXISTS ads_traffic_stats_by_source;
CREATE TABLE ads_traffic_stats_by_source
(
    `dt`               DATETIME COMMENT '统计日期',
    `recent_days`      BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `source_id`        VARCHAR(255) COMMENT '引流来源id',
    `source_site`      VARCHAR(255) COMMENT '引流来源名称',
    `uv_count`         BIGINT COMMENT '访客人数',
    `avg_duration_sec` BIGINT COMMENT '会话平均停留时长,单位为秒',
    `avg_page_count`   BIGINT COMMENT '会话平均浏览页面数',
    `sv_count`         BIGINT COMMENT '会话数',
    `bounce_rate`      DECIMAL(16, 2) COMMENT '跳出率',
    PRIMARY KEY (`dt`, `recent_days`, `source_id`)
) COMMENT '各引流来源流量统计';


# 02)页面浏览路径分析
DROP TABLE IF EXISTS ads_traffic_page_path;
CREATE TABLE ads_traffic_page_path
(
    `dt`          DATETIME COMMENT '统计日期',
    `recent_days` BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `source`      VARCHAR(255) COMMENT '跳转起始页面id',
    `target`      VARCHAR(255) COMMENT '跳转终到页面id',
    `path_count`  BIGINT COMMENT '跳转次数',
    PRIMARY KEY (`dt`, `recent_days`, `source`, `target`)
) COMMENT '页面浏览路径分析';


# 03)各引流来源销售状况统计
DROP TABLE IF EXISTS ads_traffic_sale_stats_by_source;
CREATE TABLE ads_traffic_sale_stats_by_source
(
    `dt`                 DATETIME COMMENT '统计日期',
    `recent_days`        BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `source_id`          VARCHAR(255) COMMENT '引流来源id',
    `source_site`        VARCHAR(255) COMMENT '引流来源名称',
    `order_total_amount` DECIMAL(16, 2) COMMENT '销售额',
    `order_user_count`   BIGINT COMMENT '下单用户数',
    `pv_visitor_count`   BIGINT COMMENT '引流访客数',
    `convert_rate`       DECIMAL(16, 2) COMMENT '转化率',
    PRIMARY KEY (`dt`, `recent_days`, `source_id`)
) COMMENT '各引流来源销售状况统计';


# 04)用户变动统计
DROP TABLE IF EXISTS ads_user_user_change;
CREATE TABLE ads_user_user_change
(
    `dt`               DATETIME COMMENT '统计日期',
    `user_churn_count` BIGINT COMMENT '流失用户数',
    `user_back_count`  BIGINT COMMENT '回流用户数',
    PRIMARY KEY (`dt`)
) COMMENT '用户变动统计';


# 05)用户留存率
DROP TABLE IF EXISTS ads_user_user_retention;
CREATE TABLE ads_user_user_retention
(
    `dt`              DATETIME COMMENT '统计日期',
    `create_date`     VARCHAR(255) COMMENT '用户新增日期',
    `retention_day`   INT COMMENT '截至当前日期留存天数',
    `retention_count` BIGINT COMMENT '留存用户数量',
    `new_user_count`  BIGINT COMMENT '新增用户数量',
    `retention_rate`  DECIMAL(16, 2) COMMENT '留存率',
    PRIMARY KEY (`dt`, `create_date`, `retention_day`)
) COMMENT '用户留存率';


# 06)用户新增活跃统计
DROP TABLE IF EXISTS ads_user_user_stats;
CREATE TABLE ads_user_user_stats
(
    `dt`                DATETIME COMMENT '统计日期',
    `recent_days`       BIGINT COMMENT '最近n日,1:最近1日,7:最近7日,30:最近30日',
    `new_user_count`    BIGINT COMMENT '新增用户数',
    `active_user_count` BIGINT COMMENT '活跃用户数',
    PRIMARY KEY (`dt`, `recent_days`)
) COMMENT '用户新增活跃统计';


# 07)用户行为漏斗分析
DROP TABLE IF EXISTS ads_user_user_action;
CREATE TABLE ads_user_user_action
(
    `dt`                DATETIME COMMENT '统计日期',
    `recent_days`       BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `home_count`        BIGINT COMMENT '浏览首页人数',
    `good_detail_count` BIGINT COMMENT '浏览商品详情页人数',
    `cart_count`        BIGINT COMMENT '加入购物车人数',
    `order_count`       BIGINT COMMENT '下单人数',
    `payment_count`     BIGINT COMMENT '支付人数',
    PRIMARY KEY (`dt`, `recent_days`)
) COMMENT '用户行为漏斗分析';


# 08)新增交易用户统计
DROP TABLE IF EXISTS ads_user_new_buyer_stats;
CREATE TABLE ads_user_new_buyer_stats
(
    `dt`                     DATETIME COMMENT '统计日期',
    `recent_days`            BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `new_order_user_count`   BIGINT COMMENT '新增下单人数',
    `new_payment_user_count` BIGINT COMMENT '新增支付人数',
    PRIMARY KEY (`dt`, `recent_days`)
) COMMENT '新增交易用户统计';


# 09)各年龄段下单用户数
DROP TABLE IF EXISTS ads_user_order_user_count_by_age_group;
CREATE TABLE ads_user_order_user_count_by_age_group
(
    `dt`               DATETIME COMMENT '统计日期',
    `recent_days`      BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `age_group`        VARCHAR(255) COMMENT '年龄段,18岁及以下、19-24岁、25-29岁、30-34岁、35-39岁、40-49岁、50岁及以上',
    `order_user_count` BIGINT COMMENT '下单人数',
    PRIMARY KEY (`dt`, `recent_days`, `age_group`)
) COMMENT '各年龄段下单用户数统计';


# 10)各类别课程交易统计
DROP TABLE IF EXISTS ads_course_trade_stats_by_category;
CREATE TABLE ads_course_trade_stats_by_category
(
    `dt`               DATETIME COMMENT '统计日期',
    `recent_days`      BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `category_id`      VARCHAR(255) COMMENT '类别id',
    `category_name`    VARCHAR(255) COMMENT '类别名称',
    `order_count`      BIGINT COMMENT '订单数',
    `order_user_count` BIGINT COMMENT '订单人数' ,
    `order_amount`     DECIMAL(16, 2) COMMENT '下单金额',
    PRIMARY KEY (`dt`, `recent_days`, `category_id`)
) COMMENT '各类别课程交易统计';


# 11)各学科课程交易统计
DROP TABLE IF EXISTS ads_course_trade_stats_by_subject;
CREATE TABLE ads_course_trade_stats_by_subject
(
    `dt`               DATETIME COMMENT '统计日期',
    `recent_days`      BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `subject_id`       VARCHAR(255) COMMENT '学科id',
    `subject_name`     VARCHAR(255) COMMENT '学科名称',
    `order_count`      BIGINT COMMENT '订单数',
    `order_user_count` BIGINT COMMENT '订单人数' ,
    `order_amount`     DECIMAL(16, 2) COMMENT '下单金额',
    PRIMARY KEY (`dt`, `recent_days`, `subject_id`)
) COMMENT '各学科课程交易统计';


# 12)各课程交易统计
DROP TABLE IF EXISTS ads_course_trade_stats_by_course;
CREATE TABLE ads_course_trade_stats_by_course
(
    `dt`               DATETIME COMMENT '统计日期',
    `recent_days`      BIGINT COMMENT '最近天数,1:最近 1 天,7:最近 7天,30:最近 30 天',
    `course_id`        VARCHAR(255) COMMENT '课程id',
    `course_name`      VARCHAR(255) COMMENT '课程名称',
    `order_count`      BIGINT COMMENT '下单数',
    `order_user_count` BIGINT COMMENT '下单人数',
    `order_amount`     DECIMAL(16, 2) COMMENT '下单金额',
    PRIMARY KEY (`dt`, `recent_days`, `course_id`)
) COMMENT '各课程交易统计';


# 13)各课程评价统计
DROP TABLE IF EXISTS ads_course_review_stats_by_course;
CREATE TABLE ads_course_review_stats_by_course
(
    `dt`                DATETIME COMMENT '统计日期',
    `recent_days`       BIGINT COMMENT '最近天数,1:最近 1 天,7:最近 7 天,30:最近 30 天',
    `course_id`         VARCHAR(255) COMMENT '课程id',
    `course_name`       VARCHAR(255) COMMENT '课程名称',
    `avg_stars`         BIGINT COMMENT '用户平均评分',
    `review_user_count` BIGINT COMMENT '评价用户数',
    `praise_rate`       DECIMAL(16, 2) COMMENT '好评率',
    PRIMARY KEY (`dt`, `recent_days`, `course_id`)
) COMMENT '各课程评价统计';


# 14)各分类课程试听留存统计
DROP TABLE IF EXISTS ads_sample_retention_stats_by_category;
CREATE TABLE ads_sample_retention_stats_by_category
(
    `dt`                DATETIME COMMENT '统计日期',
    `retention_days`    BIGINT COMMENT '留存天数,1-7 天',
    `category_id`       VARCHAR(255) COMMENT '分类id',
    `category_name`     VARCHAR(255) COMMENT '分类名称',
    `sample_user_count` BIGINT COMMENT '试听人数',
    `retention_rate`    DECIMAL(16, 2) COMMENT '试听留存率',
    PRIMARY KEY (`dt`, `retention_days`, `category_id`)
) COMMENT '各分类课程试听留存统计';


# 15)各学科试听留存统计
DROP TABLE IF EXISTS ads_sample_retention_stats_by_subject;
CREATE TABLE ads_sample_retention_stats_by_subject
(
    `dt`                DATETIME COMMENT '统计日期',
    `retention_days`    BIGINT COMMENT '留存天数,1-7 天',
    `subject_id`        VARCHAR(255) COMMENT '学科id',
    `subject_name`      VARCHAR(255) COMMENT '学科名称',
    `sample_user_count` BIGINT COMMENT '试听人数',
    `retention_rate`    DECIMAL(16, 2) COMMENT '试听留存率',
    PRIMARY KEY (`dt`, `retention_days`, `subject_id`)
) COMMENT '各学科试听留存统计';


# 16)各课程试听留存统计
DROP TABLE IF EXISTS ads_sample_retention_stats_by_course;
CREATE TABLE ads_sample_retention_stats_by_course
(
    `dt`                DATETIME COMMENT '统计日期',
    `retention_days`    BIGINT COMMENT '留存天数,1-7 天',
    `course_id`         VARCHAR(255) COMMENT '课程id',
    `course_name`       VARCHAR(255) COMMENT '课程名称',
    `sample_user_count` BIGINT COMMENT '试听人数',
    `retention_rate`    DECIMAL(16, 2) COMMENT '试听留存率',
    PRIMARY KEY (`dt`, `retention_days`, `course_id`)
) COMMENT '各课程试听留存统计';


# 17)交易综合指标
DROP TABLE IF EXISTS ads_trade_stats;
CREATE TABLE ads_trade_stats
(
    `dt`                 DATETIME COMMENT '统计日期',
    `recent_days`        BIGINT COMMENT '最近天数,1:最近1日,7:最近7天,30:最近30天',
    `order_total_amount` DECIMAL(16, 2) COMMENT '订单总额,GMV',
    `order_count`        BIGINT COMMENT '订单数',
    `order_user_count`   BIGINT COMMENT '下单人数',
    PRIMARY KEY (`dt`, `recent_days`)
) COMMENT '交易综合指标';


# 18)各省份交易统计
DROP TABLE IF EXISTS ads_trade_order_by_province;
CREATE TABLE ads_trade_order_by_province
(
    `dt`                 DATETIME COMMENT '统计日期',
    `recent_days`        BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `province_id`        VARCHAR(10) COMMENT '省份id',
    `province_name`      VARCHAR(30) COMMENT '省份名称',
    `region_id`          VARCHAR(30) COMMENT '大区id',
    `area_code`          VARCHAR(255) COMMENT '地区编码',
    `iso_code`           VARCHAR(255) COMMENT '国际标准地区编码',
    `iso_code_3166_2`    VARCHAR(255) COMMENT '国际标准地区编码',
    `order_count`        BIGINT COMMENT '订单数' ,
    `order_user_count`   BIGINT COMMENT '下单人数',
    `order_total_amount` DECIMAL(16, 2) COMMENT '订单金额',
    PRIMARY KEY (`dt`, `recent_days`, `province_id`, `region_id`, `area_code`, `iso_code`, `iso_code_3166_2`)
) COMMENT '各省份交易统计';


# 19)各试卷平均统计
DROP TABLE IF EXISTS ads_examination_paper_avg_stats;
CREATE TABLE ads_examination_paper_avg_stats
(
    `dt`             DATETIME COMMENT '统计日期',
    `recent_days`    BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `paper_id`       VARCHAR(255) COMMENT '试卷 id',
    `paper_title`    VARCHAR(255) COMMENT '试卷名称',
    `avg_score`      DECIMAL(16, 2) COMMENT '试卷平均分',
    `avg_during_sec` BIGINT COMMENT '试卷平均时长',
    `user_count`     BIGINT COMMENT '试卷用户数',
    PRIMARY KEY (`dt`, `recent_days`, `paper_id`)
) COMMENT '各试卷平均统计';


# 20)最近 1/7/30 日各试卷成绩分布
DROP TABLE IF EXISTS ads_examination_course_avg_stats;
CREATE TABLE ads_examination_course_avg_stats
(
    `dt`             DATETIME COMMENT '统计日期',
    `recent_days`    BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `course_id`      VARCHAR(255) COMMENT '课程id',
    `course_name`    VARCHAR(255) COMMENT '课程名称',
    `avg_score`      DECIMAL(16, 2) COMMENT '平均分',
    `avg_during_sec` BIGINT COMMENT '平均时长',
    `user_count`     BIGINT COMMENT '用户数',
    PRIMARY KEY (`dt`, `recent_days`, `course_id`)
) COMMENT '各课程考试相关指标';


# 21)最近 1/7/30 日各试卷分数分布统计
DROP TABLE IF EXISTS ads_examination_user_count_by_score_duration;
CREATE TABLE ads_examination_user_count_by_score_duration
(
    `dt`             DATETIME COMMENT '统计日期',
    `recent_days`    BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `paper_id`       VARCHAR(255) COMMENT '试卷 id',
    `score_duration` VARCHAR(255) COMMENT '分数区间',
    `user_count`     BIGINT COMMENT '各试卷各分数区间用户数',
    PRIMARY KEY (`dt`, `recent_days`, `paper_id`, `score_duration`)
) COMMENT '各试卷分数分布统计';


# 22)最近 1/7/30 日各题目正确率
DROP TABLE IF EXISTS ads_examination_question_accuracy;
CREATE TABLE ads_examination_question_accuracy
(
    `dt`          DATETIME COMMENT '统计日期',
    `recent_days` BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `question_id` VARCHAR(255) COMMENT '题目 id',
    `accuracy`    DECIMAL(16, 2) COMMENT '题目正确率',
    PRIMARY KEY (`dt`, `recent_days`, `question_id`)
) COMMENT '各题目正确率';


# 23)单章视频播放情况统计
DROP TABLE IF EXISTS ads_learn_play_stats_by_chapter;
CREATE TABLE ads_learn_play_stats_by_chapter
(
    `dt`           DATETIME COMMENT '统计日期',
    `recent_days`  BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `chapter_id`   VARCHAR(30) COMMENT '章节 id',
    `chapter_name` VARCHAR(200) COMMENT '章节名称',
    `video_id`     VARCHAR(255) COMMENT '视频 id',
    `video_name`   VARCHAR(255) COMMENT '视频名称',
    `play_count`   BIGINT COMMENT '各章节视频播放次数',
    `avg_play_sec` BIGINT COMMENT '各章节视频人均观看时长',
    `user_count`   BIGINT COMMENT '各章节观看人数',
    PRIMARY KEY (`dt`, `recent_days`, `chapter_id`, `video_id`)
) COMMENT '单章视频播放情况统计';


# 24)各课程播放情况统计
DROP TABLE IF EXISTS ads_learn_play_stats_by_course;
CREATE TABLE ads_learn_play_stats_by_course
(
    `dt`           DATETIME COMMENT '统计日期',
    `recent_days`  BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `course_id`    VARCHAR(255) COMMENT '课程id',
    `course_name`  VARCHAR(255) COMMENT '课程名称',
    `play_count`   BIGINT COMMENT '各课程视频播放次数',
    `avg_play_sec` BIGINT COMMENT '各课程视频人均观看时长',
    `user_count`   BIGINT COMMENT '各课程观看人数',
    PRIMARY KEY (`dt`, `recent_days`, `course_id`)
) COMMENT '各课程播放情况统计';


# 25)各课程完课人数统计
DROP TABLE IF EXISTS ads_complete_complete_user_count_per_course;
CREATE TABLE ads_complete_complete_user_count_per_course
(
    `dt`          DATETIME COMMENT '统计日期',
    `recent_days` BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `course_id`   VARCHAR(255) COMMENT '课程 id',
    `user_count`  BIGINT COMMENT '各课程完课人数',
    PRIMARY KEY (`dt`, `recent_days`, `course_id`)
) COMMENT '各课程完课人数统计';


# 26)完课综合指标
DROP TABLE IF EXISTS ads_complete_complete_stats;
CREATE TABLE ads_complete_complete_stats
(
    `dt`                         DATETIME COMMENT '统计日期',
    `recent_days`                BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `user_complete_count`        BIGINT COMMENT '完课人数',
    `user_course_complete_count` BIGINT COMMENT '完课人次',
    PRIMARY KEY (`dt`, `recent_days`)
) COMMENT '完课综合指标';


# 27)各课程人均完成章节视频数
DROP TABLE IF EXISTS ads_complete_complete_chapter_count_per_course;
CREATE TABLE ads_complete_complete_chapter_count_per_course
(
    `dt`                     DATETIME COMMENT '统计日期',
    `recent_days`            BIGINT COMMENT '最近天数,1:最近1天,7:最近7天,30:最近30天',
    `course_id`              VARCHAR(255) COMMENT '课程 id',
    `complete_chapter_count` BIGINT COMMENT '各课程用户平均完成章节数',
    PRIMARY KEY (`dt`, `recent_days`, `course_id`)
) COMMENT '各课程人均完成章节视频数';

02、修改datax的jar包

DataX

  1. GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
  2. https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
  3. https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
[atguigu@node001 ~]$ cd /opt/module/datax/
[atguigu@node001 datax]$ python bin/datax.py -p"-Dexportdir=/warehouse/edu/ads/ads_traffic_stats_by_source/" job/ads_traffic_stats_by_source.json

2023-09-05 10:59:01.854 [job-0] ERROR RetryUtil - Exception when calling callable, 即将尝试执行第1次重试.本次重试计划等待[1000]ms,实际等待[1001]ms, 异常Msg:[Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).].  -  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.]
2023-09-05 10:59:03.860 [job-0] ERROR RetryUtil - Exception when calling callable, 即将尝试执行第2次重试.本次重试计划等待[2000]ms,实际等待[2000]ms, 异常Msg:[Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).].  -  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.]
2023-09-05 10:59:07.865 [job-0] ERROR RetryUtil - Exception when calling callable, 即将尝试执行第3次重试.本次重试计划等待[4000]ms,实际等待[4000]ms, 异常Msg:[Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).].  -  具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.]

解决办法:已检查N遍,账号密码没有问题,将/opt/module/datax/plugin/writer/mysqlwriter/libs与/opt/module/datax/plugin/reader/mysqlreader/libs等两个lib目录下的mysql-connector-java-5.1.34.jar包替换为mysql-connector-java-8.0.29.jar。

03、ads_traffic_stats_by_source.json文件

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-01], Description:[获取表字段相关信息失败.].  - 获取表:ads_traffic_stats_by_source 的字段的元信息时失败. 请联系 DBA 核查该库、表信息. - java.sql.SQLSyntaxErrorException: Unknown column 'channel' in 'field list'
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
        at com.alibaba.datax.plugin.rdbms.util.DBUtil.getColumnMetaData(DBUtil.java:563)
        at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:125)
        at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:140)
        at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.doPretreatment(OriginalConfPretreatmentUtil.java:35)
        at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Job.init(CommonRdbmsWriter.java:41)
        at com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter$Job.init(MysqlWriter.java:31)
        at com.alibaba.datax.core.job.JobContainer.initJobWriter(JobContainer.java:704)
        at com.alibaba.datax.core.job.JobContainer.init(JobContainer.java:304)
        at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:113)
        at com.alibaba.datax.core.Engine.start(Engine.java:92)
        at com.alibaba.datax.core.Engine.entry(Engine.java:171)
        at com.alibaba.datax.core.Engine.main(Engine.java:204)

        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:33)
        at com.alibaba.datax.plugin.rdbms.util.DBUtil.getColumnMetaData(DBUtil.java:575)
        at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:125)
        at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:140)
        at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.doPretreatment(OriginalConfPretreatmentUtil.java:35)
        at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Job.init(CommonRdbmsWriter.java:41)
        at com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter$Job.init(MysqlWriter.java:31)
        at com.alibaba.datax.core.job.JobContainer.initJobWriter(JobContainer.java:704)
        at com.alibaba.datax.core.job.JobContainer.init(JobContainer.java:304)
        at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:113)
        at com.alibaba.datax.core.Engine.start(Engine.java:92)
        at com.alibaba.datax.core.Engine.entry(Engine.java:171)
        at com.alibaba.datax.core.Engine.main(Engine.java:204)

/opt/module/datax/job/ads_traffic_stats_by_source.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [
                            "*"
                        ],
                        "defaultFS": "hdfs://node001:8020",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "fileType": "text",
                        "nullFormat": "\\N",
                        "path": "${exportdir}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "dt",
                            "recent_days",
                            "source_id",
                            "source_site",
                            "uv_count",
                            "avg_duration_sec",
                            "avg_page_count",
                            "sv_count",
                            "bounce_rate"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://node001:3306/edu_report?useUnicode=true&characterEncoding=utf-8",
                                "table": [
                                    "ads_traffic_stats_by_source"
                                ]
                            }
                        ],
                        "username": "root",
                        "password": "123456",
                        "writeMode": "replace"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0.02,
                "record": 0
            },
            "speed": {
                "channel": 3
            }
        }
    }
}

P113

12.2.2 DataX配置文件生成脚本

P114

第13章 数据仓库工作流调度

Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。

P115

第2章 DolphinScheduler部署说明

第3章 DolphinScheduler集群模式部署

3.6 一键部署DolphinScheduler

[atguigu@node001 apache-dolphinscheduler-2.0.3-bin]$ jpsall
================ node001 ================
5360 QuorumPeerMain
2832 NameNode
9296 WorkerServer
3411 JobHistoryServer
5988 RunJar
9668 ApiApplicationServer
6100 RunJar
9414 LoggerServer
3000 DataNode
9545 AlertServer
10540 Jps
7020 NodeManager
================ node002 ================
5296 NodeManager
5984 WorkerServer
6032 LoggerServer
6231 Jps
4745 QuorumPeerMain
5178 ResourceManager
4986 DataNode
================ node003 ================
3985 NodeManager
4658 LoggerServer
4884 Jps
1861 DataNode
3594 QuorumPeerMain
1967 SecondaryNameNode
[atguigu@node001 apache-dolphinscheduler-2.0.3-bin]$ 

P116

3.7 DolphinScheduler启停命令

[atguigu@node001 apache-dolphinscheduler-2.0.3-bin]$ cd /opt/module/dolphinScheduler/ds-2.0.3/
[atguigu@node001 ds-2.0.3]$ ll
总用量 60
drwxrwxr-x 2 atguigu atguigu  4096 9月   6 11:21 bin
drwxrwxr-x 5 atguigu atguigu  4096 9月   6 11:21 conf
-rwxrwxr-x 1 atguigu atguigu  5190 9月   6 11:22 install.sh
drwxrwxr-x 2 atguigu atguigu 20480 9月   6 11:22 lib
drwxrwxr-x 2 atguigu atguigu  4096 9月   6 11:23 logs
drwxrwxr-x 2 atguigu atguigu  4096 9月   6 11:22 pid
drwxrwxr-x 2 atguigu atguigu  4096 9月   6 11:22 script
drwxrwxr-x 3 atguigu atguigu  4096 9月   6 11:22 sql
drwxrwxr-x 8 atguigu atguigu  4096 9月   6 11:22 ui
[atguigu@node001 ds-2.0.3]$ cd bin/
[atguigu@node001 bin]$ ll
总用量 20
-rwxrwxr-x 1 atguigu atguigu 6770 9月   6 11:21 dolphinscheduler-daemon.sh
-rwxrwxr-x 1 atguigu atguigu 2427 9月   6 11:21 start-all.sh
-rwxrwxr-x 1 atguigu atguigu 3332 9月   6 11:21 status-all.sh
-rwxrwxr-x 1 atguigu atguigu 2428 9月   6 11:21 stop-all.sh
[atguigu@node001 bin]$ 

node003没有运行WorkerServer,资源不够,将资源改为8g就能运行了,但没必要。

P117

启动hadoop、zookeeper、hive、hive-service2、ds。

  1. [atguigu@node001 ~]$ myhadoop.sh start
  2. [atguigu@node001 ~]$ zookeeper.sh start
  3. [atguigu@node001 ~]$ nohup /opt/module/hive/hive-3.1.2/bin/hive &
  4. [atguigu@node001 ~]$ nohup /opt/module/hive/hive-3.1.2/bin/hive --service hiveserver2 &
  5. [atguigu@node001 ~]$ /opt/module/dolphinScheduler/ds-2.0.3/bin/start-all.sh
[atguigu@node001 ~]$ myhadoop.sh start
 ================ 启动 hadoop集群 ================
 ---------------- 启动 hdfs ----------------
Starting namenodes on [node001]
Starting datanodes
Starting secondary namenodes [node003]
 --------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers
 --------------- 启动 historyserver ---------------
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$ nohup /opt/module/hive/hive-3.1.2/bin/hive &
[1] 3741
[atguigu@node001 ~]$ nohup: 忽略输入并把输出追加到"nohup.out"

[atguigu@node001 ~]$ nohup /opt/module/hive/hive-3.1.2/bin/hive --service hiveserver2 &
[2] 3912
[atguigu@node001 ~]$ nohup: 忽略输入并把输出追加到"nohup.out"

[atguigu@node001 ~]$ /opt/module/dolphinScheduler/ds-2.0.3/bin/start-all.sh
node001:default
...

DolphinScheduler工作流运行之后在工作流实例中查不到是什么原因?将node001的运行内存从4G调为8G即可。

P118

第5章 DolphinScheduler进阶

5.1 工作流传参

DolphinScheduler支持对任务节点进行灵活的传参,任务节点可通过${参数名}引用参数值。

由此可得,优先级由高到低:本地参数 > 全局参数 > 上游任务传递的参数。

5.1.5 参数优先级

3)结论

(1)本地参数 > 全局参数 > 上游任务传递的参数;

(2)多个上游节点均传递同名参数时,下游节点会优先使用值为非空的参数;

(3)如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数。

P119

5.2 引用依赖资源

P120

13.2 数据准备

启动hadoop、zookeeper、kafka、maxwell、f1、f2、f3。

P121

13.3 工作流调度实操

[2023-09-06 17:15:26,824] ERROR [Broker id=0] Received LeaderAndIsrRequest with correlation id 1 from controller 1 epoch 33 for partition __consumer_offsets-44 (last update controller epoch 33) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2023-09-06 17:15:26,824] ERROR [Broker id=0] Received LeaderAndIsrRequest with correlation id 1 from controller 1 epoch 33 for partition __consumer_offsets-32 (last update controller epoch 33) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2023-09-06 17:15:26,824] ERROR [Broker id=0] Received LeaderAndIsrRequest with correlation id 1 from controller 1 epoch 33 for partition __consumer_offsets-41 (last update controller epoch 33) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)

[2023-09-06 19:32:27,802] ERROR [Controller id=0 epoch=34] Controller 0 epoch 34 failed to change state for partition __transaction_state-27 from OfflinePartition to OnlinePartition (state.change.logger)
kafka.common.StateChangeFailedException: Failed to elect leader for partition __transaction_state-27 under strategy OfflinePartitionLeaderElectionStrategy(false)
    at kafka.controller.ZkPartitionStateMachine.$anonfun$doElectLeaderForPartitions$7(PartitionStateMachine.scala:424)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at kafka.controller.ZkPartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:421)
    at kafka.controller.ZkPartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:332)
    at kafka.controller.ZkPartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:238)
    at kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:158)
    at kafka.controller.PartitionStateMachine.triggerOnlineStateChangeForPartitions(PartitionStateMachine.scala:74)
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:59)
    at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:536)
    at kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1594)
    at kafka.controller.KafkaController.process(KafkaController.scala:2484)
    at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)
    at kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)
    at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:133)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
    at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:133)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2023-09-06 19:32:27,805] INFO [Controller id=0 epoch=34] Changed partition __consumer_offsets-22 from OfflinePartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=37, isr=List(1), zkVersion=37) (state.change.logger)

maxwell 报错: java.lang.RuntimeException: error: unhandled character set ‘utf8mb3‘

  1. maxwell 报错: java.lang.RuntimeException: error: unhandled character set ‘utf8mb3‘_你的482的博客-CSDN博客
  2. Maxwell安装使用 - 掘金

这个问题是因为MySQL从 5.5.3 开始,用 utf8mb4 编码来实现完整的 UTF-8,其中 mb4 表示 most bytes 4,最多占用4个字节。而原来的utf8则被utf8mb3则代替。 一种解决方案是,将MySQL降级,重新安装5.5.3以下的版本。 另一种方法则是修改maxwell源码。 解压打开,找到有问题的类:com.zendesk.maxwell.schema.columndef.StringColumnDef,加上能识别utf8mb3的语句,重新打包。 打包好的maxwell-1.19.0.jar替换maxwell/lib/maxwell-1.19.0.jar,重启即可。

启动hadoop、zookeeper、kafka、maxwell、f1.sh、f2.sh、f3.sh。

关闭采集的相关组件:kafka、flume(f1、f2、f3)、maxwell;启动hadoop、hive、zookeeper、dolphinscheduler ...

忘记启动zookeeper了...

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
[ERROR] 2023-09-07 14:46:32.033 org.springframework.boot.SpringApplication:[843] - Application run failed
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'monitorServiceImpl': Unsatisfied dependency expressed through field 'registryClient'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'registryClient': Invocation of init method failed; nested exception is org.apache.dolphinscheduler.registry.api.RegistryException: zookeeper connect timeout

...

datax将数据同步至hdfs里面,mysql_to_hdfs_full.sh;

数据导入到ods层中,hdfs_to_ods_db.sh、

ods_to_dwd.sh。

export HADOOP_HOME=/opt/module/hadoop/hadoop-3.1.3
export HADOOP_CONF_DIR=/opt/module/hadoop/hadoop-3.1.3/etc/hadoop
export SPARK_HOME=/opt/module/spark/spark-3.0.0-bin-hadoop3.2
export JAVA_HOME=/opt/module/jdk/jdk1.8.0_212
export HIVE_HOME=/opt/module/hive/hive-3.1.2
export DATAX_HOME=/opt/module/datax

export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$DATAX_HOME/bin:$PATH

串联

P122【122_在线教育数仓开发回顾 04:23】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/984719.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Hadoop:HDFS--分布式文件存储系统

目录 HDFS的基础架构 VMware虚拟机部署HDFS集群 HDFS集群启停命令 HDFS Shell操作 hadoop 命令体系: 创建文件夹 -mkdir 查看目录内容 -ls 上传文件到hdfs -put 查看HDFS文件内容 -cat 下载HDFS文件 -get 复制HDFS文件 -cp 追加数据到HDFS文件中 -appendTo…

第 3 章 栈和队列(汉诺塔问题递归解法)

1. 背景说明 假设有 3 个分别命名为 X、Y 和 Z 的塔座,在塔座 X 上插有 n 个直径大小各不相同、依小到大编号为 1, 2,…,n 的圆盘。 现要求将 X 轴上的 n 个圆盘移至塔座 Z 上并仍按同样顺序叠排,圆盘移动时必须遵循下列规则&…

伦敦金的走势高低的规律

伦敦金市场是一个流动性很强的市场,其价格走势会在诸多因素的影响下,出现反复的上下波动,如果投资者能够在这些高低走势中找到一定的规律,在相对有利的时机入场和离场,就能够通过不断的交易,累积大量的财富…

浏览器渲染原理及流程

浏览器主要组成与浏览器线程 浏览器组件 浏览器大体上由以下几个组件组成,各个浏览器可能有一点不同。 界面控件 – 包括地址栏,前进后退,书签菜单等窗口上除了网页显示区域以外的部分浏览器引擎 – 查询与操作渲染引擎的接口渲染引擎 – …

记录vite下使用require报错和解决办法

前情提要 我们现在项目用的是vite4react18开发的项目、但是最近公司有个睿智的人让我把webpack中的bpmn组件迁移过来、结果就出现问题啦:因为webpack是commonjs规范、但是vite不是、好像是es吧、可想而知各种报错 废话不多说啦 直接上代码: 注释是之前c…

生成式AI爆发,安全问题如何解决?

在生成式AI浪潮下,如何为行业用户提供符合实际应用场景需求的生成式AI服务,是行业数字化转型的下一个重点。《亚马逊云科技AIGC加速企业创新指南》白皮书指出,AIGC在游戏、零售电商、金融、媒体娱乐、医疗健康等行业都有典型应用场景。作为 A…

冠达管理:股票退市整理期?

近些年来,随着我国股市的发展,股票市场的出资者逐渐增多。但在出资过程中,退市股票的问题也成为了备受重视的论题。那么,股票退市收拾期到底是什么?如何应对退市股票? 首要,什么是股票退市收拾…

设备管理系统有什么功能?它有什么用?

设备管理系统已成为现代化大规模研究所,信息化管理体系建设中最为关键的要素。随着工业设备的机械化、自动化、大型化、高速化以及复杂化等因素不断叠加,设备设施对于工业生产的作用和影响越来越大,其各项制度和流程也涉及面广、内容繁杂。  …

中企绕道突破封锁,防不胜防 | 百能云芯

韩国的财经媒体Business Korea最新报道指出,尽管美方在《通胀削减法案》(IRA)的补贴中排除了中国,但中国企业正通过多种方式积极应对美国在半导体和电动汽车电池领域的封锁,这包括建立合资企业、设立生产基地以及开展技…

STDF-Viewer 解析工具说明

一、简介 1. 概述 STDF(Standard Test Data Format)(标准测试数据格式)是半导体测试行业的最主要的数据格式,包含了summary信息和所有测试项的测试结果;是半导体行业芯片测试数据的存储规范。 在半导体行业…

城市排水监测方案(dtu终端配合工业路由器精准监测)

台风季节,暴雨容易导致城市内涝积水。为有效监测排水状况,预警和防控积水灾害,星创易联推出智慧排水监测解决方案。 解决方案采用星创易联DTU300作为水位数据采集终端,它可挂载在河道及排水井等地点,实时监测水位变化,一旦超过预警阈值,立即通过4G网络传输报警信息,实现对水位…

使用Kmeans进行图像聚类

Kmeans可以用于与发现聚类相关的其他任务 介绍 聚类是一种无监督机器学习技术。这意味着您的数据集没有标签,即与解释变量发现的模式关联的目标变量。 无监督学习是找到看似相似的模式并将它们放入同一个桶中的过程。 最常用的无监督学习算法之一是Kmeans&#xff…

冠达管理:紧盯必要性 追问合理性 再融资问询透露监管新动向

在“活泼资本市场,提振出资者决心”一系列办法落地之后,再融资市场整体已明确收紧,但审阅尺度、相关细则还有待进一步观察。有接受采访的投行人士指出,现在存量项目仍在持续推进,监管审阅要点已在问询环节有较为充沛的…

2.7 PE结构:重定位表详细解析

重定位表(Relocation Table)是Windows PE可执行文件中的一部分,主要记录了与地址相关的信息,它在程序加载和运行时被用来修改程序代码中的地址的值,因为程序在不同的内存地址中加载时,程序中使用到的地址也…

单月打造8个10w+,情感类视频号如何爆火?

上月,腾讯公布了2023年Q2财报,其中,较为亮眼的是微信视频号的广告收入。据财报显示,二季度视频号用户使用时长与去年同期相比几乎翻倍,广告收入超过30亿元。作为微信生态的核心组件,视频号的内容生态呈现出…

C/C++浮点数向零舍入 2019年9月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C浮点数向零舍入 一、题目要求 1、编程实现 2、输入输出 二、解题思路 1、案例分析 三、程序代码 四、程序说明 五、运行结果 六、考点分析 C/C浮点数向零舍入 2019年9月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 输入一个单精度浮点数&#…

09-JVM垃圾收集底层算法实现

上一篇:08-JVM垃圾收集器详解 1.三色标记 在并发标记的过程中,因为标记期间应用线程还在继续跑,对象间的引用可能发生变化,多标和漏标的情况就有可能发生。 这里我们引入“三色标记”来给大家解释下,把Gcroots可达性…

uni-app动态tabBar,根据不同用户展示不同的tabBar

1.uni框架的api实现 因为我们用的是uni-app框架开发,所以在创建项目的时候直接创建uni-ui的项目即可,这个项目模板中自带了uni的一些好用的组件和api。 起初我想着这个效果不难实现,因为官方也有api可以直接使用,所以我最开始尝试…

DeeTune:基于 eBPF 的百度网络框架设计与应用

作者 | 百度APP云原生技术研发组 导读 随着云计算的技术的不断迭代演进,百度内部服务逐渐搬迁到云环境中,部署成本和效率取得明显收益,但一些可观测能力的短板和缺失逐渐显露,传统的方式往往通过植入代码进行修改来实现&#xff0…

esp-hosted 方案介绍

esp-hosted SDK esp-hosted 方案介绍 esp-hosted 方案主要为 Linux 或者 MCU 提供无线连接功能(WiFi 或者 BT/BLE) esp-hosted 解决方案包含了 esp-hosted FG 和 esp-hosted NG 两套方案 与传统 WiFi 网卡的区别在于 ESP 设备端需要烧录固件&#xff0…