医疗数仓业务数据采集与同步

news2025/1/3 23:36:40

业务数据采集与同步

  • 业务采集组件配置
  • 业务数据同步概述
  • 数据同步策略选择
  • 数据同步工具概述
  • 1.1.4 全量表数据同步
  • DataX配置文件生成
  • 全量表数据同步脚本
  • 增量表数据同步 MySQL - Maxwell - Kafka - Flume - HDFS
  • Maxwell配置
  • 增量表首日全量同步

业务采集组件配置

在这里插入图片描述
Maxwell将业务采集到的kafka,下游从kafka消费数据用于计算
链接: 配置Maxwell

业务数据同步概述

业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。数据的同步策略有全量同步和增量同步。
全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。
在这里插入图片描述
增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。
在这里插入图片描述

数据同步策略选择

两种策略都能保证数据仓库和业务数据库的数据同步,那应该如何选择呢?下面对两种策略进行简要对比。
在这里插入图片描述
若业务表数据量比较大,且每天数据变化的比例比较低,这时应采用增量同步,否则可采用全量同步。
在这里插入图片描述

数据同步工具概述

数据同步工具种类繁多,大致可分为两类,
一类是以DataX、Sqoop为代表的基于Select查询的离线、批量同步工具
另一类是以Maxwell、Canal、Flink-CDC为代表的基于数据库数据变更日志(例如MySQL的binlog,其会实时记录所有的insert、update以及delete操作)的实时流式同步工具
全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用Maxwell、Canal、Flink-CDC等工具,下面对增量同步不同方案进行简要对比。
在这里插入图片描述

1.1.4 全量表数据同步

链接: 配置DataX
数据通道
全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向,如下图所示。
在这里插入图片描述
DataX配置文件
我们需要为每张全量表编写一个DataX的json配置文件,此处以 user为例,配置文件内容如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "create_time",
              "update_time",
              "email",
              "hashed_password",
              "telephone",
              "username"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://hadoop102:3306/medical?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8"
                ],
                "table": [
                  "user"
                ]
              }
            ],
            "password": "000000",
            "splitPk": "",
            "username": "root"
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "bigint"
              },
              {
                "name": "create_time",
                "type": "string"
              },
              {
                "name": "update_time",
                "type": "string"
              },
              {
                "name": "email",
                "type": "string"
              },
              {
                "name": "hashed_password",
                "type": "string"
              },
              {
                "name": "telephone",
                "type": "string"
              },
              {
                "name": "username",
                "type": "string"
              }
            ],
            "compress": "gzip",
            "defaultFS": "hdfs://hadoop102:8020",
            "fieldDelimiter": "\t",
            "fileName": "user",
            "fileType": "text",
            "path": "${targetdir}",
            "writeMode": "truncate",
            "nullFormat": ""
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

由于目标路径包含一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交任务时通过参数动态传入,参数名称为targetdir。

DataX配置文件生成

链接: DataX配置文件生成
将生成器上传到服务器的/opt/module/gen_datax_config目录
[atguigu@hadoop102 ~]$ mkdir /opt/module/gen_datax_config
[atguigu@hadoop102 ~]$ cd /opt/module/gen_datax_config
上传生成器
在这里插入图片描述
修改configuration.properties配置

mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
mysql.database.import=medical
# mysql.database.export=
mysql.tables.import=dict,doctor,hospital,medicine,patient,user
# mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://hadoop102:8020
import_out_dir=/opt/module/datax/job/medical/import
# export_out_dir=

执行

[atguigu@hadoop102 gen_datax_config]$ java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar

测试生成的DataX配置文件
以user为例,测试用脚本生成的配置文件是否可用。
1)创建目标路径
由于DataX同步任务要求目标路径提前存在,故需手动创建路径,当前user表的目标路径应为/origin_data/medical/user_full/2023-05-09。

[atguigu@hadoop102 bin]$ hadoop fs -mkdir -p /origin_data/medical/user_full/2023-05-09

2)执行DataX同步命令

[atguigu@hadoop102 bin]$ python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/medical/user_full/2023-05-09" /opt/module/datax/job/medical/import/medical.user.json

