MIT6.5840 Lab 1: MapReduce(6.824)

news2025/1/20 16:29:01

结果

介绍
在本实验中,您将构建一个MapReduce系统。您将实现一个调用应用程序Map和Reduce函数并处理文件读写的工作进程,以及一个将任务分发给工作进程并处理失败的工作进程的协调进程。您将构建类似于MapReduce论文的东西。(注意:本实验使用“coordinator”代替论文中的“master”。)

mrsequential.go的逻辑就是从写好的代码(例如mrapps/wc.go)编译成的动态库(wc.so)中提取出map和reduce两个函数,再利用map来处理数据得到中间结果,reduce拿中间结果进一步处理得到最终结果。

现在要在分布式的环境下执行这个过程,也就是通过协调进程去把任务分发到worker上,这个任务可能是map可能是reduce,

Your Job (moderate/hard)

实现一个分布式MapReduce,它由两个程序组成,协调器和工作器。只有一个协调进程和一个或多个并行执行的工作进程。在一个真实的系统中,工人会在一堆不同的机器上运行,但在这个实验中,你将在一台机器上运行它们。工作人员将通过RPC与协调器对话。每个工作进程将在一个循环中向协调器请求一个任务,从一个或多个文件中读取任务的输入,执行任务,将任务的输出写入一个或多个文件,然后再次向协调器请求一个新任务。协调器应该注意到,如果一个工人没有在合理的时间内完成任务(在本实验中,使用10秒),并将相同的任务交给另一个工人。协调器和工作器的“主”例程位于main/mrcoordinato.go 和 main/mrworker.go不要更改这些文件。您应该将您的实现放在 mr/coordinator.go, mr/worker.go, and mr/rpc.go

实验要求:

  1. nReduce对应的Reduce数及输出的文件数,也要作为MakeCoordinator()方法的参数;
  2. Reduce任务的输出文件的命名为mr-out-X,这个X就是来自nReduce;
  3. mr-out-X的输出有个格式要求,参照main/mrsequential.go,"%v %v" 格式;
  4. Map输出的中间值要放到当前目录的文件中,Reduce任务从这些文件来读取;
  5. 当Coordinator.go的Done()方法返回true,MapReduce的任务就完成了;
  6. 当一个任务完成,对应的worker就应该终止,这个终止的标志可以来自于call()方法,若它去给Master发送请求,得到终止的回应,那么对应的worker进程就可以结束了。

实验提示:

  1. 修改mr/worker.go的Worker(),发送RPC请求给coordinator要任务。然后修改Coordinator将还没有被Map执行的文件作为响应返回给worker。然后worker读取文件并执行Map方法函数,就如示例文件 mrsequential.go;
  2. Map和Reduce函数加载来自插件wc.go,如果改了这些东西需要使用命令重新编译生成新的.so文件,尽量不要动这些东西;
  3. 中间文件的命名方式推荐为mr-X-Y,X对应Map任务Id,Y对应的Reduce任务Id;
  4. 为顺利存储中间数据,采用json,以便读取;
  5. worker 的 map 部分可以使用ihash(key)函数(在worker.go 中)为给定的键选择 reduce 任务;
  6. Coordinator作为一个 RPC 服务器,将是并发的;不要忘记锁定共享数据;
  7. 在所有Map任务完成后,Reduce任务才会开始,所以对应的worker可能会需要等待,那么可以使用time.sleep()或其他方法;
  8. worker可能挂掉或其他原因崩了,Coordinator在这个实验中等待10s,超过时间将会分配给其他的worker;
  9. 您可以使用 ioutil.TempFile 创建一个临时文件,并使用 os.Rename 对其进行原子重命名;
  10. test-mr.sh 运行子目录 mr-tmp 中的所有进程,因此如果出现问题并且您想查看中间文件或输出文件,请查看那里。您可以修改 test-mr.sh 以在测试失败后退出,这样脚本就不会继续测试(并覆盖输出文件)。
RPC通信 

项目中需要使用rpc的地方是worker向coordinator索要任务或发送任务完成情况,先探究rpc是如何通信的,在mr/coordinato.go中,注册rpc的函数为

func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

