极简的MapReduce实现

news2025/1/15 20:34:12

目录

1. MapReduce概述

2. 极简MapReduce内存版

3. 复杂MapReduce磁盘版

4. MapReduce思想的总结


1. MapReduce概述

       以前写过一篇 MapReduce思想 ,这次再深入一点,简单实现一把单机内存的。MapReduce就是把它理解成高阶函数,需要传入map和reduce所需的函数,即可对一个集合的键值对进行按需变换。

我们按Python的map和reduce逻辑,接收这两个高阶函数所需的函数形态,而不是像Java那样写一个Mapper/Reducer,当然如果需要复杂一点map功能,可以通过闭包来实现。

2. 极简MapReduce内存版

       如果是效仿Python,很容易想到实现MapReduce 就是map + 分组 + reduce。一头一尾的map和reduce都可以直接用py的内置函数,中间分组也容易用groupby实现。因此首先约定map和reduce的自定义函数写法就是直接按内置函数要求来写,只不过map输出是键值对,要求用tuple/list 来区分key和value;reduce时候只对一组的values做自定义聚合,不能操作key。到这里很容易实现严格的MapReduce,你用起来大概就和Spark提供的mapreduce一样了。如果想直接做wordcount好像不太容易,因为一行文本得输出一个词表数组,因此可以宽松一点,实现类似flatMap的效果,也就是即使map只输出一个,也要放进列表里,由框架去展开。

def wc_map(line):
    return [(x,1) for x in line.strip().split()]

reduce就是严格的两个参数和一个返回值形式(其中y表示上一轮迭代的结果,x表示每次便利数组拿到的值,这种严格的reduce函数并容易设计)

def wc_reduce(y, x):
    return y+x

剩下要做的事就是把map出来的数据做 拉平和排序分组,最后对每一组做reduce。

from itertools import groupby
from functools import reduce
def map_reduce(f_map, f_reduce, seq):
    map_result = map(f_map, seq)
    
    flattened = sum(map_result, []) #拉平
    
    shuffled = sorted(flattened, key= lambda x:x[0]) #排序
    
    mr_result = [(key, reduce(f_reduce, (x[1] for x in data))) for key, data in groupby( shuffled, key=lambda x:x[0])] #groupby分组
    return mr_result

3. 复杂MapReduce磁盘版

如果内存不够,就必须不断把数据写到磁盘上,也就是过去大数据面试题最喜欢考察的点。那些题若是有单机版MapReduce,基本都可以直接做出来。大体结构如下图:

其中partition只有一个,就不用单独设计了。过程也简化很多,也就是每当buffer满了,就排序写出到成一个数据块,最后将所有的数据块merge起来。merge过程写起来不是很容易,需要维持所有数据块的句柄数组有序:每次读取所有句柄中最小的那一条,随后数组需要重新按最小值排序,直到所有数据块读取完。

另外就是因为中间数据需要落地,所以这一版的MapReduce框架直接从文件到文件。那么让写map和reduce函数直接对着文件输出,类似Java那样提供一个context的参数呢?我选择允许用yield来输出内容,这样相当于也宽松了输出多个键值对的要求。

def wc_map(line):
    for t in line.strip().split():
         yield (t, 1)

最后同样宽松reduce的写法,不用写那么复杂的两参数一返回,而是像Java那样,拿到一组key和可迭代的values,甚至可以这一组key输出多个结果。

def reduce_func(key , kv_pairs):
    s = 0
    for k,v in kv_pairs :   
        s += v
    yield (key, s)

很显然我们要补充很多东西,要读取文件、要设计缓存、要做merge,因此设计了一个类,过程就不解释了

