业务数据采集与同步
- 业务采集组件配置
- 业务数据同步概述
- 数据同步策略选择
- 数据同步工具概述
- 1.1.4 全量表数据同步
- DataX配置文件生成
- 全量表数据同步脚本
- 增量表数据同步 MySQL - Maxwell - Kafka - Flume - HDFS
- Maxwell配置
- 增量表首日全量同步
业务采集组件配置
Maxwell将业务采集到的kafka,下游从kafka消费数据用于计算
链接: 配置Maxwell
业务数据同步概述
业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。数据的同步策略有全量同步和增量同步。
全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。
增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。
数据同步策略选择
两种策略都能保证数据仓库和业务数据库的数据同步,那应该如何选择呢?下面对两种策略进行简要对比。
若业务表数据量比较大,且每天数据变化的比例比较低,这时应采用增量同步,否则可采用全量同步。
数据同步工具概述
数据同步工具种类繁多,大致可分为两类,
一类是以DataX、Sqoop为代表的基于Select查询的离线、批量同步工具,
另一类是以Maxwell、Canal、Flink-CDC为代表的基于数据库数据变更日志(例如MySQL的binlog,其会实时记录所有的insert、update以及delete操作)的实时流式同步工具。
全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal、Flink-CDC等工具,下面对增量同步不同方案进行简要对比。
1.1.4 全量表数据同步
链接: 配置DataX
数据通道
全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向,如下图所示。
DataX配置文件
我们需要为每张全量表编写一个DataX的json配置文件,此处以 user为例,配置文件内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"create_time",
"update_time",
"email",
"hashed_password",
"telephone",
"username"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/medical?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8"
],
"table": [
"user"
]
}
],
"password": "000000",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "hashed_password",
"type": "string"
},
{
"name": "telephone",
"type": "string"
},
{
"name": "username",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "user",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "truncate",
"nullFormat": ""
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
由于目标路径包含一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交任务时通过参数动态传入,参数名称为targetdir。
DataX配置文件生成
链接: DataX配置文件生成
将生成器上传到服务器的/opt/module/gen_datax_config目录
[atguigu@hadoop102 ~]$ mkdir /opt/module/gen_datax_config
[atguigu@hadoop102 ~]$ cd /opt/module/gen_datax_config
上传生成器
修改configuration.properties配置
mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
mysql.database.import=medical
# mysql.database.export=
mysql.tables.import=dict,doctor,hospital,medicine,patient,user
# mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://hadoop102:8020
import_out_dir=/opt/module/datax/job/medical/import
# export_out_dir=
执行
[atguigu@hadoop102 gen_datax_config]$ java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar
测试生成的DataX配置文件
以user为例,测试用脚本生成的配置文件是否可用。
1)创建目标路径
由于DataX同步任务要求目标路径提前存在,故需手动创建路径,当前user表的目标路径应为/origin_data/medical/user_full/2023-05-09。
[atguigu@hadoop102 bin]$ hadoop fs -mkdir -p /origin_data/medical/user_full/2023-05-09
2)执行DataX同步命令
[atguigu@hadoop102 bin]$ python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/medical/user_full/2023-05-09" /opt/module/datax/job/medical/import/medical.user.json
3)观察同步结果
观察HFDS目标路径是否出现数据。
全量表数据同步脚本
为方便使用以及后续的任务调度,此处编写一个全量表数据同步脚本。
1)在~/bin目录创建medical_mysql_to_hdfs_full.sh
[atguigu@hadoop102 bin]$ vim ~/bin/medical_mysql_to_hdfs_full.sh
#!/bin/bash
DATAX_HOME=/opt/module/datax
DATAX_DATA=/opt/module/datax/job/medical
#清理脏数据
handle_targetdir() {
hadoop fs -rm -r $1 >/dev/null 2>&1
hadoop fs -mkdir -p $1
}
#数据同步
import_data() {
local datax_config=$1
local target_dir=$2
handle_targetdir "$target_dir"
echo "正在处理$1"
python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config >/tmp/datax_run.log 2>&1
if [ $? -ne 0 ]
then
echo "处理失败, 日志如下:"
cat /tmp/datax_run.log
fi
}
#接收表名变量
tab=$1
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
do_date=$2
else
do_date=$(date -d "-1 day" +%F)
fi
case ${tab} in
dict|doctor|hospital|medicine|patient|user)
import_data $DATAX_DATA/import/medical.${tab}.json /origin_data/medical/${tab}_full/$do_date
;;
"all")
for tmp in dict doctor hospital medicine patient user
do
import_data $DATAX_DATA/import/medical.${tmp}.json /origin_data/medical/${tmp}_full/$do_date
done
;;
esac
2)为mysql_to_hdfs_full.sh增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/medical_mysql_to_hdfs_full.sh
3)测试同步脚本
[atguigu@hadoop102 bin]$ medical_mysql_to_hdfs_full.sh all 2023-05-09
4)检查同步结果
查看HDFS目表路径是否出现全量表数据,全量表共6张。
全量表同步逻辑比较简单,只需每日执行全量表数据同步脚本medical_mysql_to_hdfs_full.sh即可。
增量表数据同步 MySQL - Maxwell - Kafka - Flume - HDFS
数据通道
Flume配置
1)Flume配置概述
Flume需要将Kafka中各topic的数据传输到HDFS,因此选用KafkaSource以及HDFSSink。对于安全性要求高的数据(不允许丢失)选用FileChannel,允许部分丢失的数据如日志可以选用MemoryChannel以追求更高的效率。此处采集的是业务数据,不允许丢失,选用FileChannel,生产环境根据实际情况选择合适的组件。
KafkaSource订阅Kafka medical_ods主题的数据,HDFSSink将不同topic的数据写入不同路径,路径中应包含表名及日期,前者用于区分来源于不同业务表的数据,后者按天对数据进行划分。关键配置如下:
具体数据示例如下:
链接: 医疗数仓配置Flume
Maxwell配置
1)Maxwell时间戳问题
为了让Maxwell时间戳日期与模拟的业务日期保持一致,对Maxwell源码进行改动,增加了mock_date参数,在/opt/module/maxwell/config.properties文件中将该参数的值修改为业务日期即可。
2)补充mock.date参数
配置参数如下。
log_level=info
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key
# 修改数据时间戳的日期部分
mock_date=2023-05-09
3)重新启动Maxwell
[atguigu@hadoop102 bin]$ mxw.sh restart
4)重新生成模拟数据
[atguigu@hadoop102 bin]$ medical_mock.sh 1
5)观察HDFS目标路径日期是否与业务日期保持一致
增量表首日全量同步
通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建medical_mysql_to_kafka_inc_init.sh
[atguigu@hadoop102 bin]$ vim medical_mysql_to_kafka_inc_init.sh
脚本内容如下
#!/bin/bash
# 该脚本的作用是初始化所有的增量表,只需执行一次
MAXWELL_HOME=/opt/module/maxwell
import_data() {
for tab in $@
do
$MAXWELL_HOME/bin/maxwell-bootstrap --database medical --table $tab --config $MAXWELL_HOME/config.properties
done
}
case $1 in
consultation | payment | prescription | prescription_detail | user | patient | doctor)
import_data $1
;;
"all")
import_data consultation payment prescription prescription_detail user patient doctor
;;
esac
2)为medical_mysql_to_kafka_inc_init.sh增加执行权限
[atguigu@hadoop102 bin]$ chmod 777 ~/bin/medical_mysql_to_kafka_inc_init.sh
3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除。
[atguigu@hadoop102 ~]$ hadoop fs -ls /origin_data/medical | grep _inc | awk '{print KaTeX parse error: Expected 'EOF', got '}' at position 2: 8}̲' | xargs hadoo… medical_mysql_to_kafka_inc_init.sh all
4)检查同步结果
观察HDFS上是否重新出现增量表数据。