这里使用的是unix套接字,它用于本地进程之间的通信,通常比网络套接字更高效,因为数据不需要通过网络协议栈,在同一台机器上的进程之间通信,Worker 进程可以通过套接字文件连接到 Coordinator 进行 RPC 调用,使用 HTTP 协议来组织和传递数据。

整体流程概括为:Worker 的 RPC 请求通过 HTTP 协议发送->请求通过 Unix 套接字传输到 Coordinator->Coordinator 的 HTTP 服务处理请求,并返回响应。

在worker中调用rpc的方法如下,传入rpc方法名,参数和返回值。

func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}
worker部分

work的工作就是处理map任务和reduce任务,并在处理完成后反馈结果,那么在mr/worker.go中有,其中executeMapTask和executeReduceTask分别用来处理map和reduce任务,处理完成后会调用notifyTaskComplete反馈任务结果,函数的实现可以参考mrsequential.go。

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		task := requestTask()
		switch task.TaskType {
		case MapTask:
			executeMapTask(task, mapf)
		case ReduceTask:
			executeReduceTask(task, reducef)
		case NoTask:
			log.Println("No task available, sleeping...")
			time.Sleep(1 * time.Second)
		}
	}

}

func executeMapTask(task *TaskRep, mapf func(string, string) []KeyValue) {
	filename := task.FileName
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	kva := mapf(filename, string(content))

	intermediate := make(map[int][]KeyValue)
	for _, kv := range kva {
		reduceTaskNum := ihash(kv.Key) % task.ReduceCount
		intermediate[reduceTaskNum] = append(intermediate[reduceTaskNum], kv)
	}

	for reduceTaskNum, kvs := range intermediate {
		tempFile, _ := ioutil.TempFile("", "mr-temp-*")
		enc := json.NewEncoder(tempFile)
		for _, kv := range kvs {
			enc.Encode(&kv)
		}
		tempFile.Close()
		finalName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceTaskNum)
		os.Rename(tempFile.Name(), finalName)
	}

	notifyTaskComplete(task.TaskID, MapTask)
}

func executeReduceTask(task *TaskRep, reducef func(string, []string) string) {
	intermediate := make(map[string][]string)

	// 遍历所有 MapTask 的任务 ID
	for mapTaskID := 0; mapTaskID < task.MapTaskCount; mapTaskID++ {
		filename := fmt.Sprintf("mr-%d-%d", mapTaskID, task.TaskID)
		file, err := os.Open(filename)
		if err != nil {
			// 文件不存在可能是因为 MapTask 失败,忽略
			continue
		}
		// 解码中间文件内容
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate[kv.Key] = append(intermediate[kv.Key], kv.Value)
		}
		file.Close()
	}

	// 生成最终输出文件
	outputFile, _ := os.Create(fmt.Sprintf("mr-out-%d", task.TaskID))
	for key, values := range intermediate {
		result := reducef(key, values)
		fmt.Fprintf(outputFile, "%v %v\n", key, result)
	}
	outputFile.Close()

	notifyTaskComplete(task.TaskID, ReduceTask)
}

func notifyTaskComplete(taskID int, taskType int) {
	req := TaskCompleteReq{TaskID: taskID, TaskType: taskType}
	reply := TaskCompleteRep{}
	call("Coordinator.TaskComplete", &req, &reply)
}
coordinator部分

对于coordinator.go,首先需要定义任务的种类,这里想到worker要知道是map还是reduce任务,要处理的文件名称,并且写入文件时需要有map和reduce的id,处理时间需要在10s内,那么定义如下Task结构体,任务的类型和状态都用枚举数,任务在coordinactor实例初始化的时候就塞到实例的...task字段内,这里要注意输入的file有多少个,就有多少个map任务,而reduce任务的数量和nReduce有关。任务超时的检查我是用轮询机制,每隔一秒轮询所有任务如果任务状态为正在运行并且时间超时那么把它状态初始化。 

结构体中的字段并不是一下就能全部想出来,也是需要在写处理函数的过程中看需要哪些字段才决定。

type Task struct {
	TaskType    int
	FileName    string
	TaskID      int
	ReduceCount int
	Status      int
	StartTime   time.Time
}

const (
	MapTask = iota
	ReduceTask
	NoTask
)

