【大数据】MapReduce实战

news2024/11/25 3:05:07

文章目录

    • @[toc]
      • Word Count
        • Mapper
        • Reducer
        • run.sh
        • 本地调试
      • 基于白名单的Word Count
        • Mapper
        • Reducer
        • run.sh
        • 本地调试
      • 文件分发
        • -file
          • Mapper
          • Reducer
          • run.sh
        • -cacheFile
          • Mapper
          • Reducer
          • run.sh
        • -cacheArchive
          • Mapper
          • Reducer
          • run.sh
      • 杀死MapReduce Job
      • 排序
      • 压缩文件
      • mr_ip_lib_python本地调试

因上努力

个人主页:丷从心·

系列专栏:大数据

果上随缘


Word Count

1

Mapper
import re
import sys

p = re.compile(r'\w+')

for line in sys.stdin:
    word_list = line.strip().split(' ')

    for word in word_list:
        if len(p.findall(word)) < 1:
            continue

        word = p.findall(word)[0].lower()

        print('\t'.join([word, '1']))
Reducer
import sys

cur_word = None
cur_cnt = 0

for line in sys.stdin:
    word, val = line.strip().split('\t')

    if cur_word == None:
        cur_word = word

    if cur_word != word:
        print('\t'.join([cur_word, str(cur_cnt)]))

        cur_word = word
        cur_cnt = 0

    cur_cnt += int(val)

print('\t'.join([cur_word, str(cur_cnt)]))
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_wordcount"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py" \
	-reducer "python red.py" \
	-file ./map.py \
	-file ./red.py
本地调试
cat the_man_of_property.txt | python map.py | sort -k1 | python red.py > result.txt
cat result.txt | sort -k2 -rn | head

基于白名单的Word Count

Mapper
import sys


def get_white_list_word(white_list_file):
    white_list_file = open(white_list_file, 'r')

    white_list_word = set()
    for word in white_list_file:
        word = word.strip()

        if word != '':
            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_file):
    white_list_word = get_white_list_word(white_list_file)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_white_list_word_count"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func white_list" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=3" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-file ./map.py \
	-file ./red.py \
	-file ./white_list
本地调试
cat the_man_of_property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reducer_func > result.txt
cat the_man_of_property.txt | grep -o 'against' | wc -l

文件分发

-file
Mapper
import sys


def get_white_list_word(white_list_file):
    white_list_file = open(white_list_file, 'r')

    white_list_word = set()
    for word in white_list_file:
        word = word.strip()

        if word != '':
            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_file):
    white_list_word = get_white_list_word(white_list_file)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_file_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func white_list" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=3" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-file ./map.py \
	-file ./red.py \
	-file ./white_list
-cacheFile
Mapper
import sys


def get_white_list_word(white_list_file):
    white_list_file = open(white_list_file, 'r')

    white_list_word = set()
    for word in white_list_file:
        word = word.strip()

        if word != '':
            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_file):
    white_list_word = get_white_list_word(white_list_file)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_cachefile_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func WL" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=2" \
	-jobconf  "mapred.job.name=mr_cachefile_broadcast" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-cacheFile "hdfs://master:9000/white_list#WL" \
	-file "./map.py" \
	-file "./red.py"
-cacheArchive
Mapper
import os
import sys


def get_cachefile_list(white_list_dir):
    cachefile_list = []

    if os.path.isdir(white_list_dir):
        for cachefile in os.listdir(white_list_dir):
            cachefile = open(white_list_dir + '/' + cachefile)

            cachefile_list.append(cachefile)

    return cachefile_list


def get_white_list_word(white_list_dir):
    white_list_word = set()

    for cachefile in get_cachefile_list(white_list_dir):
        for word in cachefile:
            word = word.strip()

            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_dir):
    white_list_word = get_white_list_word(white_list_dir)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func WLD" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=2" \
	-jobconf  "mapred.job.name=mr_cachearchive_broadcast" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-cacheArchive "hdfs://master:9000/white_list_dir.tgz#WLD" \
	-file "./map.py" \
	-file "./red.py"

