[每周一更]-(第133期):Go中MapReduce架构思想的使用场景

news2025/3/16 15:58:19

在这里插入图片描述

文章目录

      • **MapReduce 工作流程**
      • Go 中使用 MapReduce 的实现方式:
      • **Go MapReduce 的特点**
      • **哪些场景适合使用 MapReduce?**
      • 使用场景
        • 1. 数据聚合
        • 2. 数据过滤
        • 3. 数据排序
        • 4. 数据转换
        • 5. 数据去重
        • 6. 数据分组
        • 7. 数据统计
        • 8.**统计文本中单词出现次数**
          • **代码实现**
      • MapReduce vs. 扇入/扇出
        • 示例1:爬取多个网页
        • 示例2:多个 goroutine 计算结果,并聚合
      • 参考
      • 注意事项

新年开工,2025重新出发

为什么需要 MapReduce

在 Go 中,虽然没有内置的 MapReduce 框架,但我们可以利用 Go 的并发特性(如 goroutines 和 channels)来实现 MapReduce。

在 Go 语言中,MapReduce 是一种编程模型,用于处理和生成大规模数据集。它将任务分解为两个主要阶段:Map(映射)和 Reduce(归约),并通过并行处理提高效率。MapReduce 模型最初由 Google 提出,广泛应用于大数据处理、分布式计算等领域。

它的核心思想是将问题分解成多个较小的子问题并行处理,然后将结果合并。MapReduce 分为两个主要步骤:

  1. Map 阶段:将输入数据映射到中间结果。这个阶段将输入数据拆分成小块,分配给不同的处理单元,并对每个数据项应用一个映射函数。
  2. Reduce 阶段:将 Map 阶段的中间结果进行合并。通常是通过聚合或汇总中间结果,生成最终输出。

MapReduce 工作流程

  1. 输入数据:将大规模数据分成多个小块。
  2. Map(映射):对数据进行并行处理,并生成中间结果。
  3. Shuffle(洗牌,可选):对中间结果进行归类,按 key 组织数据。
  4. Reduce(归约):合并和处理 Map 阶段的中间结果,得出最终结果。

Go 中使用 MapReduce 的实现方式:

Go 提供了 goroutine 和 channel,这使得它非常适合实现并行计算的场景。一个简单的 Go 实现通常会使用以下步骤:

  1. Map:通过 goroutine 处理每个数据块。
  2. Shuffle(可选):将中间结果通过 channel 或其他方式传递到 Reduce 阶段。
  3. Reduce:聚合结果,得到最终输出。

通过 Go 的并发模型,可以利用多个 CPU 核心实现 MapReduce 的并行计算。

Go MapReduce 的特点

  1. 高并发
    • 通过 goroutine 并行执行 Map 和 Reduce 操作,提升计算效率。
    • Go 的 goroutine 轻量级,支持大规模并发执行 Map 任务,不会像 Java 线程那样占用大量内存。
  2. 无锁数据传输
    • channel 作为数据流通管道,避免手动加锁,提高代码可读性和安全性。
    • Go 提供了 sync.WaitGroupsync.Map 等并发工具,可以更简单地管理 MapReduce 任务。
  3. 适用于大规模数据处理
    • 适合处理日志分析、数据聚合、分布式计算等任务。

哪些场景适合使用 MapReduce?

场景Map 阶段Reduce 阶段
日志分析读取大量日志,提取关键字段统计访问次数、错误率等
搜索引擎索引解析网页,提取关键词统计关键词出现次数
基因数据分析解析 DNA 序列,计算某个基因的出现频率归并统计结果,得出全局基因分布
机器学习计算训练数据的特征训练模型,计算最终的回归参数
推荐系统计算用户的浏览、点击数据归并计算得到推荐结果
并行图像处理处理图像的每个区域合并所有区域结果,生成完整图像

