MIT6.5840-2023-Lab1: MapReduce

news2025/1/22 9:02:33

前置知识

MapReduce:Master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
Map阶段:被分配了 map 任务的 worker 程序读取相关的输入数据片段,生成并输出中间 k/v 对,并缓存在内存中。
Reduce阶段:所有 map 任务结束,reduce 程序使用 RPC 从 map worker 所在主机的磁盘上读取缓存数据,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起,reduce 进行操作后输出为文件。
image.png

实验内容

实现一个分布式 MapReduce,由两个程序(coordinator 和 worker)组成。只有一个 coordinator 和一个或多个并行执行的 worker 。在真实系统中, worker 会运行在多台不同的机器上,但在本 lab 中将在一台机器上运行所有 worker 。 worker 将通过 RPC 与 coordinator 通话。每个 worker 进程都会向 coordinator 请求 task,从一个或多个文件中读取 task 输入,执行 task,并将 task 输出写入一个或多个文件。 coordinator 应该注意到,如果某个 worker 在合理的时间内(本 lab 使用 10 秒)没有完成任务,就会将相同的 task 交给另一个 worker。
rpc举例:https://pdos.csail.mit.edu/6.824/notes/kv.go
lab内容:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
Impl:mr/coordinator.go、mr/worker.go、mr/rpc.go
总结一下
对于 Coordiantor:

  • map 任务初始化;
  • rpc handler:回应worker分配任务请求、回应worker任务完成通知;
  • 自身状态控制,处理 map/reduce 阶段,还是已经全部完成;
  • 任务超时重新分配;

对于 Worker:

  • 给 coordinator 发送 rpc 请求分配任务;
  • 给 coordinator 发送 rpc 通知任务完成;
  • 自身状态控制,准确来说是 coordinator 不需要 worker 工作时,通知 worker 结束运行;

具体code见:https://github.com/BeGifted/MIT6.5840-2023

实验环境

OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64

概要设计

所设计的Coordinator、Task、rpc消息格式:

type WorkerArgs struct {
	WorkerId    int
	WorkerState int // init\done\fail
	Task        *Task
}

type WorkerReply struct {
	WorkerId    int
	WorkerState int // init\done\fail
	Task        *Task
}

type Coordinator struct {
	// Your definitions here.
	MapTaskChan    chan *Task
	ReduceTaskChan chan *Task
	NumReduce      int // reduce num
	NumMap         int // map num
	NumDoneReduce  int // reduce done num
	NumDoneMap     int // map done num
	State          int // map\reduce\done
	mu             sync.Mutex
	Timeout        time.Duration
	MapTasks       map[int]*Task
	ReduceTasks    map[int]*Task
}

type Task struct {
	TaskId    int
	TaskType  int // map\reduce
	TaskState int // int\run\done
	NReduce   int // nReduce
	StartTime time.Time

	Input []string
}

const (
	StateMap    = 0
	StateReduce = 1
	StateDone   = 2
)

const (
	TaskStateInit = 0
	TaskStateRun  = 1
	TaskStateDone = 2
)

const (
	TaskTypeMap    = 0
	TaskTypeReduce = 1
)

const (
	WorkerStateInit = 0
	WorkerStateDone = 1
	WorkerStateFail = 2
)

(TODO)WorkerState 出现 fail 的原因主要是在文件无法打开或读取上,如果是在处理 map 任务时出现 fail,那只有可能是原文件丢失了;如果是 reduce 任务时出现 fail,表示中间文件丢失,需要运行某个特定的 map 任务重新生成,然后再重新开始该 reduce 任务。当然,不实现这个也不会影响 test。

主要流程

创建 Coordinator

创建 Coordinator 并且初始化,将需要处理的数据片段放入 MapTaskChan 信道。这里将单个文件视作一个数据片段进行处理,也就是说有 len(files) 个 map 任务。

func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{}

    // Your code here.
    c.NumMap = len(files)
    c.NumReduce = nReduce
    c.MapTaskChan = make(chan *Task, len(files))
    c.ReduceTaskChan = make(chan *Task, nReduce)
    c.MapTasks = make(map[int]*Task)
    c.ReduceTasks = make(map[int]*Task)
    c.NumDoneMap = 0
    c.NumDoneReduce = 0
    c.State = StateMap
    c.Timeout = time.Duration(time.Second * 10)
    for i, file := range files {
        input := []string{file}
        task := Task{
            TaskId:    i,
            TaskType:  TaskTypeMap,
            TaskState: TaskStateInit,
            Input:     input,
            NReduce:   nReduce,
            StartTime: time.Now(),
        }
        c.MapTaskChan <- &task
        c.MapTasks[i] = &task
    }

    c.server()
    return &c
}

