GitLab的用户创建和推送
在root用户-密码界面重新设置密码 添加Leader用户和自己使用的用户 使用root用户创建相应的群组 使用Leader用户创建对应的项目 设置分支配置为“初始推送后完全保护” 设置.gitignore文件,项目配置文件等其他非通用代码无需提交 安装gitlab project 2020插件 点击share project on gitlab 即可将项目上传到gitlab中
Flink集群的搭建
只需要运行Yarn模式 配置Hadoop的环境变量 将Flink1.17解压安装到对应为止即可
Hbase的配置
依赖zookeeper和hadoop这两个框架 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
解压Hbase2.4.11的安装包 添加Hbase的环境变量 修改配置文件
hbase-env.xml
export hbase_manages_zk=false 不使用自带的zookeeper hbase-site.xml
hbase.cluster.distributed = true 使用集群模式 hbase.zookeeper.quorum = hadoop102… zookeeper连接地址 hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径 hbase.wal.provider = filesystem 预写日志 regionservers: 添加hbase小弟的主机名称
Redise的配置
进入redise目录,执行make指令进行编译 make instanll安装 将myredis.conf文件复制到~/目录下 将bind 127.0.0.1 注释掉,并且关闭保护模式 设置daemon 后台启动模式为yes redis-server ./my_redis.conf后台启动
实时数仓ODS层
保证数据模拟器产生的数据是有序的
设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。 Kafka数据有序:Flink并发度和Kafka的分区数一致
设置三个kafka节点的分区个数都为4,num.partitions=4 Flink的并发度=4 历史维度数据
使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka 编写mysql_to_kafka_init.sh脚本 maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可
实时数仓dim层
dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型 dim层的数据存储在Hbase中 开发时需要切换到dev开发分支 为Flink的开发创建一个基类,名为BaseApp
抽象方法handle(): 每个主程序的业务逻辑 具体方法start():里面实现Flink代码的通用逻辑 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group
Flink-cdc获取维度信息
数据清洗 动态拆分维度表功能
方式1:直接将维度表做成List< String > (维度表名称)保存
方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。 方式5:cdc,变更数据抓取,类似与maxwell。 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {
public static void main ( String [ ] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
env. setParallelism ( 4 ) ;
System . setProperty ( "HADOOP_USER_NAME" , "atguigu" ) ;
env. setStateBackend ( new HashMapStateBackend ( ) ) ;
MySqlSource < String > mySqlSource = MySqlSource . < String > builder ( )
. hostname ( Constant . MYSQL_HOST )
. port ( Constant . MYSQL_PORT )
. username ( Constant . MYSQL_USER_NAME )
. password ( Constant . MYSQL_PASSWORD )
. databaseList ( "gmall2023_config" )
. tableList ( "gmall2023_config.table_process_dim" )
. deserializer ( new JsonDebeziumDeserializationSchema ( ) )
. startupOptions ( StartupOptions . initial ( ) )
. build ( ) ;
DataStreamSource < String > ds = env. fromSource ( mySqlSource,
WatermarkStrategy . noWatermarks ( ) ,
"kafkasource" ) . setParallelism ( 1 ) ;
ds. print ( ) ;
try {
env. execute ( ) ;
} catch ( Exception e) {
throw new RuntimeException ( e) ;
}
}
}