一、说明
- 本次实验是基于MIT-6.824的课程,详情请参见官网主页
- 下载源代码
二、MapReduce原理
2.1 经典的分布式模型
MapReduce是经典的分布式模型。通过Map函数和Reduce函数实现。
分布式计算,就是利用多台机器,完成一个任务。关于分布式计算,几个经典的例子就是单词词频统计。假设现在有1000MB的文本文件需要进行词频统计,如果只有1台机器,处理此大文件可能需要10s。
如果现在有10台机器,每台机器负责处理100MB的数据,处理完之后(并行),再进行汇总,那效率将会极大提升。
2.2 MapReduce的过程
- MapReduce模型中,有一个master,负责分配任务,有多个worker,负责map任务和reduce任务。当map处理好任务后,输出中间结果{key, value}。一般来说,每个reduce会负责固定key的任务。reduce拿到中间结果继续处理,最后再整合输出。
三. Go语言实现
3.1 Worker进行和Coordinate进行RPC通信
- RPC的参数
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
需要注意,遍历要大写,因为在Go语言中大写首字母代表public。
- RPC服务器的接口
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
客户端能够远程调用该函数,并得到结果
- 客户端远程调用
func CallExample() {
args := ExampleArgs{}
args.X = 99
reply := ExampleReply{}
// send the RPC request, wait for the reply.
call("Coordinator.Example", &args, &reply)
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
}
3.2 一些变量的定义
- 我们这里使用了状态机的方式,设置了
TaskMap
,TaskReduce
,TaskWait
,TaskEnd
几种状态,分别表示Map任务,Reduce任务,目前还有任务正在执行需要等待,任务全部完成
const (
TaskMap = 1 // Map任务
TaskReduce = 2 // Reduece任务
TaskWait = 3 // 暂时无任务
TaskEnd = 4 // 所有任务已完成
FixedTimeOut = 15 // 等待15s,Worker没回复,就说明出错了
)
var Debug bool = false
// Map任务
type MapTask struct {
FileName string // 需要处理的文件
MapID int // 当前map的编号
NReduce int // 需要分成几块
}
type ReduceTask struct {
FileName string // 该任务属于哪一个文件
MapID int // 哪个map的输出结果
ReduceID int // 当前reduce的编号
}
// Debug模式下才需要打印
func Dprintf(format string, data ...interface{}) {
if Debug {
fmt.Printf(format, data...)
}
}
3.2 Coordinator
- Coorinator负责整个MapReduce过程的管理,因此需要记录任务相关的信息
type Coordinator struct {
// Your definitions here.
mutex sync.Mutex
mapTaskQ []MapTask
redTaskQ []ReduceTask // [{0-0}, {0-1}, {0-2}, ..., {0-9}, {1-0},...{1-9}]
mapTaskingQ []MapTask // 正在执行map任务
redTaskingQ []ReduceTask // 正在执行reduce任务
nReduce int
isDone bool
}
mutex
:因为程序涉及到多线程,因此需要加锁
mapTaskQ
:记录目前的Map任务,实际上就是文件名
redTaskQ
:记录当前的Reduce任务,因为每个map会输出对应nReduce的中间文件,实验给的提示是通过ihash(key)函数,将不同的key映射到不同的reduce上,因此每个reduce应该会涉及到处理多个中间文件
mapTaskingQ
:正在执行的map任务,当目前还有正在执行的map任务时,但是又有闲置的map机器在请求任务,此时coordinator应该让其等待,因为可能执行任务的map机器出现故障,此时就需要找到新的map继续执行该任务。此外,如果当目前还有正在执行的map任务,也不应该进入到reduce阶段
redTaskingQ
:正在执行的reduce任务,理由和上面类似,都是为了防止发生故障
- Coordinator初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.mapTaskingQ = make([]MapTask, 0)
c.nReduce = nReduce
// 每个文件对应一个任务
for i, file := range files {
task := MapTask{
FileName: file,
MapID: i,
NReduce: nReduce,
}
c.mapTaskQ = append(c.mapTaskQ, task)
}
c.isDone = false
Dprintf("Master working...\n")
go c.tasking2task()
c.server()
return &c
}
实际上就是将文件转换为map任务
- AskTask Worker请求任务的RPC调用
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
// 还有map任务
if len(c.mapTaskQ) > 0 {
reply.TaskType = TaskMap
reply.MapTask = c.assignMapTask()
return nil
}
// 有正在执行的map任务
if len(c.mapTaskingQ) > 0 {
reply.TaskType = TaskWait
Dprintf("some MapTasks are not completed, please wait...\n")
return nil
}
// 有reduce任务
if len(c.redTaskQ) > 0 {
reply.TaskType = TaskReduce
redTasks := c.assignRedTask()
reply.RedTasks = append(reply.RedTasks, redTasks...)
return nil
}
// 有正在执行的reduce任务
if len(c.redTaskingQ) > 0 {
reply.TaskType = TaskWait
Dprintf("some ReduceTasks are not completed, please wait...\n")
return nil
}
// reduce任务也处理完了
reply.TaskType = TaskEnd
c.isDone = true
Dprintf("ALL tasks done,msg: closing -> worker...\n")
return nil
}
- 分配Map任务
func (c *Coordinator) assignMapTask() MapTask {
task := c.mapTaskQ[0]
c.mapTaskQ = append(c.mapTaskQ[:0], c.mapTaskQ[1:]...)
c.mapTaskingQ = append(c.mapTaskingQ, task)
Dprintf("assign MapTask, fileName..%v, mapId..%v, mReduce..%v\n",
task.FileName, task.MapID, task.NReduce)
return task
}
- 分配reduce任务
// 分配reduce任务
func (c *Coordinator) assignRedTask() []ReduceTask {
redTasks := make([]ReduceTask, 0)
reduceId := c.redTaskQ[0].ReduceID
// 取第一个reduce task
for i := 0; i < len(c.redTaskQ); {
if c.redTaskQ[i].ReduceID != reduceId {
i++
continue
}
task := c.redTaskQ[i]
c.redTaskingQ = append(c.redTaskingQ, task)
redTasks = append(redTasks, task)
c.redTaskQ = append(c.redTaskQ[:i], c.redTaskQ[i+1:]...)
Dprintf("assign ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
task.FileName, task.MapID, task.ReduceID)
}
return redTasks
}
- worker完成,回复coordinator
func (c *Coordinator) TaskDone(args *TaskDoneReply, reply *ExampleReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
switch args.TaskType {
case TaskMap:
c.mapTaskingQDeleter(args.MapTask, args.RedTasks)
break
case TaskReduce:
c.redTaskingQDeleter(args.RedTasks)
break
default:
break
}
return nil
}
- map任务完成,从mapTaskingQ中移除,同时还应该将map处理好的中间结果,存放到redTaskQ
func (c *Coordinator) mapTaskingQDeleter(mapTask MapTask, redTasks []ReduceTask) {
Dprintf("MapTask done, fileName..%v, mapId..%v, nReduce..%v\n",
mapTask.FileName, mapTask.MapID, mapTask.NReduce)
// 找到 mapTask.MapID的任务,然后剔除
for i := 0; i < len(c.mapTaskingQ); i++ {
if c.mapTaskingQ[i].MapID == mapTask.MapID {
c.mapTaskingQ = append(c.mapTaskingQ[:i], c.mapTaskingQ[i+1:]...)
break
}
}
// 将该map的结果放到reduceTask
// reTasks = [{i-0},...,{i-9}]
for _, v := range redTasks {
c.redTaskQ = append(c.redTaskQ, v)
Dprintf("add ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
v.FileName, v.MapID, v.ReduceID)
}
}
- reduce任务完成,从redTaskingQ移除
func (c *Coordinator) redTaskingQDeleter(redTasks []ReduceTask) {
for i := 0; i < len(redTasks); i++ {
task := redTasks[i]
Dprintf("ReduceTask done, fileName..%v, mapId..%v, reduceId..%v\n",
task.FileName, task.MapID, task.ReduceID)
for j := 0; j < len(c.redTaskingQ); {
if c.redTaskingQ[j].ReduceID == task.ReduceID {
c.redTaskingQ = append(c.redTaskingQ[:j], c.redTaskingQ[j+1:]...)
continue
}
j++
}
}
}
- coordinator还需要定期检查是否有map或者reduce宕机,如果有宕机,应该重新由空闲的机器处理。这里我们采用设置一个等待时间
FixedTimeOut=15
,如果超过这个时间正在执行的任务还没有完成,应该将他们移动到任务队列中
func (c *Coordinator) tasking2task() {
for {
time.Sleep(FixedTimeOut * time.Second)
c.mutex.Lock()
// defer c.mutex.Unlock()
// 还有正在执行的map任务,将它添加到待完成任务队列 mapTaskQ
if len(c.mapTaskingQ) != 0 {
for i, _ := range c.mapTaskingQ {
c.mapTaskQ = append(c.mapTaskQ, c.mapTaskingQ[i])
}
c.mapTaskingQ = []MapTask{}
}
if len(c.redTaskingQ) != 0 {
Dprintf("redTaskingQ..%v\n", c.redTaskingQ)
for i, _ := range c.redTaskingQ {
c.redTaskQ = append(c.redTaskQ, c.redTaskingQ[i])
}
c.redTaskingQ = []ReduceTask{}
}
c.mutex.Unlock()
}
}
3.3 Worker
- 因为我们是单机,所以这里需要模拟多机,我们会让Worker一直请求任务,知道所有任务处理完成
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := CallAskTask()
switch reply.TaskType {
case TaskMap: // Map任务
doMapTask(mapf, reply.MapTask)
break
case TaskReduce: // Reduce任务
doReduceTask(reducef, reply.RedTasks)
break
case TaskWait: // 此时没有任务
time.Sleep(1 * time.Second)
break
case TaskEnd: // 所有任务全部完成
Dprintf("任务全部完成,关机...\n")
return
default:
fmt.Println("reply.TaskType: ", reply.TaskType)
Dprintf("Unknown fault\n")
break
}
}
}
- 请求任务
func CallAskTask() AskTaskReply {
args := AskTaskArgs{}
reply := AskTaskReply{}
call("Coordinator.AskTask", &args, &reply)
return reply
}
- 处理map任务
func doMapTask(mapf func(string, string) []KeyValue, mapTask MapTask) {
Dprintf("doing MapTask, fileName..%v, mapId..%v, nReduce..%v\n",
mapTask.FileName, mapTask.MapID, mapTask.NReduce)
fileName := mapTask.FileName
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", fileName)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot open %v", fileName)
}
file.Close()
// 通过map函数,统计每个单词
kvs := mapf(fileName, string(content))
sort.Sort(ByKey(kvs))
// 将所有的单词都放到对应的 reduces[idx]中 idx = [0,...,NReduce)
reduces := make([][]KeyValue, mapTask.NReduce)
for _, kv := range kvs {
idx := ihash(kv.Key) % mapTask.NReduce
reduces[idx] = append(reduces[idx], kv)
}
// reduce任务
redTasks := []ReduceTask{} // [{i-0}, ... ,{i-9}]
for idx, reduce := range reduces {
// reduce任务
redTask := ReduceTask{
FileName: mapTask.FileName,
MapID: mapTask.MapID,
ReduceID: idx,
}
redTasks = append(redTasks, redTask)
output := "mr-" + strconv.Itoa(redTask.MapID) + "-" + strconv.Itoa(redTask.ReduceID)
file, err = os.Create(output)
if err != nil {
log.Fatalf("cannot create %v", output)
}
defer file.Close()
enc := json.NewEncoder(file)
for _, kv := range reduce {
enc.Encode(&kv)
}
}
Dprintf("MapTask to ReduceTask done, fileName..%v, mapId..%v\n", mapTask.FileName, mapTask.MapID)
CallTaskDone(TaskMap, mapTask, redTasks)
}
- 处理reduce任务
func doReduceTask(reducef func(string, []string) string, redTasks []ReduceTask) {
intermediate := []KeyValue{}
for _, v := range redTasks {
Dprintf("doing ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
v.FileName, v.MapID, v.ReduceID)
fileName := "mr-" + strconv.Itoa(v.MapID) + "-" + strconv.Itoa(v.ReduceID)
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", fileName)
}
defer file.Close()
// 反序列化JSON格式文件
dec := json.NewDecoder(file)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
//log.Print("read file done...")
break
}
intermediate = append(intermediate, KeyValue{kv.Key, kv.Value})
}
}
sort.Sort(ByKey(intermediate))
oname := "mr-out-" + strconv.Itoa(redTasks[0].ReduceID)
ofile, _ := os.Create(oname)
defer ofile.Close()
i := 0
for i < len(intermediate) {
// 找到每一组相同的key,统一放到一个slice
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
// ["1","1",...,"1"]
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
// 执行Reduce函数,计算key出现的次数
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
Dprintf("ReduceTask done, fileName..%v, mapId..%v\n", redTasks[0].FileName, redTasks[0].MapID)
CallTaskDone(TaskReduce, MapTask{}, redTasks)
}
- 任务处理完,需要回应coordinator
func CallTaskDone(taskType int, mapTask MapTask, redTasks []ReduceTask) {
args := TaskDoneReply{}
args.TaskType = taskType
args.MapTask = mapTask
args.RedTasks = redTasks
reply := ExampleReply{}
call("Coordinator.TaskDone", &args, &reply)
}