【Go语言实战】(25) 分布式算法 MapReduce

news2025/1/19 17:33:28

MapReduce

写在前面

身为大数据专业的学生,其实大学我也多多少少接触过mapreduce,但是当时觉得这玩意太老了,觉得这和php一样会被时代淘汰。只能说当时确实太年轻了,没有好好珍惜那时候的学习资源…

现在回过头来看mapreduce,发现技术这东西和语言不一样,技术万变不离其中,而语言只是实现技术的一种方法而已,用什么语言其实并不重要。

原论文地址:MapReduce: Simplified Data Processing on Large Clusters

总览

这次 lab1 的 mapreduce,其实是在 搜索引擎tangseng 的时候,需要用来构建倒排索引。所以会和课程上所要求的不太一样,这里也没有使用rpc调用,而是为了与项目统一,便改用了grpc进行调用。

mapreduce工作原理

这里需要注意几点

  • 不同的Map任务之间不会进行通信
  • 不同的Reduce任务之间也不会发生任何信息交换
  • 所有的数据交换都是通过MapReduce框架自身去实现的

那么如何对 map tasks 和 reduce tasks 进行合理的协调呢?这里我们就要引入两个角色,master 和 worker,在原论文中,对这两者的并没有非常明确的定义,但我们可以摘录并提炼原论文对这两个角色的描述:

master :

  • The master picks idle workers and assigns each one a map task or a reduce task.

worker :

  • The map worker who is assigned a map task reads the contents of the corresponding input split.

  • The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function

这里我们先说一下几个状态枚举值:

  • idle :空闲状态
  • in-progress :进行状态
  • completed :完成状态

这三个枚举值代表着每一个 map task 和 reduce task 的状态,标识着这些 task 是未开始,进行中,还是已完成。

那么 master 其实就是选择空闲的 worker 节点,为每一个空闲的 worker 节点分配 map task 或者 reduce task。而 worker 看似分成了 map worker 和 reduce worker,但其实这两个 worker 都是一样,只是看 master 分配的是 map task 还是 reduce task。这样我们的 map 和 reduce 的数据传送就非常清晰了。

MapReduce整体工作流

接下来,我们来详细讲解一下这几个重要的角色

Worker

首先我们先定义一个 MapReduce 的任务,也就是我们 worker 需要用到参数

type MapReduceTask struct {
	Input         string   `json:"input"`         // 输入的文件
	TaskState     State    `json:"task_state"`    // 状态
	NReducer      int      `json:"n_reducer"`     // reducer 数量
	TaskNumber    int      `json:"task_number"`   // 任务数量
	Intermediates []string `json:"intermediates"` // map 之后的文件存储地址
	Output        string   `json:"output"`        // output的输出地址
}

接着再定义 State 枚举值

type MasterTaskStatus int

const (
	Idle       MasterTaskStatus = iota + 1 // 未开始
	InProgress                             // 进行中
	Completed                              // 已完成
)

接下来我们的 Worker 函数就很简单了

func Worker(ctx context.Context, mapf func(string, string) []*types.KeyValue, reducef func(string, []string) *roaring.Bitmap) {
	// 启动worker
  for {
		task, err := getTask(ctx) // worker从master获取任务
		if err != nil {
			log.LogrusObj.Error("Worker-getTask", err)
			return
		}
		// 拿到task之后,根据task的state,map task交给mapper, reduce task交给reducer
		// 额外加两个state,让 worker 等待 或者 直接退出
		switch task.TaskState {
		case int64(types.Map):
			mapper(ctx, task, mapf)
		case int64(types.Reduce):
			reducer(ctx, task, reducef)
		case int64(types.Wait):
			time.Sleep(5 * time.Second)
		case int64(types.Exit):
			return
		default:
			return
		}
	}
}

至于 mapper 和 reducer 如何实现的,先桥豆麻袋一下,下文在 map 和 reduce 中会给出答案,如何从 master 中拿到 task 呢?这就涉及到 worker 和 master 的通信。本来打算用 RPC 通信的,但为了项目的整体统一,还是用了 gRPC 。

创建一个proto文件