3)观察同步结果
观察HFDS目标路径是否出现数据。
在这里插入图片描述

全量表数据同步脚本

为方便使用以及后续的任务调度,此处编写一个全量表数据同步脚本。
1)在~/bin目录创建medical_mysql_to_hdfs_full.sh
[atguigu@hadoop102 bin]$ vim ~/bin/medical_mysql_to_hdfs_full.sh

#!/bin/bash

DATAX_HOME=/opt/module/datax
DATAX_DATA=/opt/module/datax/job/medical

#清理脏数据
handle_targetdir() {
  hadoop fs -rm -r $1 >/dev/null 2>&1
  hadoop fs -mkdir -p $1
}

#数据同步
import_data() {
  local datax_config=$1
  local target_dir=$2

  handle_targetdir "$target_dir"
  echo "正在处理$1"
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config >/tmp/datax_run.log 2>&1
  if [ $? -ne 0 ]
  then
    echo "处理失败, 日志如下:"
    cat /tmp/datax_run.log 
  fi
}

#接收表名变量
tab=$1
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=$(date -d "-1 day" +%F)
fi


case ${tab} in
dict|doctor|hospital|medicine|patient|user)
  import_data $DATAX_DATA/import/medical.${tab}.json /origin_data/medical/${tab}_full/$do_date
  ;;
"all")
  for tmp in dict doctor hospital medicine patient user
  do
    import_data $DATAX_DATA/import/medical.${tmp}.json /origin_data/medical/${tmp}_full/$do_date
  done
  ;;
esac

2)为mysql_to_hdfs_full.sh增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/medical_mysql_to_hdfs_full.sh
3)测试同步脚本
[atguigu@hadoop102 bin]$ medical_mysql_to_hdfs_full.sh all 2023-05-09
4)检查同步结果
查看HDFS目表路径是否出现全量表数据,全量表共6张。
全量表同步逻辑比较简单,只需每日执行全量表数据同步脚本medical_mysql_to_hdfs_full.sh即可。

增量表数据同步 MySQL - Maxwell - Kafka - Flume - HDFS

数据通道
在这里插入图片描述
Flume配置
1)Flume配置概述
Flume需要将Kafka中各topic的数据传输到HDFS,因此选用KafkaSource以及HDFSSink。对于安全性要求高的数据(不允许丢失)选用FileChannel,允许部分丢失的数据如日志可以选用MemoryChannel以追求更高的效率。此处采集的是业务数据,不允许丢失,选用FileChannel,生产环境根据实际情况选择合适的组件。

KafkaSource订阅Kafka medical_ods主题的数据,HDFSSink将不同topic的数据写入不同路径,路径中应包含表名及日期,前者用于区分来源于不同业务表的数据,后者按天对数据进行划分。关键配置如下:
在这里插入图片描述
具体数据示例如下:
在这里插入图片描述

链接: 医疗数仓配置Flume

Maxwell配置

1)Maxwell时间戳问题
在这里插入图片描述
为了让Maxwell时间戳日期与模拟的业务日期保持一致,对Maxwell源码进行改动,增加了mock_date参数,在/opt/module/maxwell/config.properties文件中将该参数的值修改为业务日期即可。

2)补充mock.date参数
配置参数如下。

log_level=info

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db

# MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true

# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key

# 修改数据时间戳的日期部分
mock_date=2023-05-09

3)重新启动Maxwell
[atguigu@hadoop102 bin]$ mxw.sh restart
4)重新生成模拟数据
[atguigu@hadoop102 bin]$ medical_mock.sh 1
5)观察HDFS目标路径日期是否与业务日期保持一致

增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建medical_mysql_to_kafka_inc_init.sh
[atguigu@hadoop102 bin]$ vim medical_mysql_to_kafka_inc_init.sh
脚本内容如下

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
  for tab in $@
  do 
    $MAXWELL_HOME/bin/maxwell-bootstrap --database medical --table $tab --config $MAXWELL_HOME/config.properties
  done
}

case $1 in
consultation | payment | prescription | prescription_detail | user | patient | doctor)
  import_data $1
  ;;
