MapReduce论文阅读
1. 编程模型
Map 函数(kv -> kv)
Map 函数将输入的键值对处理为一系列中间值(键值对),并将所有的中间结果传递给 Reduce 处理。
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
Reduce 函数(kv -> kv)
Reduce 函数将相同键的值合并,每个 Reduce 函数输出 0 或 1 个结果。
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
类型定义
map (k1, v1) -> list(k2, v2)
reduce (k2, list(v2)) -> list(v)
例子:统计文本中每个单词的出现次数(词频统计)
输入数据:
假设有以下三段文本数据:
- 文本1:“apple banana apple”
- 文本2:“banana orange apple”
- 文本3:“orange banana banana”
Map 阶段
Map 函数接受输入键值对 (k1, v1)
,其中 k1
是文本的标识(例如文本编号或文件名),v1
是文本内容。Map 函数将文本内容拆分为单词,并为每个单词输出中间键值对 (k2, v2)
,其中 k2
是单词,v2
是计数值 1
。
Map 输入:
("文本1", "apple banana apple")
("文本2", "banana orange apple")
("文本3", "orange banana banana")
Map 输出:
-
对于输入
("文本1", "apple banana apple")
,Map 函数输出:("apple", 1) ("banana", 1) ("apple", 1)
-
对于输入
("文本2", "banana orange apple")
,Map 函数输出:("banana", 1) ("orange", 1) ("apple", 1)
-
对于输入
("文本3", "orange banana banana")
,Map 函数输出:("orange", 1) ("banana", 1) ("banana", 1)
Shuffle and Sort(整理和排序)
将所有 Map 阶段的输出根据键 k2
(单词)进行分组,形成 (k2, list(v2))
的形式:
("apple", [1, 1, 1])
("banana", [1, 1, 1, 1])
("orange", [1, 1])
Reduce 阶段
Reduce 函数接受 (k2, list(v2))
,将 list(v2)
中的值累加,得到每个单词的总出现次数。
-
对于
("apple", [1, 1, 1])
,Reduce 函数计算:total = 1 + 1 + 1 = 3 输出:("apple", 3)
-
对于
("banana", [1, 1, 1, 1])
,Reduce 函数计算:total = 1 + 1 + 1 + 1 = 4 输出:("banana", 4)
-
对于
("orange", [1, 1])
,Reduce 函数计算:total = 1 + 1 = 2 输出:("orange", 2)
最终输出
("apple", 3)
("banana", 4)
("orange", 2)
总结流程
- Map 阶段:将输入的文本内容拆分为单词,对每个单词生成键值对
(k2, v2)
,其中k2
是单词,v2
是计数1
。 - Shuffle and Sort:将所有的键值对按照键
k2
进行分组,收集所有相同键的值到一个列表list(v2)
中。 - Reduce 阶段:对每个键
k2
,将对应的list(v2)
中的值进行累加,得到该单词的总出现次数。
通过这个例子,可以清楚地看到 MapReduce 的工作过程:
- Map 阶段:处理输入数据,映射
(k1, v1)
到一系列的(k2, v2)
。 - Reduce 阶段:聚合中间结果,将
(k2, list(v2))
转化为最终结果(k2, v)
。
这个过程非常适合处理大规模的数据集,因为 Map 和 Reduce 阶段都可以并行执行,充分利用分布式计算资源。
示例
本节中,我们给出了一些简单的示例,这些示例都是可以通过 MapReduce 计算实现的有趣程序。
-
分布式“grep”:如果一行文本匹配给定的模式,那么 Map 函数会输出该行。Reduce 作为一个恒等函数,仅将提供的中间数据复制到输出。
-
URL 访问频率计数:Map 函数处理网页请求日志,输出
<URL, 1>
。Reduce 函数对相同URL
的值求和,输出<URL, 总数>
键值对。 -
反转 Web 链接拓扑图:Map 函数在名为
source
的页面中,对每个指向的target
URL 输出一个<target, source>
键值对。Reduce 函数将所有相同target
的source
合并为一个列表,输出<target, list(source)>
键值对。 -
每个主机的词向量统计:词向量是对一系列文档中最重要的词的汇总,其形式为
<词, 词频>
的键值对列表。Map 函数为每篇输入文档输出一个<主机名, 词向量>
键值对(主机名
由文档的 URL 解析而来)。Reduce 函数接收给定主机上所有文章的词向量,将这些词向量相加,丢弃低频词,最终输出<主机名, 词向量>
键值对。 -
倒排索引:Map 函数处理每篇文档,输出一系列
<词, 文档 ID>
的键值对。Reduce 函数接受相同词的所有键值对,按文档 ID
排序,输出<词, list(文档 ID)>
键值对。所有输出的键值对集合组成了一个简单的倒排索引。如果需要跟踪词的位置,可以通过增量计算实现。 -
分布式排序:Map 函数提取每条记录中的键,输出一个
<键, 记录>
的键值对。Reduce 函数不修改中间变量,直接输出所有的键值对。排序计算依赖于 MapReduce 框架中的分区机制和排序特性,以确保数据按照键的顺序输出。
2. 实现
执行概述
输入数据会自动被分割为
M
M
M个分片(split),这样,map函数调用可以在多个机器上分布式执行,每个输入的分片可以在不同机器上并行处理。中间键值对的键空间会通过被分区函数(例如,
h
a
s
h
(
k
e
y
)
m
o
d
R
hash(key) mod R
hash(key)modR)分割为
R
R
R个分区,这样,reduce函数也可以分布式执行。其中分区的数量(
R
R
R)和分区函数由用户指定。
图1展示了在我们的实现中,MapReduce操作的完整工作流。当用户程序调用MapReduce函数时会发生如下的操作(下列序号与图1中序号对应):
-
程序启动:
- 顶部的 User Program 启动后,通过 fork 操作创建一个 Master 节点和多个 Worker 节点
- Master 负责任务的协调和分配
-
输入阶段:
- 左侧显示输入文件被分成多个 splits(split 0 到 split 4)
- 这些 splits 代表了需要处理的原始数据的分片
-
Map 阶段:
- Master 将 map 任务分配给 worker 节点
- Worker 节点读取相应的输入分片(图中标注为(3) read)
- 每个 map worker 处理数据并将结果写入本地磁盘(标注为(4) local write)
- 生成的中间文件存储在 local disks 上
-
中间阶段:
- 中间结果以文件形式存储在各个节点的本地磁盘上
- 这些文件等待被 reduce 阶段的 worker 处理
-
Reduce 阶段:
- Master 为 reduce worker 分配任务
- Reduce worker 通过远程读取(标注为(5) remote read)获取它们需要处理的中间文件数据
- 最后将处理结果写入最终的输出文件(标注为(6) write)
-
输出阶段:
- 最终生成多个输出文件(output file 0, output file 1)
- 这些文件包含了整个MapReduce作业的最终结果
整个流程展示了 MapReduce 的核心特点:
- 并行处理:多个 worker 同时工作
- 数据本地性:map 阶段的数据尽量在本地处理
- 容错性:master 可以重新分配失败的任务
- 可扩展性:worker 节点可以根据需要增加
这种架构设计使得 MapReduce 能够高效地处理大规模数据,成为分布式计算的重要范式。
假设我们要统计一个大型文本库中每个单词出现的次数:
-
数据分片(Split)阶段
- 假设我们有 100GB 的文本文件
- 系统将其分成 M=2000 个分片,每个约 50MB
- 此时会启动一个 master 和多个 worker 进程
-
任务分配阶段
- Master 会将 2000 个 map 任务分配给空闲的 workers
- 同时分配好 R(假设=50)个 reduce 任务
-
Map 处理阶段
# Map 函数示例 def map(document): for word in document.split(): emit(word, 1) # 为每个单词发出(word, 1)的键值对
-
中间结果处理
- 每个 map worker 将产生的键值对(如:{“hello”: 1, “world”: 1})缓存在内存
- 按照分区函数(如 hash(word) mod 50)将数据分成 50 个区域
- 将位置信息报告给 master
-
Reduce 获取与排序
- 每个 reduce worker 负责特定的单词集合
- 例如:reduce-0 可能负责 hash 值为 0 的所有单词
- 从所有 map worker 收集数据后进行排序
# 排序后的数据可能如下: {"hello": [1,1,1,1], "hi": [1,1]}
-
Reduce 处理
# Reduce 函数示例 def reduce(word, counts): sum = 0 for count in counts: sum += count emit(word, sum)
-
完成阶段
- 最终得到 50 个输出文件,每个文件包含一部分单词的统计结果
- 这些文件可以直接用于后续处理,如:
- 作为新的 MapReduce 任务的输入
- 被其他分布式程序使用
关键特点:
-
容错性:如果某个 worker 失败,master 会将任务重新分配给其他 worker
-
本地性优化:master 会尽量将 map 任务分配给拥有对应输入数据的机器,减少网络传输
-
扩展性:
- M 和 R 的值可以远大于机器数量
- 动态负载均衡
- 适应不同规模的计算集群
在我们的MapReduce实现的实际情况中,对 M M M和 R R R的上限进行了限制。如前文所述,master必须做出 O ( M + R ) O(M+R) O(M+R)个调度决策,并在内存中保存 O ( M × R ) O(M \times R) O(M×R)个状态。(内存占用的常数因子比较小: O ( M × R ) O(M \times R) O(M×R)条状态由大约每个map/reduce任务仅一字节的数据组成。)
此外, R R R还经常受用户限制,因为每个reduce任务会生成一个单独的输出文件。在实际情况下,我们更倾向于自定义参数 M M M,这样可以使每个单独的任务的输入数据大概在16MB到64MB(这样可以使前面提到的局部性优化最有效),同时,我们使 R R R是期望使用的worker机器的较小的倍数。我们经常在 2 , 000 2,000 2,000台机器上选择 M = 200 , 000 M=200,000 M=200,000、 R = 5 , 000 R=5,000 R=5,000的参数执行MapReduce计算。
容错
1. Worker 故障处理
- Master 通过定期 ping 来监控 worker 状态
- Worker 无响应时会被标记为故障
- 故障 worker 的任务处理:
- 已完成的 map 任务:重置为"等待中"状态,需要重新执行(因为输出在本地磁盘)
- 已完成的 reduce 任务:不需重新执行(因为输出在全局文件系统)
- 正在执行的任务:重置为"等待中"状态
- Map 任务重新执行时会通知所有 reduce worker
- 系统可以处理大规模故障(如文中提到的 80 台机器同时故障)
2. Master 故障处理
- 采用检查点机制,定期保存状态
- Master 故障时从最近检查点恢复
- 当前实现:Master 故障则终止计算,由客户端决定是否重试
3. 故障时的语义保证
-
确定性函数情况:
- 分布式执行结果与顺序执行相同
- 通过原子提交机制实现:
- Map/Reduce 任务输出先写入临时文件
- Map 完成时向 master 报告临时文件名
- Reduce 完成时原子性重命名为永久文件
-
非确定性函数情况:
- 提供较弱的语义保证
- 同一 reduce 任务的输出等同于顺序执行
- 不同 reduce 任务可能基于不同执行结果
改进
1. 分区函数(Partitioning Function)
- 允许用户自定义分区方式
- 默认使用 hash(key) mod R
- 可根据需求自定义,如按 URL 主机名分区
2. 有序性保证
- 同一分区内的中间键值对按键升序处理
- 便于生成有序输出
- 支持按键的高效随机访问
3. 合并函数(Combiner Function)
- 在 map 端预先合并相同 key 的数据
- 减少网络传输量
- 适用于满足交换律和结合律的操作(如词频统计)
4. 输入输出类型
- 支持多种输入格式(如文本行、键值对序列)
- 可扩展的 reader/writer 接口
- 支持自定义数据源(如数据库、内存数据结构)
5. 附属输出
- 支持生成额外的输出文件
- 通过临时文件和原子重命名确保一致性
- 不支持跨文件的两阶段提交
6. 容错机制
- 可跳过导致确定性崩溃的记录
- 使用信号处理器捕获错误
- master 跟踪并管理问题记录
7. 本地执行支持
- 提供本地顺序执行模式
- 便于调试和测试
- 支持常规调试工具(如 gdb)
8. 状态信息
- master 提供 HTTP 状态页面
- 显示进度、资源使用等信息
- 支持任务执行状态监控
9. 计数器功能
- 支持自定义计数器统计事件
- 自动维护系统级计数器
- 用于监控和验证处理进度
- 处理重复计数问题
(1). 基本用法示例 - 文档处理统计
class DocumentCounters:
def __init__(self):
self.total_docs = GetCounter("total_documents")
self.english_docs = GetCounter("english_documents")
self.chinese_docs = GetCounter("chinese_documents")
self.invalid_docs = GetCounter("invalid_documents")
self.large_docs = GetCounter("large_docs_over_1mb")
def map(doc_id, content):
# 增加总文档计数
counters.total_docs.Increment()
# 检查文档大小
if len(content) > 1024 * 1024: # 1MB
counters.large_docs.Increment()
try:
lang = detect_language(content)
if lang == 'en':
counters.english_docs.Increment()
elif lang == 'zh':
counters.chinese_docs.Increment()
process_document(content)
except InvalidDocumentError:
counters.invalid_docs.Increment()
return
# 任务完成后可以获取统计信息:
# total_documents: 1000000
# english_documents: 600000
# chinese_documents: 350000
# invalid_documents: 50000
# large_docs_over_1mb: 5000
(2). 质量控制示例 - 数据处理验证
class QualityCounters:
def __init__(self):
self.input_records = GetCounter("input_records")
self.output_records = GetCounter("output_records")
self.malformed_records = GetCounter("malformed_records")
self.null_fields = GetCounter("null_required_fields")
self.out_of_range = GetCounter("value_out_of_range")
def map(key, record):
counters.input_records.Increment()
# 验证记录格式
if not is_valid_format(record):
counters.malformed_records.Increment()
return
# 检查必填字段
if has_null_required_fields(record):
counters.null_fields.Increment()
return
# 检查数值范围
if not is_value_in_range(record.value):
counters.out_of_range.Increment()
return
# 处理有效记录
process_record(record)
counters.output_records.Increment()
# 可以通过比较计数器验证数据质量:
# input_records == output_records + malformed_records + null_fields + out_of_range
(3). 性能监控示例 - 处理时间统计
class PerformanceCounters:
def __init__(self):
self.processed_bytes = GetCounter("processed_bytes")
self.cache_hits = GetCounter("cache_hits")
self.cache_misses = GetCounter("cache_misses")
self.slow_operations = GetCounter("slow_operations_over_100ms")
def map(key, data):
start_time = time.time()
# 统计处理的数据量
counters.processed_bytes.Increment(len(data))
# 缓存使用统计
if cache.has(key):
counters.cache_hits.Increment()
else:
counters.cache_misses.Increment()
# 处理数据
result = process_data(data)
# 统计慢操作
if (time.time() - start_time) * 1000 > 100: # 100ms
counters.slow_operations.Increment()
# 可以分析性能指标:
# cache_hit_rate = cache_hits / (cache_hits + cache_misses)
# avg_processing_speed = processed_bytes / total_time
# slow_operation_percentage = slow_operations / total_operations
(4). 业务指标统计示例 - 电商数据分析
class BusinessCounters:
def __init__(self):
self.total_orders = GetCounter("total_orders")
self.total_revenue = GetCounter("total_revenue")
self.cancelled_orders = GetCounter("cancelled_orders")
self.high_value_orders = GetCounter("orders_over_1000")
self.new_customers = GetCounter("new_customers")
def map(order_id, order):
counters.total_orders.Increment()
counters.total_revenue.Increment(order.amount)
if order.amount > 1000:
counters.high_value_orders.Increment()
if order.status == 'CANCELLED':
counters.cancelled_orders.Increment()
if order.is_new_customer:
counters.new_customers.Increment()
# 可以计算关键业务指标:
# average_order_value = total_revenue / total_orders
# cancellation_rate = cancelled_orders / total_orders
# high_value_percentage = high_value_orders / total_orders
3. 总结
目前,我们使用MapReduce做的最重要的工作之一是完全重写了一个索引系统,该系统被用作生成用于Google web搜索服务的数据结构。该索引系统将大量被我们爬虫系统检索到的文档(作为GFS文件存储)作为输入。这些文档的原始内容的数据大小超过20TB。索引进程会运行一系列5~10个MapReduce操作。使用MapReduce(而不是旧版索引系统中ad-hoc分布式传递方案)提供了很多好处:
索引代码更简单、短、便于理解,因为处理容错、分布式和并行的代码被隐藏在了MapReduce库中。例如,计算中的有一个阶段的代码量从3800行C++代码所见到了700行使用MapReduce的代码。
MapReduce库的性能足够好,这让我们可以将概念上不相关的计算分离开,而不是将它们混合在一起,这样可以避免传递过多额外的数据。这使改变索引程序变得非常简单。例如,在我们旧的索引系统中,一处修改会花费几个月的时间,而新的系统仅需要几天就能实现。
索引系统变得更容易操作。大部分因机器故障、缓慢的机器、网络不稳定等引起的问题都被MapReduce库自动处理了,不需要引入额外的操作。此外,向索引集群添加新机器以获得更好的性能变得更加简单。
实验环境
- 操作系统:macos
- IDE:GOLAND
- 语言:GO 1.23
- lab1链接:
Lab1 说明
实验内容
MapReduce 作业的抽象视图 – 词频统计示例
Input1 -> Map -> a,1 b,1
Input2 -> Map -> b,1
Input3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,1
| -----> Reduce -> b,2
---------> Reduce -> a,2
运行流程
- 输入已被分割成 M 个片段。
- MapReduce 为每个输入分片调用 Map() 函数,生成键值对列表,这些数据称为“中间数据”。每个 Map() 调用是一个“任务”。
- 当 Map 任务完成后,MapReduce 收集每个键的所有中间值,并将每个键及其对应的值传递给 Reduce 调用。
- 最终输出是一组来自 Reduce() 的键值对。
词频统计代码
Map(d)
将 d 切分成单词
对于每个单词 w
emit(w, "1")
Reduce(k, v[])
emit(len(v[]))
MapReduce 特点
- 扩展性:多台计算机(N台)可以获得 N 倍的吞吐量,Map() 和 Reduce() 函数可并行运行。
- 自动化容错处理:自动处理 Map 和 Reduce 函数代码的分发、任务跟踪、中间数据的传输、负载均衡、崩溃恢复等。
- 设计限制:不允许有交互或状态,数据流只支持 Map/Reduce 模式,不支持实时或流式处理。
实验要求
核心任务
实现一个分布式的 MapReduce 系统,包含两个主要程序:
- coordinator(协调者)
- worker(工作者)
系统架构
- 只有一个 coordinator 进程。
- 允许多个 worker 进程并行执行,worker 通过 RPC 与 coordinator 通信。
- 所有进程运行在同一台机器上。
Worker 工作流程
- 向 coordinator 请求任务。
- 从一个或多个文件读取任务输入。
- 执行任务并将输出写入一个或多个文件。
- 重复请求新任务直到任务完成。
具体要求
-
Map 阶段输出要求:
- 将中间键分成 nReduce 个桶,每个 mapper 创建 nReduce 个中间文件供 reduce 任务使用。
-
输出文件要求:
- 第 X 个 reduce 任务的输出放在 mr-out-X 文件中,每个 Reduce 函数的输出一行,使用 Go 的 “%v %v” 格式化输出 key 和 value。
-
容错处理:
- coordinator 需要检测 worker 是否在 10 秒内完成任务,如果超时,则将任务重新分配给其他 worker。
-
任务完成判断:
- coordinator 需实现 Done() 方法,当 MapReduce 工作完全结束时返回 true,通知 worker 进程退出。
可修改的文件
mr/worker.go
mr/coordinator.go
mr/rpc.go
实验提示(Hints)
-
入门建议:
- 修改
mr/worker.go
的 Worker() 函数,使其发送 RPC 请求给 coordinator 请求任务。 - 修改 coordinator 使其返回尚未开始的 map 任务文件名,worker 可使用该文件名调用应用程序的 Map 函数。
- 修改
-
插件:
- Map 和 Reduce 函数通过 Go plugin 包从 .so 文件加载。修改
mr/
目录下内容时,需重新构建 MapReduce 插件:go build -buildmode=plugin ../mrapps/wc.go
- Map 和 Reduce 函数通过 Go plugin 包从 .so 文件加载。修改
-
文件系统:
- 本实验要求 workers 共享文件系统,同一机器上运行时较简单,不同机器上则需 GFS 等全局文件系统支持。
-
命名约定:
- 中间文件命名格式:
mr-X-Y
,其中 X 是 Map 任务编号,Y 是 reduce 任务编号。
- 中间文件命名格式:
-
JSON 存储建议:
// 写入 key/value 对 enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv) } // 读取 key/value 对 dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
-
任务分配:
- worker 可以使用 ihash(key) 函数(在 worker.go 中)来选择 reduce 任务。
- 可参考
mrsequential.go
实现中的代码:读取 Map 输入文件、中间 key/value 对排序、将 Reduce 输出存储到文件。
-
并发与锁定:
- coordinator 是 RPC 服务器,需要锁定共享数据。可使用 Go 的竞态检测器测试并发:
go run -race
- coordinator 是 RPC 服务器,需要锁定共享数据。可使用 Go 的竞态检测器测试并发:
-
等待机制:
- workers 有时需要等待(如等待最后一个 map 完成后才能开始 reduce),可让 workers 定期询问 coordinator,并使用
time.Sleep()
。
- workers 有时需要等待(如等待最后一个 map 完成后才能开始 reduce),可让 workers 定期询问 coordinator,并使用
-
错误处理:
- coordinator 需要处理 worker 崩溃或卡住的情况,建议等待 10 秒后假设 worker 已死亡,重新分配任务。
-
测试建议:
- 使用
mrapps/crash.go
测试崩溃恢复。 - 使用临时文件并原子重命名处理崩溃的文件写入:
tempFile, _ := ioutil.TempFile("", "temp-") os.Rename(tempFile.Name(), "final-output-file")
- 使用
-
RPC 相关:
- Go RPC 只发送首字母大写的结构体字段,reply 结构体需包含默认值。
- 调用 RPC call() 函数时,reply 结构体应包含所有默认值:
reply := SomeType{} call(..., &reply)
规则说明
-
Map 阶段规则:
- Map 阶段将中间键分成 nReduce 个桶用于 reduce 任务,每个 mapper 为 reduce 任务创建 nReduce 个中间文件。
-
输出文件规则:
- worker 将第 X 个 reduce 任务的输出放在 mr-out-X 文件中。
-
输出格式规则:
- mr-out-X 文件中的每行输出使用 Go 的 “%v %v” 格式,传入 key 和 value。
-
文件修改规则:
- 可修改
mr/worker.go
、mr/coordinator.go
和mr/rpc.go
。测试时需确保代码在原始版本下正常运行。
- 可修改
-
中间输出规则:
- worker 将 Map 的中间输出放在当前目录的文件中供后续 Reduce 任务读取。
-
任务完成状态规则:
main/mrcoordinator.go
中 Done() 方法在任务完成时返回 true,协调器结束任务。
-
进程退出规则:
- worker 如果无法联系到 coordinator,可假设任务已完成并退出。coordinator 也可发送“请退出”的伪任务。
测试要求
必须通过以下测试:
- Word Count 测试
- Indexer 测试
- Map 并行性测试
- Reduce 并行性测试
- Job Count 测试
- Early Exit 测试
- Crash 测试
所有测试包含在 test-mr.sh
脚本中,最终输出需与顺序执行结果一致。
具体实现
问题
- 使用goland中的模板进行build的时候出错
如果直接从main函数进行build不会报错,如果手动从模板进行会报错 - lab题目理解出问题
总结
参考
- MapReduce论文翻译