[MIT6.5840]MapReduce

news2025/1/12 17:38:26

MapReduce

Lab 地址

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文地址

https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf

工作原理

简单来讲,MapReduce是一种分布式框架,可以用来处理大规模数据。该框架抽象了两个接口,分别是MapReduce函数:
在这里插入图片描述

凡是符合这个模式的算法都可以使用该框架来实现并行化,执行流程如下图所示。

在这里插入图片描述

整个框架分为Master和Worker,Master负责分配mapreduce任务,Worker负责向Master申请任务并执行。执行流程如下:

Map阶段:

  • 输入是大文件分割后的一组小文件,通常大小为16~64MB。
  • Worker向Master申请任务,假设得到map任务in0。
  • Worker开始执行map任务,将文件名和文件内容作为参数传入map函数中,得到kv list.
  • 最后Worker将kv list分割成reduceNum份(超参数),要求使得具有相同key的kv对在一份中。可以通过hash值%reduceNum实现分割,然后输出到文件中,下图的0-*

Reduce阶段:

  • 输入当前reduce的序号id,从map阶段的输出中选出*-id的文件,也就是将hash值%reduceNum值相同的kv对取出,这样可以保证具有相同key的kv对只用一次处理。
  • 将所有的kv对根据键值排序,使得相同key的kv对能够连续排列,方便合并。
  • 之后合并相同key的kv对,然后将每个key和其对应的value list输入reduce函数,得到合并的结果,再将其输出到文件中。

在这里插入图片描述

本文介绍了大致思想,详细内容请参考原论文。

代码详解

rpc.go

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import (
  "fmt"
  "os"
  "strconv"
)

const (
  MAP    = "MAP"
  REDUCE = "REDUCE"
  DONE   = "DONE"
)

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ApplyArgs struct {
  WorkerID     int
  LastTaskType string
  LastTaskID   int
}

type ReplyArgs struct {
  TaskId    int
  TaskType  string
  InputFile string
  MapNum    int
  ReduceNum int
}

// Add your RPC definitions here.

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
  s := "/var/tmp/5840-mr-"
  s += strconv.Itoa(os.Getuid())
  return s
}
// 构造文件名
func tmpMapResult(workerID int, taskID int, reduceId int) string {
  return fmt.Sprintf("tmp-worker-%d-%d-%d", workerID, taskID, reduceId)
}

func finalMapResult(taskID int, reduceID int) string {
  return fmt.Sprintf("mr-%d-%d", taskID, reduceID)
}

func tmpReduceResult(workerID int, reduceId int) string {
  return fmt.Sprintf("tmp-worker-%d-out-%d", workerID, reduceId)
}

func finalReduceResult(reduceID int) string {
  return fmt.Sprintf("mr-out-%d", reduceID)
}

worker.go

package mr

import (
  "fmt"
  "hash/fnv"
  "io"
  "log"
  "net/rpc"
  "os"
  "sort"
  "strings"
)

// Map functions return a slice of KeyValue.
type KeyValue struct {
  Key   string
  Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
  h := fnv.New32a()
  h.Write([]byte(key))
  return int(h.Sum32() & 0x7fffffff)
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
  reducef func(string, []string) string) {

  // Your worker implementation here.

  id := os.Getegid()
  // log.Printf("worker %d start working", id)
  lastTaskId := -1
  lastTaskType := ""

loop:
  for {
    args := ApplyArgs{
      WorkerID:     id,
      LastTaskType: lastTaskType,
      LastTaskID:   lastTaskId,
    }

    reply := ReplyArgs{}

    ok := call("Coordinator.ApplyForTask", &args, &reply)
    if !ok {
      fmt.Printf("call failed!\n")
      continue
    }
    // log.Printf("reply: %v", reply)
    lastTaskId = reply.TaskId
    lastTaskType = reply.TaskType
    switch reply.TaskType {
    case "":
      // log.Println("finished")
      break loop
    case MAP:
      // log.Printf("worker %d get map task %d", id, reply.TaskId)
      doMapTask(id, reply.TaskId, reply.InputFile, reply.ReduceNum, mapf)
    case REDUCE:
      // log.Printf("worker %d get reduce task %d", id, reply.TaskId)
      doReduceTask(id, reply.TaskId, reply.MapNum, reducef)
    }
  }
  // uncomment to send the Example RPC to the coordinator.
  // CallExample()

}

// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
  // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
  sockname := coordinatorSock()
  c, err := rpc.DialHTTP("unix", sockname)
  if err != nil {
    log.Fatal("dialing:", err)
  }
  defer c.Close()

  err = c.Call(rpcname, args, reply)
  if err == nil {
    return true
  }

  fmt.Println(err)
  return false
}

func doMapTask(id int, taskId int, filename string, reduceNum int, mapf func(string, string) []KeyValue) {
  file, err := os.Open(filename)
  if err != nil {
    log.Fatalf("%s 文件打开失败! ", filename)
    return
  }
  content, err := io.ReadAll(file)
  if err != nil {
    log.Fatalf("%s 文件内容读取失败! ", filename)
  }
  file.Close()
  kvList := mapf(filename, string(content)) // kv list
  hashedKvList := make(map[int]ByKey)
  for _, kv := range kvList {
    hashedKey := ihash(kv.Key) % reduceNum
    hashedKvList[hashedKey] = append(hashedKvList[hashedKey], kv)
  }

  for i := 0; i < reduceNum; i++ {
    outFile, err := os.Create(tmpMapResult(id, taskId, i))
    if err != nil {
      log.Fatalf("can not create output file: %e", err)
      return
    }
    for _, kv := range hashedKvList[i] {
      fmt.Fprintf(outFile, "%v\t%v\n", kv.Key, kv.Value)
    }
    outFile.Close()
  }
  // log.Printf("worker %d finished map task\n", id)
}

func doReduceTask(id int, taskId int, mapNum int, reducef func(string, []string) string) {
  var kvList ByKey
  var lines []string
  for i := 0; i < mapNum; i++ {
    mapOutFile := finalMapResult(i, taskId)
    file, err := os.Open(mapOutFile)
    if err != nil {
      log.Fatalf("can not open output file %s: %e", mapOutFile, err)
      return
    }
    content, err := io.ReadAll(file)
    if err != nil {
      log.Fatalf("file read failed %s: %e", mapOutFile, err)
      return
    }
    lines = append(lines, strings.Split(string(content), "\n")...)
  }
  for _, line := range lines {
    if strings.TrimSpace(line) == "" {
      continue
    }
    split := strings.Split(line, "\t")
    kvList = append(kvList, KeyValue{Key: split[0], Value: split[1]})
  }
  sort.Sort(kvList)
  outputFile := tmpReduceResult(id, taskId)
  file, err := os.Create(outputFile)
  if err != nil {
    log.Fatalf("can not create output file: %e", err)
    return
  }

  for i := 0; i < len(kvList); {
    j := i + 1
    key := kvList[i].Key
    var values []string
    for j < len(kvList) && kvList[j].Key == key {
      j++
    }
    for k := i; k < j; k++ {
      values = append(values, kvList[k].Value)
    }
    res := reducef(key, values)
    fmt.Fprintf(file, "%v %v\n", key, res)
    i = j
  }
  file.Close()
  // log.Printf("worker %d finished reduce task", id)
}

coordinator.go

package mr

import (
  "fmt"
  "log"
  "math"
  "net"
  "net/http"
  "net/rpc"
  "os"
  "sync"
  "time"
)

type Task struct {
  id        int
  inputFile string
  worker    int
  taskType  string
  deadLine  time.Time
}

type Coordinator struct {
  // Your definitions here.
  mtx        sync.Mutex
  inputFile  []string
  reduceNum  int
  mapNum     int
  taskStates map[string]Task
  todoList   chan Task
  stage      string
}

// Your code here -- RPC handlers for the worker to call.

// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) ApplyForTask(args *ApplyArgs, reply *ReplyArgs) error {
  // process the last task
  if args.LastTaskID != -1 {
    taskId := createTaskId(args.LastTaskID, args.LastTaskType)
    c.mtx.Lock()
    if task, ok := c.taskStates[taskId]; ok && task.worker != -1 { // 排除过期任务
      // log.Printf("worker %d finish task %d", args.WorkerID, task.id)
      if args.LastTaskType == MAP {
        for i := 0; i < c.reduceNum; i++ {
          err := os.Rename(tmpMapResult(task.worker, task.id, i), finalMapResult(task.id, i))
          if err != nil {
            log.Fatalf("can not rename %s: %e", tmpMapResult(task.worker, task.id, i), err)
          }
        }
      } else if args.LastTaskType == REDUCE {
        err := os.Rename(tmpReduceResult(task.worker, task.id), finalReduceResult(task.id))
        if err != nil {
          log.Fatalf("can not rename %s: %e", tmpReduceResult(task.worker, task.id), err)
        }
      }
      delete(c.taskStates, taskId)
      if len(c.taskStates) == 0 {
        c.shift()
      }
    }
    c.mtx.Unlock()
  }
  // assign the new task
  task, ok := <-c.todoList
  if !ok {
    return nil
  }
  reply.InputFile = task.inputFile
  reply.MapNum = c.mapNum
  reply.ReduceNum = c.reduceNum
  reply.TaskId = task.id
  reply.TaskType = task.taskType
  task.worker = args.WorkerID
  task.deadLine = time.Now().Add(10 * time.Second)
  // log.Printf("assign %s task %d to worker %d", task.taskType, task.id, args.WorkerID)
  c.mtx.Lock()
  c.taskStates[createTaskId(task.id, task.taskType)] = task
  c.mtx.Unlock()
  return nil
}

// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
  rpc.Register(c)
  rpc.HandleHTTP()
  //l, e := net.Listen("tcp", ":1234")
  sockname := coordinatorSock()
  os.Remove(sockname)
  l, e := net.Listen("unix", sockname)
  if e != nil {
    log.Fatal("listen error:", e)
  }
  go http.Serve(l, nil)
}
// 改变当前的状态
func (c *Coordinator) shift() {
  // 加锁状态
  if c.stage == MAP {
    // log.Printf("Map Task finished")
    c.stage = REDUCE
    // 分配reduce task
    for i := 0; i < c.reduceNum; i++ {
      task := Task{
        id:       i,
        worker:   -1,
        taskType: REDUCE,
      }
      c.todoList <- task
      c.taskStates[createTaskId(i, REDUCE)] = task
    }
  } else if c.stage == REDUCE {
    close(c.todoList)
    c.stage = DONE
  }
}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
  // Your code here.
  c.mtx.Lock()
  defer c.mtx.Unlock()
  return c.stage == DONE
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
  c := Coordinator{
    mtx:        sync.Mutex{},
    inputFile:  files,
    reduceNum:  nReduce,
    mapNum:     len(files),
    taskStates: make(map[string]Task),
    todoList:   make(chan Task, int(math.Max(float64(nReduce), float64(len(files))))),
    stage:      MAP,
  }

  for i, file := range files {
    task := Task{
      id:        i,
      inputFile: file,
      worker:    -1,
      taskType:  MAP,
    }
    c.todoList <- task
    c.taskStates[createTaskId(i, MAP)] = task
  }
  // 回收任务
  go c.collectTask()
  c.server()
  return &c
}

func createTaskId(id int, taskType string) string {
  return fmt.Sprintf("%d-%s", id, taskType)
}
// worker执行过期后回收任务
func (c *Coordinator) collectTask() {
  for {
    time.Sleep(500 * time.Millisecond)
    c.mtx.Lock()
    if c.stage == DONE {
      c.mtx.Unlock()
      return
    }
    for _, task := range c.taskStates {
      if task.worker != -1 && time.Now().After(task.deadLine) {
        // task is expired
        task.worker = -1
        // log.Printf("task %d is expired", task.id)
        c.todoList <- task
      }
    }
    c.mtx.Unlock()
  }
}

运行说明

mrcoordinator

cd src/main/
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrcoordinator.go pg-*.txt
 