from operator import itemgetter
import itertools
import sys
class MapReduce:

    class Buffer:
        def __init__(self, buffer_size = 80000):
            self.buffer_size = buffer_size
            self.buffer = []
            self.current_size = 0
            
        def add_data(self, kv) -> bool :
            self.current_size += len(str(kv))
            self.buffer.append(kv)
            return True if self.current_size  > self.buffer_size else False          

        def spill(self, file_path):
            if self.buffer:
                self.buffer.sort(key = lambda x:x[0]) 
                with open(file_path, mode='w') as f:
                    for kv in self.buffer:
                        f.write(str(kv) + '\n')
                self.buffer = []
                self.current_size = 0
    
    def __init__(self, buffer_size,temp_dir):
        self.buffer = self.Buffer(buffer_size)
        self.temp_dir = temp_dir
        self.block_id = 0



    def __fetch_kvs_from_map(self, map_f, infile):
        with open(infile, mode='r', encoding='utf-8') as f:
            for line in f:
                for k,v in map_f(line):
                    yield (k,v)

    def __run_map(self, map_f, infile):
        block_files = [x for x in os.listdir(self.temp_dir) if x.startswith('block.') and x.split('.',1)[1].isdigit() ]
        for block in block_files:
            os.remove(f'{self.temp_dir}/{block}')
        for kv in self.__fetch_kvs_from_map(map_f, infile):
            if self.buffer.add_data(kv):
                self.buffer.spill(f'{self.temp_dir}/block.{self.block_id}')
                self.block_id += 1
        self.buffer.spill(f'{self.temp_dir}/block.{self.block_id}')


    def __merge_sort_from_files(self):
        block_files = [x for x in os.listdir(self.temp_dir) if x.startswith('block.') and x.split('.',1)[1].isdigit() ]
        block_files.sort(key = lambda filename: int(filename.split('.',1)[1]))
        ffs = [open(f'{self.temp_dir}/{block}', 'r') for block in block_files]
        first_kvs = [ eval(f.readline()) for f in ffs ]

        shuffled_files = [[fk, ff]  for fk, ff in  zip(first_kvs, ffs)]
        shuffled_files.sort(key = lambda x:x[0][0], reverse=True) 


        with open(f'{self.temp_dir}/final_one.dat', mode='w') as fw:
            while len(shuffled_files ) > 1:
                first_keys = [x[0][0] for x in shuffled_files]
                min_key = first_keys[-1]
                min_idx_bound = first_keys.index(min_key)
                sffs = shuffled_files[min_idx_bound:]
                for sff in sffs:
                    fw.write(str(sff[0]) + '\n')
                    n = sff[1].readline()
                    if n :
                        sff[0] = eval(n)
                    else:
                        sff[0] = ''
                        sff[1].close()
                shuffled_files = sorted(filter(lambda x:x[0], shuffled_files), key = lambda x:x[0][0], reverse=True)

            if shuffled_files:
                fw.write(str(shuffled_files[0][0]) + '\n')  #already exist
                for line in shuffled_files[0][1]:
                    fw.write(line)
                shuffled_files[0][1].close()

    def __run_reduce(self, reduce_f, outfile):
        def read_mapper_output(file):
            for line in file:
                yield eval(line.rstrip())
        with open(f'{self.temp_dir}/final_one.dat', encoding='utf-8') as f , \
                open(f'{self.temp_dir}/{outfile}', encoding='utf-8',mode='w') as fw:
            stdin_generator=read_mapper_output(f)
            for key, kv_pairs in itertools.groupby(stdin_generator, itemgetter(0) ):
                for key,result in reduce_f(key, kv_pairs):
                    fw.write(f"{key}\t{result}\n")
    
    def run_mrjob(self, map_f, reduce_f, infile, outfile):
        #====map
        self.__run_map(map_f, infile)
        #====shuffle===
        self.__merge_sort_from_files()
        #====reduce
        self.__run_reduce(reduce_f, outfile)


def map_func(x):
    for t in x.strip().split():
         yield (t, 1)

def reduce_func(key , kv_pairs):
    s = 0
    for k,v in kv_pairs :   
        s += v
    yield (key, s)


if __name__ == "__main__":
    mr = MapReduce(30, "./")

    mr.run_mrjob(map_func, reduce_func, "words.txt", "result.txt")

代码明显复杂了很多,其中reduce读取有序文件还借鉴了Hadoop streaming的处理方式。

4. MapReduce思想的总结

       这个系列到这算完结了,为了讲解MapReduce我都是先讲解高阶函数map和reduce,深入思考以后发现这些个东西确实还有一些值得补充的地方。但应该不止于此,对于没有完全学过函数式编程的我来说就纯个人观点一下。

写map有什么用?在你写了很多独立地循环处理逻辑以后, 你发现你可以把循环与处理分离了。反过来更多使用map会习惯这种分离的思维,甚至提升这种分离。 这种分离意味着一种共性的抽象。

