4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)

news2025/1/11 23:36:43

1、离线数仓同步数据

1.1 用户行为数据同步

1.1.1 数据通道

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。
在这里插入图片描述

1.1.2 日志消费Flume配置概述

按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
此次选择KafkaSource、FileChannel、HDFSSink
在这里插入图片描述

2.1.3 日志消费Flume配置实操

1、创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf

vim job/kafka_to_hdfs_log.conf 

2、配置内容如下

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.zhm.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注:配置优化
1、FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2、HDFS Sink优化
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3、编写Flume拦截器
(1)数据漂移问题

(2)在com.zhm.gmall.flume.interceptor包下创建TimestampInterceptor类

package org.zhm.gmall.flume.interceptor;

/**
 * @ClassName TimestampInterceptor
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/19 18:33
 * @Version 1.0
 */
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {


    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //1、获取header和body的数据
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        //2、将body的数据类型转成jsonObject类型(方便获取数据)
        JSONObject jsonObject = JSONObject.parseObject(log);

        //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}


(3)重新打包
(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面

2.1.4 日志消费Flume测试

1、启动Zookeeper、Kafka、hadoop集群
2、启动日志采集Flume

f1.sh start

3、启动hadoop104的日志消费Flume

bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console

4、生成模拟数据

 lg.sh 

5、观察HDFS是否出现数据

2.1.5 日志消费Flume启停脚本

若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1、在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh

 vim f2.sh

2、在脚本中填写如下内容

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

3、增加脚本权限

chmod 777 f2.sh

2.2 业务数据同步

2.2.1 数据同步策略概述

业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。
为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。
数据的同步策略有全量同步和增量同步。
全量同步就是每天都将业务数据库中的全部数据同步到一份到数据仓库,这是保证两侧数据同步的最简单的方式。
在这里插入图片描述

增量同步就是每天只将业务数据中的新增变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。。
在这里插入图片描述

2.2.2 数据同步策略选择
同步策略优点缺点
全量同步逻辑简单在某些情况下效率比较低。例如某张表的数据量比较大,但是每天数据变化比例很低,若对其采用每日全量同步,则会重复同步和储存大量相同的数据
增量同步效率高,无需同步和储存重复数据逻辑复杂,需要将每日新增及变化数据同原来的数据进行整合,才能使用

根据上述对比,可以得出以下结论:通常情况,业务表数据量比较大,优先考虑增量,数据量比较小,优先考虑全量

下图为各表同步策略:
在这里插入图片描述

2.2.3 数据同步工具概述

数据同步工具种类繁多,大致可分为两类,一类是以DataX、Sqoop为代表的基于Select查询的离线、批量同步工具,另一类是以Maxwell、Canal为代表的基于数据库数据变更日志(例如MySQL的binlog,其会实时记录所有的insert、update以及delete操作)的实时流式同步工具。
全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal等工具,下面对增量同步不同方案进行简要对比。

增量同步方案DataX/SqoopMaxwell/Canal
对数据的要求原理是基于查询,故若想通过select查询获取新增及变化数据,就要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据要求数据库记录变更操作,例如MySQL需要开启binlog
数据的中间状态由于是离线批量同步,故若一条数据在一天中变化多次,该方案只能获取最后一个状态,中间状态无法获取由于是实时获取所有的数据变更操作所以可以获取变更数据的所有中间状态
2.2.5 全量表数据同步

1、数据同步工具DataX部署
DataX学习链接

2、数据通道
全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向如下图所示。
在这里插入图片描述
3、DataX配置文件
我们需要为每张全量表编写一个DataX的json配置文件,此处以activity_info为例,配置文件内容如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "activity_name",
                            "activity_type",
                            "activity_desc",
                            "start_time",
                            "end_time",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "table": [
                                    "activity_info"
                                ]
                            }
                        ],
                        "password": "000000",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "activity_name",
                                "type": "string"
                            },
                            {
                                "name": "activity_type",
                                "type": "string"
                            },
                            {
                                "name": "activity_desc",
                                "type": "string"
                            },
                            {
                                "name": "start_time",
                                "type": "string"
                            },
                            {
                                "name": "end_time",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "activity_info",
                        "fileType": "text",
                        "path": "${targetdir}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

注:由于目标路径包含一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交任务时通过参数动态传入,参数名称为targetdir。

4、 DataX配置文件生成脚本
(1)在~/bin目录下创建gen_import_config.py脚本

# ecoding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))


def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)


def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