常见使用场景:

  • 大规模数据处理: MapReduce 适用于批量处理大量数据,例如日志分析。

  • 并发数据处理: 在需要并发处理的场景中,例如查询数据库,MapReduce 可以将任务拆分成并发请求,从而减少处理时间并提高性能。处理结果可以被聚合起来。

  • 分布式数据处理和合并: MapReduce 用于以分布式方式处理和合并数据。大型数据集被分成较小的部分,由不同的机器或线程处理,然后合并。

使用场景

1. 数据聚合

场景:统计日志文件中不同状态码的出现次数。

拆解

  • Map阶段:读取日志文件,提取状态码,生成键值对(状态码, 1)。
  • Reduce阶段:汇总相同状态码的计数,生成最终结果(状态码, 总次数)。
func mapFunc(line string) map[string]int {
    parts := strings.Split(line, " ")
    statusCode := parts[8] // 假设状态码在第9个字段
    return map[string]int{statusCode: 1}
}

func reduceFunc(statusCode string, counts []int) int {
    return sum(counts)
}
2. 数据过滤

场景:从大量数据中筛选出符合特定条件的记录。

拆解

  • Map阶段:检查每条记录是否满足条件,满足则输出(记录, 1)。
  • Reduce阶段:汇总符合条件的记录。
func mapFunc(record Record) map[Record]int {
    if record.Age > 30 {
        return map[Record]int{record: 1}
    }
    return nil
}

func reduceFunc(record Record, counts []int) Record {
    return record
}
3. 数据排序

场景:对大规模数据集进行排序。

拆解

  • Map阶段:将数据分片并局部排序。
  • Reduce阶段:合并各分片的排序结果。
func mapFunc(data []int) []int {
    sort.Ints(data)
    return data
}

func reduceFunc(sortedSlices [][]int) []int {
    return mergeSortedSlices(sortedSlices)
}
4. 数据转换

场景:将数据从一种格式转换为另一种格式。

拆解

  • Map阶段:将原始数据转换为目标格式。
  • Reduce阶段:合并转换后的数据。
func mapFunc(input InputType) OutputType {
    return transform(input)
}

func reduceFunc(outputs []OutputType) OutputType {
    return combine(outputs)
}
5. 数据去重

场景:去除数据集中的重复记录。

拆解

  • Map阶段:将每条记录作为键输出(记录, 1)。
  • Reduce阶段:合并相同记录,输出唯一记录。
func mapFunc(record Record) map[Record]int {
    return map[Record]int{record: 1}
}

func reduceFunc(record Record, counts []int) Record {
    return record
}
6. 数据分组

场景:按某个字段对数据进行分组。

拆解

  • Map阶段:根据分组字段生成键值对(分组字段, 记录)。
  • Reduce阶段:将相同分组字段的记录合并。
func mapFunc(record Record) map[string]Record {
    return map[string]Record{record.GroupField: record}
}

func reduceFunc(groupField string, records []Record) []Record {
    return records
}
7. 数据统计

场景:计算数据集的平均值、最大值、最小值等统计信息。

拆解

  • Map阶段:计算局部统计信息。
  • Reduce阶段:合并局部统计信息,生成全局统计结果。
func mapFunc(data []int) Stat {
    return calculateLocalStat(data)
}

func reduceFunc(stats []Stat) Stat {
    return combineStats(stats)
}
8.统计文本中单词出现次数
  • 同步 Map 阶段
    • 通过 sync.WaitGroup 确保所有 mapFunction 任务完成后才关闭 mapChannel,避免 Reduce 过早读取导致数据丢失。
  • 使用 go func() 异步关闭 channel
    • mapWG.Wait() 结束后,关闭 mapChannel,确保 Reduce 读取完整数据。
  • Reduce 处理改进
    • reduceFunction 直接从 channel 读取数据,并合并为最终的 map[string]int 结果。
代码实现
package main

import (
	"fmt"
	"strings"
	"sync"
)

// Map 阶段:统计部分数据中的单词频率
func mapFunction(text string, out chan<- map[string]int, wg *sync.WaitGroup) {
	defer wg.Done()
	wordCount := make(map[string]int)
	words := strings.Fields(text)
	for _, word := range words {
		wordCount[word]++
	}
	out <- wordCount
}