"all")
  import_data consultation payment prescription prescription_detail user patient doctor
  ;;
esac

2)为medical_mysql_to_kafka_inc_init.sh增加执行权限
[atguigu@hadoop102 bin]$ chmod 777 ~/bin/medical_mysql_to_kafka_inc_init.sh
3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除。
[atguigu@hadoop102 ~]$ hadoop fs -ls /origin_data/medical | grep _inc | awk '{print KaTeX parse error: Expected 'EOF', got '}' at position 2: 8}̲' | xargs hadoo… medical_mysql_to_kafka_inc_init.sh all
4)检查同步结果
观察HDFS上是否重新出现增量表数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2268839.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

神经网络-VggNet

2014年VggNet被推出,获取了ILSVRC2014比赛分类项目的第二名,第一名是GoogleNet,该网络在下节介绍,本节主要介绍VggNet。 VggNet可以称为是一个家族,根据层数的不同包括了A、A-LRN、B、C、D等网络结构,其中…

计算机网络 (7)物理层下面的传输媒体

一、定义与位置 物理层是计算机网络体系结构的最低层,它位于传输媒体(传输介质)之上,主要作用是为数据链路层提供一个原始比特流的物理连接。这里的“比特流”是指数据以一个个0或1的二进制代码形式表示。物理层并不是特指某种传输…

敏捷开发中的自动化脚手架在 HarmonyOS 的应用

文章目录 前言什么是自动化脚手架工具构建自动化脚手架实战代码项目目录结构生成功能说明 示例代码生成功能说明 主工具类入口功能说明 如何运行脚手架工具总结参考资料 前言 在敏捷开发环境中,快速搭建项目结构是提升开发效率的关键。然而,手动配置开发…

VScode 格式化代码空格记录

点击 -> “文件” -> “首选项" -> “设置” -> 按下图操作: 怎么格式化代码空格,先看下: 保存代码后,这代码自动格式化发,如下图: 你可以试试看就即可

Python编程技术

设计目的 该项目框架Scrapy可以让我们平时所学的技术整合旨在帮助学习者提高Python编程技能并熟悉基本概念: 1. 学习基本概念:介绍Python的基本概念,如变量、数据类型、条件语句、循环等。 2. 掌握基本编程技巧:教授学生如何使…

《Java核心技术 卷II》流的创建

流的创建 Collection接口中stream方法可以将任何集合转换为一个流。 用静态Stream.of转化成数组。 Stream words Stream.of(contents.split("\\PL")); of方法具有可变长参数,可以构建具有任意数量的流。 使用Array.stream(array,from,to)可以用数组…

ESP32-CAM开发板入门 (下载示例程序)