注:(1)安装Python Mysql驱动

sudo yum install -y MySQL-python

(2)脚本使用说明

python gen_import_config.py -d database -t table

##通过-d传入数据库名,-t传入表名,执行上述命令即可生成该表的DataX同步配置文件。

(2)在~/bin目录下创建gen_import_config.sh脚本

vim ~/bin/gen_import_config.sh

脚本内容如下

#!/bin/bash

python ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info

(3)为gen_import_config.sh脚本增加执行权限

chmod 777 ~/bin/gen_import_config.sh 

(4)执行gen_import_config.sh脚本,生成配置文件

gen_import_config.sh 

(5)观察生成的配置文件

5、 测试生成的DataX配置文件
以activity_info为例,测试用脚本生成的配置文件是否可用。
1、创建目标路径
由于DataX同步任务要求目标路径提前存在,故需手动创建路径,当前activity_info表的目标路径应为/origin_data/gmall/db/activity_info_full/2020-06-14

hadoop fs -mkdir /origin_data/gmall/db/activity_info_full/2020-06-14

2、执行DataX同步命令

python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14" /opt/module/datax/job/import/gmall.activity_info.json

3、观察同步结果

6、全量表数据同步脚本
为方便使用以及后续的任务调度,此处编写一个全量表数据同步脚本。

(1)在~/bin目录创建mysql_to_hdfs_full.sh
脚本内容如下

#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}

#数据同步
import_data() {
  datax_config=$1
  target_dir=$2

  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}

case $1 in
"activity_info")
  import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
  ;;
"activity_rule")
  import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
  ;;
"base_category1")
  import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
  ;;
"base_category2")
  import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
  ;;
"base_category3")
  import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
  ;;
"base_dic")
  import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
  ;;
"base_province")
  import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
  ;;
"base_region")
  import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
  ;;
"base_trademark")
  import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
  ;;
"cart_info")
  import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
  ;;
"coupon_info")
  import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
  ;;
"sku_attr_value")
  import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
  ;;
"sku_info")
  import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
  ;;
"sku_sale_attr_value")
  import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
  ;;
"spu_info")
  import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
  ;;
"all")
  import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
  import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
  import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
  ;;
esac

(2)为mysql_to_hdfs_full.sh增加执行权限

chmod 777 ~/bin/mysql_to_hdfs_full.sh 

(3)测试同步脚本

mysql_to_hdfs_full.sh all 2020-06-14

(4)检查同步结果
查看HDFS目表路径是否出现全量表数据,全量表共15张。

2.2.6 增量表数据同步

1、数据通道
在这里插入图片描述
2、 Flume配置
(1)Flume配置概述
Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。
需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:
在这里插入图片描述
具体数据示例如下:
在这里插入图片描述

(2)Flume配置实操
(a)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf
配置内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zhm.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(b)编写拦截器
新建一个Maven项目,并在pom.xml文件中加入如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在com.zhm.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类

package com.zhm.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);

 		JSONObject jsonObject = JSONObject.parseObject(log);

 		Long ts = jsonObject.getLong("ts");
 		//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
 		String timeMills = String.valueOf(ts * 1000);

 		String tableName = jsonObject.getString("table");

 		headers.put("timestamp", timeMills);
 		headers.put("tableName", tableName);
		return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor ();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

重新打包。
将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下
(3)通道测试
(a)启动Zookeeper、Kafka集群
(b)启动hadoop104的Flume
(c)生成模拟数据
(d)观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。
(e)数据目标路径的日期说明
仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。
(4)编写Flume启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh
填写以下内容

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

增加脚本执行权限

chmod 777 f3.sh

3、MaxWell配置
1、Maxwell时间戳问题
在这里插入图片描述
修改Maxwell配置文件config.properties,增加mock_date参数,如下

log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092

#kafka topic配置
kafka_topic=topic_db

#注:该参数仅在maxwell教学版中存在,修改该参数后重启Maxwell才可生效
mock_date=2020-06-14

# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

重启Maxwell
重新生成模拟数据

4、增量表首日全量同步
通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
(1)在~/bin目录创建mysql_to_kafka_inc_init.sh
脚本内容如下

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

(2)为mysql_to_kafka_inc_init.sh增加执行权限

chmod 777 ~/bin/mysql_to_kafka_inc_init.sh

(3)测试同步脚本
(a)清理历史数据

hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

(b)执行同步脚本

mysql_to_kafka_inc_init.sh all 

(4)检查同步结果
观察HDFS上是否重新出现增量表数据。