mrworker

cd src/main/
go run mrworker.go wc.so

测试结果

bash test-mr.sh

在这里插入图片描述

MIT6.5840 课程Lab完整项目

https://github.com/Joker0x00/MIT-6.5840-Lab/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1957029.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

英伟达最强劲敌Groq一招绝杀GPU,反超GPT-4o mini2倍,AI大佬Karpathy:直接飞升AGI!

Llama 3.1 405B被吐槽太笨重? 英伟达对手AI新星Groq一招绝杀:上LPU直接速度翻倍,直接让Llama 3.1飞升AGI! Meta 最新发布的 Llama 3.1 405B 的开源让AI圈不平静了! 追捧者感慨"GPT-4o的能力已握在手中”,而批评者反驳说,大体量消耗这么多算力,有些结果跑得还不如GP…

【内网】安装wget

先是去RPM Search 下载了wget-1.24.5-2.1.x86_64.rpm这个包&#xff0c;结果安装的时候报一堆错 [rootlocalhost ~]# rpm -ivh wget-1.24.5-2.1.x86_64.rpm warning: wget-1.24.5-2.1.x86_64.rpm: Header V3 RSA/SHA512 Signature, key ID 29b700a4: NOKEY error: Failed dep…

不同WEB下的的ApplicationContext的选择

依赖 ApplicationContext类型选择 默认情况下&#xff0c;spring通过选择的web端的框架来选择使用哪个ApplicationContext子类&#xff0c;默认情况下我们一般使用spring mvc框架&#xff0c;这个时候AC的实现类为 org.springframework.boot.web.servlet.context.AnnotationC…

docker安装mysql8自动备份脚本

引用&#xff1a;https://blog.csdn.net/leovnay/article/details/140585094 # 创建两个卷 docker volume ls docker volume create mysqlData docker volume create mysqlSQL# 运行容器 docker run -d --namemysql8 -p 3306:3306 -e MYSQL_ROOT_PASSWORDxxx -e TZAsia/Shangh…

Java小抄|Java中的List与Map转换

文章目录 1 List<User> 转Map<User.id,User>2 基础类型的转换&#xff1a;List < Long> 转 Map<Long,Long> 1 List 转Map<User.id,User> Map<Long, User> userMap userList.stream().collect(Collectors.toMap(User::getId, v -> v, …

自闭症儿童上小学教育方法:个性化关怀,引领全面发展

在教育的征程中&#xff0c;为自闭症儿童提供适合他们的小学教育方法至关重要。这些孩子如同独特的星星&#xff0c;需要我们用个性化的关怀去照亮他们的成长之路&#xff0c;引领他们实现全面发展。 个性化关怀是自闭症儿童小学教育的核心。每个孩子都是独一无二的&#xff0c…

钡铼技术PLC网关:实现PLC数据无缝对接MQTT协议

MQTT 协议概述 MQTT 是用于物联网的标准消息传递协议。它被设计为一种非常轻量级的发布/订阅消息传送&#xff0c;非常适合以较小的代码占用量和网络带宽连接远程设备。 PLC网关是一种专门设计用于连接可编程逻辑控制器&#xff08;PLC&#xff09;与其它网络设备或系统的中间…

元器件基础学习笔记——二极管基础

一、二极管基础 二极管是用半导体材料(硅、硒、锗等)制成的一种电子器件&#xff0c;具有单向导电性&#xff0c;是现代电子技术的基石。它在电子电路中扮演着至关重要的角色&#xff0c;通过与电阻、电容、电感等元器件的合理连接&#xff0c;能够实现整流、检波、限幅、稳压等…

python实现GUI版图片锐化小工具

目录 效果展示代码脚本代码 效果展示 锐化前&#xff1a; 锐化后代码 sharpen_img.py import tkinter as tk from tkinter import filedialog from PIL import Image, ImageTk,ImageFilter import os class ImageViewerApp:def __init__(self, root):self.root rootself.r…

空气净化器CE认证简介