syntax="proto3";
option go_package = "/index_platform;";

message MapReduceTask{
	// @inject_tag:form:"input" uri:"input"
	string input = 1;
	// @inject_tag:form:"task_state" uri:"task_state"
	int64 task_state = 2;
	// @inject_tag:form:"n_reducer" uri:"n_reducer"
	int64 n_reducer = 3;
	// @inject_tag:form:"task_number" uri:"task_number"
	int64 task_number = 4;
	// @inject_tag:form:"intermediates" uri:"intermediates"
	repeated string intermediates = 5;
	// @inject_tag:form:"output" uri:"output"
	string output = 6;
}

message MasterTaskCompletedResp {
	// @inject_tag:form:"code" uri:"code"
	int64 code=1;
	// @inject_tag:form:"message" uri:"message"
	string message=2;
}

service MapReduceService {
	rpc MasterAssignTask(MapReduceTask) returns (MapReduceTask);
	rpc MasterTaskCompleted(MapReduceTask) returns (MasterTaskCompletedResp);
}

定义两个 RPC 函数,MasterAssignTask 用来接受 master 分配的 task MasterTaskCompleted 完成 task 之后,对这个 task 进行标识,意味着该任务结束。

所以我们 worker 接受任务的通信如下

func getTask(ctx context.Context) (resp *mapreduce.MapReduceTask, err error) {
	// worker从master获取任务
	taskReq := &mapreduce.MapReduceTask{}
	resp, err = rpc.MapReduceClient.MasterAssignTask(ctx, taskReq)

	return
}

当完成任务时,通过gRPC发送给master

func TaskCompleted(ctx context.Context, task *mapreduce.MapReduceTask) (reply *mapreduce.MasterTaskCompletedResp, err error) {
	// 通过RPC,把task信息发给master
	reply, err = rpc.MapReduceClient.MasterTaskCompleted(ctx, task)

	return
}

那么 master 是如何分配任务的?接下来我们来介绍一下 master 节点。

Master

我们定义这么一个 Master 服务的结构体

type MasterSrv struct {
	TaskQueue     chan *types.MapReduceTask // 等待执行的task
	TaskMeta      map[int]*types.MasterTask // 当前所有task的信息
	MasterPhase   types.State               // Master的阶段
	NReduce       int                       // Reduce的数量
	InputFiles    []string                  // 输入的文件
	Intermediates [][]string                // Map任务产生的R个中间文件的信息

	mapreduce.UnimplementedMapReduceServiceServer // gRPC服务实现接口
}

那么当我们 New 一个 Master 服务的时候,顺便创建 map tasks 任务

func NewMaster(files []string, nReduce int) *MasterSrv {
	m := &MasterSrv{
		TaskQueue:     make(chan *types.MapReduceTask, int(math.Max(float64(nReduce), float64(len(files))))),
		TaskMeta:      map[int]*types.MasterTask{},
		MasterPhase:   types.Map,
		NReduce:       nReduce,
		InputFiles:    files,
		Intermediates: make([][]string, nReduce),
	}
	m.createMapTask()
	return m
}

创建 map task 任务

func (m *MasterSrv) createMapTask() {
  // 把输入的files都形成一个task元数据塞到queue中
	for idx, filename := range m.InputFiles { 
		taskMeta := types.MapReduceTask{
			Input:      filename,
			TaskState:  types.Map, // map节点
			NReducer:   m.NReduce,
			TaskNumber: idx,
		}
		m.TaskQueue <- &taskMeta
		m.TaskMeta[idx] = &types.MasterTask{
			TaskStatus:    types.Idle, // 状态为 idle ,等待worker节点来领取 task
			TaskReference: &taskMeta,
		}
	}
}

创建 reduce task 任务

func (m *MasterSrv) createReduceTask() {
	m.TaskMeta = map[int]*types.MasterTask{}
	for idx, files := range m.Intermediates {
		taskMeta := types.MapReduceTask{
			TaskState:     types.Reduce, // reduce 阶段
			NReducer:      m.NReduce,
			TaskNumber:    idx,
			Intermediates: files,
		}
		m.TaskQueue <- &taskMeta
		m.TaskMeta[idx] = &types.MasterTask{
			TaskStatus:    types.Idle, // 找到空闲的 worker
			TaskReference: &taskMeta,
		}
	}
}