2.3 采集通道启动/停止脚本

1、在/home/atguigu/bin目录下创建脚本cluster.sh
在脚本中填写如下内容

#!/bin/bash

case $1 in
"start"){
        echo ================== 启动 集群 ==================

        #启动 Zookeeper集群
        zk.sh start

        #启动 Hadoop集群
        hdp.sh start

        #启动 Kafka采集集群
        kf.sh start

        #启动采集 Flume
        f1.sh start

#启动日志消费 Flume
        f2.sh start

#启动业务消费 Flume
        f3.sh start

#启动 maxwell
        mxw.sh start

        };;
"stop"){
        echo ================== 停止 集群 ==================

#停止 Maxwell
        mxw.sh stop

#停止 业务消费Flume
        f3.sh stop

#停止 日志消费Flume
        f2.sh stop

#停止 日志采集Flume
        f1.sh stop

        #停止 Kafka采集集群
        kf.sh stop

        #停止 Hadoop集群
        hdp.sh stop

        #停止 Zookeeper集群
        zk.sh stop

};;
esac

2、增加脚本执行权限

chmod 777 cluster.sh

3、数仓环境准备

Hive的安装和部署

就是安装配置一下Hive就行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/704030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【选择排序】手撕八大排序之直接选择排序和堆排序

目录 一.选择排序 1.直接选择排序 2.堆排序 一.选择排序 1.直接选择排序 选择排序&#xff08;Selection Sort&#xff09;是一种简单直观的排序算法。它的基本思想是每次遍历找到最小&#xff08;或最大&#xff09;的元素&#xff0c;然后将其放置在已排序序列的末尾。在…

实操接口自动化测试项目之项分层设计

本文以笔者当前使用的自动化测试项目为例&#xff0c;浅谈分层设计的思路&#xff0c;不涉及到具体的代码细节和某个框架的实现原理&#xff0c;重点关注在分层前后的使用对比&#xff0c;可能会以一些伪代码为例来说明举例。 接口测试三要素&#xff1a; 参数构造发起请求&a…

JS 1.如何实现继承 2.原型和原型链

