MIT6.824 lab 1 小白实现过程

news2024/11/17 14:54:03

1.总体思路

        构建一个简单的MapReduce系统,Coordinator线程用于分配任务(包括Map任务和Reduce任务),Worker线程向Coordinator线程请求任务,要求所有map任务完成后才可以请求到reduce任务,否则的话这个worker应该处于等待状态,每个任务完成后会通知Coordinator,由Coordinator进行统计。这意味着在Coordinator里要实现两个对外开放的方法,能让Work通过RPC的方式调用这两个方法。

Coordinator中向Worer开放的方法:

DistributeTask(args *TaksArgs, reply *Task)  //主线程发任务的方法

MarkFinished(args *Task, reply *Task) //工作线程通知主线程完成任务的方法

Coordinator统计到所有任务都完成的话就需要关闭worker线程,会返回一个状态是killTask的任务,在worker中是这个状态的会结束线程,否则会循环的请求任务 (在实现的时候可以先从请求Map任务开始,打印出来对不对再试Reduce任务)。

任务是通过通道的方式发送的,可以保证线程安全

Coordinator类型的成员:

type Coordinator struct {
	// Your definitions here.
	ReduceNum int //传入参数决定多少个reducer 也要传入到任务里
	TaskId    int //每一个任务都有一个唯一的id
	ReduceId  int //这个为了生成结果文件命名时候用的

	TaskChannelReduce chan *Task     //使用通道保证线程安全 放reduce的任务通道
	TaskChannelMap    chan *Task     //放map任务的通道
	CoordinatorState  Phase          //目前分配任务处于哪个阶段  MapState、ReduceState、AllDone
	TaskMetaHolder    TaskMetaHolder //里面是一个map 存放任务的元信息 主要用于统计每个阶段的任务是否完成
// map里是 {taskid:taskptr,taskstate,runtime} 这样在统计的时候只需要遍历这个map统计taskstate的情况 

}

Task类

type Task struct {
	TaskType TaskType //任务类型 maptask,reducetask,waitingtask,killtask
	TaskId   int      //任务id

	ReduceNum  int      //输入的reduce num数量 用于hash
	InputFiles []string //传入的文件数组
	ReduceId   int      //标识第几个reduce任务 用于生成输出文件名
}

 2.Worker线程的实现

循环的请求任务,直到请求到的任务类型是KillTask,对于map任务和reduce任务,做完任务后要通知主线程,如果所有map任务没完成请求不到reduce任务 waiting一会儿再去请求

//mrworker.go 整体流程
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	alive := true

	// Your worker implementation here.
	// uncomment to send the Example RPC to the coordinator.
	
	// CallExample()
	for alive {
		task := RequireTask()向coordinator发送rpc请求 获取任务
		//根据请求任务的类型分别做不同的操作
		switch task.TaskType {
		case MapTask:
			DoMap(task, mapf)
			callDone(task)//告诉Coordinator这个任务完成
		case ReduceTask:
			DoReduce(task.ReduceId, task, reducef)
			callDone(task)
		case WaitingTask:
			fmt.Println("get waiting...")
			time.Sleep(time.Second)
		case KillTask:
			fmt.Println("kill worker...")
			alive = false
		}
		time.Sleep(time.Second)

	}

}

RequieTask()调用Coordinator中的DistributeTask()方法,不需要传入参数(随便一个空的),传出参数是请求到的任务

// 通过rpc请求一个map任务
func RequireTask() *Task {
	args := TaksArgs{}
	reply := Task{}
	ok := call("Coordinator.DistributeTask", &args, &reply)
	fmt.Printf("请求任务完成!")
	if ok {
		fmt.Printf("worker get a job id = %d\n", reply.TaskId)
	} else {
		fmt.Printf("call failed!\n")
	}
	return &reply

}

 DoMap()和DoReduce()和mrsequence.go里给的基本一样,不过要注意在这里map生成的中间文件是下面这样的,rm-tmp-X-Y,也就是说map对每一个文件的处理都会输出到reduceNum个不同的文件中,方法是处理一个文件时提供好的ihash()函数,根据处理得到的key使用hash和%,映射到10个文件中,这里是有8个输入的txt文件,10个reduceNum 所以会生成80个中间文件。在做reduce任务的时候 会把所有Y值一样的文件放到一个任务里一起处理。

 对于CallDone()会调用Coordinator中的MarkFinished方法通知,不需要返回值,只需要传入任务告诉那边