运行 Worker

Worker 主要处理两类任务:map 和 reduce。这两类任务通过 rpc 与 Coordinator 通信获取。
map 任务处理:

if task.TaskType == TaskTypeMap {
    filename := task.Input[0]
    intermediate := []KeyValue{}
    file, err := os.Open(filename)
    if err != nil {
        log.Fatalf("cannot open %v", filename)
        continue
    }
    content, err := ioutil.ReadAll(file)
    if err != nil {
        log.Fatalf("cannot read %v", filename)
        continue
    }
    file.Close()
    // log.Println("mapf")
    // log.Println(task.TaskId)
    kva := mapf(filename, string(content))
    intermediate = append(intermediate, kva...)

    // sort.Sort(ByKey(intermediate))

    ReduceSplit := make(map[int][]KeyValue)
    for _, kv := range intermediate {
        ReduceSplit[ihash(kv.Key)%task.NReduce] = append(ReduceSplit[ihash(kv.Key)%task.NReduce], kv)
    }

    for i := 0; i < task.NReduce; i++ {
        oname := fmt.Sprintf("mr-%d-%d.tmp", task.TaskId, i)
        ofile, _ := os.Create(oname)
        enc := json.NewEncoder(ofile)
        for _, kv := range ReduceSplit[i] {
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatalf("cannot encode %v", kv)
                break
            }
        }
        ofile.Close()
    }

    // Task Done
    args.Task = task
    TaskDone(&args)
}

reduce 任务处理:

if task.TaskType == TaskTypeReduce {
    var kva ByKey
    for _, filename := range task.Input {
        file, err := os.Open(filename)
        if err != nil {
            log.Fatalf("cannot open %v", filename)
            file.Close()
            continue
        }

        dec := json.NewDecoder(file)
        for {
            var kv KeyValue
            if err := dec.Decode(&kv); err != nil {
                break
            }
            kva = append(kva, kv)
        }
        file.Close()
    }

    sort.Sort(kva)

    i := 0
    oname := fmt.Sprintf("mr-out-%d", task.TaskId)
    ofile, _ := os.Create(oname)
    for i < len(kva) {
        j := i + 1
        for j < len(kva) && kva[j].Key == kva[i].Key {
            j++
        }
        values := []string{}
        for k := i; k < j; k++ {
            values = append(values, kva[k].Value)
        }
        output := reducef(kva[i].Key, values)

        fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
        i = j
    }

    // Task Done
    args.Task = task
    TaskDone(&args)
}

WorkerHandler

给 worker 分配 map/reduce 任务,取决于阶段任务是否全部完成,当阶段任务全部完成,coordinator 的状态也需要更新。这个过程全局加锁。
处理 map 阶段:

if c.State == StateMap {
    select {
    case reply.Task = <-c.MapTaskChan:
        reply.Task.StartTime = time.Now()
        reply.Task.TaskState = TaskStateRun
    default:
        for _, mapTask := range c.MapTasks {
            if mapTask.TaskState == TaskStateRun && time.Since(mapTask.StartTime) > c.Timeout {
                mapTask.StartTime = time.Now()
                reply.Task = mapTask
                return nil
            }
        }
    }
}

处理 reduce 阶段:

if c.State == StateReduce {
    select {
    case reply.Task = <-c.ReduceTaskChan:
        reply.Task.StartTime = time.Now()
        reply.Task.TaskState = TaskStateRun
    default:
        for _, reduceTask := range c.ReduceTasks {
            if reduceTask.TaskState == TaskStateRun && time.Since(reduceTask.StartTime) > c.Timeout {
                reduceTask.StartTime = time.Now()
                reply.Task = reduceTask
                return nil
            }
        }
    }
}

需要注意的是,除了这两个阶段外还有 StateDone 阶段,即 reduce 任务都执行完毕了,coordinator 还没完全回收,此时 worker 还在请求分配任务,这时候就应该通知 worker 停止。

DoneHandler

coordinator 处理任务完成的通知。全程加锁。在这里更新 task/coordinator 状态。

