1.总体思路
构建一个简单的MapReduce系统,Coordinator线程用于分配任务(包括Map任务和Reduce任务),Worker线程向Coordinator线程请求任务,要求所有map任务完成后才可以请求到reduce任务,否则的话这个worker应该处于等待状态,每个任务完成后会通知Coordinator,由Coordinator进行统计。这意味着在Coordinator里要实现两个对外开放的方法,能让Work通过RPC的方式调用这两个方法。
Coordinator中向Worer开放的方法:
DistributeTask(args *TaksArgs, reply *Task) //主线程发任务的方法
MarkFinished(args *Task, reply *Task) //工作线程通知主线程完成任务的方法
Coordinator统计到所有任务都完成的话就需要关闭worker线程,会返回一个状态是killTask的任务,在worker中是这个状态的会结束线程,否则会循环的请求任务 (在实现的时候可以先从请求Map任务开始,打印出来对不对再试Reduce任务)。
任务是通过通道的方式发送的,可以保证线程安全
Coordinator类型的成员:
type Coordinator struct {
// Your definitions here.
ReduceNum int //传入参数决定多少个reducer 也要传入到任务里
TaskId int //每一个任务都有一个唯一的id
ReduceId int //这个为了生成结果文件命名时候用的
TaskChannelReduce chan *Task //使用通道保证线程安全 放reduce的任务通道
TaskChannelMap chan *Task //放map任务的通道
CoordinatorState Phase //目前分配任务处于哪个阶段 MapState、ReduceState、AllDone
TaskMetaHolder TaskMetaHolder //里面是一个map 存放任务的元信息 主要用于统计每个阶段的任务是否完成
// map里是 {taskid:taskptr,taskstate,runtime} 这样在统计的时候只需要遍历这个map统计taskstate的情况
}
Task类
type Task struct {
TaskType TaskType //任务类型 maptask,reducetask,waitingtask,killtask
TaskId int //任务id
ReduceNum int //输入的reduce num数量 用于hash
InputFiles []string //传入的文件数组
ReduceId int //标识第几个reduce任务 用于生成输出文件名
}
2.Worker线程的实现
循环的请求任务,直到请求到的任务类型是KillTask,对于map任务和reduce任务,做完任务后要通知主线程,如果所有map任务没完成请求不到reduce任务 waiting一会儿再去请求
//mrworker.go 整体流程
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
alive := true
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
// CallExample()
for alive {
task := RequireTask()向coordinator发送rpc请求 获取任务
//根据请求任务的类型分别做不同的操作
switch task.TaskType {
case MapTask:
DoMap(task, mapf)
callDone(task)//告诉Coordinator这个任务完成
case ReduceTask:
DoReduce(task.ReduceId, task, reducef)
callDone(task)
case WaitingTask:
fmt.Println("get waiting...")
time.Sleep(time.Second)
case KillTask:
fmt.Println("kill worker...")
alive = false
}
time.Sleep(time.Second)
}
}
RequieTask()调用Coordinator中的DistributeTask()方法,不需要传入参数(随便一个空的),传出参数是请求到的任务
// 通过rpc请求一个map任务
func RequireTask() *Task {
args := TaksArgs{}
reply := Task{}
ok := call("Coordinator.DistributeTask", &args, &reply)
fmt.Printf("请求任务完成!")
if ok {
fmt.Printf("worker get a job id = %d\n", reply.TaskId)
} else {
fmt.Printf("call failed!\n")
}
return &reply
}
DoMap()和DoReduce()和mrsequence.go里给的基本一样,不过要注意在这里map生成的中间文件是下面这样的,rm-tmp-X-Y,也就是说map对每一个文件的处理都会输出到reduceNum个不同的文件中,方法是处理一个文件时提供好的ihash()函数,根据处理得到的key使用hash和%,映射到10个文件中,这里是有8个输入的txt文件,10个reduceNum 所以会生成80个中间文件。在做reduce任务的时候 会把所有Y值一样的文件放到一个任务里一起处理。
对于CallDone()会调用Coordinator中的MarkFinished方法通知,不需要返回值,只需要传入任务告诉那边
func callDone(task *Task) {
//告诉coordinatior任务已经完成 将任务状态变成已完成
reply := Task{}
ok := call("Coordinator.MarkFinished", &task, &reply)
if ok {
fmt.Printf("mark finished,id = %d\n", task.TaskId)
} else {
fmt.Println("call markfinished fail!")
}
}
3.Coordinator线程的实现
主线程启动的时候就要将map任务制作好放到TaskchannelMap里,然后等待worker的请求,如果TaskchannelMap里面不为空 就说明map任务还没发完一直发map,如果为空了就要取判断所有发出去的map任务是否都完成了(完成的任务worker会通知的),完成了才可以进入下一阶段:制作reduce任务,放到channel里,发reduce任务,否则的话会让任务状态是waiting。同样Reduce任务都发完(channel为空)去判断一下是否reduce都完成,完成的话关闭子线程
//Coordinator.go
// worker通过RPC请求的方法 需要有传入参数和传出参数 !! reply参数会返回给worker
// 分发任务 这是coordinator的方法
func (c *Coordinator) DistributeTask(args *TaksArgs, reply *Task) error {
mu.Lock()//要加锁
defer mu.Unlock()
fmt.Println("coordator get a request from worker:")
if c.CoordinatorState == MapState {
if len(c.TaskChannelMap) > 0 {
*reply = *<-c.TaskChannelMap
c.TaskMetaHolder.fireTask(reply.TaskId)//用来改变状态从初始的waiting->working
//放入到meta信息map中
fmt.Printf("发了一个id = %d 的map任务\n", reply.TaskId)
} else {
/*
这时需要一个中间状态waiting 否则worker还要去做domap
*/
reply.TaskType = WaitingTask
fmt.Println("map task 发完了,等所有map任务结束,该给worker发reduce任务了")
if c.TaskMetaHolder.checkMapDone() {
//fmt.Println("map channel里没有任务...")
//检查一下是否map任务都做完了 做完了的话制作reduce任务
c.nextState() //制作reduce发生在这时候
}
return nil
}
} else if c.CoordinatorState == ReduceState {
if len(c.TaskChannelReduce) > 0 {
*reply = *<-c.TaskChannelReduce
c.TaskMetaHolder.fireTask(reply.TaskId)
fmt.Printf("发了一个id = %d的reduce任务", reply.TaskId)
} else {
reply.TaskType = WaitingTask
fmt.Println("map channel里没有任务...")
if c.TaskMetaHolder.checkReduceDone() {
fmt.Println("******************")
c.nextState()
}
return nil
}
} else {
reply.TaskType = KillTask
fmt.Println("所有任务都完成了...")
}
return nil
}
MarkFinshed()函数会把map里的任务的状态改成done ,这是woker线程调用的
func (c *Coordinator) MarkFinished(args *Task, reply *Task) error {
c.TaskMetaHolder.MetaMap[args.TaskId].Taskstate = done
switch args.TaskType {
case MapTask:
fmt.Printf("Map task ID[%d] is finished.\n", args.TaskId)
case ReduceTask:
fmt.Printf("Reduce task ID[%d] is finished.\n", args.TaskId)
}
return nil
}
检测任务完成情况,可以直接根据map里信息的来统计,map 存放任务的元信息:
{taskid:{taskptr,taskstate,runtime}} 需要遍历这个map统计taskstate的情况
// 检测任务完成情况
func (Taskholer *TaskMetaHolder) checkMapDone() bool {
mapDoneNum := 0
mapUndoneNum := 0
for _, v := range Taskholer.MetaMap {
if v.TaskPtr.TaskType == MapTask {
if v.Taskstate == done {
mapDoneNum++
} else {
mapUndoneNum++
}
}
}
fmt.Printf("%d/%d map jobs are done\n", mapDoneNum, mapDoneNum+mapUndoneNum)
return (mapDoneNum > 0 && mapUndoneNum == 0)
}
func (Taskholer *TaskMetaHolder) checkReduceDone() bool {
reduceDnoeNum := 0
reduceUndnoeNum := 0
for _, v := range Taskholer.MetaMap {
if v.TaskPtr.TaskType == ReduceTask {
if v.Taskstate == done {
reduceDnoeNum++
} else {
reduceUndnoeNum++
}
}
}
fmt.Printf(" %d/%d reduce job are done\n", reduceDnoeNum, reduceUndnoeNum+reduceDnoeNum)
return (reduceDnoeNum > 0 && reduceUndnoeNum == 0)
}
用一个crash探测线程去探测worker的情况,如果超过10s就任务这个worker crash了,要把它的任务重新放回到channel里等待其他的worker完成
func (c *Coordinator) CrashHandler() {
//Crash探测线程 超过10s没完成的任务加回队列中
for {
fmt.Println("**********探测**********")
time.Sleep(time.Second * 2) //2s探测一次
mu.Lock()
// defer mu.Unlock() // 不可以 会死锁?
if c.CoordinatorState == AllDone {
mu.Unlock()
break
}
for _, v := range c.TaskMetaHolder.MetaMap {
if v.Taskstate == working && time.Since(v.StartTime) > 9*time.Second {
fmt.Printf("task id = [%d] is crash, time = %d", v.TaskPtr.TaskId, time.Since(v.StartTime))
switch v.TaskPtr.TaskType{
case MapTask:
//放回channel
c.TaskChannelMap <- v.TaskPtr
v.Taskstate = waiting
case ReduceTask:
c.TaskChannelReduce <- v.TaskPtr
v.Taskstate = waiting
}
}
}
mu.Unlock()
}
}