姓名: 总分:
Hadoop、Hive、HBase、数据集成、Scala阶段测试
一、选择题(共20道,每道0.5分)
1、下面哪个程序负责HDFS数据存储( C )
A. NameNode B. Jobtracher
C. DataNode D. SecondaryNameNode
2、下列哪个属性是hdfs-site.xml中的配置( C ) B
A. fs.defaultFS B. dfs.replication
C. yarn.resourcemanager.address D. mapreduce.framework.name
解析:
fs.defaultFS:用于指定HDFS(Hadoop Distributed File System)的默认文件系统URI。它是Java代码访问HDFS时使用的路径。
dfs.replication:该配置项用于设置HDFS中数据块的副本数。
yarn.resourcemanager.address:用于指定 YARN ResourceManager 的 RPC 服务器地址和端口号。ResourceManager 是 YARN 架构中的核心组件之一,负责接收客户端提交的作业(如 MapReduce 任务、Spark 任务等),并为这些作业分配资源(如内存、CPU)以在集群中的 NodeManager 上执行。
mapreduce.framework.name:该配置项用于指定MapReduce作业的运行框架。在Hadoop 2.x及更高版本中,MapReduce作业通常运行在YARN上,因此该配置项的值通常为
yarn
。它告诉MapReduce框架应该使用YARN来管理其作业的执行。
3、Hadoop-3.x集群中的HDFS的默认的数据块的大小是( D )
A. 256M B.32M
C.64M D.128M
4、Hadoop-3.x集群中的HDFS的默认的副本块的个数是( C )
A. 1 B. 2
C. 3 D. 4
5、请问以下哪个命令组成是错误的( C )
A.bin/hadoop fs -cat /data/c.txt B. sbin/hdfs dfsadmin -report
C. bin/hdfs namenode -format D.sbin/stop-dfs.sh
解析:
使用
hadoop fs
命令来访问HDFS文件系统,-cat
选项用于查看指定文件(这里是/data/c.txt
)的内容。用于请求HDFS报告其状态,包括健康报告、数据节点报告等。
在Hadoop中,
hdfs namenode
命令应该位于sbin
目录下,而不是bin
目录。这个命令用于格式化HDFS的NameNode,通常是在HDFS首次设置或需要重置HDFS元数据时使用的。用于停止HDFS守护进程(NameNode和DataNode)。
sbin
目录下包含了Hadoop的管理脚本,如启动和停止服务的脚本。
6、以下与HDFS类似的框架是( C )
A. NTFS B. FAT32
C. GFS D.EXT3
解析:
HDFS(Hadoop Distributed File System):Hadoop分布式文件系统,是一个高度容错性的系统,设计用来部署在低廉的硬件上,并能提供高吞吐量的数据访问,适合大规模数据集上的应用。
GFS(Google File System):Google文件系统,是一个可扩展的分布式文件系统,同样用于大型的、分布式的、对大量数据进行访问的应用。
NTFS(New Technology File System):是Windows操作系统的一个文件系统,NTFS主要用于个人计算机和小型服务器的数据存储和管理。
FAT32(File Allocation Table 32):是另一种文件系统,主要用于小型的存储设备,如U盘和早期的硬盘驱动器。
EXT3(Third extended filesystem):是Linux系统的一种常见的文件系统,主要用于Linux操作系统的数据存储和管理。
7、HBase启动不需要哪个进程( D )
A. HMaster B. HRegionServer
C. QuorumPeerMain D. NodeManager
A. HMaster:这是HBase的主节点进程,负责协调集群中的所有RegionServer并处理元数据操作。因此,HBase启动时需要HMaster进程。
B. HRegionServer:这是负责处理数据的读写请求以及Region的负载均衡的进程。HBase启动时需要RegionServer进程来提供数据服务。
C. QuorumPeerMain:这是ZooKeeper服务器的主类。如果HBase配置为使用ZooKeeper进行协调和管理,那么ZooKeeper(包括QuorumPeerMain进程)将是HBase启动过程中的一部分。虽然它不是HBase本身的进程,但HBase的运行依赖于ZooKeeper,因此在某种程度上可以认为HBase启动时需要ZooKeeper(包括QuorumPeerMain)的支持。
D. NodeManager:这是YARN(Yet Another Resource Negotiator)框架中的一部分,用于管理集群中的节点资源,如CPU、内存等。YARN是Hadoop生态系统中的一个资源管理和任务调度的框架,与HBase的核心功能不直接相关。HBase并不依赖于YARN或NodeManager来运行其基本的数据存储和查询功能。因此,HBase启动时不需要NodeManager进程。
8、下列哪个是纯离线数据采集工具( B )
A. FlinkX B. Sqoop
C. Flume D. Canal
FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步。
Sqoop是一个纯离线的数据采集工具,主要用于将关系型数据库(如MySQL)中的数据导入到Hadoop的HDFS中,或者从HDFS导出到关系型数据库中。高度依赖MapReduce和YARN
Flume是一个分布式、高可靠性和高可用性的海量日志收集系统,主要用于实时采集日志数据。
Canal是一个基于MySQL数据库增量日志解析的实时数据同步工具,它主要用于提供增量数据订阅和消费。
9、Map的输出结果首先被写入( A )
A. 内存 B. 缓存
C. 磁盘 D. 以上都正确
Map的输出结果首先被写入的是一个内存缓冲区,这个缓冲区可以看作是内存的一部分。
10、MapReduce与HBase的关系,哪些描述是正确的?( B )
A. 两者不可或缺,MapReduce是HBase可以正常运行的保证
B. 两者不是强关联关系,没有MapReduce,HBase可以正常运行
C. MapReduce不可以直接访问HBase
D. 它们之间没有任何关系
A. 两者不可或缺,MapReduce是HBase可以正常运行的保证
- 这个描述并不准确。虽然MapReduce和HBase都是Hadoop生态系统中的重要组件,但它们各自承担着不同的职责。HBase是一个开源的、分布式的非关系型数据库,而MapReduce是一种用于大规模数据处理的编程模型。HBase的正常运行并不直接依赖于MapReduce,它可以独立运行,并通过其他方式(如HBase自带的计算框架Coprocessor)进行数据处理。
B. 两者不是强关联关系,没有MapReduce,HBase可以正常运行
- 这个描述是正确的。HBase和MapReduce是两个独立的项目,它们之间的关系是相互依赖但非强制的。HBase可以独立于MapReduce运行,并通过其他机制(如HBase Shell、HBase API等)进行数据的读写和管理。同时,MapReduce也可以处理非HBase来源的数据,如HDFS上的文件等。
C. MapReduce不可以直接访问HBase
- 这个描述不完全准确。实际上,MapReduce可以通过HBase提供的HBaseTableInputFormat类来直接访问HBase中的数据。这个类允许MapReduce任务将HBase表作为输入源,从而可以对HBase中的数据进行读取和处理。
D. 它们之间没有任何关系
- 这个描述显然是错误的。HBase和MapReduce都是Hadoop生态系统中的关键组件,它们之间存在着紧密的联系和交互。HBase可以作为MapReduce任务的输入或输出源,而MapReduce则可以对HBase中的数据进行高效的并行处理。
11.下列哪个不是Spark的执行模式? ( C )
A. Local B. YARN
C. Mesos D. HDFS
A. Local:这是Spark的一个执行模式,其中Spark应用程序在单个JVM进程中运行,通常用于开发、测试和调试目的。在这个模式下,Spark不需要启动集群,而是在本地机器上运行,非常适合小规模数据处理和快速原型开发。因此,A选项是Spark的一个执行模式。
B. YARN:YARN(Yet Another Resource Negotiator)是Apache Hadoop的资源管理器,用于在Hadoop集群上管理资源和调度任务。Spark可以在YARN上运行,将YARN作为集群管理器,以管理集群资源和调度Spark任务。YARN模式包括yarn-client和yarn-cluster两种运行模式,分别适用于不同的场景。因此,B选项也是Spark的一个执行模式。
C. Mesos:虽然Mesos是一个开源的集群管理器,用于管理跨多种框架(包括Spark)的集群资源,但它本身并不是Spark的一种执行模式。相反,Spark可以在Mesos上运行,利用Mesos提供的资源管理和调度功能。然而,在描述Spark的执行模式时,我们通常不会说“Mesos是Spark的一个执行模式”,而是说“Spark可以在Mesos上运行”。因此,C选项不是Spark的直接执行模式。
D. HDFS:HDFS(Hadoop Distributed File System)是Apache Hadoop的分布式文件系统,用于存储大数据集。它并不是Spark的执行模式,而是Spark可以访问的一种数据存储系统。Spark可以从HDFS中读取数据,进行处理,并将结果写回到HDFS中。然而,HDFS与Spark的执行模式是两个不同的概念。因此,D选项同样不是Spark的执行模式。
12.在Spark中,什么机制用于加速迭代计算? ( B )
A. Checkpointing B. Caching
C. Broadcasting D. Partitioning
解析:
Caching(缓存)
缓存机制是Spark中用于优化迭代计算的重要手段。Spark允许用户将RDD(弹性分布式数据集)或DataFrame等数据集缓存到内存中,以便在后续的计算中重用。
Checkpointing(检查点)
虽然Checkpointing也是Spark中的一种容错机制,但它主要用于在系统故障或节点故障时恢复数据,而不是直接用于加速迭代计算。
Broadcasting(广播变量)
Broadcasting是Spark中用于优化数据传输的一种机制,它允许将大变量(如模型参数、大配置对象等)从Driver端广播到所有Executor端,以减少每个Task的数据传输量。
Partitioning(分区)
Partitioning是Spark中对数据进行分布式存储和计算的基础。通过合理的分区策略,可以将大数据集分割成多个小数据集,并分布到不同的节点上进行并行处理。然而,分区本身并不直接加速迭代计算,而是为并行计算提供了基础。
-
下列哪个函数属于转换操作(Transformation)而不是行动操作(Action)?( C )
A. count() B. collect()
C. filter() D. saveAsTextFile()
-
在Spark中,
persist
和cache
方法有何区别? ( B )
A) cache
是persist
的一个别名,二者完全相同。
B) persist
提供多种存储级别,而cache
总是使用默认的存储级别。
C) cache
用于DataFrame,而persist
用于RDD。
D) persist
用于数据的持久化,而cache
用于数据的临时存储。
- 在Spark中,什么是“窄依赖”(Narrow Dependency)与“宽依赖”(Wide Dependency)?它们如何影响数据的并行处理? ?(A )
A) 窄依赖表示每个父RDD分区映射到子RDD的一个分区,而宽依赖涉及多个父分区到一个子分区,导致shuffle。
B) 宽依赖意味着数据不需要重分布,而窄依赖则需要shuffle。
C) 窄依赖和宽依赖都涉及到数据的shuffle,只是程度不同。
D) 窄依赖与宽依赖仅在DataFrame中存在,对RDD没有意义。
- Spark的
Job
和Stage
在执行过程中如何划分? ( D )
A) Job
由一系列Stage
组成,每个Stage
对应于一个shuffle操作。
B) Job
和Stage
是同义词,没有区别。
C) Stage
由一系列Job
组成,用于并行执行不同的任务。
D) Job
是由用户提交的任务,Stage
是DAGScheduler为优化执行计划而创建的最小执行单元。
解析:
- Job的定义
- Job是Spark中由用户提交的任务,通常是由一个Action操作(如
collect
、count
、save
、reduce
等)触发的。每个Action操作都会生成一个Job。
- Stage的划分
- Stage是Spark中Job处理过程要分为的几个阶段。DAGScheduler(有向无环图调度器)会根据RDD之间的依赖关系(特别是宽依赖,如shuffle操作)将Job划分为多个Stage。
- 划分Stage的依据是RDD之间的宽窄依赖。宽依赖(如
groupByKey
、reduceByKey
、join
等)会导致shuffle操作,从而需要在不同节点间重新分配数据。每当遇到宽依赖时,DAGScheduler就会切分出一个新的Stage。- Stage的数量取决于程序中宽依赖(即shuffle操作)的数量。每个Stage包含一组可以并行执行的任务(Tasks)。
- Task的定义
- Task是Spark中任务运行的最小单位,最终是以Task为单位运行在Executor中的。一个Stage会包含多个Task,这些Task的数量通常取决于Stage中最后一个RDD的分区数。
- Task的内容与Stage相同,但当分区数量为n时,会有n个相同效果的Task被分发到执行程序中执行。
- 在Spark中,
mapPartitions
与map
操作有何区别,以及在什么情况下使用mapPartitions
更合适? ( B )
A) mapPartitions
和map
都是对每个元素进行操作,没有区别。
B) mapPartitions
可以访问整个分区的数据,适用于需要对分区内的数据进行全局操作的场景。
C) map
操作可以改变分区的数量,而mapPartitions
不能。
D) mapPartitions
是map
的别名,用于提高代码可读性。
- Spark的
Kryo
序列化库如何帮助提高性能? ( B )
A) Kryo增加了序列化的复杂度,但提高了数据的完整性。
B) Kryo序列化库提供了一种更紧凑、更快的序列化方式,减少了网络传输和磁盘I/O的开销。
C) Kryo只用于Spark的内部通信,对外部数据无影响。
D) Kryo序列化库是默认的序列化方式,不需要配置。
- 在Spark中,
SparkSession
与SparkContext
的关系是什么?为何推荐使用SparkSession
? ( A )
A) SparkSession
是SparkContext
的封装,提供了更高级的功能,如SQL查询和数据源管理,SparkSession
简化了API,提高了易用性。
B) SparkSession
和SparkContext
可以互换使用,没有推荐使用的原因。
C) SparkContext
是SparkSession
的前身,SparkSession
仅用于Spark SQL。
D) SparkSession
用于管理执行器,SparkContext
用于管理Driver程序。
- Spark的
Broadcast Join
与Shuffle Hash Join
有何区别?在何种情况下应优先考虑使用Broadcast Join
? ( D )A
A) Broadcast Join
将较小的表广播到每个节点,减少shuffle成本;Shuffle Hash Join
需要更多网络传输,适用于大表间的连接。
B) Broadcast Join
和Shuffle Hash Join
没有区别,只是名称不同。
C) Shuffle Hash Join
总是优于Broadcast Join
,因为它更通用。
D) Broadcast Join
用于小数据集,Shuffle Hash Join
用于大数据集,但具体选择与数据大小无关。
解析:
Broadcast Join
和Shuffle Hash Join
是Spark SQL中处理连接(Join)操作的两种不同策略。Broadcast Join
适用于连接操作中的一个小表,它会将这个小表广播到每个节点上,从而避免了大表的shuffle操作,减少了网络传输成本。而Shuffle Hash Join
则适用于大数据集之间的连接,它需要对数据进行shuffle操作来确保连接的正确性。因此,在连接操作中的一个小表时,应优先考虑使用Broadcast Join
。注意,实际选择哪种连接策略还取决于其他因素,如数据分布、集群配置等,但数据大小是一个重要的考虑因素。
二、填空题(共20分,每空0.5分)
1、启动hdfs的shell脚本是:( sh xxx.sh )start-dfs.sh
解析:
启动hdfs的shell脚本是:
start-dfs.sh
。这个脚本用于启动Hadoop分布式文件系统(HDFS)的所有守护进程,包括NameNode和DataNode等。
2、Block是HDFS的基本存储单元,默认大小是( 128 ) MB
3、MapReduce默认输入的格式化类:( InputFormat )TextInputFormat
解析:
MapReduce默认输入的格式化类是
TextInputFormat
。这是MapReduce的默认输入格式,它读取文件的行作为输入,其中行的字节偏移量作为键(Key),行的内容作为值(Value)。
4、Hadoop三大组件:( HDFS )、( MapReduce )、( Yarn )
5、Hiveserver2默认的端口:( 10000 )
解析:
Hiveserver2默认的端口是10000。Hiveserver2是Hive的一个服务组件,它允许用户通过JDBC或ODBC等协议远程连接到Hive并执行SQL查询。
6、HBase的RowKey设计三大原则:( 唯一性 )、( 散列性 )、( 长度适中 )
解析:
- 唯一性:确保RowKey的唯一性,以便能够唯一标识表中的每一行数据。
- 散列性:设计RowKey时应该考虑其散列性,避免大量数据集中在少数几个Region上,从而导致热点问题。
- 长度适中:RowKey的长度应该适中,不宜过长也不宜过短。过长的RowKey会占用较多的存储空间,而过短的RowKey则可能增加数据倾斜的风险。
7、在Spark中,一个( RDD )由一系列的( 分区 )组成,每个Stage由一组( RDD )构成,而Stage之间的依赖关系通常由( 行动算子 )操作触发。
8、hive中数据文件默认存储格式是( txt )TextFile
解析:
hive中数据文件默认存储格式是TextFile。TextFile是一种简单的文本格式,数据以行为单位进行存储,每行数据之间通过换行符分隔。
9、spark core中缓存的实现方式有几种( 使用cache存储到内存中 )( 使用checkpoint存储到磁盘中 )
10、hive中sql转变成mr经过4个器,分别是解析器,( 编译器 )、( 优化器 )、( 执行器 )
三、判断题(共10道,每道1分)
1、Block Size是不可以修改的( F )
2、如果NameNode意外终止,SecondaryNameNode会接替它使集群继续工作( F )
3、MapReduce 切片的大小等于 block的大小( F )
4、在HBase中由HMaster负责用户的IO请求( F )
5、MapReduce中map任务的数量可以自己指定( T )
6、DataX只能用于离线数据采集( T )
7、Flume运行时需要依赖MapReduce( T )F
解析:
Apache Flume是一个分布式、可靠且可用的服务,用于高效地收集、聚合和移动大量日志数据。它并不依赖MapReduce来运行,而是可以独立于Hadoop生态系统运行。
8、MapReduce中环形缓冲区默认大小为128M( F )
9、Spark的SparkContext
和SparkSession
可以同时存在于同一个应用中,SparkContext
提供了更多低级的API,而SparkSession
则提供了高层的API,包括SQL和数据源支持。( T )
10、Spark的map
操作是懒惰求值的,只有在触发行动操作时才会执行计算。( T )
四、简答题(共5道,每道4分)
1、用自己的语言描述SecondaryNameNode的作用。
SecondaryNameNode作为NameNode的秘书,帮助NameNode处理事务。
SecondaryNameNode是用来帮助NameNode完成元数据信息合并,从角色上看属于NameNode的“秘书”
1.定期合并FsImage和Edits文件
-
提供HDFS元数据的冷备份
-
监控HDFS状态
-
提升HDFS的可靠性和性能
2、用自己的语言描述spark的数据倾斜优化方式。
1.使用Hive ETL预处理数据
如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据)。
此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是 原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么 在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。
2.过滤少数导致倾斜的key
如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大。
如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别 重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤 掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时, 动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可。
3.提高shuffle操作的并行度
在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很 多场景来说都有点过小。
增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个 task,从而让每个task处理比原来更少的数据。
4.双重聚合
对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by 语句进行分组聚合时,比较适用这种方案。
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
5.将reduce join转为map join
在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中 的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作, 进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式 连接起来。
6.采样倾斜key并分拆join操作
两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五 ”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。
思路:
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪 几个key。
然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。
再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。
而另外两个普通的RDD就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
7.使用随机前缀和扩容RDD进行join
如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没 什么意义。
该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成 数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
然后将该RDD的每条数据都打上一个n以内的随机前缀。
同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。
最后将两个处理后的RDD进行join即可。
3、用自己的语言描述诉MapReduce流程。
一、输入分片(Input Splitting)
- 过程描述:在MapReduce作业开始前,输入文件(或文件夹)首先被划分为多个InputSplit(输入分片)。默认情况下,每个HDFS的block(数据块)对应一个InputSplit。这样做的目的是将大文件分割成多个小块,以便并行处理。
- 分片大小:分片的大小通常与HDFS的数据块大小相同(默认是128MB),但也可以根据作业需求进行调整。分片时不考虑数据集整体,而是逐个针对每一个文件单独切片。
二、Map阶段
- InputFormat类:使用InputFormat类的子类(如TextInputFormat)把输入文件(夹)划分为InputSplit。
- RecordReader类:每个InputSplit通过RecordReader类被解析成一个个<key,value>键值对。在TextInputFormat中,默认是每行的起始偏移量作为key,每行的内容作为value。
- Mapper类:框架调用Mapper类中的map函数,输入是<k1,v1>键值对,输出是<k2,v2>键值对。程序员可以覆盖map函数,实现自己的逻辑。
三、Combiner阶段(可选)
- 过程描述:Combiner是一个本地化的reduce操作,发生在map端。它的主要作用是减少map端的输出,从而减少shuffle过程中网络传输的数据量,提高作业的执行效率。
- 注意:Combiner的输出是Reducer的输入,因此它绝不能改变最终的计算结果。Combiner适合于等幂操作,如累加、最大值等。
四、Shuffle阶段
- 分区:在map函数处理完数据后,输出的<k2,v2>键值对会根据key进行分区,不同的分区由不同的reduce task处理。分区操作通常通过哈希函数实现。
- 排序与合并:在写入环形缓冲区之前,数据会先进行排序(默认采用快速排序算法)。当缓冲区满(默认为80%容量)时,数据会溢出到磁盘文件中,并在溢出前完成排序。多个溢出文件在最终输出前会进行归并排序,合并成一个大的有序文件。
- 数据传输:map任务完成后,reduce任务会启动数据copy线程,通过HTTP方式请求map任务所在的NodeManager以获取输出文件。
五、Reduce阶段
- 数据合并:Reduce任务将接收到的所有map任务的输出数据(已分区且区内有序)进行合并,相同key的value值会被放到同一个集合中。
- Reducer类:框架调用Reducer类中的reduce函数,对合并后的数据进行处理,最终输出结果。
- OutputFormat类:使用OutputFormat类的子类(如TextOutputFormat)将最终结果输出到文件或数据库等存储介质中。
4、谈谈Hive的优化。
1.本地模式运行,当处理一些小任务时可以选择本地模式运行,这样会使得任务执行的速度会很快。
2.JVM模式,在处理一些需要很多资源的任务时,可以先申请一部分的资源,等运行结束后再将资源释放。
3.严格模式,启动严格模式,禁止分区表的全表扫描,查询数据时必须加limit,禁止笛卡尔积。
4.hive join的数据倾斜问题,当小表join小表时,不用去管它;当小表join大表时,小表放在join的左边;当大表join大表时,应当考虑是否会出现某个reduce数据量过大的情况。空key过滤:当有大量数据同时放入一个reduce时,应当观察该rowkey,一般来说该rowkey对应的数据都是异常数据,需要使用sql语句对其进行过滤。空key转换:当有大量的数据都对应一个空的rowkey时,需要给这些数据随机分配一个rowkey,使它们均匀的分布到一些reduce中。
5.自定义map和reduce的数量,一般不去修改它。
5、用自己的语言描述诉spark的资源调度和任务调度流程。
spark的资源调度:driver向resourcemanager申请资源,resourcemanager选择一个空闲的子节点开启applicationmaster任务,applicationmaster向resourcemanager提交申请资源开启executor的任务。applicationmaster选择一个空闲子节点开启executor,
开启完毕后applicationmaster将executor开启的消息发送给driver,让driver发送执行任务。
spark的任务调度流程:driver端,遇到action算子触发任务执行,将任务提交到有向无环图,DAGscheduler中,根据RDD的血缘关系划分划分stage,将RDD中的分区封装成taskset任务,发送到TASKscheduler。TASKscheduler取出taskset任务,根据RDD 的提供最优的任务执行计划,只移动计算不移动数据,开始对执行任务。
spark的资源调度:
1、Driver提交作业命令
2、向ResourceMananger申请资源
3、ResourceMananger检查一些权限、资源空间,在一个相对空闲的子节点上开启一个ApplicationMaster的进程
4、ApplicationMaster向ResourceMananger申请资源,启动Executor
5、ResourceMananger启动Executor
6、Executor反向注册给Driver,告诉Driver资源申请成功,可以发送任务
spark的任务调度流程:
7、Driver遇到一个行动算子,触发整个作业调度
8、先将整个作业交给DAG有向无环图
DAG Scheduler
9、根据RDD之间的血缘关系,找出宽窄依赖(有没有shuffle的产生)
10、通过宽窄依赖划分stage阶段
11、根据stage阶段,将stage中的task任务封装成一个taskSet对象
12、发送给后续的 Task Scheduler
Task Scheduler
13、从DAG Scheduler发送过来的taskSet中取出task任务
14、根据RDD五大特性的最后一大特性,只移动计算不移动数据,将task任务发送到对应的Executor的线程池中执行
五、代码题(50分)
1、spark sql数据分析以及可视化(30分)
疫情期间各类政府媒体及社交网站,均发布了相关疫情每日统计数据,下面基于数据仓库工具Hive请你统计分析相关疫情数据。
提示:
(数据字段为:日期date、省份province、城市city、新增确诊confirm、新增出院heal、新增死亡dead、消息来源source)
部分数据截图:
题目:
请基于covid19.csv数据,将数据导入到Hive中,使用spark on hive读取数据使用纯SQL完成下列统计分析
请自行在Hive按照数据结构创建对应的表并加载数据
请给出代码语句及结果截图
-
1、统计湖北省每月新增出院病例总数最多的前3个城市(8分)
输出:[月份,城市,每月新增出院病例总数,排名]
create table bigdata30_test3.covid (
dates String,
province String,
city String,
confirm Int,
heal Int,
dead Int,
source String
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/bigdata30/data/';
select
tt1.month,
tt1.city,
tt1.counts as counts,
tt1.rank
from
(select
t1.month,
t1.city,
t1.counts as counts,
-- row_number:数据相同也不会导致排名重复
row_number() over(partition by t1.month order by t1.counts desc) rank
from
(select
province,
city,
-- 只取出月份
subString(dates,0,2) as month,
count(heal) as counts
from bigdata30_test3.covid
where province = "湖北" and city != "境外输入-英国"
-- 只取出月份用来分组
group by city,province,subString(dates,0,2)) t1) tt1
-- 取每个排序的前三名
WHERE tt1.rank <= 3;
结果:
1月 武汉市 24 1
1月 荆门市 10 2
1月 荆州市 10 3
2月 武汉市 62 1
2月 黄石市 54 2
2月 黄冈市 46 3
3月 武汉市 41 1
3月 鄂州市 32 2
3月 孝感市 28 3
4月 武汉市 29 1
4月 荆门市 1 2
4月 襄阳市 1 3
-
2、统计安徽省每月新增确诊人数同比
同比 = (当月指标 - 上月指标)/ 上月指标 (6分)
输出:[月份,每月新增确诊人数,上月新增确诊人数,同比]
将该需求结果写入到mysql中,使用fileBI作图,柱状图 (4分)
-- 纯sql DBeaver中执行 select t1.month, t1.counts, LAG(counts,1,-1) over(order by t1.month) as last_counts, case when LAG(counts,1,-1) over(order by t1.month) < 0 then '没有上一个月的数据' else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2) end as rate from (select subString(dates,0,2) as month, province, count(confirm) as counts from bigdata30_test3.covid where province = "安徽" group by province,subString(dates,0,2)) t1; -- MySQL建表 create table dataMysql ( months varchar(30), counts varchar(30), last_counts varchar(30), rate varchar(30) )
// 为了将数据写入到MySQL,使用sparksql package com.shujia.DSLexam import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import java.sql.{Connection, DriverManager, PreparedStatement} object Exam2_1 { def main(args: Array[String]): Unit = { val sparkSession: SparkSession = SparkSession.builder() .master("local") .appName("考试大题一") //参数设置的优先级:代码优先级 > 命令优先级 > 配置文件优先级 .config("spark.sql.shuffle.partitions", "1") .enableHiveSupport() // 开启hive的配置 .getOrCreate() sparkSession.sql("use bigdata30_test3") //truncate = false 时,完整地显示某列值,不进行任何截断。 val dataDF: DataFrame = sparkSession.sql( """ |select |t1.month, |t1.counts, |LAG(counts,1,-1) over(order by t1.month) as last_counts, |case | when LAG(counts,1,-1) over(order by t1.month) < 0 then 0 | else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2) |end as rate |from |(select |subString(dates,0,2) as month, |province, |count(confirm) as counts |from |bigdata30_test3.covid |where province = "安徽" |group by province,subString(dates,0,2)) t1 |""".stripMargin) dataDF.foreach((rdd: Row) => { //注册驱动 Class.forName("com.mysql.jdbc.Driver") //创建数据库连接对象 val conn: Connection = DriverManager.getConnection( "jdbc:mysql://master:3306/exam?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "123456" ) //创建预编译对象 val statement: PreparedStatement = conn.prepareStatement("insert into dataMysql values(?,?,?,?)") statement.setString(1, rdd.getAs[String]("month")) statement.setLong(2, rdd.getAs[Long]("counts")) statement.setLong(3, rdd.getAs[Long]("last_counts")) statement.setDouble(4, rdd.getAs[Double]("rate")) // 执行sql语句 statement.executeUpdate() statement.close() conn.close() }) } }
fineBI作图
-
3、统计安徽省各城市连续新增确诊人数、连续新增确诊开始日期、连续新增确诊结束日期及连续新增确诊天数(12分)
输出:[城市,连续新增确诊人数,连续新增确诊开始日期,连续新增确诊结束日期,连续新增确诊天数]
select DISTINCT
tt1.city,
max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) - min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as add_confirm,
min(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as start_date,
max(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as end_date,
count(1) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as counts
from
(select
*,
CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,
(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) as flag
from
(select * ,
from_unixtime(unix_timestamp(dates,'MM月dd日'),'MM-dd') as new_day,
row_number() over(partition by city order by dates) rank
from bigdata30_test3.covid
where province = "安徽" and source = "安徽卫健委" and heal IS NOT NULL and confirm IS NOT NULL and dead IS NOT NULL) t1) tt1
-- 连续新增确诊人数应该是,求出的数据应该是本组的最后一条的confirm减去本组第一天的confirm,而不是下面的一组中的最大的confirm减去最小的confirm ??? 该如何求解
max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) -min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as add_confirm,
-- 解决方案:FIRST_VALUE()和LAST_VALUE()函数分别获取了每个分组的第一天和最后一天的确诊人数
-- 为了避免下述中出现的分组全出现在结果中的问题,使用FIRST_VALUE()和LAST_VALUE()函数时,
-- 最好指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
SELECT DISTINCT
tt1.city,
LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) -
FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS add_confirm,
MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS start_date,
MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS end_date,
COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS counts
FROM
(SELECT
*,
CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,
(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) AS flag
FROM
(SELECT *,
FROM_UNIXTIME(UNIX_TIMESTAMP(dates, 'MM月dd日'), 'MM-dd') AS new_day,
ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rank
FROM bigdata30_test3.covid
WHERE province = '安徽' AND source = '安徽卫健委' AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL
) t1
) tt1;
-- 注:
在SQL中,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING是窗口函数(如FIRST_VALUE(), LAST_VALUE(), ROW_NUMBER(), SUM(), AVG()等)的一个子句,用于指定窗口的范围。
UNBOUNDED PRECEDING表示窗口的起始点是分区中的第一行。
UNBOUNDED FOLLOWING表示窗口的结束点是分区中的最后一行。
对于FIRST_VALUE()和LAST_VALUE()这样的函数,它们通常需要一个明确的窗口定义来确定“第一”和“最后”是基于什么范围来计算的。如果不提供ROWS BETWEEN子句,某些数据库系统可能会报错,因为它们不知道应该基于哪些行来计算这些值。
-- 不加也可正常执行,但是不能加上ORDER BY tt1.new_day,否则会出现整个分组都出现在最终的结果里
形如:
|合肥市| 0| 01-28| 01-30| 3|
|合肥市| 10| 01-28| 01-30| 3|
|合肥市| 7| 01-28| 01-30| 3|
SELECT DISTINCT
tt1.city,
LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) -
FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS add_confirm,
MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS start_date,
MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS end_date,
COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS counts
FROM
(SELECT
*,
CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,
(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) AS flag
FROM
(SELECT *,
FROM_UNIXTIME(UNIX_TIMESTAMP(dates, 'MM月dd日'), 'MM-dd') AS new_day,
ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rank
FROM bigdata30_test3.covid
WHERE province = '安徽' AND source = '安徽卫健委' AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL
) t1
) tt1;
没指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING,又加上了order by dates: 会出现下面这种情况
2、spark DSL数据分析(20分)
现有三份数据,结构如下:
-
live_types 直播间信息表
结构:live_id live_type
直播间id 直播间类型
-
live_events 用户访问直播间记录表
结构:user_id live_id start_time end_time
用户id 直播间id 开始时间 结束时间
-
user_info 用户信息表
结构:user_id user_name
用户id 用户名
题目:
请给出结果截图及Scala代码
1、统计每位用户观看不同类型直播的次数(6分)
输出:[用户id,用户名,直播间类型,次数]
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
object Exam2 {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("社保练习")
.getOrCreate()
import org.apache.spark.sql.functions._
import sparkSession.implicits._
// user_id live_id start_time end_time
// 用户id 直播间id 开始时间 结束时间
val live_events: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("user_id INT,live_id INT,start_time timestamp,end_time timestamp")
.load("spark/data_exam/live_events.txt")
// live_events.show()
//live_id live_type
// 直播间id 直播间类型
val live_types: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("live_id INT,live_type String")
.load("spark/data_exam/live_types.txt")
// live_types.show()
//user_id user_name
// 用户id 用户名
val user_info: DataFrame = sparkSession.read
.format("csv")
.option("sep", "\t")
.schema("user_id INT,user_name String")
.load("spark/data_exam/user_info.txt")
// user_info.show()
/**
* 1、统计每位用户观看不同类型直播的次数(6分)
* > 输出:[用户id,用户名,直播间类型,次数]
*/
live_events
.withColumn("count",count(expr("1")) over Window.partitionBy($"user_id",$"live_id"))
// TODO 设置表与表之间进行左连接
.join(live_types, live_events("live_id") === live_types("live_id"), "left")
.join(user_info, live_events("user_id") === user_info("user_id"), "left")
/**
* 两张表中的字段名相同,要注明字段所属表
* 否则会报 Reference 'user_id' is ambiguous, could be: user_id, user_id.
*/
.select(live_events("user_id"),$"user_name",$"live_type",$"count")
.distinct()
.show()
+-------+---------+---------+-----+
|user_id|user_name|live_type|count|
+-------+---------+---------+-----+
| 106| Lucy| music| 1|
| 100| LiHua| game| 1|
| 102| Tom| food| 1|
| 104| Bush| game| 1|
| 105| Jam| game| 1|
| 102| Tom| music| 1|
| 100| LiHua| food| 2|
| 101| Bob| food| 1|
| 101| Bob| game| 1|
| 102| Tom| game| 1|
| 104| Bush| food| 1|
+-------+---------+---------+-----+
2、统计每位用户累计观看直播时长,按时长降序排列(6分)
输出:[用户id,用户名,累计时长]
/**
* 2、统计每位用户累计观看直播时长,按时长降序排列(6分)
* > 输出:[用户id,用户名,累计时长]
*/
// 100 1 2021-12-01 19:00:00 2021-12-01 19:28:00 start_time timestamp,end_time
live_events
.withColumn("times",(unix_timestamp($"end_time","yyyy-MM-dd HH:mm:ss") - unix_timestamp($"start_time","yyyy-MM-dd HH:mm:ss")))
.withColumn("all_times", sum($"times") over Window.partitionBy($"user_id"))
.join(user_info,"user_id")
// TODO 时间戳 / 60 ,在最后查询时,可以将秒转换成分钟
.select($"user_id",$"user_name",$"all_times" / 60)
.distinct()
.orderBy($"all_times".desc)
.show()
+-------+---------+----------------+
|user_id|user_name|(all_times / 60)|
+-------+---------+----------------+
| 104| Bush| 178.0|
| 101| Bob| 163.0|
| 102| Tom| 140.0|
| 106| Lucy| 129.0|
| 100| LiHua| 110.0|
| 105| Jam| 8.0|
+-------+---------+----------------+
3、统计不同类型直播用户累计观看时长降序排名(8分)
输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]
/**
* 3、统计不同类型直播用户累计观看时长降序排名(8分)
* > 输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]
*/
live_events
//TODO 在开始得出时间戳的时候就将其转换成以分钟为单位
.withColumn("times", (unix_timestamp($"end_time", "yyyy-MM-dd HH:mm:ss") - unix_timestamp($"start_time", "yyyy-MM-dd HH:mm:ss")) / 60)
.withColumn("all_times", sum($"times") over Window.partitionBy($"user_id"))
.join(user_info, "user_id")
.join(live_types, "live_id")
.withColumn("rank", row_number() over Window.partitionBy($"live_type").orderBy($"all_times".desc))
.select($"live_id",$"live_type",$"user_id",$"user_name",$"all_times",$"rank")
.show()
+-------+---------+-------+---------+---------+----+
|live_id|live_type|user_id|user_name|all_times|rank|
+-------+---------+-------+---------+---------+----+
| 1| food| 104| Bush| 178.0| 1|
| 1| food| 101| Bob| 163.0| 2|
| 1| food| 102| Tom| 140.0| 3|
| 1| food| 100| LiHua| 110.0| 4|
| 1| food| 100| LiHua| 110.0| 5|
| 3| music| 102| Tom| 140.0| 1|
| 3| music| 106| Lucy| 129.0| 2|
| 2| game| 104| Bush| 178.0| 1|
| 2| game| 101| Bob| 163.0| 2|
| 2| game| 102| Tom| 140.0| 3|
| 2| game| 100| LiHua| 110.0| 4|
| 2| game| 105| Jam| 8.0| 5|
+-------+---------+-------+---------+---------+----+