func (c *Coordinator) DoneHandler(args *WorkerArgs, reply *WorkerReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	task := args.Task
	if task.TaskType == TaskTypeMap {
		if task.TaskState == TaskStateRun {
			task.TaskState = TaskStateDone
			c.MapTasks[task.TaskId].TaskState = TaskStateDone
			c.NumDoneMap++
		}
	} else if task.TaskType == TaskTypeReduce {
		if task.TaskState == TaskStateRun {
			task.TaskState = TaskStateDone
			c.ReduceTasks[task.TaskId].TaskState = TaskStateDone
			c.NumDoneReduce++
		}
	}

	if c.State == StateMap {
		if c.NumDoneMap == c.NumMap {
			c.State = StateReduce
			for i := 0; i < c.NumReduce; i++ {
				input := []string{}
				for j := 0; j < c.NumMap; j++ {
					input = append(input, fmt.Sprintf("mr-%d-%d.tmp", j, i))
				}
				task := Task{
					TaskId:    i,
					TaskType:  TaskTypeReduce,
					TaskState: TaskStateInit,
					NReduce:   c.NumReduce,
					StartTime: time.Now(),
					Input:     input,
				}
				c.ReduceTaskChan <- &task
				c.ReduceTasks[i] = &task
			}
		}
	} else if c.State == StateReduce {
		if c.NumDoneReduce == c.NumReduce {
			c.State = StateDone
		}
	}
	return nil
}

实验结果

bash test-mr-many.sh 10

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1290164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LeetCode刷题-链表】--92.反转链表II

92.反转链表II /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* ListNode(int val, ListNode next) { this.val val; this.next next; }* }*/ cla…

Java第二十一章网络通信

一、网络程序设计基础 1、局域网与互联网 为了实现两台计算机的通信&#xff0c;必须用一个网络线路连接两台计算机&#xff0c;如下图所示。 2、网络协议 1.IP协议 IP指网际互连协议&#xff0c;Internet Protocol的缩写&#xff0c;是TCP/IP体系中的网络层协议。设计IP的目的…

基于NIQE算法的图像无参考质量评价算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 空域NSS特征提取 4.2 图像块选取 4.3 MVG模型 4.4 NIQE指标 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022a 3.部分核心程序 clc; clear; close all; …

Python推导式详细讲解

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在Python中&#xff0c;推导式是一种简洁而强大的语法特性&#xff0c;可以用来创建列表、集合、字典等数据结构。本文将深入探讨Python中的三种推导式&#xff1a;列表推导式、集合推导式和字典推导式&#xff…

pytorch中的transpose用法

注意&#xff1a;维数从0开始&#xff0c;0维 1维2维…,负数代表从右往左数&#xff0c;-1代表第一维&#xff0c;以此类推 import torch import numpy as np# 创建一个二维数组 arr torch.tensor([[[1, 2],[3, 4]],[[5, 6],[7, 8]]]) print("原始数组&#xff1a;"…

resnet 图像分类的项目

1. 项目文件 文件下载资源&#xff1a;resnet 图像分类的项目代码 本章利用reset34 作图像分类&#xff0c;包括计算训练集和测试集的loss、accuracy曲线&#xff0c;对训练好的网络在训练集测试集上求混淆矩阵 data 文件为训练集测试集&#xff0c;图像按照文件夹摆放inferenc…

电源滤波器如何检测?ATECLOUD-POWER电源自动测试软件如何助力?

电源滤波器常用来对电源中的纹波和干扰信号进行滤波&#xff0c;从而确保元器件不受损坏&#xff0c;是保证系统稳定性的重要方法。因此电源滤波器测试是非常重要的&#xff0c;通过检测来评估其质量、性能和稳定性&#xff0c;从而使电源滤波器可以稳定工作&#xff0c;进行滤…

HarmonyOS 修改App的默认加载的界面(ArkTS版本)(十七)

根据鸿蒙系统APP的应用生命周期结构&#xff08;鸿蒙4.0开发笔记之ArkTS语法基础之应用生命周期&#xff09;来看。 1、首先在roject/entry/src/main/ets/entryability/EntryAbility.ts文件中找到UI加载函数&#xff1a;onWindowStageCreate(…){…}&#xff0c;然后找到windo…

Jupyter Notebook工具

Jupyter Notebook 是一个交互式的笔记本环境&#xff0c;允许用户以网页形式编写和分享代码、文本、图像以及其它多媒体内容。它支持超过 40 种编程语言&#xff0c;最常用的是 Python。 以下是 Jupyter Notebook 工具的一些特点和用法&#xff1a; 1. 特点&#xff1a; 交互式…