reduce被设计出来的意义何在? 不使用全局变量,而是部分变量传递,以完成全局功能的设计。全局功能线性拆分成N个独立的有部分依赖的功能总和。其本质也是一种分离或拆分思想。反过来使用reduce实现功能,习惯这种等价的整体拆分成部分的思维,强化这种拆分,意味着进行带依赖的共性的提炼

最后概括一下MapReduce的学习意义:证明了对任务的工序拆分能有效实现并行加速。即对任务抽象拆分出同质(独立或依赖)的步骤,这些同质工作能并行/自动化。而工序拆分,靠的是前面高阶函数训练出来的思维。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1091858.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蓝桥杯每日一题2023.10.14

年号字串 - 蓝桥云课 (lanqiao.cn) 题目描述 我们发现每个字母都与26紧密相关&#xff0c;其%26的位置就是最后一个字母&#xff0c;由于最开始将0做为了1故在写答案时需要注意细节问题 #include<bits/stdc.h> using namespace std; char s[] "ABCDEFGHIJKLMNOPQ…

电源集成INN3270C-H215-TL、INN3278C-H114-TL、INN3278C-H215-TL简化了反激式电源转换器的设计和制造。

一、概述 InnoSwitch™3-CP系列IC极大地简化了反激式电源转换器的设计和制造&#xff0c;特别是那些需要高效率和/或紧凑尺寸的产品。InnoSwitch3-CP系列将初级和次级控制器以及安全额定反馈集成到单个IC中。 InnoSwitch3-CP系列器件集成了多种保护功能&#xff0c;包括线路过…

【git篇】git的使用

文章目录 1. Git介绍与安装1. Git简介2. 下载安装程序3. 设置用户名和邮箱 2. Git的基本使用1. 创建版本库2. 文件管理1. 提交文件2. 查看状态3. 查看提交日志4. 版本回退 3. 原理解析1. Git区的划分2. 撤销修改3. 删除文件 4. 分支管理1. 基本原理2. 创建分支3. 合并分支4. 删…

处理死锁策略2

一、避免死锁-动态策略 1.概述 安全序列-能使每个进程才能顺利完成的分配资源的序列&#xff0c;可有多种&#xff0c;此时系统处于安全状态下&#xff0c;系统一定不会发生死锁。 不安全状态-找不到一个安全序列时&#xff0c;系统处于不安全状态下&#xff0c;系统可能会发…

BuyVM 挂载存储块

发布于 2023-07-13 on https://chenhaotian.top/linux/buyvm-mount-block-storage/ BuyVM 挂载存储块 参考&#xff1a; https://zhujitips.com/2653https://www.pigji.com/898.html 1 控制台操作 存储块购买完毕后&#xff0c;进入后台管理界面&#xff0c;进入对应 VPS …

Qt工具开发,该不该跳槽?

Qt工具开发&#xff0c;该不该跳槽? 就这样吧&#xff0c;我怕你跳不动。 嵌入式UI&#xff0c;目前趋势是向着LVGL发展。QT已经在淘汰期了。很多项目还在用&#xff0c;但技术上已经落后。QT短期内不会全面淘汰&#xff0c;但退位让贤的大趋势已经很清楚了。 最近很多小伙伴…

整理了六大类兼职平台,看看有适合你的吗

现代人已经不再仅仅依赖于一份全职工作&#xff0c;他们通过兼职来为自己赚取额外的收入&#xff0c;同时也能更加自由地安排自己的时间。而如今&#xff0c;互联网兼职平台应运而生&#xff0c;为我们创造了更多的选择。今天我将为你介绍六大类兼职平台&#xff0c;相信其中一…

多输入多输出 | MATLAB实现PSO-RBF粒子群优化径向基神经网络多输入多输出预测

多输入多输出 | MATLAB实现PSO-RBF粒子群优化径向基神经网络多输入多输出预测 目录 多输入多输出 | MATLAB实现PSO-RBF粒子群优化径向基神经网络多输入多输出预测预测效果基本介绍程序设计往期精彩参考资料 预测效果 基本介绍 Matlab实现PSO-RBF粒子群优化径向基神经网络多输入…

二维码怎么做列表?点击可跳转其他内容

最近很多小伙伴在问&#xff0c;在用二维码展示内容时&#xff0c;怎么设置一个列表&#xff0c;点击每条内容或者单个图片&#xff0c;就可以跳转到对应的详情页面查看内容&#xff0c;而且二维码内容还能够随时编辑或者修改。那么想要做到上面的这种效果&#xff0c;可以用二…

