需要这个完整离线数仓项目的源码和流程PPT可以私信我,可以帮助解决项目中遇到的问题,做完项目可以让你对数仓有更加清晰的认识
项目流程:
配置文件
kafka server.properties
hive : hvie-site.xml
启动mysql 的binlog日志
修改maxwell配置文件监听mysql的数据同步到kafka
配置flume-ng文件采集kafka—incdb 主题消费到的数据并上传至hdfs
flume-config
# ------------------- define data source ----------------------
# source alias
agent.sources = source_from_kafka
# channels alias
agent.channels = mem_channel
# sink alias
agent.sinks = hdfs_sink
# define kafka source
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource agent.sources.source_from_kafka.channels = mem_channel agent.sources.source_from_kafka.batchSize = 5000
# set kafka broker address
agent.sources.source_from_kafka.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
# set kafka topic
agent.sources.source_from_kafka.kafka.topics = incdb
# set kafka groupid
agent.sources.source_from_kafka.kafka.consumer.group.id = incdb_id
# defind hdfs sink
agent.sinks.hdfs_sink.type = hdfs
# specify the channel the sink should use
agent.sinks.hdfs_sink.channel = mem_channel
# set store hdfs path
agent.sinks.hdfs_sink.hdfs.path = /flume/kafka/%Y%m%d
# set file size to trigger roll
agent.sinks.hdfs_sink.hdfs.rollSize = 0
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30 agent.sinks.hdfs_sink.hdfs.fileType=DataStream agent.sinks.hdfs_sink.hdfs.writeFormat=Text
# define channel from kafka source to hdfs sink
agent.channels.mem_channel.type = memory
# channel store size
agent.channels.mem_channel.capacity = 100000
# transaction size
agent.channels.mem_channel.transactionCapacity = 10000
开启maxwell监听:
flume采集:
开启数据管道传输:
maxwell监听mysql ---> kafka ----> flume ----> HDFS
Hdfs结果:
模拟生成的sql文件:
数仓:
### ods层
ods_aoi_full
maxwell josn数据
ods_user_travels_inc表
ods_user_activities_inc表
dwd层
dwd_users_full表
dwd_aoi_full
dwd层:
dws_user_activities_inc
dws_user_travels_inc表
DIM层
维度表
hive终端:
ADS应用层
统计2023年国庆每天出游总人数
统计热门景点top10
统计旅游热门省份、经济大区
统计每个省份景点数
统计出行方式人数