分布式系统---MapReduce实现(Go语言)

news2024/11/17 17:38:45

一、说明

  • 本次实验是基于MIT-6.824的课程,详情请参见官网主页
  • 下载源代码

二、MapReduce原理

2.1 经典的分布式模型

MapReduce是经典的分布式模型。通过Map函数和Reduce函数实现。

分布式计算,就是利用多台机器,完成一个任务。关于分布式计算,几个经典的例子就是单词词频统计。假设现在有1000MB的文本文件需要进行词频统计,如果只有1台机器,处理此大文件可能需要10s。

如果现在有10台机器,每台机器负责处理100MB的数据,处理完之后(并行),再进行汇总,那效率将会极大提升。

2.2 MapReduce的过程

  • MapReduce模型中,有一个master,负责分配任务,有多个worker,负责map任务和reduce任务。当map处理好任务后,输出中间结果{key, value}。一般来说,每个reduce会负责固定key的任务。reduce拿到中间结果继续处理,最后再整合输出。
    在这里插入图片描述

三. Go语言实现

3.1 Worker进行和Coordinate进行RPC通信

  • RPC的参数
type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

需要注意,遍历要大写,因为在Go语言中大写首字母代表public。

  • RPC服务器的接口
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

客户端能够远程调用该函数,并得到结果

  • 客户端远程调用
func CallExample() {
	args := ExampleArgs{}
	args.X = 99
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Coordinator.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

3.2 一些变量的定义

  • 我们这里使用了状态机的方式,设置了TaskMap,TaskReduce,TaskWait,TaskEnd几种状态,分别表示Map任务,Reduce任务,目前还有任务正在执行需要等待,任务全部完成
const (
	TaskMap      = 1  // Map任务
	TaskReduce   = 2  // Reduece任务
	TaskWait     = 3  // 暂时无任务
	TaskEnd      = 4  // 所有任务已完成
	FixedTimeOut = 15 // 等待15s,Worker没回复,就说明出错了
)

var Debug bool = false

// Map任务
type MapTask struct {
	FileName string // 需要处理的文件
	MapID    int    // 当前map的编号
	NReduce  int    // 需要分成几块
}

type ReduceTask struct {
	FileName string // 该任务属于哪一个文件
	MapID    int    // 哪个map的输出结果
	ReduceID int    // 当前reduce的编号
}

// Debug模式下才需要打印
func Dprintf(format string, data ...interface{}) {
	if Debug {
		fmt.Printf(format, data...)
	}
}

3.2 Coordinator

  • Coorinator负责整个MapReduce过程的管理,因此需要记录任务相关的信息
type Coordinator struct {
	// Your definitions here.
	mutex       sync.Mutex
	mapTaskQ    []MapTask
	redTaskQ    []ReduceTask // [{0-0}, {0-1}, {0-2}, ..., {0-9}, {1-0},...{1-9}]
	mapTaskingQ []MapTask    // 正在执行map任务
	redTaskingQ []ReduceTask // 正在执行reduce任务
	nReduce     int
	isDone      bool
}

mutex:因为程序涉及到多线程,因此需要加锁
mapTaskQ:记录目前的Map任务,实际上就是文件名
redTaskQ:记录当前的Reduce任务,因为每个map会输出对应nReduce的中间文件,实验给的提示是通过ihash(key)函数,将不同的key映射到不同的reduce上,因此每个reduce应该会涉及到处理多个中间文件
mapTaskingQ:正在执行的map任务,当目前还有正在执行的map任务时,但是又有闲置的map机器在请求任务,此时coordinator应该让其等待,因为可能执行任务的map机器出现故障,此时就需要找到新的map继续执行该任务。此外,如果当目前还有正在执行的map任务,也不应该进入到reduce阶段
redTaskingQ:正在执行的reduce任务,理由和上面类似,都是为了防止发生故障

  • Coordinator初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapTaskingQ = make([]MapTask, 0)
	c.nReduce = nReduce

	// 每个文件对应一个任务
	for i, file := range files {
		task := MapTask{
			FileName: file,
			MapID:    i,
			NReduce:  nReduce,
		}

		c.mapTaskQ = append(c.mapTaskQ, task)
	}

	c.isDone = false

	Dprintf("Master working...\n")

	go c.tasking2task()

	c.server()
	return &c
}

