【hadoop】hadoop streaming

news2025/3/30 15:23:55

API:
https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html(hadoop3)
https://cwiki.apache.org/confluence/display/HADOOP2/HadoopStreaming(hadoop2)
hadoop version查看hadoop版本,下面的测试基于hadoop2环境。

文章目录

  • 1.介绍
  • 2.Demo
  • 3.map任务获取数据来源
  • 4.作业提交主要参数解释
    • 1)输入输出
    • 2)资源分发
    • 3) 作业启动管理
    • 4)分区规则
    • 5)压缩
  • 5.仅map作业

1.介绍

在这里插入图片描述
hadoop streaming可以将任何可执行的文件,如sh脚本,py脚本,嵌入到分布式环境当中执行MR的逻辑,而不必局限于java语言。

hadoop会在每个启动的任务进程中初始化指定的map或reduce脚本并执行,map或reduce任务通过标准输入流读取数据,标准输出流写出数据。

需要注意的是streaming任务默认在map端数据shuffle到reduce端时没有本地合并的过程,也就是MR任务中在map=>reduce过程中,传输的数据格式:key: [value1, value2, value3] ,是key+一个value构成的迭代器。而streaming任务中,map输出的相同key的数据只会被依次相邻的送到同一个reducekey1: value1; key2: value2这样。

2.Demo

对于下面的数据:

(base) map@gzdt-map-poi-yingxiang-offline04 test_streaming$ cat test.dat
1	小王
2	小李
1	张三
3	李四

执行下面命令:

#!/bin/bash
set -e -x -u -o pipefail

hadoop streaming \
  -input afs://xxx/test.dat \
  -output afs://xxx/test.res \
  -jobconf hadoop.job.ugi="xxx,xxx" \
  -jobconf mapred.job.queue.name="xxx" \
  -jobconf mapred.job.tracker="xxx" \
  -inputformat org.apache.hadoop.mapred.TextInputFormat \
  -mapper cat \
  -reducer wc

产出的结果文件:

(base) map@gzdt-map-poi-yingxiang-offline04 test_streaming$ cat test.res
      2       4      18
      1       2       9
      1       2       9

wc是Unix/Linux中的一个命令行工具,用于计算给定文件中的字节数、字数和行数。

wc命令的输出有三个或四个数字:

第一个数字是文件中的行数。
第二个数字是文件中的词数。这里的"词"是指由空格、制表符或换行符分隔的字符串。
第三个数字是文件中的字节数。这是文件的大小,不是字符数。如果文件中包含多字节字符(如UTF-8编码的非ASCII字符),字节数会大于字符数。
第四个字段是输入文件的名称。

如果只输入wc命令而不带任何文件名,wc会从标准输入读取数据,此时不会显示文件名。

3.map任务获取数据来源

一个streaming job中可以指定多个不同的输入路径,不同路径的数据可能会需要不同的处理方式,所以map任务中区分当前数据的来源非常重要。

hadoop在任务启动时会预置一些属性作为进程级别的环境变量:
在这里插入图片描述

圈起来的这个属性在map任务中可以用来获取当前map任务处理的文件块的名字,类似于hdfs://namenode:port/file_path/block_name
在这里插入图片描述

注意点:
1)During the execution of a streaming job, the names of the “mapreduce” parameters are transformed. The dots ( . ) become underscores ( _ ),上面截图中的mapreduce参数值在程序中获取时,需要将.转换为_,也就是mapreduce.map.input.file在程序中通过mapreduce_map_input_file获取。
2)python中该参数被简化,mapreduce_map_input_file写为map_input_file

下面是一个参考的demo:

import sys
import os
import io

# 强制stdout以UTF-8编码输出(python3默认,但最终编码方式受限系统环境)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

# 当前数据块的全路径
path = os.environ['map_input_file']

# path是block的全路径,所以判断全路径包含哪个表名即可
if 'tb1_name' in path:
    for line in sys.stdin.buffer:
        decoded_line = line.decode('utf-8').strip('\n')
        items = decoded_line.split('\t')
        key = items[0]
        value = items[1:]
        print('\t'.join([key, 'tb1_name', value]))  # reduce中通过第二个标志位标识数据来源于哪个表
elif 'tb2_name' in path:
    for line in sys.stdin.buffer:
        decoded_line = line.decode('gb18030').strip('\n')
        items = decoded_line.split('\t')
        key = items[0]
        value = items[1:]
        print('\t'.join([key, 'tb2_name', value]))
else:
    for line in sys.stdin.buffer:
        decoded_line = line.decode('utf-8').strip('\n')
        items = decoded_line.split('\t')
        key = items[0]
        value = items[1:]
        print('\t'.join([key, 'other', value]))

reduce处理:

import sys


def process(datas):
    pass


if __name__ == '__main__':
    key = ''
    datas = []
    for line in sys.stdin:
        items = line.strip('\n').split('\t')
        if key == '' or items[0] == key:  # 相同bid
            key = items[0]
            datas.append(items)
        else:
            process(datas)  # 处理上一批bid
            datas.clear()  # 清空
            key = items[0]
            datas.append(items)  # 添加当前记录

    if len(datas) > 0:  # 处理最后一批bid
        process(datas)