MasterAssignTask 等待 worker 来领取 task

func (m *MasterSrv) MasterAssignTask(ctx context.Context, req *mapreduce.MapReduceTask) (reply *mapreduce.MapReduceTask, err error) {
	mu.Lock()
	defer mu.Unlock()
	task := &types.MapReduceTask{
		Input:         req.Input,
		TaskState:     types.State(req.TaskState),
		NReducer:      int(req.NReducer),
		TaskNumber:    int(req.TaskNumber),
		Intermediates: req.Intermediates,
		Output:        req.Output,
	}
	if len(m.TaskQueue) > 0 {
		// 如果queue中还有任务的话就发出去
		*task = *<-m.TaskQueue
		m.TaskMeta[task.TaskNumber].TaskStatus = types.InProgress // 修改worker的状态为进行中
		m.TaskMeta[task.TaskNumber].StartTime = time.Now() // 记录task的启动时间
	} else if m.MasterPhase == types.Exit {
		*task = types.MapReduceTask{
			TaskState: types.Exit,
		}
	} else {
		// 没有task就让worker等待
		*task = types.MapReduceTask{TaskState: types.Wait}
	}
	
  // 返回该任务的状态,因为发出去就是给task了,这个状态已经改变了,worker可以工作了
	reply = &mapreduce.MapReduceTask{
		Input:         task.Input,
		TaskState:     int64(task.TaskState),
		NReducer:      int64(task.NReducer),
		TaskNumber:    int64(task.TaskNumber),
		Intermediates: task.Intermediates,
		Output:        task.Output,
	}

	return
}

那么如果 task 把任务都做完了,master 应该怎么回应呢?

func (m *MasterSrv) MasterTaskCompleted(ctx context.Context, req *mapreduce.MapReduceTask) (resp *mapreduce.MasterTaskCompletedResp, err error) {
	resp = new(mapreduce.MasterTaskCompletedResp)
	resp.Code = e.ERROR
	resp.Message = "map finish successfully"
	// 更新task状态
	if req.TaskState != int64(m.MasterPhase) || m.TaskMeta[int(req.TaskNumber)].TaskStatus == types.Completed {
		// 因为worker写在同一个文件这次盘上对于重复的结果要丢弃
		return
	}
	m.TaskMeta[int(req.TaskNumber)].TaskStatus = types.Completed
	err = m.processTaskResult(req) // always success haha and hope u so :)
	if err != nil {
		resp.Code = e.ERROR
		resp.Message = "map finish failed"
		return
	}

	return
}

处理任务的结果,如果是 map 完成后就变成 reduce 阶段,reduce 之后就是 all done. 😃

// processTaskResult 处理任务结果
func (m *MasterSrv) processTaskResult(task *mapreduce.MapReduceTask) (err error) {
	switch task.TaskState {
	case int64(types.Map):
		// 收集intermediate信息
		for reduceTaskId, filePath := range task.Intermediates {
			m.Intermediates[reduceTaskId] = append(m.Intermediates[reduceTaskId], filePath)
		}
		if m.allTaskDone() {
			// 获取所有的map task后,进入reduce阶段
			m.createReduceTask()
			m.MasterPhase = types.Reduce
		}
	case int64(types.Reduce):
		if m.allTaskDone() {
			// 获得所有的reduce task后,进去exit阶段
			m.MasterPhase = types.Exit
		}
	}

	return
}

介绍完master之后,我们具体来看一下map的具体行为。

Map

在 map 中,我们抽离出一个 mapper,具体的map函数可根据实际情况进行修改,然后将map function传入mapper中进行实际的map动作,我们读取每一个文件,然后把输出的结果都放到 intermediates 中,并且根据 task 所设定的 NReducer 也就是 reducer 数 进行hash ,将结果均匀分到每个中间文件中。