实际上就是将文件转换为map任务

  • AskTask Worker请求任务的RPC调用
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	// 还有map任务
	if len(c.mapTaskQ) > 0 {
		reply.TaskType = TaskMap
		reply.MapTask = c.assignMapTask()

		return nil
	}

	// 有正在执行的map任务
	if len(c.mapTaskingQ) > 0 {
		reply.TaskType = TaskWait
		Dprintf("some MapTasks are not completed, please wait...\n")

		return nil
	}

	// 有reduce任务
	if len(c.redTaskQ) > 0 {
		reply.TaskType = TaskReduce
		redTasks := c.assignRedTask()
		reply.RedTasks = append(reply.RedTasks, redTasks...)

		return nil
	}

	// 有正在执行的reduce任务
	if len(c.redTaskingQ) > 0 {
		reply.TaskType = TaskWait
		Dprintf("some ReduceTasks are not completed, please wait...\n")

		return nil
	}

	// reduce任务也处理完了
	reply.TaskType = TaskEnd
	c.isDone = true

	Dprintf("ALL tasks done,msg: closing -> worker...\n")

	return nil
}
  • 分配Map任务
func (c *Coordinator) assignMapTask() MapTask {
	task := c.mapTaskQ[0]
	c.mapTaskQ = append(c.mapTaskQ[:0], c.mapTaskQ[1:]...)
	c.mapTaskingQ = append(c.mapTaskingQ, task)

	Dprintf("assign MapTask, fileName..%v, mapId..%v, mReduce..%v\n",
		task.FileName, task.MapID, task.NReduce)

	return task
}
  • 分配reduce任务
// 分配reduce任务
func (c *Coordinator) assignRedTask() []ReduceTask {
	redTasks := make([]ReduceTask, 0)
	reduceId := c.redTaskQ[0].ReduceID

	// 取第一个reduce task
	for i := 0; i < len(c.redTaskQ); {
		if c.redTaskQ[i].ReduceID != reduceId {
			i++
			continue
		}

		task := c.redTaskQ[i]
		c.redTaskingQ = append(c.redTaskingQ, task)
		redTasks = append(redTasks, task)
		c.redTaskQ = append(c.redTaskQ[:i], c.redTaskQ[i+1:]...)
		Dprintf("assign ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
			task.FileName, task.MapID, task.ReduceID)
	}

	return redTasks
}
  • worker完成,回复coordinator
func (c *Coordinator) TaskDone(args *TaskDoneReply, reply *ExampleReply) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	switch args.TaskType {
	case TaskMap:
		c.mapTaskingQDeleter(args.MapTask, args.RedTasks)
		break
	case TaskReduce:
		c.redTaskingQDeleter(args.RedTasks)
		break
	default:
		break
	}

	return nil
}
  • map任务完成,从mapTaskingQ中移除,同时还应该将map处理好的中间结果,存放到redTaskQ
func (c *Coordinator) mapTaskingQDeleter(mapTask MapTask, redTasks []ReduceTask) {
	Dprintf("MapTask done, fileName..%v, mapId..%v, nReduce..%v\n",
		mapTask.FileName, mapTask.MapID, mapTask.NReduce)

	// 找到 mapTask.MapID的任务,然后剔除
	for i := 0; i < len(c.mapTaskingQ); i++ {
		if c.mapTaskingQ[i].MapID == mapTask.MapID {
			c.mapTaskingQ = append(c.mapTaskingQ[:i], c.mapTaskingQ[i+1:]...)
			break
		}
	}

	// 将该map的结果放到reduceTask
	// reTasks = [{i-0},...,{i-9}]
	for _, v := range redTasks {
		c.redTaskQ = append(c.redTaskQ, v)
		Dprintf("add ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
			v.FileName, v.MapID, v.ReduceID)
	}
}
  • reduce任务完成,从redTaskingQ移除
func (c *Coordinator) redTaskingQDeleter(redTasks []ReduceTask) {
	for i := 0; i < len(redTasks); i++ {
		task := redTasks[i]
		Dprintf("ReduceTask done, fileName..%v, mapId..%v, reduceId..%v\n",
			task.FileName, task.MapID, task.ReduceID)
	
		for j := 0; j < len(c.redTaskingQ); {
			if c.redTaskingQ[j].ReduceID == task.ReduceID {
				c.redTaskingQ = append(c.redTaskingQ[:j], c.redTaskingQ[j+1:]...)
				continue
			}
			j++
		}
	}
}
  • coordinator还需要定期检查是否有map或者reduce宕机,如果有宕机,应该重新由空闲的机器处理。这里我们采用设置一个等待时间FixedTimeOut=15,如果超过这个时间正在执行的任务还没有完成,应该将他们移动到任务队列中