4.作业提交主要参数解释

1)输入输出

-input:输入文件路径,可以有多个。
-output:输出文件路径。
-inputformat:负责定义如何读取输入数据,并决定如何将数据分割成多个块,每个块由一个map任务处理。默认org.apache.hadoop.mapred.TextInputFormat普通文本读取。

2)资源分发

-file:将本地文件分发到hadoop集群的所有MR任务的当前工作目录(task级别),使任务脚本能够访问这些文件。当前工作目录表明每启动一个map或reduce任务都会拷贝一份副本。常用来分发map和reduce的任务脚本以及一些资源文件,如模型权重文件。模型资源文件和任务脚本处于同一个目录,任务脚本中可以直接./xxx相对路径读取资源文件。
-cacheArchive:将归档文件分发到所有任务节点(worker级别)并自动解压,节点上的MR任务启动时不会拷贝副本到自己的工作目录,会通过符号链接共享同一个节点上的文件。适合用来分发一些比较大的文件,如python环境包。

eg:-cacheArchive $HADOOP_PYTHON3#python3,如果hdfs上python包解压缩后的结构为python3/bin/python3,则在mapper和reducer参数中指定python环境时-mapper ./python3/python3/bin/python3 mapper.py,关于python解释器层级路径的解释:#python3,会在当前task工作目录中创建一个名为python3的文件夹,并将python环境的tar包拉到这个目录下解压。

3) 作业启动管理

-mapper: 指定mapper脚本。
-reducer: 指定reducer脚本。
-jobconf mapred.map.tasks:map任务数量,不严格依赖指定,跟输入数据的分块有关。
-jobconf mapred.reduce.tasks:reduce任务数量,等于任务最终输出包含的文件个数。
-jobconf hadoop.job.ugi:提交作业用户的身份,主要影响作业对HDFS或YARN的访问权限。
-jobconf mapred.job.queue.name:资源队列。
-jobconf mapred.job.priority:任务优先级。

4)分区规则

-partitioner:分区规则,org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner基于指定序号的字段为分区键。
-jobconf stream.num.map.output.key.fields:指定map输出的key,如-jobconf stream.num.map.output.key.fields=1指定按照map输出的第一个字段作为key。
-jobconf num.key.fields.for.partition:决定如何分区,默认按照完整的key,即上面参数配置。如果上面配置为符合key,该参数可以指定按照key中的部分字段分区。也就是说,如果配置了这个参数,指定的字段决定了map输出送往哪个reduce,而上面配置的key决定了送到同一个reduce的数据的先后顺序。(map在shuffle数据到reduce之前会对数据按照key排序)

5)压缩

-jobconf mapred.output.compress:输出结果是否开启压缩,true、false。
-jobconf mapred.output.compression.codec:指定压缩编解码器,Gzip压缩org.apache.hadoop.io.compress.GzipCodec
-jobconf mapred.output.compression.type:压缩类型,=BLOCK块压缩,适合列式存储,=RECORD记录压缩,适合行式存储。

5.仅map作业

https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Specifying_Map-Only_Jobs
在hadoop3版本中指定参数-D mapreduce.job.reduces=0,hadoop2版本中指定-jobconf mapred.reduce.tasks=0

-reducer参数不可省,即使不需要reduce任务,否则实际测试中会报错如下。可以随意指定,例如-reducer cat

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2322556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity-RectTransform设置UI width

不知道有没人需要这样的代码,就是.sizeDelta //不确定是不是英文翻译的原因,基本很难理解,sizeDeltaSize,//未必完全正确,但这么写好像总没错过 //image 在一个UnityEngine.UI.Image 的数组内foreach (var image in l…

【现代深度学习技术】现代卷积神经网络04:含并行连接的网络(GoogLeNet)

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上,结合当代大数据和大算力的发展而发展出来的。深度学习最重…

链表-LeetCode

这里写目录标题 1 排序链表1.1 插入法 O(n)1.2 归并排序 1 排序链表 1.1 插入法 O(n) /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullpt…

【STL】vector介绍(附部分接口模拟实现)

文章目录 1.介绍2.使用2.1 vector的构造2.2 vector空间相关接口2.2.1 size()2.2.2 capacity()2.2.3 empty()2.2.4 resize()2.2.5 reserve() 2.3 vector的增删查改2.3.1 push_back()2.3.2 insert()2.3.3 pop_back()2.3.4 erase()2.3.5 swap()2.3.6 operator[]注:关于…

一周掌握Flutter开发--8. 调试与性能优化(上)

文章目录 8. 调试与性能优化核心技能8.1 使用 Flutter DevTools 分析性能8.2 检查 Widget 重绘(debugPaintSizeEnabled)8.3 解决 ListView 卡顿(ListView.builder itemExtent) 其他性能优化技巧8.4 减少 build 方法的调用8.5 使用…