const (
	Pending   = iota //任务已准备好进行处理,并将由一个空闲的工作器接收
	Active           //任务正在被工作器处理
	Retry            //工作器无法处理任务,任务正在等待将来重试
	Completed        //任务已成功处理
)

type Coordinator struct {
	mu          sync.Mutex
	mapTasks    []Task // 所有 Map 任务
	reduceTasks []Task // 所有 Reduce 任务
	nReduce     int    // Reduce 任务数量
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		nReduce:     nReduce,
		mapTasks:    make([]Task, len(files)),
		reduceTasks: make([]Task, nReduce),
	}

	for i, file := range files {
		c.mapTasks[i] = Task{TaskType: MapTask, FileName: file, TaskID: i, Status: Pending}
	}

	for i := 0; i < nReduce; i++ {
		c.reduceTasks[i] = Task{TaskType: ReduceTask, TaskID: i, Status: Pending}
	}

	// Your code here.
	c.server()
	go c.monitorTimeouts()
	return &c
}

func (c *Coordinator) monitorTimeouts() {
	for {
		time.Sleep(time.Second)
		c.mu.Lock()
		for i := range c.mapTasks {
			if c.mapTasks[i].Status == Active && time.Since(c.mapTasks[i].StartTime) > TaskTimeout {
				c.mapTasks[i].Status = Pending
			}
		}
		for i := range c.reduceTasks {
			if c.reduceTasks[i].Status == Active && time.Since(c.reduceTasks[i].StartTime) > TaskTimeout {
				c.reduceTasks[i].Status = Pending
			}
		}
		c.mu.Unlock()
	}
}

Done方法很简单,所有map任务和reduce任务都是已完成的状态就代表Done

func (c *Coordinator) Done() bool {
	ret := c.allMapTasksDone() && c.allReduceTasksDone()
	return ret
}

func (c *Coordinator) allMapTasksDone() bool {
	for _, task := range c.mapTasks {
		if task.Status != Completed {
			return false
		}
	}
	return true
}

func (c *Coordinator) allReduceTasksDone() bool {
	for _, task := range c.reduceTasks {
		if task.Status != Completed {
			return false
		}
	}
	return true
}

接下来是分发任务的逻辑和任务完成后的回调函数,分发任务注意map任务全部完成了才可以开始reduce任务

//分发任务
func (c *Coordinator) AssignTask(req *TaskReq, reply *TaskRep) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 分配 Map 任务
	for i, task := range c.mapTasks {
		if task.Status == Pending {
			reply.TaskType = MapTask
			reply.FileName = task.FileName
			reply.TaskID = task.TaskID
			reply.ReduceCount = c.nReduce
			c.mapTasks[i].Status = Active
			c.mapTasks[i].StartTime = time.Now()
			return nil
		}
	}
	// 检查是否可以分配 Reduce 任务
	if c.allMapTasksDone() {
		for i, task := range c.reduceTasks {
			if task.Status == Pending {
				reply.TaskType = ReduceTask
				reply.TaskID = task.TaskID
				reply.ReduceCount = c.nReduce
				reply.MapTaskCount = len(c.mapTasks)
				c.reduceTasks[i].Status = Active
				c.reduceTasks[i].StartTime = time.Now()
				return nil
			}
		}
	}
	// 没有任务可分配
	reply.TaskType = NoTask
	return nil
}

//worker完成任务后会回调这个函数
func (c *Coordinator) TaskComplete(req *TaskCompleteReq, reply *TaskCompleteRep) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if req.TaskType == MapTask {
		c.mapTasks[req.TaskID].Status = Completed
	} else if req.TaskType == ReduceTask {
		c.reduceTasks[req.TaskID].Status = Completed
	}

	reply.Success = true
	return nil
}
rpc部分

在rpc.go中,定义worker要调用的rpc方法(要任务,报告任务完成情况)的参数和返回值就行

type TaskReq struct {
}

type TaskRep struct {
	TaskType     int    // 任务类型:Map、Reduce
	FileName     string // Map 任务的输入文件名
	TaskID       int    // 任务编号
	MapTaskCount int    //map任务数量
	ReduceCount  int    // 传入的reducer的数量,用于hash
	Status       int
	StartTime    time.Time
}

type TaskCompleteReq struct {
	TaskID   int
	TaskType int
}