// Reduce 阶段:合并多个 map 结果
func reduceFunction(in <-chan map[string]int) map[string]int {
	result := make(map[string]int)
	for partialMap := range in {
		for word, count := range partialMap {
			result[word] += count
		}
	}
	return result
}

func main() {
	// 输入数据
	texts := []string{
		"hello world",
		"go is great",
		"hello go",
		"map reduce in go",
		"go go go",
	}

	// 创建 channel 传输 map 结果
	mapChannel := make(chan map[string]int, len(texts))
	var mapWG sync.WaitGroup

	// 启动多个 Map 任务
	for _, text := range texts {
		mapWG.Add(1)
		go mapFunction(text, mapChannel, &mapWG)
	}

	// 确保所有 map 任务完成后再关闭 channel
	go func() {
		mapWG.Wait()
		close(mapChannel)
	}()

	// Reduce 阶段:合并 map 结果
	result := reduceFunction(mapChannel)

	// 输出最终结果
	fmt.Println("Word Count Result:", result)
}

MapReduce vs. 扇入/扇出

历史文章:[每周一更]-(第24期):Go的并发模型,提到过Go 并发模式:扇入、扇出,这里简单对比一下

MapReduce 和 Go 的 扇入(Fan-in)/扇出(Fan-out) 在并发模型上是类似的,但它们的侧重点和应用场景有所不同

  • 如果只是单机并发任务(如 API 调用、爬虫),用 扇入/扇出

  • 如果要处理大数据(如日志分析、搜索索引),用 MapReduce

特性MapReduce扇入(Fan-in)/扇出(Fan-out)
核心思想拆分任务并行计算,再归并结果并行处理任务,聚合结果到一个 channel
Map 阶段 / 扇出并发执行多个子任务启动多个 goroutine 处理任务
Reduce 阶段 / 扇入归并多个子任务的结果读取多个 goroutine 结果并处理
数据流动方式Map → Reduce多个 goroutine → 单个 channel
适用场景大规模数据计算(如日志分析、搜索引擎索引)并发任务管理(如爬虫、API 并发请求)
是否涉及分布式适用于分布式计算主要用于单机并发任务
示例1:爬取多个网页
package main

import (
	"fmt"
	"net/http"
	"sync"
)

var urls = []string{
	"https://golang.org",
	"https://go.dev",
	"https://gophercises.com",
}

// 扇出:启动多个 goroutine 并发爬取网页
func fetch(url string, wg *sync.WaitGroup) {
	defer wg.Done()
	resp, err := http.Get(url)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println("Fetched:", url, "Status:", resp.Status)
}

func main() {
	var wg sync.WaitGroup
	for _, url := range urls {
		wg.Add(1)
		go fetch(url, &wg)
	}
	wg.Wait()
	fmt.Println("All requests finished!")
}
示例2:多个 goroutine 计算结果,并聚合
package main

import (
	"fmt"
	"sync"
)

func worker(id int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	out <- id * id // 计算平方并发送
}

func main() {
	out := make(chan int, 5)
	var wg sync.WaitGroup

	// 扇出:启动多个 goroutine
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go worker(i, out, &wg)
	}

	// 等待所有任务完成后关闭 channel
	go func() {
		wg.Wait()
		close(out)
	}()

	// 扇入:聚合所有 goroutine 的结果
	sum := 0
	for result := range out {
		sum += result
	}

	fmt.Println("Total Sum:", sum) // 计算最终结果
}

参考

  • go-zero中介绍MapReduce使用场景:
    • 介绍原理:go-zero/core/mr/readme-cn.md at master · zeromicro/go-zero
    • 示例:zero-examples/mapreduce at main · zeromicro/zero-examples

注意事项

  • 数据并行性: MapReduce适合数据并行处理的任务,即任务可以分解为多个独立的子任务。
  • 数据规模: 对于小规模数据,MapReduce可能引入不必要的开销,应根据数据规模选择合适的处理方式。
  • 实时性要求: MapReduce不适合实时处理要求很高的任务,因为它通常用于批处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2295649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QML初识