func mapper(ctx context.Context, task *mapreduce.MapReduceTask, mapf func(string, string) []*types.KeyValue) {
	// 从文件名读取content
	content, err := os.ReadFile(task.Input)
	if err != nil {
		log.LogrusObj.Error("mapper", err)
		return
	}
	// 将content交给mapf,缓存结果
	intermediates := mapf(task.Input, string(content))

	// 缓存后的结果会写到本地磁盘,并切成R份
	// 切分方式是根据key做hash
	buffer := make([][]*types.KeyValue, task.NReducer)
	for _, intermediate := range intermediates {
		slot := ihash(intermediate.Key) % task.NReducer
		buffer[slot] = append(buffer[slot], intermediate)
	}
	mapOutput := make([]string, 0)
	for i := 0; i < int(task.NReducer); i++ {
		mapOutput = append(mapOutput, writeToLocalFile(int(task.TaskNumber), i, &buffer[i]))
	}
	// R个文件的位置发送给master
	task.Intermediates = mapOutput
	_, err = TaskCompleted(ctx, task) // 完成后,给master发送消息,map阶段结束
	if err != nil {
		fmt.Println("mapper-TaskCompleted", err)
	}

	return
}

具体的 Map方法,由于是用于搜索引擎,所以这里是建立倒排索引

func Map(filename string, contents string) (res []*types.KeyValue) {
	res = make([]*types.KeyValue, 0)
	lines := strings.Split(contents, "\r\n") // 分行
	var inputData *model.InputData
	for _, line := range lines[1:] {
		docStruct, _ := doc2Struct(line) // 字符串转 doc struct
		tokens, err := analyzer.GseCutForBuildIndex(docStruct.DocId, docStruct.Body)
		if err != nil {
			return
		}
		for _, v := range tokens {
      res = append(res, &types.KeyValue{Key: v.Token, Value: cast.ToString(v.DocId)}) // token:docId 倒排索引
		}
	}

	return
}

至此map就已经完成了,是不是很简单,其实具体的map和reduce并不难,难的是如何平衡调度,接下来我们来看看reduce是如何怎么的。

Reduce

和map一样,我们抽离出一个reducer,然后把具体的 reduce 传进去,当然还有一个shuffle过程,这里进行排序会减少后面的reduce计算。可以少计算几次。

func reducer(ctx context.Context, task *mapreduce.MapReduceTask, reducef func(string, []string) *roaring.Bitmap) {
	// 先从filepath读取intermediate的KeyValue
	intermediate := *readFromLocalFile(task.Intermediates)
	// 根据kv排序 shuffle 过程
	sort.Sort(types.ByKey(intermediate))

	dir, _ := os.Getwd()
	outName := fmt.Sprintf("%s/mr-tmp-%d.%s",
		dir, task.TaskNumber, consts.InvertedBucket)
	invertedDB := storage.NewInvertedDB(outName)
	output := roaring.NewBitmap()
	var outByte []byte

	i := 0
	for i < len(intermediate) {
		// 将相同的key放在一起分组合并
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		var values []string
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		// 交给reducef,拿到结果
		output = reducef(intermediate[i].Key, values)

		// 落倒排索引库
		outByte, _ = output.MarshalBinary()
		_ = invertedDB.StoragePostings(intermediate[i].Key, outByte)
		i = j
	}

	task.Output = outName
	_, err := TaskCompleted(ctx, task) // 完成后,给master发送消息,reduce阶段结束
	if err != nil {
		fmt.Println("reducer-TaskCompleted", err)
		return
	}
}

具体的Reduce,其实就是把相同的key的value聚合在一起。比如

after map:

{"apple":1}
{"apple:"2}
{"poizon":3}

after reduce:

{"apple":{1,2}}
{"poizon":{3}}

具体实现如下所示:

func Reduce(key string, values []string) *roaring.Bitmap {
	docIds := roaring.New()
	for _, v := range values {
		docIds.AddInt(cast.ToInt(v))
	}
	return docIds
}

最终 output 输出

output

以上就是我对6.824这个课程的lab1的所有理解了,并且运用到了 tangseng 搜索引擎中。

具体代码实现地址在 https://github.com/CocaineCong/tangseng/app/mapreduce 中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1064368.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