func callDone(task *Task) {
	//告诉coordinatior任务已经完成  将任务状态变成已完成

	reply := Task{}
	ok := call("Coordinator.MarkFinished", &task, &reply)
	if ok {
		fmt.Printf("mark finished,id = %d\n", task.TaskId)
	} else {
		fmt.Println("call markfinished fail!")
	}
}

  3.Coordinator线程的实现

主线程启动的时候就要将map任务制作好放到TaskchannelMap里,然后等待worker的请求,如果TaskchannelMap里面不为空 就说明map任务还没发完一直发map,如果为空了就要取判断所有发出去的map任务是否都完成了(完成的任务worker会通知的),完成了才可以进入下一阶段:制作reduce任务,放到channel里,发reduce任务,否则的话会让任务状态是waiting。同样Reduce任务都发完(channel为空)去判断一下是否reduce都完成,完成的话关闭子线程

//Coordinator.go
// worker通过RPC请求的方法 需要有传入参数和传出参数 !! reply参数会返回给worker
// 分发任务 这是coordinator的方法
func (c *Coordinator) DistributeTask(args *TaksArgs, reply *Task) error {
	mu.Lock()//要加锁
	defer mu.Unlock()
	fmt.Println("coordator get a request from worker:")
	if c.CoordinatorState == MapState {
		if len(c.TaskChannelMap) > 0 {
			*reply = *<-c.TaskChannelMap
			c.TaskMetaHolder.fireTask(reply.TaskId)//用来改变状态从初始的waiting->working
			//放入到meta信息map中
			fmt.Printf("发了一个id = %d 的map任务\n", reply.TaskId)
		} else {
			/*
				这时需要一个中间状态waiting 否则worker还要去做domap
			*/
			reply.TaskType = WaitingTask
			fmt.Println("map task 发完了,等所有map任务结束,该给worker发reduce任务了")
			if c.TaskMetaHolder.checkMapDone() {
				//fmt.Println("map channel里没有任务...")
				//检查一下是否map任务都做完了 做完了的话制作reduce任务
				c.nextState() //制作reduce发生在这时候
			}
			return nil
		}
	} else if c.CoordinatorState == ReduceState {
		if len(c.TaskChannelReduce) > 0 {

			*reply = *<-c.TaskChannelReduce
			c.TaskMetaHolder.fireTask(reply.TaskId)
			fmt.Printf("发了一个id = %d的reduce任务", reply.TaskId)

		} else {
			reply.TaskType = WaitingTask
			fmt.Println("map channel里没有任务...")
			if c.TaskMetaHolder.checkReduceDone() {
				fmt.Println("******************")
				c.nextState()
			}
			return nil
		}
	} else {
		reply.TaskType = KillTask
		fmt.Println("所有任务都完成了...")
	}
	return nil

}

 MarkFinshed()函数会把map里的任务的状态改成done ,这是woker线程调用的

func (c *Coordinator) MarkFinished(args *Task, reply *Task) error {
	c.TaskMetaHolder.MetaMap[args.TaskId].Taskstate = done
	switch args.TaskType {
	case MapTask:

		fmt.Printf("Map task ID[%d] is finished.\n", args.TaskId)
	case ReduceTask:
		fmt.Printf("Reduce task ID[%d] is finished.\n", args.TaskId)
	}
	return nil

}

检测任务完成情况,可以直接根据map里信息的来统计,map 存放任务的元信息:

{taskid:{taskptr,taskstate,runtime}} 需要遍历这个map统计taskstate的情况

// 检测任务完成情况
func (Taskholer *TaskMetaHolder) checkMapDone() bool {
	mapDoneNum := 0
	mapUndoneNum := 0
	for _, v := range Taskholer.MetaMap {
		if v.TaskPtr.TaskType == MapTask {
			if v.Taskstate == done {
				mapDoneNum++
			} else {
				mapUndoneNum++
			}
		}
	}
	fmt.Printf("%d/%d map jobs are done\n", mapDoneNum, mapDoneNum+mapUndoneNum)
	return (mapDoneNum > 0 && mapUndoneNum == 0)

}

func (Taskholer *TaskMetaHolder) checkReduceDone() bool {
	reduceDnoeNum := 0
	reduceUndnoeNum := 0
	for _, v := range Taskholer.MetaMap {
		if v.TaskPtr.TaskType == ReduceTask {
			if v.Taskstate == done {
				reduceDnoeNum++
			} else {
				reduceUndnoeNum++
			}
		}
	}
	fmt.Printf(" %d/%d reduce job are done\n", reduceDnoeNum, reduceUndnoeNum+reduceDnoeNum)
	return (reduceDnoeNum > 0 && reduceUndnoeNum == 0)

}

用一个crash探测线程去探测worker的情况,如果超过10s就任务这个worker crash了,要把它的任务重新放回到channel里等待其他的worker完成