func (c *Coordinator) tasking2task() {
	for {
		time.Sleep(FixedTimeOut * time.Second)
		c.mutex.Lock()
		// defer c.mutex.Unlock()

		// 还有正在执行的map任务,将它添加到待完成任务队列 mapTaskQ
		if len(c.mapTaskingQ) != 0 {
			for i, _ := range c.mapTaskingQ {
				c.mapTaskQ = append(c.mapTaskQ, c.mapTaskingQ[i])
			}

			c.mapTaskingQ = []MapTask{}
		}

		if len(c.redTaskingQ) != 0 {
			Dprintf("redTaskingQ..%v\n", c.redTaskingQ)
			for i, _ := range c.redTaskingQ {
				c.redTaskQ = append(c.redTaskQ, c.redTaskingQ[i])
			}

			c.redTaskingQ = []ReduceTask{}
		}
		c.mutex.Unlock()
	}
}

3.3 Worker

  • 因为我们是单机,所以这里需要模拟多机,我们会让Worker一直请求任务,知道所有任务处理完成
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		reply := CallAskTask()

		switch reply.TaskType {
		case TaskMap: // Map任务
			doMapTask(mapf, reply.MapTask)
			break
		case TaskReduce: // Reduce任务
			doReduceTask(reducef, reply.RedTasks)
			break
		case TaskWait: // 此时没有任务
			time.Sleep(1 * time.Second)
			break
		case TaskEnd: // 所有任务全部完成
			Dprintf("任务全部完成,关机...\n")
			return
		default:
			fmt.Println("reply.TaskType: ", reply.TaskType)
			Dprintf("Unknown fault\n")
			break
		}
	}
}
  • 请求任务
func CallAskTask() AskTaskReply {
	args := AskTaskArgs{}
	reply := AskTaskReply{}

	call("Coordinator.AskTask", &args, &reply)

	return reply
}
  • 处理map任务
func doMapTask(mapf func(string, string) []KeyValue, mapTask MapTask) {
	Dprintf("doing MapTask, fileName..%v, mapId..%v, nReduce..%v\n",
		mapTask.FileName, mapTask.MapID, mapTask.NReduce)

	fileName := mapTask.FileName

	file, err := os.Open(fileName)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}

	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}

	file.Close()

	// 通过map函数,统计每个单词
	kvs := mapf(fileName, string(content))
	sort.Sort(ByKey(kvs))

	// 将所有的单词都放到对应的 reduces[idx]中 idx = [0,...,NReduce)
	reduces := make([][]KeyValue, mapTask.NReduce)
	for _, kv := range kvs {
		idx := ihash(kv.Key) % mapTask.NReduce
		reduces[idx] = append(reduces[idx], kv)
	}

	// reduce任务
	redTasks := []ReduceTask{} // [{i-0}, ... ,{i-9}]

	for idx, reduce := range reduces {

		// reduce任务
		redTask := ReduceTask{
			FileName: mapTask.FileName,
			MapID:    mapTask.MapID,
			ReduceID: idx,
		}

		redTasks = append(redTasks, redTask)

		output := "mr-" + strconv.Itoa(redTask.MapID) + "-" + strconv.Itoa(redTask.ReduceID)
		file, err = os.Create(output)
		if err != nil {
			log.Fatalf("cannot create %v", output)
		}
		defer file.Close()

		enc := json.NewEncoder(file)

		for _, kv := range reduce {
			enc.Encode(&kv)
		}

	}

	Dprintf("MapTask to ReduceTask done, fileName..%v, mapId..%v\n", mapTask.FileName, mapTask.MapID)

	CallTaskDone(TaskMap, mapTask, redTasks)
}
  • 处理reduce任务
func doReduceTask(reducef func(string, []string) string, redTasks []ReduceTask) {

	intermediate := []KeyValue{}

	for _, v := range redTasks {
		Dprintf("doing ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
			v.FileName, v.MapID, v.ReduceID)
		fileName := "mr-" + strconv.Itoa(v.MapID) + "-" + strconv.Itoa(v.ReduceID)
		file, err := os.Open(fileName)
		if err != nil {
			log.Fatalf("cannot open %v", fileName)
		}
		defer file.Close()

		// 反序列化JSON格式文件
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			err := dec.Decode(&kv)
			if err != nil {
				//log.Print("read file done...")
				break
			}

			intermediate = append(intermediate, KeyValue{kv.Key, kv.Value})
		}
	}

	sort.Sort(ByKey(intermediate))

	oname := "mr-out-" + strconv.Itoa(redTasks[0].ReduceID)
	ofile, _ := os.Create(oname)
	defer ofile.Close()

	i := 0
	for i < len(intermediate) {
		// 找到每一组相同的key,统一放到一个slice
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}

		// ["1","1",...,"1"]
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}

		// 执行Reduce函数,计算key出现的次数
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}

	Dprintf("ReduceTask done, fileName..%v, mapId..%v\n", redTasks[0].FileName, redTasks[0].MapID)

	CallTaskDone(TaskReduce, MapTask{}, redTasks)
}
  • 任务处理完,需要回应coordinator
