一、需求
在ODPS上我们有如下数据:
id | category_id | attr_id | attr_name | attr_value |
205348 | 10000046 | 2 | 最优粘度 | ["0W-40"] |
205348 | 10000046 | 1 | 基础油类型 | ["全合成"] |
205348 | 10000046 | 3 | 级别 | ["BMW Longlife 01"] |
我们希望得到的结果如下:
(205348, 10000046, "基础油类型:全合成\n最优粘度:0W-40\n级别:BMW Longlife 01\n")
需求解读:
需要将(id, category_id)作为key,然后将(attr_id, attr_name, attr_value)进行reduce操作,在reduce之后的数据中对attr_id进行排序,再将attr_name和attr_value合并在一起。
二、reduce操作之字符串方式
这个是最简单的方式,大致思路如下:
首先,将(id, category_id)作为key。
然后,将attr_id、attr_name、attr_value合并成一个字符串attr_info:attr_id + "#" + attr_name + "#" + attr_value,然后attr_info再通过"&"进行合并。
示例代码如下:
xx.map{case(id, category_id, attr_id, attr_name, attr_value) => ((id, category_id), attr_id + "#" + attr_name + "#" + attr_value)}
.reduceByKey(_ + "&" + _, 100)
然后在接下来的流程中首先split("#")得到不同的attr信息,再通过split("&")得到不同的attr的列信息。这就要求attr_id,attr_name,attr_value中不能包含"#"和"&"字符串。
所以这种方式有缺陷,就是当attr_id,attr_name,attr_value包含了"#"和"&"字符串时需要先replace一下,这样就改变了原数据的值。
三、reduce操作之列表方式
这种方式相对复杂一点,需要对输入数据进行预处理,但是逻辑清晰。
输入数据中(id, category_id)是key保持不变,(item_id, item_name, item_value)是一组tuple。
reduce操作会在同一个partition中,不同的partition之间进行数据合并,这要求数据的输入、输出类型保持不变。
我们的初步想法:将item_id, item_name, item_value分别放到3个列表中,合并时就是列表之间的合并,合并完毕后使用时只需要遍历列表即可。
因为reduce操作的输入、输出类型不能变化,所以先放item_id, item_name, item_value初始化为一个列表,然后再进行列表之间的合并。
示例代码如下:
xx.map{case(id, category_id, attr_id, attr_name, attr_value) =>
val itemIdList = new ArrayList[Long]()
itemIdList.add(attr_id)
val itemNameList = new ArrayList[String]()
itemNameList.add(attr_name)
val itemValueList = new ArrayList[String]()
itemValueList.add(attr_value)
((id, category_id), (itemIdList, itemNameList, itemValueList))
}.reduceByKey((x, y) => {
val itemIdList = new ArrayList[Long]()
for(i <- 0 until x._1.size()){
itemIdList.add(x._1.get(i))
}
for(i <- 0 until y._1.size()){
itemIdList.add(y._1.get(i))
}
val itemNameList = new ArrayList[String]()
for(i <- 0 until x._2.size()){
itemNameList.add(x._2.get(i))
}
for(i <- 0 until y._2.size()){
itemNameList.add(y._2.get(i))
}
val itemValueList = new ArrayList[String]()
for(i <- 0 until x._3.size()){
itemValueList.add(x._3.get(i))
}
for(i <- 0 until y._3.size()){
itemValueList.add(y._3.get(i))
}
(itemIdList, itemNameList, itemValueList)
}, 100)
四、reduce之partition属性
首先提一下Shuffle过程,它的本意是洗牌、混乱的意思,类似于java中的Colletions.shuffle(List)方法,它会随机地打乱参数list里地元素顺序。MapReduce的Shuffle过程大致可以理解成:数据从map task输出到reduce task输入的这段过程。
而partition过程:分割map每个节点的结果,按照key分别映射给不同的reduce,这个是可以自定义的。
通过设置reduce中的numPartitions值,会在reduce操作之后进行repartition,避免数据不均衡堆在一个partition中。
五、reduceByKey和groupByKey的区别
从 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
从功能的角度: reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey 。reduceByKey的分区内和分区间的计算规则是一样的