312个问题,问题涵盖广、从自我介绍到大厂实战、19大主题,一网打尽、真正提高面试成功率
一、Linux
1. 说⼀下linux的常⽤命令?
说一些高级命令即可
systemctl 设置系统参数 如:systemctl stop firewalld关闭防火墙
tail / head 查看文件尾部或开头,通常用来查看日志文件
grep 查找文件 通常配合管道符 | 使用
scp 将文件分发给其他服务器
ping 查看网络是否正常
yum 下载并安装软件包
tar 解压缩
2. 如何查看linux⽂件的前⼗⾏ 或者 后⼗⾏
head -n 10 /tmp/tmpfile # 查看⽂件前10⾏
tail -n 10 /tmp/tmpfile # 查看⽂件后10⾏
cat filename | head -n 300 | tail -n +100 # 显示100⾏到300⾏
cat filename | tail -n +10 | head -n 20 # 从10⾏开始,显示20⾏,即显示10-39
⾏
3. 只知道进程名称,⽐如yarn,如何找到此进程并杀死?
ps aux | grep yarn 来查找进程yarn进程 IDkill -9 id 强制杀死进程
⼆、Shell
1. Shell 常⽤⼯具及写过的脚本
1)常⽤⼯具:
scp 通过 SSH 协议复制文件 scp file.txt user@host:/path/to/destination
sort 对文件内容排序 sort file.txt
bash 执行 Shell 脚本 bash ./a.sh
echo 输出值 echo('666')
2)⽤ Shell 写过哪些脚本
(1)集群启动,分发脚本 start-all.sh
(2)数仓与 MySQL 的导⼊导出 (sqoop脚本)sqoop - ...
(3)数仓层级内部的导⼊:ods->dwd->dws->dwt->ads
2. Shell 中单引号和双引号区别?
1)单引号不取变量值2)双引号取变量值3)反引号 ` ,执⾏引号中命令4)双引号内部嵌套单引号,取出变量值5)单引号内部嵌套双引号,不取出变量值
三、MySQL
1. MySQL 索引使⽤有哪些注意事项呢?
1)索引哪些情况会失效
- (1)查询条件包含or,会导致索引失效。
- (2)隐式类型转换(查询时自动将某种数据类型转换为另一种数据类型),会导致索引失效,例如age字段类型是int,我们where age = “1”,这样就会触发 隐式类型转换。
- (3)like通配符会导致索引失效。注意:"ABC%“会⾛range索引,”%ABC"索引才会失效。
- (4)联合索引(包含多个列的索引),查询时的条件列不是联合索引中的第⼀个列,索引失效。
- (5)对索引字段进⾏函数运算。
2)索引不适合哪些场景
- 数据量少的不适合加索引
- 更新⽐较频繁的也不适合加索引
- 离散性低(数据值之间的差异小)的字段不适合加索引(如性别)
2. 数据库的优化
- 1)加索引
它会将表中的某一列或多列的值与对应的记录位置,数据库可以避免全表扫描,而是通过索引直接定位到符合条件的记录,从而显著减少查询的时间
- 2)⾏列裁剪,避免返回不必要的数据(select 字段)
通过只选择需要的列(而不是
SELECT *
),减少网络传输的数据量
3)适当分批量进⾏ 避免一次性大数据量操作
- 4)优化sql结构
- 5)分库分表
3. Hive数仓与mysql数据库的区别?
- 1)数据规模
- Hive⽀持很⼤规模的数据计算;数据库可以⽀持的数据规模较⼩。
- 2)计算引擎
- Hive底层运⾏的是MapReduce程序,但是数据库有⾃⼰的执⾏引擎。
- 3)数据更新
- Hive读多写少,不建议对数据改写;数据库对数据的增删改操作⽐较多。
- 4)存储数据位置
- Hive数据实际是存储在HDFS上,数据库的数据存储在块设备上或者本地⽂件系统中
- 5)数据冗余
- hive允许一定量的数据冗余
四、Hadoop
1. Hadoop 常⽤端⼝号
2. HDFS 读流程和写流程 *
读流程
- 1)客户端向namenode请求下载⽂件,namenode通过查询元数据,找到⽂件块所在的datanode地址。
- 2)挑选⼀台datanode(就近原则,然后随机)服务器,请求读取数据。
- 3)datanode开始传输数据给客户端(从磁盘⾥⾯读取数据放⼊流,以packet为单位来做校验)。
- 4)客户端以packet为单位接收,先在本地缓存,然后写⼊⽬标⽂件。
- 这个读流程是⽐较简单的,⾥⾯还涉及到⽤户读权限的校验以及⽂件是否存在的判断
写流程
- 1)客户端向namenode请求上传⽂件,namenode检查⽬标⽂件是否已存在,⽗⽬录是否存在。
- 2)namenode返回是否可以上传。
- 3)客户端请求第⼀个 block上传到哪⼏个datanode服务器上。
- 4)namenode返回3个datanode节点,分别为dn1、dn2、dn3。
- 5)客户端请求dn1上传数据,dn1收到请求会继续调⽤dn2,然后dn2调⽤dn3,将这个通信管道建⽴完成。
- 6)dn1、dn2、dn3逐级应答客户端
- 7)客户端开始往dn1上传第⼀个block(先从磁盘读取数据放到⼀个本地内存缓存),以packet为单位, dn1
- 8)收到⼀个packet就会传给dn2,dn2传给dn3;dn1每传⼀个packet会放⼊⼀个应答队列等待应答
- 9)当⼀个block传输完成之后,客户端再次请求namenode上传第⼆个block的服务器。(重复执⾏3-7 步)
3. HDFS 常⽤命令
hdfs dfs -help
hdfs dfs -cat <hdfsfile> # 查看⽂件内容
hdfs dfs -mkdir <path> # 新建⽬录
hdfs dfs -put # 上传本地⽂件到hdfs
hdfs dfs -rm # 删除⽂件或⽬录
hdfs dfs -get < hdfs path> < localpath> # 将hdfs⽂件下载到本地
4. HDFS ⼩⽂件处理
a) 存储层⾯(1)1 个⽂件块,占⽤ namenode 多⼤内存 150 字节(2)128G 能存储多少⽂件块?(3)128 * 1024*1024*1024byte/150 字节 = 9 亿⽂件块(4)极⼤的占⽤了NameNode的内存b) 计算层⾯每个⼩⽂件都会起到⼀个 MapTask,占⽤了⼤量计算资源
a)采⽤ har 归档⽅式,将⼩⽂件归档b)采⽤ CombineTextInputFormat
5. MapReduce的天⻰⼋部 *
- 1)设置MapReduce的输⼊InputFormat类型,默认为TextInputFormat
- 2)⾃定义map函数,得到TextInputFormat的k1,v1;经过处理后传出k2,v2
- 3)分区--默认根据k2决定map中的数据该发送到哪个reduce中
- 4)排序--默认根据k2进⾏字典排序
- 5)规约--默认没有此阶段,是优化⼿段,可以提前合并
- 6)分组--相同k2的value会放到同⼀个集合中
- 7)⾃定义reduce函数,讲分组得到的k2,v2转成k3,v3输出
- 8)设置输出的OutputFormat,默认采⽤TextOutputFormat,将结果输出到⼀个纯⽂本⽂件中
6. MapReduce的优化 *
1)Map 阶段
- (1)增⼤环形缓冲区⼤⼩。由 100m 扩⼤到 200m
- (2)增⼤环形缓冲区溢写的⽐例。由 80%扩⼤到 90%
- (3)减少对溢写⽂件的 merge 次数。(10 个⽂件,⼀次 20 个 merge)
- (4)不影响实际业务的前提下,采⽤ Combiner 提前合并,减少 I/O。
2)Reduce 阶段
- (1)合理设置 Map 和 Reduce 数:两个都不能设置太少,也不能设置太多。太少,会 导致 Task 等 待,延⻓处理时间;太多,会导致 Map、Reduce 任务间竞争资源,造成处理超 时等错误。
- (2)设置 Map、Reduce 共存:调整 slowstart.completedmaps 参数,使 Map 运⾏到⼀定 程度后, Reduce 也开始运⾏,减少 Reduce 的等待时间。
- (3)规避使⽤ Reduce,因为 Reduce 在⽤于连接数据集的时候将会产⽣⼤量的⽹络消耗。
- (4)增加每个 Reduce 去 Map 中拿数据的并⾏数
- (5)集群性能可以的前提下,增⼤ Reduce 端存储数据内存的⼤⼩。
3)IO 传输
采⽤数据压缩的⽅式,减少⽹络 IO 的的时间。安装 Snappy 和 LZOP 压缩编码器。 压缩:(1)map 输⼊端主要考虑数据量⼤⼩和切⽚,⽀持切⽚的有 Bzip2、LZO。注意: LZO 要想⽀持切⽚ 必须创建索引;(2)map 输出端主要考虑速度,速度快的 snappy、LZO;(3)reduce 输出端主要看具体需求,例如作为下⼀个 mr 输⼊需要考虑切⽚,永久保 存考虑压缩率⽐ 较⼤的 gzip。
7.Yarn调度器
1)Hadoop 调度器重要分为三类:FIFO 、Capacity Scheduler(容量调度器)和 Fair Sceduler(公平调度器)。2)区别:FIFO 调度器:⽀持单队列 、先进先出容量调度器:⽀持多队列。队列资源分配,优先选择资源占⽤率最低的队列分配资源; 作业资源分 配,按照作业的优先级和提交时间顺序分配资源;容器资源分配,本地原则(同 ⼀节点/同⼀机架/ 不同节点不同机架)公平调度器:⽀持多队列,保证每个任务公平享有队列资源。资源不够时可以按照缺额分配。3)在⽣产环境下怎么选择?⼤⼚:如果对并发度要求⽐较⾼,选择公平,要求服务器性能必须 OK;中⼩公司,集群服务器资源不太充裕选择容量。4)在⽣产环境怎么创建队列?a)调度器默认就 1 个 default 队列,不能满⾜⽣产要求。b)按照框架:Hive /Spark/ flink 每个框架的任务放⼊指定的队列(企业⽤的不是特别 多)c)按照业务模块:登录注册、购物⻋、下单、业务部⻔1、业务25)在⽣产环境怎么创建队列?a)因为担⼼员⼯不⼩⼼,写递归死循环代码,把所有资源全部耗尽。b)实现任务的降级使⽤,特殊时期保证重要的任务队列资源充⾜。c)业务部⻔ 1(重要)=》业务部⻔ 2(⽐较重要)=》下单(⼀般)=》购物⻋(⼀般) =》登录 注册(次要)
8. Hadoop 宕机
1)如果 MR 造成系统宕机。此时要控制 Yarn 同时运⾏的任务数,和每个任务申请的最 ⼤内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是 8192MB)2)如果写⼊⽂件过快造成 NameNode 宕机。那么调⾼ Kafka 的存储⼤⼩,控制从 Kafka 到 HDFS 的写⼊速度。例如,可以调整 Flume 每批次拉取 数据量的⼤⼩参数 batchsize。
9. Hadoop 解决数据倾斜⽅法
1)提前在 map 进⾏ combine,减少传输的数据量 *
提前聚合 在 Mapper 加上 combiner 相当于提前进⾏ reduce,即把⼀个 Mapper 中的相同 key 进⾏ 了聚合,减 少 shuffle 过程中传输的数据量,以及 Reducer 端的计算量。如果导致数据倾斜的 key ⼤量分布在不同的 mapper 的时候,这种⽅法就不是很有效了。
2)导致数据倾斜的 key ⼤量分布在不同的 mapper
局部聚合加全局聚合。第⼀次在 map 阶段对那些导致了数据倾斜的 key 加上 1 到 n 的随机前缀,这样本来相 同的 key 也会被 分到多个 Reducer 中进⾏局部聚合,数量就会⼤⼤降低。第⼆次 mapreduce,去掉 key 的随机前缀,进⾏全局聚合。思想:⼆次 mr,第⼀次将 key 随机散列到不同 reducer 进⾏处理达到负载均衡⽬的。第 ⼆次再根据去 掉 key 的随机前缀,按原 key 进⾏ reduce 处理。这个⽅法进⾏两次 mapreduce,性能稍差。
3)增加 Reducer,提升并⾏度
JobConf.setNumReduceTasks(int)
4)实现⾃定义分区
根据数据分布情况,⾃定义散列函数,将key均匀分配到不同reducer。
五、Flume
1.Flume 组成,Put 事务,Take 事务
1)taildir source
- a)断点续传、多⽬录
- b)哪个 Flume 版本产⽣的?Apache1.7、CDH1.6
- c)没有断点续传功能时怎么做的? ⾃定义
- d)taildir 挂了怎么办? 重启就好了,不会丢数:断点续传,可能会有重复数据:
- e)重复数据怎么处理?
- 不处理:⽣产环境通常不处理,出现重复的概率⽐较低。处理会影响传输效率。 处理
- ⾃身:在 taildirsource ⾥⾯增加⾃定义事务,影响效率
- 找兄弟:下⼀级处理(Hive dwd Sparkstreaming flink 布隆)、去重⼿段 (groupby、开窗取 窗⼝第⼀条、redis)
f)taildir source 是否⽀持递归遍历⽂件夹读取⽂件?
不⽀持。 ⾃定义 递归遍历⽂件夹 + 读取⽂件2)file channel /memory channel/kafka channel
- a)File Channel
- 数据存储于磁盘,优势:可靠性⾼;劣势:传输速度低
- 默认容量:100 万 event
- 注意:FileChannel 可以通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增 ⼤
- Flume 吞吐量。
- b)Memory Channel
- 数据存储于内存,优势:传输速度快;劣势:可靠性差
- 默认容量:100 个 event
- c)Kafka Channel
- 数据存储于 Kafka,基于磁盘;
- 优势:可靠性⾼;
- 传输速度快 Kafka Channel ⼤于 Memory Channel + Kafka Sink 原因省去了 Sink 阶段
- d)⽣产环境如何选择?
- 如果下⼀级是 Kafka,优先选择 Kafka Channel 如果是⾦融、对钱要求准确的公司,选择 File
- Channel 如果就是普通的⽇志,通常可以选择 Memory Channel
3)事务
- Source 到 Channel 是 Put 事务
- Channel 到 Sink 是 Take 事务
2. Flume 采集数据会丢失吗?(防⽌数据丢失的机制)
如果是 FileChannel 不会,Channel 存储可以存储在 File 中,数据传输⾃身有事务。 如果是MemoryChannel 有可能丢。
六、Hive *
1. Hive 的架构
Hive 元数据默认存储在 derby 数据库,不⽀持多客户端访问,所以将元数据存储在 MySQl,⽀持多客 户端访问
2. 内部表和外部表的区别?
内部表:元数据、原始数据,全删除外部表:元数据 只删除
3. 4个by的区别?
- 1)Order By:全局排序,只有⼀个 Reducer;
- 2)Sort By:分区内有序;
- 3)Distrbute By:类似 MR 中 Partition,进⾏分区,结合 sort by 使⽤。
- 4)Cluster By:当 Distribute by 和 Sorts by 字段相同时,可以使⽤ Cluster by ⽅式。Cluster by 除 了具有 Distribute by 的功能外还兼具 Sort by 的功能。但是排序只能是升序排序,不能 指定排序规则 为 ASC 或者 DESC。
4. 系统函数
- 1)date_add、date_sub 函数(加减⽇期)
- 2)next_day 函数(周指标相关)
- 3)date_format 函数(根据格式整理⽇期)
- 4)last_day 函数(求当⽉最后⼀天⽇期)
- 5)collect_set 函数 (去重函数)
- 6)get_json_object 解析 json 函数
- 7)NVL(表达式 1,表达式 2) 如果表达式 1 为空值,NVL 返回值为表达式 2 的值,否则返回表达式 1 的值。
5. ⾃定义 UDF、UDTF 函数
在项⽬中是否⾃定义过 UDF、UDTF 函数,以及⽤他们处理了什么问题,及⾃定义步骤?
- ⽤ UDF 函数解析公共字段;⽤ UDTF 函数解析事件字段。 (2)⾃定义 UDF:继承 UDF,重 写 evaluate ⽅法
- ⾃定义 UDTF:继承⾃ GenericUDTF,重写 3 个⽅法:initialize(⾃定义输出的列 名和类型), process(将结果返回 forward(result)),close
为什么要⾃定义 UDF/UDTF?
因为⾃定义函数,可以⾃⼰埋点 Log 打印⽇志,出错或者数据异常,⽅便调试。 引⼊第三⽅ jar 包 时,也需要。
6. 窗⼝函数
1)Rank
a)RANK() 排序相同时会重复,总数不会变b)DENSE_RANK() 排序相同时会重复,总数会减少c)ROW_NUMBER() 会根据顺序计算
2)OVER()
指定分析函数⼯作的数据窗⼝⼤⼩,这个数据窗⼝⼤⼩可能会随着⾏的变⽽变化
- a)CURRENT ROW:当前⾏
- b)n PRECEDING:往前 n ⾏数据
- c)n FOLLOWING:往后 n ⾏数据
- d)UNBOUNDED : 起 点 ,
- UNBOUNDED PRECEDING 表 示 从 前 ⾯ 的 起 点 ,
- UNBOUNDED FOLLOWING 表示到后⾯的终点
- e)LAG(col,n):往前第 n ⾏数据
- f)LEAD(col,n):往后第 n ⾏数据
- g)NTILE(n):把有序分区中的⾏分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每⼀ ⾏,NTILE 返回此⾏所属的组的编号。注意:n 必须为 int 类型。
7. Hive优化 *
1)SQL
- a)⾏列过滤
- 列处理:在 SELECT 中,只拿需要的列,如果有,尽量使⽤分区过滤,少⽤ SELECT * 。
- ⾏处理:在分区剪裁中,当使⽤外关联时,如果将副表的过滤条件写在 Where 后⾯, 那么就会先全表关联,之后再过滤。
- b)减少job数(例如相同的on条件的join放在⼀起作为⼀个任务)。
- c)⼩表Join⼤表的时候要把⼩表放前⾯
- 原因是在Join操作的Reduce阶段,位于Join操作符左边的 表的内容会被加载进内存,将条⽬少的表放在左边,可以减少数据量,可以有效减少发⽣OOM错误 的⼏率。
- d)使⽤group by 代替 count distinct 完成计算。
- e)优先过滤后再进⾏ Join 操作,最⼤限度的减少参与 join 的数据量
2)建表
- a)创建分区表或者分桶表,避免全表查询
- b)创建表是采⽤列式存储,例如orc或者parquet
3)参数
- a)merge输出合并⼩⽂件
- SET Hive.merge.mapfiles = true; -- 默认 true,在 map-only 任务结束时合并⼩⽂件
- SET Hive.merge.mapredfiles = true; -- 默认 false,在 map-reduce 任务结 束时合并⼩⽂件
- b)在 Map 执⾏前合并⼩⽂件
- 减少 Map 数:CombineHiveInputFormat 具有对⼩⽂件 进⾏合并 的功能(系统默认的格式)。HiveInputFormat 没有对⼩⽂件合并功能。
- c)开启 map 端 combiner(不影响最终业务逻辑)
- set Hive.map.aggr=true;
- d)压缩(选择快的)
- set Hive.exec.compress.intermediate=true --启⽤中间数据压缩 set
- mapreduce.map.output.compress=true --启⽤最终数据压缩 set
- mapreduce.map.outout.compress.codec=…; --设置压缩⽅式
- e)合理设置 map数
- mapred.min.split.size: 指的是数据的最⼩分割单元⼤⼩;
- min 的默认值是 1B mapred.max.split.size: 指的是数据的最⼤分割单元⼤⼩;
- max 的默认值是 256MB 通过调整 max 可以起到调整 map 数的作⽤,减⼩ max 可以增加
- map 数,增⼤ max 可 以减少 map 数。 需要提醒的是,直接调整 mapred.map.tasks 这个参 数是没有效果的。
- f)合理设置 Reduce 数
- 过多的启动和初始化 Reduce 也会消耗时间和资源;
- 另外,有多少个 Reduce,就会有多少个输出⽂件,
8. 数据倾斜的原因及解决办法
1)数据倾斜的表现
- a)hadoop中数据倾斜的表现
- (1)有⼀个或多个reduce任务卡住,卡在99.99%, ⼀直不能结束
- (2)各种container报错OOM
- (3)异常的Reducer读写的数据量极⼤,⾄少远远超过其他正常的Reducer
- (4)伴随着数据倾斜,会出现任务被kill 等各种诡异的表现
- b)Hive中的数据倾斜
- ⼀般都发⽣在 Sql中group by 和 Join on 上,⽽且和数据逻辑绑定⽐较深
- c)Spark 中的数据倾斜
- Spark中的数据倾斜,包括Spark Streaming 和SparkSQL,主要表现有以下⼏种:Executor lost, OOM,Shuffle过程出错;Driver OOM;单个Executor 执⾏时间特别久 ,整体任务卡在某个阶段 不能结束;正常运⾏的任务突然失败。
2)数据倾斜产⽣的原因及解决⽅法
- a)key值分布不均
- 这包括空值以及单⼀key值或⼏个key值过多,这样的情况我们⼀般是打散计算,空值过滤或者将为空 的 key 转变为字符串加随机数或纯随机数,将因空值⽽造成倾斜的数据分不到多个 Reducer。
- b)建表时考虑不周
- 例⼦:⽐如我公司刚开始是就有两张表,⼀张是user⽤户表,⼀张是log⽇志表,为两个不同部⻔创 建的,两表关联字段为user_id,但user表的user_id 为 int类型,log表⾥的user_id 为string类型, 这时候直接关联的话就会产⽣数据倾斜,那我们可以使⽤cast 函数间int类型的字段转为 string类型。
- c)业务数据量激增
- 例⼦:像我们这电商⾏业,每逢双⼗⼀双⼗⼆订单量都会激增,特别是深圳杭州北京上海等⼀线城 市订单增⻓了百分之⼀万,其余城市增⻓的并不是很多,然后我们要统计不同城市的订单情况,这 样⼀做group by 根据城市分组统计⽤于的操作,可能直接就数据倾斜了,像这样的数据倾斜,可以开启数据倾斜时负载均衡和mapjoin 使⽤两次MapReduce,第⼀次打散计 算,第⼆次在最终聚合计算。完成后和其他城市进⾏整合;
- d)像count distinct 去重统计,这样也⽐较容易产⽣数据倾斜
- 使⽤groupby 代替,这⾥使⽤了⼦查询,第⼀条查询语句是按照key分组,第⼆条查询是统计,这时 候就实现了去重统计,避免了数据倾斜。
3)定位导致数据倾斜代码
Hive数据倾斜在SQL上⼀般是在Join和group 两个⽅⾯⼊⼿
- Join操作产⽣的数据倾斜
- 解决⽅案⼀
- 符合条件的话,可以通过Map Join、Bucket Map Join,SMB Join来解决数据倾斜。
- 解决⽅案⼆
- 运⾏期处理⽅案:思路:将那些产⽣倾斜的key和对应v2的数据, 从当前这个MR中移出去, 单独 找⼀个MR来处理即可, 处理后, 和之前的MR进⾏汇总结果即可。
- set Hive.optimize.skewjoin=true; -- 开启运⾏期处理倾斜参数
- set Hive.skewjoin.key=100000; 设置阈值
- -- 阈值, 此参数在实际⽣产环境中, 需要调整在⼀个合理的值(否则极易导致⼤量的key都是倾 斜的)
- 查看 join的 字段 对应重复的数量有多少个, 然后选择⼀个合理值
- 适⽤于并不清楚哪个key容易产⽣倾斜, 此时交由系统来动态检测
- 编译期处理⽅案:思路:就是在创建这个表的时候, 我们就可以预知到后续插⼊到这个表中数 据, 那些key的值会产⽣倾斜, 在建表的时候, 将其提前配置设置好即可, 在后续运⾏的时候, 程序 会⾃动将设置的key的数据单独找⼀个MR来进⾏处理即可, 处理完成后, 再和原有结果进⾏ union all 合并操作。
- 开启编译期处理倾斜参数
- set Hive.optimize.skewjoin.compiletime=true;
- 建表时通过Skewed by(key)on (1,5)为倾斜值创建⼦⽬录单独存放
- 这是适⽤于提前知道哪些key值会发⽣数据倾斜
- union all 优化⽅案
- 说明:不管是运⾏期 还是编译期的join倾斜解决, 最终都会运⾏多个MR, 将多个MR结果通 过union all 进⾏汇总, union all也是需要单独⼀个MR来处理
- 解决⽅案
- 让每⼀个MR在运⾏完成后, 直接将结果输出到⽬的地即可, 默认是各个MR将结果输出临时 ⽬录, 通过 union all 合并到最终⽬的地
- 开启此参数即可:set Hive.optimize.union.remove=true;
- group by 产⽣的数据倾斜
- 解决⽅案⼀
- 基于MR的 combiner(规约, 提前聚合) 减少数据达到reduce数量, 从⽽减轻倾斜问题 只需要在Hive中开启combiner提前聚合配置参数即可:Hive.map.aggr=ture
- 解决⽅案⼆
- 开启负载均衡的解决⽅案(需要运⾏两个MR来处理) (⼤combiner⽅案)
- 原理: 让第⼀个MR进⾏打散并对数据进⾏聚合计算 得出局部结果, 然后让第⼆个MR进⾏最终
- 聚合计算操作, 得出最终结果。
- 区别:
- ⽅案⼆⽐⽅案⼀, 更能彻底解决数据倾斜问题, 因为其处理数据范围更⼤, 针对整个数据集来处
- 理, ⽽⽅案⼀, 只是每个MapTask处理, 仅仅局部处理。
- ⽅案选择:
- 建议在⽣产中, 优先使⽤第⼀种, 如果第⼀种⽆法解决, 尝试使⽤第⼆种解决。
- 注意事项
- 使⽤第⼆种负载均衡的解决group by 的数据倾斜, ⼀定要注意, SQL语句中不能出现多
- 次 distinct操作, 否则 Hive会直接报错的
- 倾斜的参数配置开启条件, ⼀定是出现了数据倾斜的问题, 如果没有出现 不需要开启的。
Spark数据倾斜只会发⽣在shuffle过程;可能会触发shuffle操作的算⼦有:distinct,groupByKey,reduceByKey、aggregateByKey、Joincogroup,repartition等发⽣数据倾斜时可以从这些算⼦⼊⼿某个 task 执⾏特别慢的情况⾸先要看的,就是数据倾斜发⽣在第⼏个 stage 中:如果是⽤ yarn-client 模式提交,那么在提交的机器本地是直接可以看到 log,可以在 log中找到当 前运⾏到了第⼏个 stage;如果是⽤ yarn-cluster 模式提交,则可以通过 Spark Web UI 来查看当前运⾏到了第⼏个stage。此外,⽆论是使⽤ yarn-client 模式还是 yarn-cluster 模式,我们都可以在 Spark Web UI上深⼊看⼀下当前这个 stage 各个 task 分配的数据量,从⽽进⼀步确定是不是 task 分配的数据不均匀导致 了数据倾斜。⽐如:看 task 运⾏时间和数据量、task 数据量,然后推断倾斜代码。某个 task 莫名其妙内存溢出的情况这种情况下去定位出问题的代码就⽐较容易了。我们建议直接看 yarn-client 模式下本地log 的异常 栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。⼀般来说,通过异常栈信息就 可以定位到你的代码中哪⼀⾏发⽣了内存溢出。然后在那⾏代码附近找找,⼀般也会有 shuffle 类算 ⼦,此时很可能就是这个算⼦导致了数据倾斜。因此还是要按照上⾯所讲的⽅法,通过 Spark Web UI 查看报错的那个 stage 的各个 task 的运⾏ 时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。查看导致数据倾斜的 key 分布情况先对 pairs 采样 10%的样本数据,然后使⽤ countByKey 算⼦统计出每个 key 出现的次数,最后在 客户端遍历和打印样本数据中各个 key 的出现次数。
9. Hive常⽤的函数有哪些?
- 排序函数:rank,dense_rank,row_number
- 聚合函数:sum,count,avg,min,max
- 字符串函数:substr,reverse字符串反转,concat,concat_ws,
爆炸函数:explode
- regexp_replace 正则替换(语法:regexp_extract(string subject, string pattern, int index))。
- 关系函数:like⽐较,等值⽐较,⼩于⽐较,⾮空判断等。
- 数学运算:加减乘除,取余%,等。
- 逻辑运算:and,or,not
- 条件运算:if,case,round近似函数。
- ⽇期函数:year,month,day,hour,⽇期时间转⽇期函数:to_date等。
七、Sqoop
1. Sqoop的参数有哪些?
指定导⼊⽅式:import (导⼊hdsf)--connect:jdbc 连接库表信息--username:账号--password:密码--table 表名--columns 选择需要的列
2. Sqoop数据导出Parquet会有问题吗?
ads层数据⽤Sqoop往MySql中导⼊数据的时候,如果⽤了orc(Parquet)不能导⼊,需转化成text格式1)创建临时表,把Parquet中表数据导⼊到临时表,把临时表导出到⽬标表⽤于可视化2)Sqoop⾥⾯有参数,可以直接把Parquet转换为text3)ads层建表的时候就不要建Parquet表
3. Sqoop中导⼊导出Null存储⼀致性问题:
Hive中的Null在底层是以“\N”来存储,⽽MySQL中的Null在底层就是Null,为了保证数据两端的⼀致性。在导出数据时采⽤--input-null-string和--input-null-non-string两个参数。导⼊数据时采⽤--null-string和--null-non-string。
4. Sqoop数据导出⼀致性问题
当Sqoop导出数据到MySql时,使⽤4个map怎么保证数据的⼀致性因为在导出数据的过程中map任务可 能会失败,可以使⽤—staging-table –clear-staging任务执⾏成功⾸先在tmp临时表中,然后将tmp表 中的数据复制到⽬标表中即可(这个时候可以使⽤事务,保证事务的⼀致性)
5. Sqoop在导⼊数据的时候数据倾斜怎么解决?
Sqoop 参数: split-by:按照⾃增主键来切分表的⼯作单元。num-mappers:启动 N 个 map 来并⾏导⼊数据,默认 4 个;
6. Sqoop的同步策略有哪⼀些?
1)全量同步每天将MySQL中该表的全量数据导⼊到Hive中,按照分区进⾏存储,每个分区中都是当天的时间切⽚ (该表必须不能太⼤)。2)增量同步增量同步只适合MySQL中只存在新增(如⽀付流⽔表、订单状态流⽔表),不存在删除与修改的表,在 Hive中以分区的形式进⾏同步和存储,这样的表在Hive中叫增量表。3)新增及变化同步(与全量同步场景唯⼀的区别是数据量很⼤,为了减少冗余)不能使⽤简单的分区⼀般在业务那边维护2个时间字段,⼀个是订单创建时间create_time,⼀个是订单修改时间operator_time。 实现要么通过在业务表中增加create_time、operator_time,要么使⽤⼯具如canal进⾏binlog的解析。
⼋、Spark
1. Spark解决了什么问题?
回顾:hadoop主要解决了海量数据的存储与海量数据的的分析计算。mapreduce太慢了频繁的进行磁盘 I/O操作, Spark将数据存储在内存中主要解决了海量数据的分析计算。
2. Spark常⽤端⼝号:
- Spark-shell任务端⼝:4040
- 内部通讯端⼝:7077
- 查看任务执⾏情况端⼝:8080
- 历史服务器:18080
- Oozie端⼝号:11000
3. Spark有哪⼏种的运⾏模式?
- 1)Local:运⾏在⼀台机器上。 测试⽤。
- 2)Standalone:是 Spark ⾃身的⼀个调度系统。 对集群性能要求⾮常⾼时⽤。国内很少使⽤。
- 3)Yarn:采⽤ Hadoop 的资源调度器。 国内⼤量使⽤。
- 4)Mesos:国内很少使⽤。
4. 有使⽤Spark submit的⽅式提交任务吗?你们的内存资源是怎么安排的?
Spark submit 用于提交 Spark 作业(job)到集群的命令行工具。
spark-submit --master local[4] --class com.example.MyApp my-spark-app.jar
有的,我们的driver-cores设置的内核数为2,driver-memory 给8个g,每个executor-cores给的是4个 内核,num-executor启动executor的数量给10个,executor-memory,executor内存给8个g。
5. Spark算⼦的分类,分别举例?
Spark算⼦有两类,分别是transformation算⼦和action算⼦。Transformation算⼦有:map,mapPartitions,groupBy,filter,distinct,repartition,union, reduceByKey,groupByKey,Join,aggregateByKey等。Action算⼦有:reduce,collect,count,save,countByKey,aggregate,take。
6. Map和mapPartitions的区别?
1)Map:每次处理⼀条数据2)mapPartitions:每次处理⼀个分区数据
7. Repartitons和Coalesce区别
1)关系:两者都是⽤来改变 RDD 的 partition 数量的,repartition 底层调⽤的就是 coalesce ⽅ 法:coalesce(numPartitions, shuffle = true)2)区别:repartition ⼀定会发⽣ shuffle,coalesce 根据传⼊的参数来判断是否发⽣ shuffle ⼀般情况 下增⼤ rdd 的 partition 数量使⽤ repartition,减少 partition 数量时使⽤ coalesce。
8. reduceByKey和groupByKey的区别?
reduceByKey:具有预聚合操作
groupByKey:没有预聚合在不影响业务逻辑的前提下,优先采⽤reduceByKey。
9. Spark任务的执⾏流程
Spark 的任务执⾏流程可以分为如下⼏个步骤:
- 1)任务划分:Spark 将任务划分为多个 stages 和 tasks。每个 stage 包含多个 tasks,且每个 task 是对数据集中⼀个分区的计算。
- 2)任务调度:Spark 会将每个 stage 中的 tasks 分配给集群中的 Executor 进⾏计算。任务调度分为两 个阶段:
- 第⼀阶段为 Task Scheduler,在该阶段 Spark 会将任务分配给可⽤的 Executor;
- 第⼆阶段为 DAG Scheduler,Spark 会在该阶段优化任务执⾏计划。
- 3)数据分区:Spark 将数据集划分为多个分区,每个分区会被分配到集群中的某⼀个 Executor 上进⾏ 计算。每个 Executor 会处理多个分区,且每个分区只会被⼀个 Executor 计算。
- 4)任务执⾏:Executor 会从 Driver 程序中获取任务信息,并根据任务信息从对应的数据分区中获取数 据,然后执⾏具体的计算任务。
- 5)任务结果收集:Executor 执⾏完任务后,将计算结果返回给 Driver 程序。Driver 程序会收集所有 Executor 返回的计算结果,并进⾏汇总。
- 6)结果输出:最后,Driver 程序会将计算结果写⼊到外部存储系统(如 Hadoop HDFS 或者 Apache Cassandra)中。 在这个过程中,Spark 会利⽤⼀些优化技术来提⾼任务执⾏的效率,例如任务合并、数据本地化、缓存 等。通过这些技术,Spark 可以在⼤规模数据处理和分析任务中实现⾼效的计算和快速的结果输出。
10. Spark client 和Spark cluster的区别?
区别是driver 进程在哪运⾏
- client模式driver运⾏在master节点上,不在worker节点上;
- cluster模式 driver运⾏在worker集群某节点上,不在master节点上。
⼀般来说,如果提交任务的节点(即Master)和Worker集群在同⼀个⽹络内,此时client mode⽐较合适。如果提交任务的节点和Worker集群相隔⽐较远,就会采⽤cluster mode来最⼩化Driver和Executor之间 的⽹络延迟。
- yarn client模式:driverzai当前提交任务的节点上,可以打印任务运⾏的⽇志信息。
- yarn cluster模式:driver在AppMaster所有节点上,分布式分配,不能再提交任务的本机打印⽇志信 息。
11. Spark的任务划分有哪⼏步?
RDD任务切分中间分为:Application、Job、Stage和Task
- 1)Application:初始化⼀个SparkContext即⽣成⼀个Application
- 2)Job:⼀个Action算⼦就会⽣成⼀个Job
- 3)Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到⼀个宽依赖则划分⼀个 Stage。
- 4)Task:Stage是⼀个TaskSet,将Stage划分的结果发送到不同的Executor执⾏即为⼀个Task。
- 注意:Application->Job->Stage->Task每⼀层都是1对n的关系。
12. cache 缓存级别
- DataFrame 的 cache 默认采⽤ MEMORY_AND_DISK
- RDD 的 cache 默认⽅式采⽤ MEMORY_ONLY
13. 缓存和检查点的区别?
Cache缓存只是将数据保存起来,不切断⾎缘关系,CheckPoint检查点切断⾎缘依赖。Cache缓存的数据通常存储在磁盘、内存等地⽅,可靠性低。CheckPoint的数据通常存储在HDFS等容 错、⾼可⽤的⽂件系统中,可靠性⾼。建议对checkpoint()的RDD使⽤cache缓存,这样checkpoint()的job只需要从cache缓存中取数据即可, 否则需要再从头计算⼀次RDD。
14. 产⽣shuffle过程的Spark算⼦有哪些?
reduceByKey,groupByKey,aggregateByKey等ByKey算⼦。
15. 你对Spark优化有了解吗?
- 1)避免创建重复的RDD
- 2)尽可能复⽤同⼀个RDD
- 3)对多次使⽤的RDD进⾏持久化
- 4)尽量避免使⽤shuffle类算⼦
- 5)使⽤map-sid预聚合的shuffle操作
所谓的map-side预聚合,说的是在每个节点本地对相同的key进⾏⼀次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有⼀条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会⼤⼤减少需要拉取的数据数量,从⽽也就减少了磁盘IO以及⽹络传输开销。如reduceByKey或者aggregateByKey代替groupByKey。- 6)使⽤⾼性能算⼦
- ⽐如使⽤reduceByKey/aggregateByKey替代groupByKey
- 使⽤mapPartitions替代普通map
- 使⽤foreachPartitions替代foreach
- 使⽤filter之后进⾏coalesce操作
- 7)⼴播⼤变量
- 有时在开发过程中,会遇到需要在算⼦函数中使⽤外部变量的场景(尤其是⼤变量,⽐如100M以上的⼤ 集合),那么此时就应该使⽤Spark的⼴播(Broadcast)功能来提升性能。
- 8)调整参数
- ⽐如num-executors,executor-memory,executor-cores,driver-memory 等参数
16. SparkSQL与Hive语法差异
1)相同函数差异
- (1)Spark运⾏时⽤到的hash函数,与Hive的哈希算法不同,如果使⽤hash(),结果和Hive的hash()会 有差异。
- (2)Hive和SparkSQL使⽤grouping sets⽣成的GROUPING_ID不⼀致。
- (3)regexp_extract未匹配上的话,在Hive⾥回是null,但在Spark⾥返回是空字符。
- (4)SparkSQL中row_number的over中不能省略sort by 或order by
- (5)grouping_id()函数⽣成的数据不同
- (6)reflect()函数中,如果⼊参有⾮法数据或者null,Hive会返回null,⽽Spark会抛出异常
2)仅Hive⽀持
- (1)SparkSQL关联on条件不⽀持函数rand()。
- (2)创建临时表时,Spark不⽀持直接赋值null。
- (3)SparkSQL⽆法读取字段类型为void的表。
- (4)SparkSQL中如果表达式没有指定别名,SparkSQL会将整个表达式作为别名。如果表达式中包含特殊符号(如逗号),则CTAS建表会失败。
3)仅Spark⽀持
九、Kafka
1. Kafka⽇志保存时间
2. Kafka监控器
3. Kakfa 分区数
- 1)创建⼀个只有 1 个分区的 topic
- 2)测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
- 3)假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。
- 4)然后假设总的⽬标吞吐量是 Tt,那么分区数=Tt / min(Tp,Tc)
- 5)例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s; 分区数 =
- 100 / 20 = 5 分区
p31