func (c *Coordinator) CrashHandler() {
	//Crash探测线程 超过10s没完成的任务加回队列中
	for {
		fmt.Println("**********探测**********")
		time.Sleep(time.Second * 2) //2s探测一次
		mu.Lock()
		// defer mu.Unlock() // 不可以 会死锁?
		if c.CoordinatorState == AllDone {
			mu.Unlock()
			break
		}
		for _, v := range c.TaskMetaHolder.MetaMap {
			if v.Taskstate == working && time.Since(v.StartTime) > 9*time.Second {
				fmt.Printf("task id = [%d] is crash, time = %d", v.TaskPtr.TaskId, time.Since(v.StartTime))

				switch v.TaskPtr.TaskType{
				case MapTask:
					//放回channel
					c.TaskChannelMap <- v.TaskPtr
					v.Taskstate = waiting

				case ReduceTask:
					c.TaskChannelReduce <- v.TaskPtr
					v.Taskstate = waiting 
				}
			}

		}
		mu.Unlock()

	}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/611768.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Springboot +spring security,基于默认数据库模型实现授权

一.简介 上一篇文章中讲解了如何基于内存模型来实现授权&#xff0c;在这种模型里&#xff0c;用户的信息是保存在内存中的。但是&#xff0c;保存在内存中的信息&#xff0c;是无法持久化的&#xff0c;也就是程序一旦关闭&#xff0c;或者断电等情况发生&#xff0c;内存中的…

0基础学习VR全景平台篇第36篇:场景功能-导览

大家好&#xff0c;欢迎观看蛙色VR官方系列——后台使用课程&#xff01; 本期为大家带来蛙色VR平台&#xff0c;场景管理—导览功能操作。 功能位置示意 一、本功能将用在哪里&#xff1f; 导览&#xff0c;指给VR漫游作品预先设置好路线&#xff0c;并且可以自定义路线的旋…

DMBOK知识梳理for CDGA/CDGP——第三章数据治理

关 注gzh“大数据食铁兽” 回复“知识点”获取《DMBOK知识梳理for CDGA/CDGP》常考知识点&#xff08;第三章数据治理&#xff09; 第三章 数据治理 第三章在是CDGA|CDGP考试的重点考核章节之一&#xff0c;知识点比较密集&#xff0c;本章重点为语境关系图及数据治理概念…

初心不改凌云志 热血浇灌信仰花 《凭栏一片风云起》湖北卫视热力开播

浮光灼夏 御风而行&#xff0c; 由著名导演金琛执导&#xff0c; 胡一天、章若楠、王劲松 张晞临、张赫、林子璐领衔主演&#xff0c; 高伟光特邀出演的 年代战争剧《凭栏一片风云起》&#xff0c; 将于今晚19:30起&#xff0c; 登陆【湖北卫视】长江剧场。 电视剧《凭栏…

音乐人解密:究竟是如何一步一步成为音乐人的?

音乐人解密&#xff1a;究竟是如何一步一步成为音乐人的&#xff1f; 音乐是人类伟大的产物&#xff0c;近些年来越来越多的人都开始尝试学习音乐&#xff0c;成为一名音乐人。而艺术高考等途径也为许多想要学习音乐、成为职业歌手或者编曲师的人群提供了途径。然而想要成为一名…

初识EasyUI

2.1何为EasyUI. EasyUI的全称是“JQuery EasyUI”&#xff0c;是一种基于jQuery、Angular、Vue和React的用户界面的插件的集合&#xff0c;EasyUI的目标就是帮助web开发者更轻松的打造出功能丰富并且美观的UI界面。开发者不需要编写复杂的javascript&#xff0c;也不需要对css样…

【Protobuf速成指南】Win/Centos7下Protobuf安装教程

文章目录 安装教程一、Windows1.1 下载编译器1.2 配置PATH1.3 其他依赖项 二、Centos72.1 安装必要的工具2.2 下载安装包2.3 安装 安装教程 以版本为V21.11为例说明 一、Windows 1.1 下载编译器 下载地址&#xff1a;链接&#xff0c;一直往下翻找到 V21.11版本 win用户根据…

火爆全网,最全性能测试从0到1进阶总结,高阶内卷学习路线...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 例如&#xff1a;…

ArduPilot飞控开源代码之滤波设置

ArduPilot飞控开源代码之滤波设置 1. 源由2. 原理3. 调优3.1 ACC低通滤波 INS_ACCEL_FILTER3.2 GRYO低通滤波 INS_GYRO_FILTER3.3 陷波滤波 INS_HNTCH_ENABLE & INS_HNTC2_ENABLE 4. 总结5. 参考资料 1. 源由 对于飞控传感器来来说&#xff0c;振动噪声也是数据。 单纯从数…

yolov3

文章目录 前言一、主干网络darknet53二、从特征获取预测结果 前言 本文主要讲解yolov3的基本知识&#xff0c;如有错误请指出。 本文主要来自 博客1 博客2 一、主干网络darknet53 53是因为有53层。 1、darknet53没有使用pooling 来进行下采样&#xff0c;而是用一个33&…

电脑数据隐藏原因有哪些?电脑里隐藏的数据怎么恢复

电脑里隐藏的数据怎么恢复&#xff1f;电脑中的数据很容易被隐藏&#xff0c;这时候很多人可能会感到焦急和无助。不过不用担心&#xff0c;本文将为大家介绍三种方法&#xff0c;让你轻松找回被隐藏的数据&#xff01; ※电脑数据隐藏原因有哪些 电脑数据可能会隐藏&#xf…

100天精通Golang:全面掌握Go语言的旅程

&#x1f337; 博主 libin9iOak带您 Go to Golang Language.✨ &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &#x1f30a; 《I…

QLoRA:量化 LLM 的高效微调

此 repo 支持论文“QLoRA&#xff1a;量化 LLM 的高效微调”&#xff0c;旨在使对 LLM 研究的访问民主化。 QLoRA 使用bitsandbytes进行量化&#xff0c;并与 Hugging Face 的PEFT和transformers库集成。QLoRA 由华盛顿大学 UW NLP 小组的成员开发。 概述 我们介绍了 QLoRA&…

volatile - (C语言)

volatile关键字和const一样都是一种类型修饰符&#xff0c;用它修饰过的变量表示可以被某些编译器未知的因素更改&#xff0c;比如操作系统、硬件或者是其它线程等。 该关键字是不希望被编译器优化&#xff0c;从而达到稳定访问内存的目的。 示例代码&#xff1a; #include&…

FlinkUI和Flink常见问题解决

Flink 系统架构/Flink 作业提交运行的原理 我们编写的代码,对应着在Flink集群上执行的一个作业;所以我们在本地执行代码, 其实是idea开发环境中根据引入的依赖,先模拟启动一个Flink集群,然后把我们代码中定义好的操作,作为"作业",(job要打包好)然后将作业提…

双出口网络链路和设备双冗余案例

1、AR3模拟联通和电信运营商 2、2台防火墙vrrphrp双冗余&#xff0c;下联局域网vrrp 10.3.0.3地址&#xff0c;上联两条外线每条外线都分别vrrp虚拟一个地址1.1.1.1.和2.2.2.2.1。 3、防火墙外线地址和运营商给的外线地址不在同一个网段&#xff0c;每条都用vrrp冗余链路&#…

安装第三方库时的问题—复现带setup.py的项目

目录 题目分析&#xff1a; 正片开始&#xff1a; 题目分析&#xff1a; 事情的经过大致是这样&#xff1a; 今天在github上拿到一个处理时间序列的迁移学习项目的复现代码&#xff0c;项目文件如下所示&#xff1a; 或者我们来关注一下tl4sm这个文件夹里的东西&#xff1…

Tomcat的部署和优化(生命中的全部偶然,其实都是命中注定)

文章目录 一、Tomcat简介二、Tomcat 的构成三、Tomcat 功能组件结构四、Tomcat 请求过程五、Tomcat 服务部署六、Tomcat 虚拟主机配置七、Tomcat优化1.Tomcat 配置文件参数优化2.JVM优化 一、Tomcat简介 Tomcat 是 Java 语言开发的&#xff0c;Tomcat 服务器是一个免费的开放源…

【shiro】问题记录--为什么refreshToken方法走不下去

一、前言 最近做Jwt token续签的时候&#xff0c;在很多博客和下载的代码中&#xff0c;都是在JWTFilter中进行token的刷新&#xff0c;于是就按照了网上的代码进行尝试&#xff0c;代码如下&#xff1a; 1. 代码 在JWTFilter中的isAccessAllowed方法 目的&#xff1a;就是想…

STM32单片机RS485远程PID直流电机调速系统光电传感器

实践制作DIY- GC0137-RS485远程PID直流电机调速系统 基于STM32单片机设计-RS485远程PID直流电机调速系统 二、功能介绍&#xff1a; 主机&#xff1a;STM32F103C系列最小系统LCD1602直流电机光电测速MX15系列驱动模块4*4矩阵键盘RS485收发电路 从机&#xff1a;STM32F103C系…