空气净化器中有多种不同的技术和介质&#xff0c;使它能够向用户提供清洁和安全的空气。由于空气净化器本身就和我们的生活息息相关。因此对于产品本身的安全性是消费者首先需要考虑的&#xff0c;另一方面就是其在净化空气上的效率和效果。如今国内的空气净化器随着工艺上的不…

ts 下使用 interactjs 的时候,事件类型该如何定义 InteractEvent

ts 下使用 interactjs 的时候&#xff0c;事件类型该如何定义 InteractEvent 一、问题 interactjs 是一个很好用的给元素添加拖动事件的插件&#xff0c;它可以实现如下的效果。 其官网是 https://interactjs.io/ vitetsvue3 项目中用到了 interactjs 这个库&#xff0c;但在…

42度酒和52度酒哪个好?

我们平时在聚会的时候都会喝酒&#xff0c;而在买酒时通常会看到超市或者白酒专卖店里的白酒大多都是52度或者是42度的&#xff0c;而喝酒的人当中大多对白酒没有一定的了解&#xff0c;所以在接到买酒任务的时候&#xff0c;当看到一款酒有两种度数的时候&#xff0c;就有些拿…

元宇宙营销,能够持续下去吗?

Photo by Oberon Copeland veryinformed.com on Unsplash 一场完美风暴让一些行业观察人士怀疑&#xff0c;元宇宙这个曾经营销界最喜欢的闪亮对象&#xff0c;是否正在维持生命。像ChatGPT这样的生成式人工智能(AI)已经接管了技术炒作周期&#xff1b;关键平台的参与度微乎其…

为什么要加密源代码?六款好用的源代码加密软件推荐

在当今数字化时代&#xff0c;源代码是许多企业和开发人员最重要的资产之一。无论是保护知识产权、维护竞争优势&#xff0c;还是确保应用程序的安全性&#xff0c;加密源代码都是至关重要的措施。以下将详细探讨为什么需要加密源代码&#xff0c;并推荐六款好用的源代码加密软…

手把手教你暗通道先验去雾算法

0&#xff0c;流程 暗通道先验去雾算法&#xff08;Dark Channel Prior, DCP&#xff09;是一种基于图像的去雾技术&#xff0c;由Kaiming He等人在2009年提出。这种算法利用了大气散射模型&#xff0c;通过估计大气光和图像的传输图来去除雾的影响。以下是暗通道先验去雾算法…

PLC网关:开启工业4.0时代的智能工厂之路

PLC即可编程逻辑控制器&#xff0c;是工业自动化领域的核心设备&#xff0c;广泛应用于各个工业领域。从PLC问世至今&#xff0c;一直表现出强大的生命力和高速增长态势&#xff0c;2020年全球PLC市场的销售量已经达到了百亿RMB级别。 随着行业智能化、数字化推广&#xff0c;…

Docker从入门到实践教程(电子版)

前言 Docker 是个伟大的项目&#xff0c;它彻底释放了虚拟化的威力&#xff0c;极大降低了云计算资源供应的成本&#xff0c;同时让应用的 分发、测试、部署和分发都变得前所未有的高效和轻松&#xff01; 本电子书既适用于具备基础 Linux 知识的 Docker 初学者&#xff0c;也…

hot100-5-普通数组

53最大子数组和 56合并区间 238除自身以外数组的乘积 用前缀乘积和后缀乘积 41缺失的第一个正数 189轮转数组

文本编辑三剑客(awk)

awk作为和sed、grep同级的文本处理命令&#xff0c;也又强大的文本分析功能&#xff0c;同样&#xff0c;它的原理并不困难&#xff0c;但操作很多且很杂&#xff0c;可以通过不同的需求进行自定义搭配。 awk工作原理 awk和另外两个命令的工作原理又不相同&#xff0c;当用户…

关于使用Postman在请求https网址没有响应,但是用浏览器有响应的问题解决

一、问题描述 使用postman调用正式环境的公共接口&#xff0c;无需鉴权&#xff0c;但是产生了返回状态码200&#xff0c;但是data中却无数据&#xff0c;如下 {"code": "200","message": "操作成功","data": {"qr_c…