EEPROM、FLASH电路设计

ROM是一种掉电不丢失数据的存储器&#xff0c;EEPROM是ROM的升级版&#xff0c;他支持带电擦除&#xff0c;可以修改存储器内的内容。 而我们还会提到FLASH&#xff0c;是EEPROM的升级&#xff0c;他们二者的区别在于FLASH按扇区操作&#xff0c;EEPROM则按字节操作&#xff0…

【Rust笔记】浅聊 Rust 程序内存布局

浅聊Rust程序内存布局 内存布局看似是底层和距离应用程序开发比较遥远的概念集合&#xff0c;但其对前端应用的功能实现颇具现实意义。从WASM业务模块至Nodejs N-API插件&#xff0c;无处不涉及到FFI跨语言互操作。甚至&#xff0c;做个文本数据的字符集转换也得FFI调用操作系统…

Studio One6.5中文版本版下载及功能介绍

Studio One是一款专业的音乐制作软件&#xff0c;由美国PreSonus公司开发。该软件提供了全面的音频编辑和混音功能&#xff0c;包括录制、编曲、合成、采样等多种工具&#xff0c;可用于制作各种类型的音乐&#xff0c;如流行音乐、电子音乐、摇滚乐等。 Studio One的主要特点…

说明书MS2721A频谱分析仪7.1GHz

安立Anritsu MS2721A 频谱分析仪 MS2721A 是 Anritsu 的 7.1 GHz 频谱分析仪。频谱分析仪测量已知和未知信号的频谱功率。频谱分析仪收集信息&#xff0c;例如输入信号与其频率相比的幅度。作为频率分析仪&#xff0c;频谱分析仪的主要用途是记录和分析电输入信号以及其他信号的…

想要精通算法和SQL的成长之路 - 滑动窗口和大小根堆

想要精通算法和SQL的成长之路 - 滑动窗口和大小根堆 前言一. 大小根堆二. 数据流的中位数1.1 初始化1.2 插入操作1.3 完整代码 三. 滑动窗口中位数3.1 在第一题的基础上改造3.2 栈的remove操作 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 大小根堆 先来说下大小根堆是什…

Rust初接触

一、什么是Rust Rust 是由 Mozilla 开发的多范式编程语言&#xff0c;专注于性能和安全性。 Rust 以其先进的安全并发能力而闻名&#xff0c; 它的语法类似于 C&#xff0c;但它提供了更快的速度和内存安全性&#xff0c;但不使用垃圾收集器。 Rust 最初是为 Mozilla Firefox …

Linux bash: ipconfig: command not found解决方法

安装完centos7运行ifconfig命令发现找不到 安装相关工具 yum install net-tools.x86_64 无脑yes即可

Jenkins UI 自动化持续化集成测试

一&#xff1a;安装jenkins 环境 在官网下载msi 直接安装即可 二&#xff1a;设置全局变量 设置allure 路径 三&#xff1a;创建项目 1、创建自由风格项目 2、如果项目在本地&#xff0c;且本地服务器是windows &#xff0c;找到Jenkins安装根目录&#xff0c;寻找config…

Spring Cloud Pipelines 入门实践

文章目录 1. 前言2. Spring Cloud Pipelines 是做什么的2.1. 预定义的流程2.2. 集成测试和契约测试2.3.部署策略 4. Spring Cloud Pipelines的使用示例4.1. 创建一个Spring Boot应用4.2. 将代码托管到GitHub仓库4.3. 添加Spring Cloud Pipelines依赖4.4. 配置Spring Cloud Pipe…

基于Python简单实现接口自动化测试(详解)

一、简介 本文从一个简单的登录接口测试入手&#xff0c;一步步调整优化接口调用姿势&#xff0c;然后简单讨论了一下接口测试框架的要点&#xff0c;最后介绍了一下我们目前正在使用的接口测试框架pithy。期望读者可以通过本文对接口自动化测试有一个大致的了解。 二、引言 …

统信UOS 1060系统增量备份

原文链接&#xff1a;统信UOS 1060系统增量备份 hello&#xff0c;大家好啊&#xff0c;今天给大家带来关于统信UOS 1060系统备份还原的系列内容的第三篇文章&#xff0c;系统增量备份&#xff0c;我们可以将系统增量备份到u盘中&#xff0c;后面需要的话&#xff0c;可以进行还…