1.1 项目背景
本项目是一个创新的湖仓一体实时电商数据分析平台,旨在为电商平台提供深度的数据洞察和业务分析。技术层面,项目涵盖了从基础架构搭建到大数据技术组件的集成,采用了湖仓一体的设计理念,实现了数据仓库与数据湖的有效融合。这一架构创新旨在打通数据流通的壁垒,支持企业级项目的离线与实时数据指标分析,确保数据分析的全面性和时效性。
在业务应用方面,项目初期重点关注会员和商品两大核心主题。通过精细化的数据分析,我们提供了一系列关键指标,包括但不限于用户实时登录行为、页面浏览量(PV)与独立访客(UV)的实时统计、商品浏览信息的动态分析,以及用户积分体系的深度挖掘。这些分析指标将为平台运营提供数据支撑,优化用户体验,并驱动业务增长。展望未来,我们计划进一步扩展业务分析的广度与深度,增加更多业务指标,并不断完善技术架构,以适应不断变化的市场需求。我们致力于通过该项目,为客户提供一个可靠、灵活且功能强大的数据分析解决方案,帮助他们在电商领域中捕捉机遇,实现价值最大化。
1.2 项目架构
1.2.1实时数仓现状
目前,基于Hive构建的离线数据仓库技术已经达到了高度成熟的状态。然而,随着实时计算引擎技术的飞速进步以及业务领域对实时报表生成需求的日益增长,业界近年来一直在紧密关注并积极投身于实时数据仓库的构建与优化。在数据仓库架构的演进历程中,Lambda架构作为一个标志性的阶段,它融合了离线处理和实时处理两种数据处理途径,为满足多样化的数据处理需求提供了有效的解决方案。这种架构通过精心设计的数据处理流程,确保了数据处理的高效率和高可靠性,同时也为实时数仓的进一步发展奠定了坚实的基础,其架构图如下:
正是由于在Lambda架构中存在离线和实时两条数据处理链路,这种双重路径有时会引发数据一致性问题。为了解决这些问题并简化数据处理流程,Kappa架构应运而生。
Kappa架构以其简洁和高效的特点,被誉为实现真正实时数仓的理想架构。在业界,Flink与Kafka的结合是实现Kappa架构的典型方式。然而,即便如此强大的组合也并非完美无缺。以下是Kappa架构的一些显著缺陷:
-
数据存储限制:Kafka在处理海量数据存储方面存在局限。对于数据量巨大的业务场景,Kafka通常只能保留短期内的数据,如最近一周或甚至仅一天。
-
OLAP查询效率:Kafka并不擅长支持高效的OLAP(在线分析处理)查询。许多业务场景需要在DWD(数据仓库详细层)和DWS(数据仓库服务层)进行即时查询,而Kafka在这方面的支持并不尽如人意。
-
数据管理和血缘:Kappa架构难以复用现有的、成熟的离线数仓数据血缘和数据质量管理机制。企业可能需要重新构建一套完整的数据血缘和数据质量管理系统。
-
数据更新限制:Kafka目前不支持update或upsert操作,仅支持数据的追加。在DWS层,可能需要对数据进行更新,尤其是在数据聚合和时间窗口处理时,Kafka无法满足这些需求。
因此,在实时数仓的构建中,许多企业并没有完全采用Kappa架构,而是选择了混合架构,以兼顾实时性和离线数据处理的需求。尽管Kappa架构在提高数据报表的时效性方面取得了一定进展,但它依然面临诸多挑战。除了上述问题,对于那些实时业务需求密集的公司,Kappa架构可能还需要针对特定层的Kafka数据重新编写实时处理程序,这无疑增加了操作的复杂性和不便。
随着数据湖技术的发展,Kappa架构在实现批量数据和实时数据的统一计算方面展现出了巨大的潜力。这一趋势催生了“批流一体”的概念,它指的是在数据处理的不同层面上实现批量和流式数据的无缝集成。
在业界,对“批流一体”的理解存在两种主要视角:
-
开发层面的统一:一些观点认为,当批量处理和流处理的逻辑能够通过相同的SQL语句来实现时,就达到了批流一体。这种方式简化了开发流程,使得开发者能够用统一的逻辑来处理不同类型的数据。
-
计算引擎层面的集成:另一些观点则强调计算引擎层面的统一。例如,Spark及其相关组件(如Spark Streaming、Structured Streaming)和Flink等框架,已经在计算引擎层面实现了批处理和流处理的集成。这种集成使得同一引擎能够同时处理批量作业和实时数据流。
无论从哪个角度来看,批流一体的核心都在于存储层面的统一。数据湖技术提供了一个统一的存储解决方案,使得批量数据和实时数据能够共存于同一环境中,实现统一的处理和计算。
通过将离线数仓和实时数仓的数据统一存储在数据湖中,我们能够构建起“湖仓一体”的架构。这种架构不仅简化了数据管理,还提高了数据处理的灵活性和效率。例如,使用Iceberg作为数据湖的存储解决方案,可以解决Kappa架构中的许多挑战,使得Kappa架构更加完善和高效。
在这种“湖仓一体”的架构下,Kappa架构的实现变得更加简洁和强大。数据湖技术的应用使得数据的存储、处理和分析更加一体化,为企业提供了一个更加灵活和高效的数据处理平台。这种架构的构建已经成为许多大型公司在处理离线和实时数据时的首选方案,它代表了数据处理领域的未来趋势。
在现代数据处理架构中,数据湖技术尤其是Iceberg的引入,为Kappa架构带来了革新性的改进。这一架构的演进,通过将数据存储统一到数据湖Iceberg上,无论是流处理还是批处理,都实现了存储层面的高效整合。以下是该架构解决Kappa架构痛点的几个关键方面:
-
数据存储容量:数据湖Iceberg建立在HDFS之上,提供了一个可扩展的文件管理系统,有效解决了Kafka在数据存储量上的限制,能够处理大规模数据集。
-
OLAP查询支持:数据湖架构允许DW层数据继续支持高效的OLAP查询。通过适配现有的OLAP查询引擎,可以无缝地进行交互式分析和即席查询。
-
数据管理和治理:统一的存储基础使得可以复用一套数据血缘和数据质量管理工具和流程,简化了数据治理工作,提高了数据的可管理性和可追溯性。
-
实时数据更新:数据湖技术提供了对实时数据更新的支持,这对于需要动态调整和维护数据准确性的场景至关重要。
此外,该架构可以被视为Kappa架构的优化版本,它保留了两条数据链路的设计模式:
- 离线数据链路:基于Spark的批处理能力,适用于数据的全量处理和修正,以及其他非实时数据处理需求。
- 实时数据链路:基于Flink的流处理能力,用于处理实时数据流,确保数据的实时性和动态性。
这种混合链路架构不仅提高了数据处理的灵活性,还增强了数据的准确性和可靠性。在数据修正和非常规场景下,离线链路发挥着重要作用,而实时链路则保障了日常业务的连续性和报表的即时生成。
综上所述,这一架构通过巧妙地融合了数据湖技术和现代计算引擎,不仅解决了Kappa架构的多项挑战,还为构建一个可落地的实时数仓方案提供了坚实的基础,实现了实时报表的快速产出,满足了企业对实时数据分析的迫切需求。
1.2.2 项目架构及数据分层
在本项目中,我们采用了前沿的数据湖技术——Apache Iceberg,来构建一个创新的“湖仓一体”架构。这一架构的核心优势在于其能够无缝地整合实时和离线分析能力,专门针对电商业务指标进行优化。
在本项目中,我们采用了先进的数据湖技术——Apache Iceberg,来构建一个创新的“湖仓一体”架构,旨在实现对电商业务指标的实时和离线分析。以下是我们项目数据处理流程的详细描述:
-
数据采集:
- 我们的项目涉及两类主要数据源:一是来自MySQL的业务数据库数据,二是用户行为日志数据。
- 这两类数据通过特定的数据采集工具,被实时地收集并发送到Kafka的不同topic中。
-
数据摄入与处理:
- 使用Flink作为数据处理引擎,从Kafka的相应topic中读取业务和日志数据。
- Flink对数据进行必要的清洗、转换和聚合操作,以适应后续的分析需求。
-
数据存储与Iceberg-ODS层:
- 处理后的数据首先存储在Iceberg的ODS(操作数据存储)层中。
- 由于Flink在处理Iceberg时可能存在的数据消费位置信息保存问题,我们同时将数据写入Kafka,利用Flink的offset维护机制确保程序在停止和重启后能正确地继续消费数据。
-
数据仓库分层:
- 我们的架构基于Iceberg构建了多层数据仓库模型,每一层都针对不同的数据处理和分析需求。
-
实时数据分析:
- 实时分析的数据结果被存储在Clickhouse中,利用其优异的查询性能来支持快速的实时数据分析。
-
离线数据分析:
- 离线数据分析结果则从Iceberg的DWS(数据仓库服务层)中获取,确保了数据分析的深度和广度。
-
数据存储与展示:
- 分析结果最终存储在MySQL数据库中,用于长期的业务回顾和决策支持。
- 通过先进的数据可视化工具,我们将Clickhouse和MySQL中的分析结果以直观、易理解的形式展现出来。
-
架构优势:
- 整个架构的优势在于其灵活性和扩展性,能够适应不断变化的业务需求和数据量增长。
- 同时,它还能够保证数据处理的实时性和准确性,为业务决策提供强有力的数据支持。
通过这种综合运用多种数据处理技术的方式,我们的项目能够高效地处理和分析大规模数据,为电商业务提供深刻的洞察和实时的决策支持。
1.2.3项目可视化效果
1.3项目使用技术及版本
在项目中,使用了多种大数据技术组件来构建一个高效、可靠的数据处理和分析平台。以下是项目中使用的技术组件及其版本概览:
使用技术 | 版本 |
---|---|
Zookeeper | 3.4.13 |
HDFS | 3.1.4/3.2.2 |
Hive | 3.1.2 |
(MySQL 5.7.32) | |
Iceberg | 0.11.1 |
HBase | 2.2.6 |
Phoenix | 5.0.0 |
Kafka | 0.11.0.3 |
Redis | 2.8.18 |
Flink | 1.11.6 |
Flume | 1.9.0 |
Maxwell | 1.28.2 |
ClickHouse | 21.9.4.35 |
1.4 项目基础环境的准备
基础环境的准备是确保大数据组件顺利安装和运行的关键步骤
考虑到容器化部署的便利性,也可以使用Docker安装相应的组件。但在本项目中,为了更好地理解各个组件和流程,选择使用二进制部署方式。
基础环境:
在 windows 上 安装 VMware 并安装,centos7 。
节点配置:
每台节点配置了4GB内存和4个CPU核心,这为运行大数据组件提供了基本的硬件支持。
网络设置:
关闭了每台节点的防火墙,确保网络通信不受限制。
配置了每台节点的主机名,便于管理和识别。
配置了YUM源,确保软件包能够顺利下载和更新。
时间同步:
确保了所有节点之间的时间同步,这对于分布式系统的一致性和日志记录非常重要。
SSH设置:
实现了各个节点之间的SSH密钥认证,使得节点间可以免密码登录,方便管理和维护。
实现各个节点之间的SSH密钥认证是一个重要的步骤,它允许无密码登录到远程服务器,从而简化了管理和维护过程。以下是实现这一目标的步骤:
-
生成SSH密钥对:
在主节点上(例如node1),生成SSH密钥对。如果不指定,SSH将自动生成一个默认的RSA密钥对。ssh-keygen -t rsa
-
复制公钥到其他节点:
将公钥复制到所有其他节点的~/.ssh/authorized_keys
文件中。这可以通过SSH-copy-id命令完成:ssh-copy-id node2 ssh-copy-id node3 ssh-copy-id node4 ssh-copy-id node5
-
验证SSH密钥认证:
测试从主节点到其他节点的SSH连接,看是否能够无密码登录:ssh node2 ssh node3 ssh node4 ssh node5
-
配置SSH免密登录:
在~/.ssh/config
文件中为每个节点配置免密登录,这样SSH客户端就会自动使用正确的密钥进行认证:Host node2 HostName 192.168.179.5 User your_username IdentityFile ~/.ssh/id_rsa Host node3 HostName 192.168.179.6 User your_username IdentityFile ~/.ssh/id_rsa # 为其他节点重复上述配置
-
确保SSH服务配置正确:
在所有节点上,编辑/etc/ssh/sshd_config
文件,确保以下配置项正确:PubkeyAuthentication yes AuthorizedKeysFile .ssh/authorized_keys
-
重启SSH服务:
在所有节点上,重启SSH服务以应用配置更改:sudo systemctl restart sshd
-
确保防火墙规则允许SSH:
如果您的防火墙在稍后重新启用,确保防火墙规则允许SSH端口(默认为22):sudo firewall-cmd --permanent --add-service=ssh sudo firewall-cmd --reload
通过这些步骤,您可以在多个节点之间实现SSH密钥认证,从而无需每次手动输入密码即可登录到远程服务器。这对于自动化任务和批量管理非常有用。
JDK安装:
安装了Java开发工具包(JDK),为运行Java编写的大数据应用程序提供了环境支持。
节点IP | 节点名称 |
---|---|
192.168.179.4 | node1 |
192.168.179.5 | node2 |
192.168.179.6 | node3 |
192.168.179.7 | node4 |
192.168.179.8 | node5 |
1.4.1搭建Zookeeper
以下展示了Zookeeper集群的节点IP、节点名称以及对应的角色分布:
| 节点IP | 节点名称 | Zookeeper |
| --------------- | -------- | --------- |
| 192.168.179.4 | node1 | |
| 192.168.179.5 | node2 | |
| 192.168.179.6 | node3 | ★ |
| 192.168.179.7 | node4 | ★ |
| 192.168.179.8 | node5 | ★ |
在这个表格中,★
表示该节点将作为Zookeeper服务的实例运行。
以下是搭建Zookeeper集群的具体步骤:
1. 准备环境
- 在所有节点(node1, node2, node3, node4, node5)创建
/software
目录,用于后续安装技术组件。mkdir -p /software
2. 上传并解压Zookeeper
- 将Zookeeper安装包上传至
node3
的/software
目录,并解压。tar -zxvf zookeeper-3.4.13.tar.gz -C /software
3. 配置环境变量
- 在
node3
配置Zookeeper环境变量。echo 'export ZOOKEEPER_HOME=/software/zookeeper-3.4.13' >> /etc/profile echo 'export PATH=$PATH:$ZOOKEEPER_HOME/bin' >> /etc/profile source /etc/profile
4. 配置Zookeeper
- 在
node3
编辑Zookeeper配置文件。mv /software/zookeeper-3.4.13/conf/zoo_sample.cfg /software/zookeeper-3.4.13/conf/zoo.cfg
- 修改
zoo.cfg
文件,设置集群配置:tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/data/zookeeper clientPort=2181 server.1=node3:2888:3888 server.2=node4:2888:3888 server.3=node5:2888:3888
5. 分发Zookeeper配置
- 将配置好的Zookeeper发送至
node4
和node5
。scp -r /software/zookeeper-3.4.13 node4:/software/ scp -r /software/zookeeper-3.4.13 node5:/software/
6. 创建数据目录并配置环境变量
- 在
node3
,node4
,node5
创建数据目录。mkdir -p /opt/data/zookeeper
- 在
node4
和node5
配置Zookeeper环境变量。echo 'export ZOOKEEPER_HOME=/software/zookeeper-3.4.13' >> /etc/profile echo 'export PATH=$PATH:$ZOOKEEPER_HOME/bin' >> /etc/profile source /etc/profile
7. 创建节点ID文件
- 在
node3
,node4
,node5
的/opt/data/zookeeper
路径中添加myid
文件,并分别写入1, 2, 3。echo 1 > /opt/data/zookeeper/myid echo 2 > /opt/data/zookeeper/myid echo 3 > /opt/data/zookeeper/myid
8. 启动Zookeeper并检查状态
- 在各个节点启动Zookeeper。
zkServer.sh start
- 检查Zookeeper进程状态。
zkServer.sh status
通过这些步骤,您可以在node3
, node4
, node5
上搭建一个Zookeeper集群,为后续的大数据组件提供服务。确保在每个节点上重复相应的配置和启动步骤,以确保集群的一致性和稳定性。
以下是搭建HDFS集群的具体步骤,以及对应的角色分布表格,使用Markdown格式编写:
1.4.2搭建HDFS
HDFS集群角色分布
节点IP | 节点名称 | NN | DN | ZKFC | JN | RM | NM |
---|---|---|---|---|---|---|---|
192.168.179.4 | node1 | ★ | ★ | ★ | ★ | ||
192.168.179.5 | node2 | ★ | ★ | ★ | ★ | ||
192.168.179.6 | node3 | ★ | ★ | ★ | |||
192.168.179.7 | node4 | ★ | ★ | ★ | |||
192.168.179.8 | node5 | ★ | ★ | ★ |
搭建HDFS集群步骤
-
安装依赖:在所有节点安装HDFS HA自动切换必须的依赖。
yum -y install psmisc
-
上传并解压Hadoop:将Hadoop安装包上传至
node1
并解压。tar -zxvf hadoop-3.1.4.tar.gz -C /software
-
配置环境变量:在
node1
配置Hadoop环境变量。echo 'export HADOOP_HOME=/software/hadoop-3.1.4/' >> /etc/profile echo 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:' >> /etc/profile source /etc/profile
-
配置
hadoop-env.sh
:设置JAVA_HOME。
在$HADOOP_HOME/etc/hadoop目录下,编辑hadoop-env.sh文件,添加以下内容:export JAVA_HOME=/usr/java/jdk1.8.0_181-amd64/
-
配置
hdfs-site.xml
:编辑配置项,包括逻辑名称、权限禁用、NameNode名称、编辑日志目录等。
配置路径:$HADOOP_HOME/etc/hadoop下的hdfs-site.xml文件
<configuration>
<property>
<!--这里配置逻辑名称,可以随意写 -->
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<!-- 禁用权限 -->
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<!-- 配置namenode 的名称,多个用逗号分割 -->
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<property>
<!-- dfs.namenode.rpc-address.[nameservice ID].[name node ID] namenode 所在服务器名称和RPC监听端口号 -->
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>node1:8020</value>
</property>
<property>
<!-- dfs.namenode.rpc-address.[nameservice ID].[name node ID] namenode 所在服务器名称和RPC监听端口号 -->
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>node2:8020</value>
</property>
<property>
<!-- dfs.namenode.http-address.[nameservice ID].[name node ID] namenode 监听的HTTP协议端口 -->
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>node1:50070</value>
</property>
<property>
<!-- dfs.namenode.http-address.[nameservice ID].[name node ID] namenode 监听的HTTP协议端口 -->
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>node2:50070</value>
</property>
<property>
<!-- namenode 共享的编辑目录, journalnode 所在服务器名称和监听的端口 -->
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node3:8485;node4:8485;node5:8485/mycluster</value>
</property>
<property>
<!-- namenode高可用代理类 -->
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<!-- 使用ssh 免密码自动登录 -->
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<property>
<!-- journalnode 存储数据的地方 -->
<name>dfs.journalnode.edits.dir</name>
<value>/opt/data/journal/node/local/data</value>
</property>
<property>
<!-- 配置namenode自动切换 -->
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>
- 配置
core-site.xml
:设置Hadoop客户端默认路径、Hadoop临时目录、Zookeeper集群地址。
配置路径:$HADOOP_HOME/ect/hadoop/core-site.xml
<configuration>
<property>
<!-- 为Hadoop 客户端配置默认的高可用路径 -->
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>
<property>
<!-- Hadoop 数据存放的路径,namenode,datanode 数据存放路径都依赖本路径,不要使用 file:/ 开头,使用绝对路径即可
namenode 默认存放路径 :file://${hadoop.tmp.dir}/dfs/name
datanode 默认存放路径 :file://${hadoop.tmp.dir}/dfs/data
-->
<name>hadoop.tmp.dir</name>
<value>/opt/data/hadoop/</value>
</property>
<property>
<!-- 指定zookeeper所在的节点 -->
<name>ha.zookeeper.quorum</name>
<value>node3:2181,node4:2181,node5:2181</value>
</property>
</configuration>
- 配置
yarn-site.xml
:设置YARN辅助服务、环境变量白名单、YARN ResourceManager高可用配置。
配置路径:$HADOOP_HOME/etc/hadoop/yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<property>
<!-- 配置yarn为高可用 -->
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<!-- 集群的唯一标识 -->
<name>yarn.resourcemanager.cluster-id</name>
<value>mycluster</value>
</property>
<property>
<!-- ResourceManager ID -->
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<!-- 指定ResourceManager 所在的节点 -->
<name>yarn.resourcemanager.hostname.rm1</name>
<value>node1</value>
</property>
<property>
<!-- 指定ResourceManager 所在的节点 -->
<name>yarn.resourcemanager.hostname.rm2</name>
<value>node2</value>
</property>
<property>
<!-- 指定ResourceManager Http监听的节点 -->
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>node1:8088</value>
</property>
<property>
<!-- 指定ResourceManager Http监听的节点 -->
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>node2:8088</value>
</property>
<property>
<!-- 指定zookeeper所在的节点 -->
<name>yarn.resourcemanager.zk-address</name>
<value>node3:2181,node4:2181,node5:2181</value>
</property>
<property>
<!-- 关闭虚拟内存检查 -->
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<!-- 启用节点的内容和CPU自动检测,最小内存为1G -->
<!--<property>
<name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
<value>true</value>
</property>-->
</configuration>
-
配置
mapred-site.xml
:设置MapReduce框架名称。 -
配置
workers
文件:列出所有工作节点。 -
添加用户参数:在启动脚本中添加用户参数,防止启动错误。
-
分发Hadoop安装包:将Hadoop安装包发送到其他节点。
-
配置其他节点的环境变量:在其他节点配置HADOOP_HOME。
-
启动HDFS和YARN服务:格式化Zookeeper、启动JournalNode、格式化NameNode、同步Standby NameNode、启动HDFS和YARN。
-
访问WebUI:通过Web界面访问HDFS和YARN的状态。
-
停止集群:使用相应命令停止Hadoop集群服务。