杀死MapReduce Job

hadoop job -kill job_1715841477049_0001

排序

  • k e y key key的字符进行排序

压缩文件

  • 压缩文件不可切分,一个压缩文件占用一个 M a p Map Map

mr_ip_lib_python本地调试

$ head -20 ip_lib.txt | column -t
$ cat ip_lib.txt | awk '{print $1,$NF}' | head | column -t

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1689724.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flask Response 对象

文章目录 创建 Response 对象设置响应内容设置响应状态码设置响应头完整的示例拓展设置响应的 cookie重定向响应发送文件作为响应 总结 Flask 是一个 Python Web 框架&#xff0c;用于快速开发 Web 应用程序。在 Flask 中&#xff0c;我们使用 Response 对象来构建 HTTP 响应。…

东软联合福建省大数据集团打造“数据要素×医疗健康”服务新模式

5月23日&#xff0c;东软集团与福建省大数据集团有限公司在福州签订战略合作协议。 据「TMT星球」了解&#xff0c;双方将在健康医疗数据要素价值领域展开合作&#xff0c;通过大数据服务&#xff0c;赋能商业保险公司的产品设计和保险两核&#xff0c;打造“数据要素医疗健康…

微软开源多模态大模型Phi-3-vision,微调实战来了

节前&#xff0c;我们组织了一场算法岗技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、今年参加社招和校招面试的同学。 针对大模型& AIGC 技术趋势、大模型& AIGC 落地项目经验分享、新手如何入门算法岗、该如何准备面试攻略、面试常考点等热门话题进行了…

Qt 报错总结 No suitable kits found

目录 “No suitable kits found” 解决 解决方法参考&#xff1a; chatGPT辅助解决QT构建报错error: multiple target patterns 我的解决方法&#xff1a;把语言设置为空 “No suitable kits found” 解决 没有找到合适的kits套件&#xff0c;在安装Qt Creator时没有安装Min…

AGI技术与原理浅析:曙光还是迷失?

前言&#xff1a;回顾以往博客文章&#xff0c;最近一次更新在2020-07&#xff0c;内容以机器学习、深度学习、CV、Slam为主&#xff0c;顺带夹杂个人感悟。笔者并非算法科班出身&#xff0c;本科学制药、研究生学金融&#xff0c;最原始的算法积累都来源于网络&#xff0c;当时…

系统架构师考试(十)

SaaS为在线客服 PaaS为二次开发&#xff0c;比如低代码平台 IaaS 硬件开发 B 是基础设施作为服务 软件架构的概念 架构风格 数据流风格 网络报文是在计算机网络中通过网络传输的数据单元&#xff0c;它是网络通信的基本单位。网络报文包含了发送方和接收方之间传输的数据&…

【Crypto】摩丝

文章目录 一、摩斯解题感悟 一、摩斯 很明显莫尔斯密码 iloveyou还挺浪漫 小小flag&#xff0c;拿下 解题感悟 莫尔斯密码这种题还是比较明显的

在Windows10中重命名文件和文件夹的6种方法,有你熟悉和不熟悉的

序言 你可以通过多种方式在Windows 10上重命名文件。如果每次你想更改文件名时仍右键单击并选择“重命名”,那么我们有一些技巧可以加快更改速度。 使用文件资源管理器重命名文件和文件夹 Windows 10的文件资源管理器是一个功能强大的工具。你知道吗,有四种不同的方法可以…

从零入门激光SLAM(二十一)——看不懂FAST-LIO?进来

大家好呀&#xff0c;我是一个SLAM方向的在读博士&#xff0c;深知SLAM学习过程一路走来的坎坷&#xff0c;也十分感谢各位大佬的优质文章和源码。随着知识的越来越多&#xff0c;越来越细&#xff0c;我准备整理一个自己的激光SLAM学习笔记专栏&#xff0c;从0带大家快速上手激…

【Vue】input框自动聚焦且输入验证码后跳至下一位