type TaskCompleteRep struct {
	Success bool
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243154.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kafka进阶_1.生产消息

文章目录 一、Controller选举二、生产消息2.1、创建待发送数据2.2、创建生产者对象&#xff0c;发送数据2.3、发送回调2.3.1、异步发送2.3.2、同步发送 2.4、拦截器2.5、序列化器2.6、分区器2.7、消息可靠性2.7.1、acks 02.7.2、acks 1(默认)2.7.3、acks -1或all 2.8、部分重…

SpringBoot多环境配置的实现

前言 开发过程中必然使用到的多环境案例&#xff0c;通过简单的案例分析多环境配置的实现过程。 一、案例 1.1主配置文件 spring:profiles:active: prod server:port: 80801.2多环境配置文件 开发环境 blog:domain: http://localhost:8080测试环境 blog:domain: https:/…

鸿蒙HarmonyOS 地图定位到当前位置 site查询等操作

应用服务Map使用 地图定位 地点查询及导航 周边查询 点位标记定义等 地图定位 前提地图已经能正常显示&#xff0c;若不能显示请大家参考之前的那篇如何显示地图的博文 地图相关的api 位置效果图&#xff1a; module.json5配置权限 "requestPermissions": [{&…

AntFlow 0.11.0版发布,增加springboot starter模块,一款设计上借鉴钉钉工作流的免费企业级审批流平台

AntFlow 0.11.0版发布,增加springboot starter模块,一款设计上借鉴钉钉工作流的免费企业级审批流平台 传统老牌工作流引擎比如activiti,flowable或者camunda等虽然功能强大&#xff0c;也被企业广泛采用&#xff0c;然后也存着在诸如学习曲线陡峭&#xff0c;上手难度大&#x…

Timeline动画「硬切」的问题

1&#xff09;Timeline动画「硬切」的问题 2&#xff09;移动平台纹理压缩格式选择ASTC&#xff0c;美术出图还需遵守POT吗 3&#xff09;如何去掉DOTS Unity.Entities.Graphics创建的BatchRendererGroup的UI相机回调 4&#xff09;Timeline播放动画会产生位移的问题 这是第409…

《设计模式》创建型模式总结

目录 创建型模式概述 Factory Method: 唯一的类创建型模式 Abstract Factory Builder模式 Prototype模式 Singleton模式 最近在参与一个量化交易系统的项目&#xff0c;里面涉及到用java来重构部分vnpy的开源框架&#xff0c;因为是框架的搭建&#xff0c;所以会涉及到像…

【论文阅读】主动推理:作为感知行为的理论

文章目录 主动推理&#xff1a;作为感知行为的理论摘要1.引言2. 主动推理的概念和历史根源3. 主动推理的规范视角—以及它的发展历程 未完待续 主动推理&#xff1a;作为感知行为的理论 Active inference as a theory of sentient behavior 摘要 这篇文章综述了主动推理的历…

React--》如何高效管理前端环境变量:开发与生产环境配置详解

在前端开发中&#xff0c;如何让项目在不同环境下表现得更为灵活与高效&#xff0c;是每个开发者必须面对的挑战&#xff0c;从开发阶段的调试到生产环境的优化&#xff0c;环境变量配置无疑是其中的关键。 env配置文件&#xff1a;通常用于管理项目的环境变量&#xff0c;环境…

【工具插件类教学】在 Unity 中使用 iTextSharp 实现 PDF 文件生成与导出

目录 一、准备工作 1. 安装 iTextSharp 2. 准备资源文件 二、创建 ExportPDFTool 脚本 1、初始化 PDF 文件,设置字体 2、添加标题、内容、表格和图片 三、使用工具类生成 PDF 四、源码地址 在 Unity 项目中,我们有时会需要生成带有文本、表格和图片的 PDF 文件,以便…

【AlphaFold3】开源本地的安装及使用

文章目录 安装安装DockerInstalling Docker on Host启用Rootless Docker 安装 GPU 支持安装 NVIDIA 驱动程序安装 NVIDIA 对 Docker 的支持 获取 AlphaFold 3 源代码获取基因数据库获取模型参数构建将运行 AlphaFold 3 的 Docker 容器 参考 AlphaFold3: https://github.com/goo…

[JAVA]MyBatis框架—获取SqlSession对象

SqlSessionFactory作为MyBatis框架的核心接口有三大特性 SqlSessionFactory是MyBatis的核心对象 用于初始化MyBatis&#xff0c;创建SqlSession对象 保证SqlSessionFactory在应用中全局唯一 1.SqlSessionFactory是MyBatis的核心对象 假设我们要查询数据库的用户信息&#x…

ArkTS学习笔记:ArkTS起步

ArkTS是HarmonyOS的主力应用开发语言&#xff0c;基于TypeScript扩展&#xff0c;强化了静态检查和分析&#xff0c;旨在提升程序稳定性和性能。它采用静态类型&#xff0c;禁止运行时改变对象布局&#xff0c;并对UI开发框架能力进行扩展&#xff0c;支持声明式UI描述和自定义…

JAVA 之 JDBC

JDBC概述 基本介绍 1.JDBC为访问不同的数据库提供了统一的接口&#xff0c;为使用者屏蔽了细节问题。 2.Java程序员使用JDBC,可以连接任何提供了JDBC驱动程序的数据库系统&#xff0c;从而完成对数据库的各种操作。 3.JDBC的基本原理[ 重要 ] 4.模拟JDBC com.lmbc.myjdbc…

用 Python 从零开始创建神经网络(五):损失函数(Loss Functions)计算网络误差

用损失函数&#xff08;Loss Functions&#xff09;计算网络误差 引言1. 分类交叉熵损失&#xff08;Categorical Cross-Entropy Loss&#xff09;2. 分类交叉熵损失类&#xff08;The Categorical Cross-Entropy Loss Class&#xff09;展示到目前为止的所有代码3. 准确率计算…

Redis做分布式锁

&#xff08;一&#xff09;为什么要有分布式锁以及本质 在一个分布式的系统中&#xff0c;会涉及到多个客户端访问同一个公共资源的问题&#xff0c;这时候我们就需要通过锁来做互斥控制&#xff0c;来避免类似于线程安全的问题 因为我们学过的sychronized只能对线程加锁&…

阿里云引领智算集群网络架构的新一轮变革

阿里云引领智算集群网络架构的新一轮变革 云布道师 11 月 8 日~ 10 日在江苏张家港召开的 CCF ChinaNet&#xff08;即中国网络大会&#xff09;上&#xff0c;众多院士、教授和业界技术领袖齐聚一堂&#xff0c;畅谈网络未来的发展方向&#xff0c;聚焦智算集群网络的创新变…

预处理(1)(手绘)

大家好&#xff0c;今天给大家分享一下编译器预处理阶段&#xff0c;那么我们来看看。 上面是一些预处理阶段的知识&#xff0c;那么明天给大家讲讲宏吧。 今天分享就到这里&#xff0c;谢谢大家&#xff01;&#xff01;

ZYNQ程序固化——ZYNQ学习笔记7

一、ZYNQ启动过程 二、 SD卡启动实操 1、对ZYNQ进行配置添加Flash 2、添加SD卡 3、重新生成硬件信息 4、创建vitis工程文件 5、勾选板级支持包 6、对系统工程进行整体编译&#xff0c;生成两个Debug文件&#xff0c;如图所示。 7、插入SD卡&#xff0c;格式化为 8、考入BOOT.…

FPGA实现PCIE采集电脑端视频转SFP光口万兆UDP输出,基于XDMA+GTX架构,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的PCIE方案10G Ethernet Subsystem实现万兆以太网物理层方案 3、PCIE基础知识扫描4、工程详细设计方案工程设计原理框图电脑端视频PCIE视频采集QT上位机XDMA配置及使用XDMA中断模块FDMA图像缓存UDP视频组包发送UDP协议栈MAC…

Mongo数据库集群搭建

目录 1、Mongo集群优势 1.1 高可用性 1.2 水平扩展性 1.3 高性能 1.4 灵活的架构设计 1.5 数据安全 1.6 管理与监控 2、下载指定操作系统版本包 3、部署和验证工作 3.1 准备配置文件及依赖 3.2 启动第一个节点 3.3 部署更多的节点 3.4 初始化副本集 3.5 设置管理…