聊聊分布式架构——RPC通信原理

目录 RPC通信的基本原理 RPC结构 手撸简陋版RPC 知识点梳理 1.Socket套接字通信机制 2.通信过程的序列化与反序列化 3.动态代理 4.反射 思维流程梳理 码起来 服务端时序图 服务端—Api与Provider模块 客户端时序图 RPC通信的基本原理 RPC&#xff08;Remote Proc…

【算法练习Day13】二叉树的层序遍历翻转二叉树对称二叉树

​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;练题 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录 二叉树的层序遍历翻转二叉树…

安装Ubuntu提示:系统找不到指定的文件。

今天我删除Ubuntu后重新下载&#xff0c;发现报错&#xff0c;错误信息如下&#xff1a; 这是因为系统没有卸载干净而导致的。 解决办法&#xff1a; 第一步&#xff1a; ##查询当前已安装的系统 wsl.exe --list --all 执行结果&#xff1a; 第二步&#xff1a; ##注销当前…

【GSEP202303 C++】1级 长方形面积

[GSEP202303 一级] 长方形面积 题目描述 小明刚刚学习了如何计算长方形面积。他发现&#xff0c;如果一个长方形的长和宽都是整数&#xff0c;它的面积一定也是整数。现在&#xff0c;小明想知道如果给定长方形的面积&#xff0c;有多少种可能的长方形&#xff0c;满足长和宽…

BF算法详解(JAVA语言实现)

目录 BF算法的介绍 图解 JAVA语言实现 BF算法的时间复杂度 BF算法的介绍 BF算法&#xff0c;即暴力(Brute Force)算法&#xff0c;是普通的模式匹配算法&#xff0c;BF算法的思想就是将目标串S的第一个字符与模式串T的第一个字符进行匹配&#xff0c;若相等&#xff0c;则继…

C++设计模式-桥接(Bridge)

目录 C设计模式-桥接&#xff08;Bridge&#xff09; 一、意图 二、适用性 三、结构 四、参与者 五、代码 C设计模式-桥接&#xff08;Bridge&#xff09; 一、意图 将抽象部分与它的实现部分分离&#xff0c;使它们都可以独立地变化。 二、适用性 你不希望在抽象和它…

[笔记] Microsoft Windows网络编程《三》网际协议

文章目录 前言3.1 IPv43.1.1 寻址3.1.1.1 单播3.1.1.2 多播(组播)3.1.1.3 广播 3.1.2 IPv4 管理协议&#xff08;ARP&#xff0c;ICMP&#xff0c;IGMP&#xff09;ARPICMPIGMP 3.1.3 Winsock 中的IPv4 寻址 3.2 IPv63.2.1 寻址3.2.1.1 单播链接——本地地址站点——本地地址&a…

ipa文件怎么把应用上架到苹果ios系统下载的App Store商城

注册为苹果开发者&#xff1a;首先&#xff0c;您需要注册为苹果开发者。前往苹果开发者网站&#xff08;https://developer.apple.com/&#xff09;&#xff0c;点击"Enroll"按钮&#xff0c;并按照相关步骤注册和付费&#xff08;开发者账号需要年度费用&#xff0…

【Java 进阶篇】使用 JDBCTemplate 执行 DQL 语句详解

在前面的文章中&#xff0c;我们已经学习了如何使用 Spring 的 JDBCTemplate 执行 DML&#xff08;Data Manipulation Language&#xff09;操作&#xff0c;包括插入、更新和删除操作。现在&#xff0c;让我们来深入了解如何使用 JDBCTemplate 执行 DQL&#xff08;Data Query…

SpringCloud Alibaba - Seata 四种分布式事务解决方案(TCC、Saga)+ 实践部署(下)

目录 一、Seata 分布式解决方案 1.1、TCC 模式 1.1.1、TCC 模式理论 对比 TCC 和 AT 模式的一致性和隔离性 TC 的工作模型 1.2.2、TCC 模式优缺点 1.2.3、TCC 模式注意事项&#xff1a;空回滚 1.2.4、TCC 模式注意事项&#xff1a;业务悬挂 1.2.5、实现 TCC 模式 案例…