场景:PC端 样式: <div class="verification-code-input"><input v-model="code[index]" v-for="(_, index) in 5" :key="index" type="text" maxlength="1" @input="handleInput(index)" …

数据库—— MySQL数据库安装

一、MySQL数据库定义 MySQL是一种开源关系型数据库管理系统&#xff0c;它使用SQL语言进行数据的管理。通过MySQL&#xff0c;可以创建数据库、表格、插入、查询、更新和删除数据等操作。MySQL支持多种操作系统&#xff0c;并且被广泛应用于Web应用程序开发中。MySQL以其高性能…

【HTML】制作一个跟随鼠标的流畅线条引导页界面(可直接复制源码)

目录 前言 HTML部分 CSS部分 JS部分 效果图 总结 前言 无需多言&#xff0c;本文将详细介绍一段HTML代码&#xff0c;图中线条可跟随鼠标移动&#xff0c;具体内容如下&#xff1a; 开始 首先新建一个HTML的文本&#xff0c;文本名改为[index.html]&#xff0c;创建好后右…

(Oracle)SQL优化基础(三):看懂执行计划顺序

往期内容&#xff1a; &#xff08;Oracle&#xff09;SQL优化基础&#xff08;一&#xff09;&#xff1a;获取执行计划 &#xff08;Oracle&#xff09;SQL优化基础&#xff08;二&#xff09;&#xff1a;统计信息 获取到执行计划后&#xff0c;对于新手朋友来讲可能不知道…

win11安装docker运行Open-Webui 界面化展示 ollama大模型

1.OpenWeb UI运行需要docker 环境下载docker Get Started | Docker 2.需要命令提示符docker -v 查询是否安装成功&#xff1b; 查询docker详情docker version 3.github拉取open-webUi镜像Package open-webui GitHub 复制命令运行在命令提示符&#xff1b; 等待下载完成 4.到…

[图解]SysML和EA建模住宅安全系统-07 to be块定义图

1 00:00:01,970 --> 00:00:05,040 入侵者这里有个∞ 2 00:00:05,530 --> 00:00:07,000 说明它下面已经有子图了 3 00:00:07,010 --> 00:00:08,080 我们看看里面子图 4 00:00:10,200 --> 00:00:17,000 这里&#xff0c;我们看位置 5 00:00:19,030 --> 00:00:…

find 几招在 Linux 中高效地查找目录

1. 介绍 在 Linux 操作系统中&#xff0c;查找目录是一项常见的任务。无论是系统管理员还是普通用户&#xff0c;都可能需要查找特定的目录以执行各种操作&#xff0c;如导航文件系统、备份数据、删除文件等。Linux 提供了多种命令和工具来帮助我们在文件系统中快速找到目标目…

MyBatis-Plus介绍及Spring Boot 3集成指南

我们每个Java开发者都在使用springbootmybatis开发时&#xff0c;我们经常发现自己需要为每张数据库表单独编写XML文件&#xff0c;并且为每个表都需要编写一套增删改查的方法&#xff0c;较为繁琐。为了解决这一问题&#xff0c;MyBatis-Plus应运而生。在本文中&#xff0c;我…

http协议报文头部结构解释

http协议报文头部结构 请求报文 报文解释 请求报文由三部分组成&#xff1a;开始行、首部行、实体主体 开始行&#xff1a;请求方法&#xff08;get、post&#xff09;url版本 CRLE 方法描述GET请求指定页面信息&#xff0c;并返回实体主体HEAD类似get要求&#xff0c;只不…

【论文阅读】Rank-DETR(NIPS‘23)

paper:https://arxiv.org/abs/2310.08854 code:https://github.com/LeapLabTHU/Rank-DETR

图片、视频画质增强变清晰工具分享(免费)

生活中可能会修一下模糊图片那么这就有一款用来修图片的管理工具&#xff0c;也有可能会修一下模糊的视频&#xff0c;在吾爱上有大佬开发了这么一款工具&#xff0c;免费的&#xff0c;不需要开任何VIP&#xff0c;我试了一下&#xff0c;好用&#xff0c;分享出来&#xff0c…