目录 一、关于QML 二、布局定位和锚点 1.布局定位 2.锚点详解 三、数据绑定 1.基本概念 2.绑定方法 3.数据模型绑定 四、附加属性及信号 1.附加属性 2.信号 一、关于QML QML是Qt框架中的一种声明式编程语言&#xff0c;用于描述用户界面的外观和行为&#xff1b;Qu…

查询已经运行的 Docker 容器启动命令

一、导语 使用 get_command_4_run_container 查询 docker 容器的启动命令 获取镜像 docker pull cucker/get_command_4_run_container 查看容器命令 docker run --rm -v /var/run/docker.sock:/var/run/docker.sock cucker/get_command_4_run_container 容器id或容器名 …

协议_CAN协议

物理层特征 信号传输原理&#xff1a; CAN控制器根据CAN_L和CAN_H上的电位差来判断总线电平&#xff0c;总线电平分为显性电平&#xff08;CAN_H与CAN_L压差 2v&#xff09;、隐性电平&#xff08;CAN_H与CAN_L压差 0v&#xff09;&#xff0c;发送方通过总线电平的变化&am…

QT修仙之路2-2 对话框 尚欠火候

警告对话框 相关代码 错误对话框 相关代码 消息对话框 相关代码 询问对话框 相关代码 相关代码 警告对话框 QMessageBox::warning(this,"错误","账号密码不能为空",QMessageBox::Ok);错误对话框 QMessageBox msgBox(QMessageBox::Critical,"错误…

NFT Insider #168:The Sandbox 推出新春{金蛇礼服}套装;胖企鹅合作 LINE Minini

引言&#xff1a;NFT Insider 由 NFT 收藏组织 WHALE Members、BeepCrypto 联合出品&#xff0c; 浓缩每周 NFT 新闻&#xff0c;为大家带来关于 NFT 最全面、最新鲜、最有价值的讯息。每期周报将从 NFT 市场数据&#xff0c;艺术新闻类&#xff0c;游戏新闻类&#xff0c;虚拟…

什么是deepseek?

AI国产免费开源强大 DeepSeek 是由国内团队开发的一款开源人工智能工具库&#xff0c;专注于提供高效易用的 AI 模型训练与推理能力。它既包含预训练大语言模型&#xff08;如 DeepSeek-R1 系列&#xff09;&#xff0c;也提供配套工具链&#xff0c;助力开发者快速实现 AI 应用…

容器服务基础

1.腾讯云容器服务 使用该服务&#xff0c;开发者将无需安装、运维、扩展您的集群管理基础设施&#xff0c;只需进行简单的API调用&#xff0c;便可启动和停止 Docker 应用程序&#xff0c;查询集群的完整状态&#xff0c;以及使用各种云服务。 创建集群--创建工作负载/创建ingr…

C++基础知识(二)之数据类型、指针和内存、数组

六、C数据类型 1、sizeof运算符 sizeof运算符用于求数据类型或变量占用的内存空间。 用于数据类型&#xff1a;sizeof(数据类型) 用于变量&#xff1a;sizeof(变量名) 或 sizeof 变量名 注意&#xff1a; 在32位和64位操作系统中&#xff0c;同一种数据类型占用的内存空间…

LLMs之DeepSeek r1:Logic-RL的简介、安装和使用方法、案例应用之详细攻略

LLMs之DeepSeek r1&#xff1a;Logic-RL的简介、安装和使用方法、案例应用之详细攻略 目录 Logic-RL的简介 1、Logic-RL的特点 2、性能 Logic-RL 的安装和使用方法 1、安装 2、使用方法 数据准备 基础模型 指令模型 训练执行 实现细节 Logic-RL的案例应用 Logic-RL…

【神经网络框架】非局部神经网络

一、非局部操作的数学定义与理论框架 1.1 非局部操作的通用公式 非局部操作(Non-local Operation)是该研究的核心创新点,其数学定义源自经典计算机视觉中的非局部均值算法(Non-local Means)。在深度神经网络中,非局部操作被形式化为: 其中: 1.2 与传统操作的对比分析…

22.[前端开发]Day22-CSS单位-CSS预处理器-移动端视口

1 CSS常见单位详解 CSS中的单位 CSS中的绝对单位&#xff08; Absolute length units &#xff09; CSS中的相对单位&#xff08; Relative length units &#xff09; 1.em: 相对自己的font-size&#xff1b;如果自己没有设置, 那么会继承父元素的font-size 2.如果font-size中…

URL调用本地Ollama模型

curl http://192.168.2.247:11434/api/generate -d "{ \"model\": \"deepseek-r1:8b\", \"prompt\": \"Who r u?\" ,\"stream\":false}" 连续对话

【python】matplotlib(animation)

文章目录 1、matplotlib.animation1.1、FuncAnimation1.2、修改 matplotlib 背景 2、matplotlib imageio2.1、折线图2.2、条形图2.3、散点图 3、参考 1、matplotlib.animation 1.1、FuncAnimation matplotlib.animation.FuncAnimation 是 Matplotlib 库中用于创建动画的一个…

ubuntu24.04安装布置ros

最近换电脑布置机器人环境&#xff0c;下了24.04&#xff0c;但是网上的都不太合适&#xff0c;于是自己试着布置好了&#xff0c;留作有需要的人一起看看。 文章目录 目录 前言 一、确认 ROS 发行版名称 二、检查你的 Ubuntu 版本 三、安装正确的 ROS 发行版 四、对于Ubuntu24…

接入 deepseek 实现AI智能问诊

1. 准备工作 注册 DeepSeek 账号 前往 DeepSeek 官网 注册账号并获取 API Key。 创建 UniApp 项目 使用 HBuilderX 创建一个新的 UniApp 项目&#xff08;选择 Vue3 或 Vue2 模板&#xff09;。 安装依赖 如果需要在 UniApp 中使用 HTTP 请求&#xff0c;推荐使用 uni.requ…

网络爬虫js逆向之异步栈跟栈案例

【注意&#xff01;&#xff01;&#xff01;】 前言&#xff1a; 1. 本章主要讲解js逆向之异步栈跟栈的知识&#xff08;通过单步执行调试&#xff09; 2. 使用关键字搜定位加密入口 3. 本专栏通过多篇文章【文字案例】的形式系统化进行描述 4. 本文章全文进行了脱敏处理 5. 详…

机器学习 - 需要了解的条件概率、高斯分布、似然函数

似然函数是连接数据与参数的桥梁&#xff0c;通过“数据反推参数”的逆向思维&#xff0c;成为统计推断的核心工具。理解它的关键在于区分“参数固定时数据的概率”与“数据固定时参数的合理性”&#xff0c;这种视角转换是掌握现代统计学和机器学习的基础。 一、在学习似然函…

【Spring】什么是Spring?

什么是Spring&#xff1f; Spring是一个开源的轻量级框架&#xff0c;是为了简化企业级开发而设计的。我们通常讲的Spring一般指的是Spring Framework。Spring的核心是控制反转(IoC-Inversion of Control)和面向切面编程(AOP-Aspect-Oriented Programming)。这些功能使得开发者…

[笔记] 汇编杂记(持续更新)

文章目录 前言举例解释函数的序言函数的调用栈数据的传递 总结 前言 举例解释 // Type your code here, or load an example. int square(int num) {return num * num; }int sub(int num1, int num2) {return num1 - num2; }int add(int num1, int num2) {return num1 num2;…

开放式TCP/IP通信

一、1200和1200之间的开放式TCP/IP通讯 第一步&#xff1a;组态1214CPU&#xff0c;勾选时钟存储器 第二步&#xff1a;防护与安全里面连接机制勾选允许PUT/GET访问 第三步&#xff1a;添加PLC 第四步&#xff1a;点击网络试图&#xff0c;选中网口&#xff0c;把两个PLC连接起…