MySQL数据库基础回顾与复习一

MySQL数据库 一、原理定义概念 定义 数据库(Database)是按照数据结构来组织、存储和管理数据的建立在计算机存储设备上的仓库 数据库是长期储存在计算机内、有组织的、可共享的数据集合 分类&#xff1a; &#xff08;1&#xff09;非结构化数据&#xff1a; 数据相对来讲没…

Spring Cloud Gateway网关中各个过滤器的作用与介绍

文章目录 1. Route To Request URL Filter&#xff08;路由过滤器&#xff09;2. Gateway Filter&#xff08;全局过滤器&#xff09;3. Pre Filter&#xff08;前置过滤器&#xff09;4. Post Filter&#xff08;后置过滤器&#xff09;5. Error Filter&#xff08;错误过滤器…

【刷题笔记10.6】LeetCode:汉明距离

LeetCode&#xff1a;汉明距离 一、题目描述 两个整数之间的汉明距离是指这两个数字对应二进制位不同的位置的数目。 给你两个整数x 和 y&#xff0c;计算并返回他们之间的汉明距离。 二、分析及代码实现 对于汉明距离问题我们其实可以将其转换为&#xff1a;计算x 和 y按…

U盘作为启动盘安装苹果OS X操作系统

如何制作 macOS USB启动盘&#xff1f;如何创建可引导的 macOS 安装器&#xff1f;接下来就为大家带来可引导的苹果电脑 macOS 系统U盘启动盘制作教程。U盘是我们在工作和生活中的好帮手&#xff0c;能储存和传递数据文件&#xff0c;重要的是&#xff0c;U盘还可以制作成苹果电…

leetcode - 365周赛

一&#xff0c;2873.有序三元组中的最大值 I ​ 该题的数据范围小&#xff0c;直接遍历&#xff1a; class Solution {public long maximumTripletValue(int[] nums) {int n nums.length;long ans 0;for(int i0; i<n-2; i){for(int ji1; j<n-1; j){for(int kj1; k<…

矩阵键盘的扫描原理与基础应用

基础知识 原理图 首先需要先将 J5 跳帽放到1和2之间。 表示选择的是矩阵键盘。 简化原理图 扫描原理&#xff1a; 以左上角按键为例。 先向 R1 输出低电平&#xff0c;向 R2&#xff0c;R3&#xff0c;R4 输出高电平。 再然后向 C1&#xff0c;C2&#xff0c;C3&#xff…

在Linux中软链接和硬链接的区别是什么?

2023年10月6日&#xff0c;周五晚上 目录 软链接(SymbolicLink):硬链接(HardLink):区别: 软链接(SymbolicLink): 软链接本身只是一个指向其他文件或目录的指针,不占用任何磁盘空间。软链接的修改或删除不会影响原文件。软链接可以指向不同文件系统中的文件。 硬链接(HardLink…

Cookie和Session详解以及结合生成登录效果

目录 引言 1.Cookie中的数据从哪来数据长啥样&#xff1f; 2.Cookie有什么作用&#xff1f; 3.cookie与session的工作关联&#xff1f; 4.Cookie到哪去&#xff1f; 5.Cookie如何存&#xff1f; 6.Session 7.Cookie与Session的关联与区别 8.通过代码理解 8.1 相关代码 8.2…

c++学习之 继承的方式

在C中&#xff0c;继承方式&#xff08;或继承访问权限&#xff09;有三种&#xff1a;public、protected 和 private&#xff0c;它们决定了派生类&#xff08;子类&#xff09;对基类&#xff08;父类&#xff09;成员的访问权限&#xff0c;它们之间的区别如下&#xff1a; …

局部放电发生因素与局部放电试验的重要性

局部放电发生的几个因素&#xff1a;   ①电场过于集中于某点&#xff1b;   ②固体介质有气泡&#xff0c;有害杂质未除净&#xff1b;   ③油中含水、含气、有悬浮微粒&#xff1b;   ④不同的介质组合中&#xff0c;在界面处有严重的电场畸变。   局部放电试验的重…