ESP32-CAM开发板例程使用 1、准备工作1.1、硬件准备1.2、软件准备 2、选择示例程序并录入第一步 1、准备工作 1.1、硬件准备 1.2、软件准备 Arduino IDE : 编程与写入(下载地址 https://www.arduino.cc/en/software) 安装好后将软件设置到…

Cocos Creator 3.8.5 正式发布,更小更快更多平台!

在 Cocos Creator 3.8.5 版本中,我们做了新一轮的优化。 在加载速度、代码裁剪、平台增强等多方面做了优化,提升了开发者体验和游戏性能。 希望能够助 Cocos 开发者们的产品更上一层楼。 一、加载速度优化 1、WASM 模块延迟加载 在早期版本中&#xff0c…

HTML——30.视频引入

<head><meta charset"UTF-8"><title>视频引入</title></head><body><!--video:在网页中引入音频IE8以及之前版本不支持属性名和属性值一样&#xff0c;可以只写属性名src属性:指定视频文件路径&#xff0c;必须要有controls属…

T7 TensorFlow入门实战——咖啡豆识别

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習紀錄博客&#x1f356; 原作者&#xff1a;K同学啊 | 接輔導、項目定制 一、前期准备 1. 导入数据 # Import the required libraries import numpy as np import PIL,pathlib from PIL import Image import ma…

OpenHarmony源码编译后烧录镜像教程,RK3566鸿蒙开发板演示

本文介绍瑞芯微主板/开发板编译OpenHarmony源码后烧录镜像的教程&#xff0c;触觉智能Purple Pi OH鸿蒙开发板演示。搭载了瑞芯微RK3566四核处理器&#xff0c;树莓派卡片电脑设计&#xff0c;支持开源鸿蒙OpenHarmony3.2-5.0系统&#xff0c;适合鸿蒙开发入门学习。 编译源码…

flask后端开发(3):html模板渲染

目录 渲染模板html模板获取路由参数 gitcode地址&#xff1a; https://gitcode.com/qq_43920838/flask_project.git 渲染模板 这样就能够通过html文件来渲染前端&#xff0c;而不是通过return了 html模板获取路由参数

SOME/IP 协议详解——序列化

文章目录 0. 概述1.基本数据序列化2.字符串序列化2.1 字符串通用规则2.2 固定长度字符串规则2.3 动态长度字符串规则 3.结构体序列化4. 带有标识符和可选成员的结构化数据类型5. 数组5.1 固定长度数组5.2 动态长度数组5.3 Enumeration&#xff08;枚举&#xff09;5.4 Bitfield…

HTML5实现好看的喜庆圣诞节网站源码

HTML5实现好看的喜庆圣诞节网站源码 前言一、设计来源1.1 主界面1.2 圣诞介绍界面1.3 圣诞象征界面1.4 圣诞活动界面1.5 圣诞热度界面1.6 圣诞纪念界面1.7 联系我们界面 二、效果和源码2.1 动态效果2.2 源代码 源码下载结束语 HTML5实现好看的喜庆圣诞节网站源码&#xff0c;圣…

嵌入式开发中的机器人表情绘制

机器人的表情有两种&#xff0c;一种是贴图&#xff0c;一钟是调用图形API自绘。 贴图效果相对比较好&#xff0c;在存储空间大的情况下是可以采用的。 自绘比较麻烦&#xff0c;但在资源和空缺少的情况下&#xff0c;也是很有用的。而且自绘很容易通过调整参数加入随机效果&…

opencv实现KNN算法识别图片数字

KNN算法实现识别图片数字 目录 KNN算法实现识别图片数字图片基本情况图片数据 图片数字识别图片数据处理及预测其它数字图片正确率预测 图片基本情况 图片 数据 图片像素是2000x1000,即高&#xff08;行&#xff09;1000&#xff0c;宽&#xff08;列&#xff09;2000&#xf…

美畅物联丨视频上云网关获取视频流地址供第三方调用的方法

在视频监控与流媒体传输领域&#xff0c;视频流地址的获取与调用是极为关键的环节。视频上云网关作为一款高效且稳定的视频传输设备&#xff0c;为获取视频流地址提供了便捷途径&#xff0c;从而使外部系统或平台能够方便地进行调用。今天我们就来讨论一下如何在视频上云网关上…

MySQL数据库——索引结构之B+树

本文先介绍数据结构中树的演化过程&#xff0c;之后介绍为什么MySQL数据库选择了B树作为索引结构。 文章目录 树的演化为什么其他树结构不行&#xff1f;为什么不使用二叉查找树&#xff08;BST&#xff09;&#xff1f;为什么不使用平衡二叉树&#xff08;AVL树&#xff09;&a…

一起学Git【第六节:查看版本差异】

git diff是 Git 版本控制系统中用于展示差异的强大工具。他可以用于查看文件在工作区、暂存区和版本库之间的差异、任意两个指定版本之间的差异和两个分支之间的差异等,接下来进行详细的介绍。 1.显示工作区与暂存区之间的差异 # 显示工作区和暂存区之间的差异,后面不加参数…

Fetch处理大模型流式数据请求与解析

为什么有的大模型可以一次返回多个 data&#xff1f; Server-Sent Events (SSE)&#xff1a;允许服务器连续发送多个 data: 行&#xff0c;每个代表一个独立的数据块。 流式响应&#xff1a;大模型服务通常以流式响应方式返回数据&#xff0c;提高响应速度。 批量处理&#x…