1_使用class实现继承 /** 继承 */ class Person { constructor(name) { this.name name;}drink() { console.log(喝水)} }class Student extends Person{ constructor(name, score) { // new Personsuper(name);this.score score;}introduce() { console.log(我是${this.nam…

EasyCVR播放设备录像出现部分视频不能播放的原因排查与解决

EasyCVR视频融合平台基于云边端协同架构&#xff0c;具有强大的数据接入、处理及分发能力。平台支持多协议接入&#xff0c;包括&#xff1a;国标GB28181、RTMP、RTSP/Onvif、海康Ehome、海康SDK、大华SDK、宇视SDK等&#xff0c;对外可分发多格式视频流&#xff0c;包括RTSP、…

栈和队列(二) 队列的实现,用栈实现队列,用队列实现栈,设计循环队列

文章目录 队列的实现用队列实现栈用栈实现队列设计循环队列 队列的实现 这里的队列我们使用链式队列&#xff0c;好处就是可以很方便的取出队头的元素。 使用顺序队列取出队头元素所花费的时间复杂度为O&#xff08;N&#xff09;&#xff0c;把后面的元素向前移动一个下标所花…

CentOS Linux的最佳替代方案(二)_AlmaLinux OS 8.6基础安装教程

文章目录 CentOS Linux的最佳替代方案&#xff08;二&#xff09;_AlmaLinux OS 8.6基础安装教程一 AlmaLinux介绍和发展历史二 AlmaLinux基础安装2.1 下载地址2.2 安装过程 三 AlmaLinux使用3.1 关闭selinux/firewalld3.2 替换默认源3.3 安装一些必要工具 CentOS Linux的最佳替…

瓶盖扫码回收APP系统 废旧物品创造价值收益

资源回收再利用是近些年国家大力倡导的&#xff0c;人们也在积极践行&#xff0c;从垃圾回收、废旧衣物回收、烟盒回收等等.....今天小白要带大家了解的是瓶盖回收APP软件开发的相关事项。瓶盖回收APP是本着资源回收的初衷&#xff0c;可以时间废旧瓶盖的多次利用&#xff0c;减…

使用Xshell服务器跑程序,用pycharm连接服务器远程开发

目标&#xff1a; 1.使用Xshell在服务器上创建自己项目需要的虚拟环境 2.用pycharm实现远程服务器的连接&#xff08;这样就可以在本地debug或者写代码&#xff0c;然后再用xshell在服务器上跑&#xff09; 一、使用Xshell在服务器上创建自己项目需要的虚拟环境 1.打开Xshe…

工具系列之wireshark使用说明

简介 工具下载&#xff1a; https://www.wireshark.org/官方FAQ: https://www.wireshark.org/faq.html 过滤器设置 通常情况下&#xff0c;将.pcap 数据拖拽至 wireshark中即可打开。通过&#xff1a; 导航栏–》分析 --> 显示过滤器 即可找到对应的筛选器&#xff0c;筛…

美格智能发布高性价比5G RedCap CPE解决方案SRT835,加速5G FWA商业落地

6月30日&#xff0c;在MWC 2023上海5G未来峰会上&#xff0c;美格智能重磅发布高性价比轻量化5G RedCap&#xff08;也称作NR-Light&#xff09;CPE解决方案SRT835。该方案搭载骁龙X35调制解调器及射频系统和WCN6856高速Wi-Fi 6解决方案&#xff0c;通过精简系统架构&#xff0…

【技术教程】H.265网页流媒体播放器EasyPlayer无感知播放体验优化

EasyPlayer是我们流媒体组件系列中关注度较高的产品&#xff0c;经过多年的发展和迭代&#xff0c;目前已经有多个应用版本&#xff0c;包括RTSP版、RTMP版、Pro版&#xff0c;以及js版&#xff0c;其中js版本作为网页播放器&#xff0c;受到了用户的广泛使用。 目前我们所有的…

【每天40分钟,我们一起用50天刷完 (剑指Offer)】第十天 10/50

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

初探 C++ 标准库

有趣的重载 重载左移操作符&#xff0c;将变量或常量左移到一个对象中&#xff01; C 标准库 C 标准库并不是 C 语言的一部分 C 标准库是由类库和函数库组成的集合 C 标准库中定义的类和对象都位于 std 命名空间中 C 标准库的头文件都不带 .h 后缀 C 标准库涵盖了 C 库的功…

优思学院|什么是六西格玛黑带?

六西格玛黑带&#xff0c;这是一个有趣的称谓。这个称号意味着拥有它的人在六西格玛方法和统计工具应用方面有很高的造诣。在企业中&#xff0c;只有中层以上的人才能获得这个称号。 黑带这个词源自跆拳道&#xff0c;因为跆拳道最高段位的人所戴的腰带是黑色的。后来&#xf…

『赠书活动 | 第十三期』《算力经济:从超级计算到云计算》

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 『赠书活动 &#xff5c; 第十三期』 本期书籍&#xff1a;《算力经济&#xff1a;从超级计算到云计算》 赠书规则&#xff1a;评论区&#xff1a;点赞&#xff5c;收…

多个el-checkbox-group复选框组 选项互斥

需求 多个el-checkbox-group复选框组 , 组与组之间的选项是互斥的效果效果 代码实现 template <template><div class"page"><el-checkbox-groupv-for"(item, index) in list"v-model"item.checked":key"index"chan…

网络投票链接的制作方法投票制作方法微网络投票

用户在使用微信投票的时候&#xff0c;需要功能齐全&#xff0c;又快捷方便的投票小程序。 而“活动星投票”这款软件使用非常的方便&#xff0c;用户可以随时使用手机微信小程序获得线上投票服务&#xff0c;很多用户都很喜欢“活动星投票”这款软件。 “活动星投票”小程序在…

docker-compose部署Minio

一、镜像选择 Minio有很多镜像会有一些bug&#xff0c;而且不好解决&#xff0c;这里使用的是镜像 RELEASE.2021-04-18T19-26-29Z 直接: docker pull minio/minio:RELEASE.2021-04-18T19-26-29Z 即可&#xff1a; 拉取之后可以打个tag&#xff0c;这里是new2 RELEASE.2021…

在chrome-console中进行xpath/css/js定位(六)

目录 1.xpath 1.1 绝对定位与相对定位 1.2 通配符与不包含筛选 1.3 Xpath函数运算的简单实用 1.4 各种亲戚标签的定位 1.5xpath实例 1.5.1xpath:属性定位 1.5.2xpath:其它属性 1.5.3xpath:标签 1.5.4xpath:层级 1.5.5xpath:索引 1.5.6xpath:逻辑运算 1.5.7xpath:模…

界面组件DevExpress WPF v23.1新版亮点 - 启动和内存优化

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…