func CallTaskDone(taskType int, mapTask MapTask, redTasks []ReduceTask) {
	args := TaskDoneReply{}
	args.TaskType = taskType
	args.MapTask = mapTask
	args.RedTasks = redTasks
	reply := ExampleReply{}
	call("Coordinator.TaskDone", &args, &reply)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/503313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法第一天力扣---2651. 计算列车到站时间

1.题目要求&#xff1a; 给你一个正整数 arrivalTime 表示列车正点到站的时间&#xff08;单位&#xff1a;小时&#xff09;&#xff0c;另给你一个正整数 delayedTime 表示列车延误的小时数。 返回列车实际到站的时间。 注意&#xff0c;该问题中的时间采用 24 小时制。 示…

让ChatGPT猜你喜欢——ChatGPT后面的推荐系统

Chat GPT的大热&#xff0c;让人们的视线又一次聚焦于“人工智能”领域。通过与用户持续对话的形式&#xff0c;更加丰富的数据会不断滚动“雪球”&#xff0c;让Chat GPT的回答变得越来越智能&#xff0c;越来越接近用户最想要的答案。ChatGPT能否颠覆当下的推荐系统范式&…

第三章 灰度变换与空间滤波

第三章 灰度变换与空间滤波 3.1背景知识 ​ 空间域指图像平面本身。变换域的图像处理首先把一幅图像变换到变换域&#xff0c;在变换域中进行处理&#xff0c;然后通过反变换把处理结果返回到空间域。空间域处理主要分为灰度变换与空间滤波。 3.1.1 灰度变换和空间滤波基础 …

cmcc_simplerop

1,三连 2&#xff0c;IDA分析 溢出点&#xff1a; 偏移&#xff1a;0x144(错误) 这里动态重新测试了一下偏移&#xff1a; 正确偏移&#xff1a;0x20 3&#xff0c;找ROP 思路&#xff1a; 1、找系统调用号 2、ROPgadget找寄存器 3、写入/bin/sh ROPgadget --binary simpler…

7-2使用Redis构建任务队列

目录 7-2使用Redis构建任务队列 第1关&#xff1a;先进先出任务队列 1、rpush/lpush命令&#xff1a;rpush(name,values[values…]) 2、blpop&#xff1a;blpop(keys, timeout)和 lpop/rpop&#xff1a;lpop(name) 删并返回删除值 3、lpushx/rpushx&#xff1a;lpushx(name…

使用CKKS全同态求近似倒数(近似乘法逆元)

求倒数的算法 两个数互为倒数&#xff0c;是说这两个数乘起来等1.比如a和b互为倒数&#xff0c;那么ab1. 5的倒数是0.2&#xff0c;我们可以很简单的求出来&#xff0c;但是如何在密文域中求一个数的倒数呢&#xff1f; 文章《An investigation of complex operations with …

C#自适应布局

注意事项&#xff1a;不要在Form1中添加任何布局&#xff0c;页面背景不设置图片 步骤&#xff1a; 1、在项目中添加AutoWindowsSize.cs类&#xff0c;内容如下&#xff1a; using System; using System.Collections.Generic; using System.ComponentModel; using System.Da…

2.2 掌握 NumPy 矩阵与通用函数

2.2 掌握 NumPy 矩阵与通用函数 2.2.1 创建NumPy矩阵创建NumPy矩阵矩阵的运算矩阵的属性 2.2.2 掌握ufunc函数1、常用的ufunc函数运算2、ufunc函数的广播机制 2.2.1 创建NumPy矩阵 创建NumPy矩阵 1、使用mat函数创建矩阵&#xff1a; matr1 np.mat(“1 2 3;4 5 6;7 8 9”) 2…

casbin轻量级的基于配置的授权框架

简介 Casbin是一个强大的、高效的开源访问控制框架&#xff0c;其权限管理机制支持多种访问控制模型。 Casbin提供了一个执行者 根据提供给执行者的策略和模型文件验证传入的请求。再根据对应的配置授权策略&#xff0c;验证请求判断释放那些行动。 在 Casbin 中, 访问控制模…

由于找不到vcomp140.dll无法继续执行代码,解决方法全攻略

如何解决找不到vcomp140.dll错误&#xff1f;在使用某些软件或者游戏的时候&#xff0c;你可能会遇到下面的错误提示&#xff1a;“由于找不到vcomp140.dll&#xff0c;无法继续执行代码”。这个错误提示通常表示你的电脑缺少一个或多个DLL文件&#xff0c;而这些文件是软件和游…

「字节跳动测试开发面经」一二三面+hr面+超级全资料+复习资料

​ 说在前面&#xff0c;面试时最好不要虚报工资。本来字节跳动是很想去的&#xff0c;几轮面试也通过了&#xff0c;最后没offer&#xff0c;自己只想到几个原因&#xff1a; 1、虚报工资&#xff0c;比实际高30%&#xff1b; 2、有更好的人选&#xff0c;这个可能性不大&am…

【Linux】软件包管理器 yum和编辑器-vim的基本使用

文章目录 一、yum背景知识1.商业生态2.开源生态3.Linux软件生态本土化 二、yum的基本使用1.什么是软件包2.查看软件包3.安装软件4.卸载软件5.rzsz 三、vim的基本使用1.vim的基本概念2.vim的基本操作3.vim命令模式命令集4.vim末(底)行模式命令集5.操作总结 四、简单vim配置1.vim…

C++学习day--10 条件判断、分支

1、if语句 if 语句的三种形态 形态1&#xff1a;如果。。。那么。。。 #include <iostream> using namespace std; int main( void ) { int salary; cout << " 你月薪多少 ?" ; cin >> salary; if (salary < 20000) { cout <&…

浅谈整除分块

例题一 ∑ i 1 n ⌊ n i ⌋ \sum_{i1}^n \lfloor\frac n i\rfloor\\ i1∑n​⌊in​⌋ 首先很容易想到直接求解&#xff0c;对于较大的数据&#xff0c; O ( n ) O(n) O(n)做法无法通过。 注意到函数 y ⌊ n x ⌋ y\lfloor\dfrac n x\rfloor y⌊xn​⌋的图像如下&#xff1a…

Hive语言

一、Hive的DDL语言&#xff08;数据库、数据表的增删改查操作) 二、Hive的DQL语言&#xff08;数据库查询语言&#xff09; 2.1Hive七子句 聚合函数&#xff1a;count()、sum()、max()、min()、avg()可以单独使用。(缩写&#xff1a;cs mm a) 2.1.1 分区查询与分区裁剪 SELEC…

OpenGL(九)——颜色

目录 一、前言 二、简单光源 三、光照场景 3.1 创建光源 3.2 光源顶点着色器 3.3 光源片段着色器 3.4 物体片段着色器 3.5 光源位置 一、前言 我们看到的物体颜色是通过光照在物体&#xff0c;然后反射到人眼成像&#xff0c;具体而言是物体不能吸收的颜色。如白光照射…

C++学习day--09 字符串比较、运算符

1、项目练习 第 1 节 项目需求、项目实现 项目实现&#xff1a; #include <iostream> #include <Windows.h> #include <string> using namespace std; int main( void ) { string name; string pwd; std::cout << " 请输入账号&am…

GPT-4的免费使用方法分享

目录 方法1&#xff1a;使用Ora.sh的LLM应用 方法2&#xff1a;使用https://steamship.com 方法3&#xff1a;使用https://nat.dev 方法4&#xff1a;http://tdchat.vip 方法5&#xff1a;使用Poe网站或App 方法6&#xff1a;使用 Opencat App 方法7:使用https://Huggin…

uniApp实现公农日历相互转换、公历、农历、阳历、阴历、calendar

文章目录 效果图1、组件1.1、html部分1.2、JavaScript部分1.3、style部分 2、使用组件3、总结 效果图 1、组件 1.1、html部分 <template><view v-if"isCalendar" class"calendar_box"><view v-show"!isTime" class"btn_ca…

Linux服务器使用supervisorctl命令部署Java服务详解

我们公司采用supervisorctl命令运行Java -jar包&#xff0c;觉得还是很方便的&#xff0c;此篇文章教你如何使用supervisorctl从零部署Java服务 安装jdk 首先肯定是下载安装Java的运行环境 jdk 下载地址&#xff1a;https://www.oracle.com/java/technologies/downloads/#jav…