Ubuntu安装过程记录

软件准备 硬件 Acer电脑&#xff0c;AMD a6-440m芯片 64g优盘一个&#xff0c;实际就用了不到5g。 Ubuntu &#xff1a;官网 下载Ubuntu桌面系统 | Ubuntu 下载桌面版Ubuntu 22.04.3 LTS LTS属于稳定版 u盘系统盘制作软件 Rufus &#xff1a;Rufus - 轻松创建 USB 启动…

寻找峰值00

题目链接 寻找峰值 题目描述 注意点 数组可能包含多个峰值&#xff0c;在这种情况下&#xff0c;返回 任何一个峰值 所在位置即可对于所有有效的 i 都有 nums[i] ! nums[i 1]可以假设 nums[-1] nums[n] -∞ 解答思路 可以根据二分查找保证在O(log n)的时间复杂度找到峰…

西工大计算机学院计算机系统基础实验一(函数编写1~10)

还是那句话&#xff0c;千万不要慌&#xff0c;千万不要着急&#xff0c;耐下性子慢慢来&#xff0c;一步一个脚印&#xff0c;把基础打的牢牢的&#xff0c;一样不比那些人差。回到实验本身&#xff0c;自从​​​​​​按照西工大计算机学院计算机系统基础实验一&#xff08;…

Qt-Q_OBJECT宏使用与“无法解析的外部符号qt_metacall/metaObject/qt_metacast“

有时候我们编写Qt类的时候&#xff0c;修改代码时直接加上Q_OBJECT宏&#xff0c;然后直接构建&#xff0c;会报如下错误&#xff1a; 这里的几个函数的声明是由Q_OBJECT宏引入的&#xff0c;而其对应的实现是由moc实现的&#xff0c;如果我们更新了代码但是没有执行qmake&…

自媒体创作辅助工具有哪些?四款必备图文工具推荐

自媒体创作需要有哪些辅助工具&#xff0c;今天我们要探讨的话题是自媒体图文工具&#xff0c;这是我们在打造引人入胜内容时的得力助手。在这个信息过剩的时代&#xff0c;图文内容已经成为自媒体传播的核心。为了使我们的内容脱颖而出&#xff0c;我们需要一些专业的工具来提…

大话数据结构-查找-多路查找树

注&#xff1a;本文同步发布于稀土掘金。 7 多路查找树 多路查找树&#xff08;multi-way search tree&#xff09;&#xff0c;其每个结点的孩子可以多于两个&#xff0c;且每一个结点处可以存储多个元素。由于它是查找树&#xff0c;所有元素之间存在某种特定的排序关系。 …

回溯-组合总和

LeetCode链接 本题k相当于树的深度&#xff0c;9&#xff08;因为整个集合就是9个数&#xff09;就是树的宽度。 例如 k 2&#xff0c;n 4的话&#xff0c;就是在集合[1,2,3,4,5,6,7,8,9]中求 k&#xff08;个数&#xff09; 2, n&#xff08;和&#xff09; 4的组合。 …

分享一个Python网络爬虫数据采集利器

前言 你是否曾为获取重要数据而感到困扰&#xff1f;是否因为数据封锁而无法获取所需信息&#xff1f;是否因为数据格式混乱而头疼&#xff1f;现在&#xff0c;所有这些问题都可以迎刃而解。让我为大家介绍一款强大的数据收集平台——亮数据Bright Data。 作为世界领先的数据…

单周爆售150w+,“不是羽绒服买不起,而是军大衣更有性价比”

拼多多收盘市值超过阿里&#xff0c;成在美中概股市值第一。 截至美股收盘&#xff08;11月30日&#xff09;&#xff0c;拼多多收盘市值超过阿里巴巴&#xff0c;成为在美中概股中的市值第一股。拼多多收涨4.03%&#xff0c;报147.44美元&#xff0c;市值1959亿美元&#xff…

数据中心的操作系统——kubernets

操作系统的功能和模块与 Kubernetes 的功能和模块做了一个对比&#xff1a; Kubernetes 作为数据中心的操作系统还是主要管理数据中心里面的四种硬件资源&#xff1a;CPU、内存、存储、网络。 对于 CPU 和内存这两种计算资源的管理&#xff0c;我们可以通过 Docker 技术完成。…