游戏引擎学习第182天

回顾和今天的计划 昨天的进展令人惊喜,原本的调试系统已经被一个新的系统完全替换,新系统不仅能完成原有的所有功能,还能捕获完整的调试信息,包括时间戳等关键数据。这次的替换非常顺利,效果很好。 今天的重点是在此基…

C语言_数据结构_二叉树

【本节目标】 树的概念及结构 二叉树的概念及结构 二叉树的顺序结构及实现 二叉树的链式结构及实现 1. 树的概念及结构 1.1 树的概念 树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。把它叫做树是因为…

Compare全目录文件比较内容(项目中用到过)

第一步:找到“会话”——“会话设置” 会话设置弹框信息 第二步:选择“比较”tab标签 比较内容:选中二进制比较 第三步:选中所有文件 第四步:右键选中“比较内容” 第五步:选中“基于规则的比较”

3.26[a]paracompute homework

5555 负载不平衡指多个线程的计算量差异显著,导致部分线程空转或等待,降低并行效率。其核心矛盾在于任务划分的静态性与计算动态性不匹配,尤其在处理不规则数据或动态任务时尤为突出。以稀疏矩阵的向量乘法为例,假设其非零元素分…

视觉大模型CLIP论文精读

论文:Learning Transferable Visual Models From Natural Language Supervision 代码:https://github.com/openai/CLIP 摘要 最先进的计算机视觉系统是针对预测一组固定的、预先确定的对象类别进行训练的。这种受限的监督形式限制了它们的通用性和可用…

链表的创建:头插法与尾插法详解(数据结构)

C 链表的创建:头插法与尾插法详解 链表(Linked List)是一种重要的数据结构,适用于插入和删除操作频繁的场景。本文介绍 两种常见的链表构建方法: 尾插法(Append / Tail Insertion):…

深入解析 Java 类加载机制及双亲委派模型

🔍 Java的类加载机制是确保应用程序正确运行的基础,特别是双亲委派模型,它通过父类加载器逐层加载类,避免冲突和重复加载。但在某些特殊场景下,破坏双亲委派模型会带来意想不到的效果。本文将深入解析Java类加载机制、…

MySQL数据库精研之旅第四期:解锁库操作高阶技能

专栏:MySQL数据库成长记 个人主页:手握风云 目录 一、查看所有表 1.1. 语法 二、创建表 2.1. 语法 2.2. 示例 2.3. 表在磁盘上对应的⽂件 三、查看表结构 3.1. 语法 3.2. 示例 四、修改表 4.1. 语法 4.2. 示例 五、删除表 5.1. 语法 5.2.…

【DevOps】DevOps and CI/CD Pipelines

DevOps 是一种将开发与运维实践相结合的模式,旨在缩短软件开发周期并交付高质量软件。 DevOps 是什么? 开发团队与运维团队之间的协作 • 持续集成与持续交付(CI/CD) • 流程自动化 • 基础设施即代码(IaC)…

VS自定义静态库并在其他项目中使用

1、VS创建一个空项目或者静态库项目 2、右键项目 属性 修改生成文件类型 3、生成解决方案 4、复制.h文件和.lib文件作为静态库 5、创建一个新项目 测试使用新生成的静态库 在新项目UseStaticLib中加一个新文件夹lib,lib中放入上面的.h和.lib文件。 6、vs中右…

力扣32.最长有效括号(栈)

32. 最长有效括号 - 力扣&#xff08;LeetCode&#xff09; 代码区&#xff1a; #include<stack> #include<string> /*最长有效*/ class Solution { public:int longestValidParentheses(string s) {stack<int> st;int ans0;int ns.length();st.push(-1);fo…

vue3 项目中预览 word(.docx)文档方法

vue3 项目中预览 word&#xff08;.docx&#xff09;文档方法 通过 vue-office/docx 插件预览 docx 文档通过 vue-office/excel 插件预览 excel 文档通过 vue-office/pdf 插件预览 pdf 文档 安装插件 npm install vue-office/docx vue-demi示例代码 <template><Vu…

DHCP(Dynamic Host Configuration Protocol)原理深度解析

目录 一、DHCP 核心功能 二、DHCP 工作流程&#xff08;四阶段&#xff09; 三、关键技术机制 1. 中继代理&#xff08;Relay Agent&#xff09; 2. Option 82&#xff08;中继信息选项&#xff09; 3. 租期管理 4. 冲突检测 四、DHCP 与网络架构交互 1. MLAG 环境 2.…

创建login.api.js步骤和方法

依次创建 login.api.js、home.api.js...... login.api.js、home.api.js 差不多 导入到 main.js main.js 项目中使用

基于springboot二手交易平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 人类现已迈入二十一世纪&#xff0c;科学技术日新月异&#xff0c;经济、资讯等各方面都有了非常大的进步&#xff0c;尤其是资讯与网络技术的飞速发展&#xff0c;对政治、经济、军事、文化等各方面都有了极大的影响。 利用电脑网络的这些便利&#xff0c;发展一套二手交…