【HadoopShuffle原理剖析】基础篇二

news2024/11/27 3:55:54

Shuffle原理剖析

在这里插入图片描述

Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。

Shuffle过程

  • Map端的Shuffle

    Map的输出结果首先被缓存到内存,当缓存区容量到达80%(缓冲区默认100MB),就启动溢写操作。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序和合并(combine),之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自己处理的数据。

  • 在Reduce端的Shuffle过程

    Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行合并排序后交给Reduce处理

作用
  • 保证每一个Reduce任务处理的数据大致是一致的

  • Map任务输出的key相同,一定是相同分区,并且肯定是相同的Reduce处理的,保证计算结果的准确性

  • Reduce任务的数量决定了分区的数量,Reduce任务越多计算处理的并行度也就越高

    Reduce任务的数量(默认为1)可以通过:job.setNumReduceTasks(数量)

特点
  • Map端溢写时,key相同的一定是在相同的分区
  • Map端溢写时,排序减少了Reduce的全局排序的复杂度
  • Map端溢写是,合并(combiner【可选】)减少溢写文件的体积,提高了Reduce任务在Fetch数据时的效率,它是一种MapReduce优化策略
  • Reduce端计算或者输出时,它的数据都是有序的
Shuffle源码追踪
  • MapTask

    在这里插入图片描述

  • ReduceTask

    (略)

    建议阅读

数据清洗

数据清洗指将原始数据处理成有价值的数据的过程,就称为数据清洗。

企业大数据开发的基本流程:

  1. 采集数据(flume、logstash)先保存到MQ(Kafka)中
  2. 将MQ中的暂存数据存放到HDFS中保存
  3. 数据清洗(低价值密度的数据处理)存放到HDFS
  4. 算法干预(MapReduce),计算结果保存到HDFS或者HBase
  5. 计算结果的可视化展示(Echarts、HCharts)
需求

现有某系统某天的Nginx的访问日志,格式如下:

27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90

大数据处理的算法,需要参数客户端的ip地址、请求时间、资源、响应状态码

正则表达式提取数据

Regex Expression主要作用字符串匹配抽取和替换

语法
规则解释
.匹配任意字符
\d匹配任意数字
\D匹配任意非数字
\w配置a-z和A-Z
\W匹配非a-z和A-Z
\s匹配空白符
^匹配字符串的开头
$匹配字符串的末尾
规则的匹配次数
语法解释
*规则匹配0到N次
规则匹配1次
{n}规则匹配N次
{n,m}规则匹配n到m次
+规则匹配1到N次(至少一次)
应用
# 匹配手机号码 11位数值构成
\d{11}

# 邮箱地址校验  @
.+@.+

使用正则表达式提取Nginx访问日志中的四项指标

测试站点:http://regex101.com

分析后得到需要的正则表达式

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"\w*\s(.*)\sHTTP\/1.1"\s(\d{3}).*$
使用MapReduce分布式并行计算框架进行数据清洗

注意: 因为数据清洗不涉及统计计算,所以MapReduce程序通常只有map任务,而没有Reduce任务

job.setNumReduceTasks(0)

实现代码

数据清洗的Mapper

package com.baizhi.dataclean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    /**
     * @param key
     * @param value   nginx访问日志中的一行记录(原始数据)
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String regex = "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*\\[(.*)\\]\\s\"\\w*\\s(.*)\\sHTTP\\/1.1\"\\s(\\d{3}).*$";
        String line = value.toString();

        final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
        final Matcher matcher = pattern.matcher(line);

        while (matcher.find()) {
            // 四项关键指标  ip 请求时间 请求资源 响应状态码
            String clientIp = matcher.group(1);
            // yyyy-MM-dd HH:mm:ss
            String accessTime = matcher.group(2);
            String accessResource = matcher.group(3);
            String status = matcher.group(4);

            // 30/May/2013:17:38:21 +0800
            // 30/05/2013:17:38:21
            SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
            try {
                Date date = sdf.parse(accessTime);
                SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String finalDate = sdf2.format(date);
                context.write(new Text(clientIp + " " + finalDate + " " + accessResource + " " + status), null);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }
}

初始化类

package com.baizhi.dataclean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class DataCleanApplication {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration(), "data clean");
        job.setJarByClass(DataCleanApplication.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.setInputPaths(job,new Path("file:///E:/access.log"));
        TextOutputFormat.setOutputPath(job,new Path("file:///E:/final"));

        job.setMapperClass(DataCleanMapper.class);

        // 注意:数据清洗通常只有map任务而没有reduce
        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.waitForCompletion(true);
    }
}

数据倾斜

数据分区默认策略

数据倾斜指大量的key相同的数据交由一个reduce任务统计计算,造成”闲的闲死,忙的忙死“这样的现象。不符合分布式并行计算的设计初衷的。

现象
  • 某一个reduce运行特别耗时
  • Reduce任务内存突然溢出
解决方案
  • 增大Reduce任务机器JVM的内存(硬件的水平扩展)
  • 增加Reduce任务的数量,每个Reduce任务只负责极少部分的数据处理,并且Reduce任务的数量增加提高了数据计算的并行度

Reduce任务的正确数量: 0.95或者1.75 * (NodeManage数量 * 每个节点最大容器数量)

  • 自定义分区规则Partitioner
package com.baizhi.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;


/**
 * 自定义分区规则
 */
public class CustomPartitioner extends Partitioner<Text, LongWritable> {

    /**
     * @param key
     * @param value
     * @param i     numReduceTasks
     * @return 分区序号
     */
    public int getPartition(Text key, LongWritable value, int i) {
        if (key.toString().equals("CN-GD")) return 0;
        else if (key.toString().equals("CN-GX")) return 1;
        else if (key.toString().equals("CN-HK")) return 2;
        else if (key.toString().equals("JP-TY")) return 3;
        else return 4;
    }
}
  • 合适使用Combiner,将key相同的value进行整合合并

在combiner合并时,v必须得能支持迭代计算,并且不能够影响Reduce任务的输入

combiner通常就是Reducer任务

// 优化策略:combiner合并操作
job.setCombinerClass(MyReducer.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1961883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[论文笔记]思维链提示的升级版——回退提示

引言 今天又带来一篇提示策略的论文笔记&#xff1a;TAKE A STEP BACK: EVOKING REASONING VIA ABSTRACTION IN LARGE LANGUAGE MODELS。 作者提出了回退提示(STEP-BACK PROMPTING)技术&#xff0c;使大模型能够进行抽象&#xff0c;从包含具体细节的实例中推导出高层次的概念…

centos7 docker空间不足

今天在使用docker安装镜像的时候&#xff0c;出现报错 查看原因&#xff0c;发现是分区空间不足导致的 所以考虑进行扩容 首先在vmware扩容并没有生效 因为只是扩展的虚拟空间&#xff0c;并不支持扩展分区大小&#xff0c;下面对分区进行扩容 参考&#xff1a; 分区扩容 主…

【echarts】echarts-liquidfill 水球图

echarts-liquidfill3兼容echarts5 echarts-liquidfill2兼容echarts4 npm install echarts npm install echarts-liquidfill设置水球图背景色和内边框样式 var option {series: [{type: liquidFill,data: [0.6, 0.5, 0.4, 0.3],backgroundStyle: {borderWidth: 5,//边框宽度bo…

怎样看待AI就业冲击?

技术进步对于就业的影响&#xff0c;从工业革命开始就是社会的焦点和研究的关注点。具有“卢德主义”性质的运动和思潮&#xff0c;曾经以各种面貌反复出现。不过&#xff0c;无论是从原因穷究结果&#xff0c;还是从本质看到表象&#xff0c;AI就业冲击这一次来得真的不同以往…

申请美区 Apple ID 完整步骤图解,轻松免费创建账户

苹果手机在下载一些软件时需要我们登录其 Apple ID 才能下载&#xff0c;但是由于一些限制国内的 Apple ID 在 App Store 中有一些限制不能下载某些软件&#xff0c;如何解决这个问题&#xff1f;那就是申请一个美区 Apple ID&#xff0c;怎么申请国外苹果账户呢&#xff1f;下…

国家超算互联网平台:模型服务体验与本地部署推理实践

目录 前言一、平台显卡选用1、显卡选择2、镜像选择3、实例列表4、登录服务器 二、平台模型服务【Stable Diffusion WebUI】体验1、模型运行2、端口映射配置3、体验测试 三、本地模型【Qwen1.5-7B-Chat】推理体验1、安装依赖2、加载模型3、定义提示消息4、获取model_inputs5、生…

typescript中interface常见3种用法

文章目录 函数类型对象类型【自命名】&#xff1a; (函数)对象类型 函数类型 作用&#xff1a;声明一个函数接口&#xff1a;可用于类型声明 | 不可implements 对象类型 作用&#xff1a;声明对象具备哪些实例接口&#xff1a;可用于类型 | 可implements 【自命名】&…

【C#】ThreadPool的使用

1.Thread的使用 Thread的使用参考&#xff1a;【C#】Thread的使用 2.ThreadPool的使用 .NET Framework 和 .NET Core 提供了 System.Threading.ThreadPool 类来帮助开发者以一种高效的方式管理线程。ThreadPool 是一个线程池&#xff0c;它能够根据需要动态地分配和回收线程…

DATE_ADD、DATE_SUB Function - Mysql

DATE_ADD、DATE_SUB Function - SQL DATE_ADD() 和 DATE_SUB() 用于在日期或日期时间上增加或减少指定的时间间隔。 1. DATE_ADD() DATE_ADD() 函数用于向指定的日期或日期时间值添加一个时间间隔。 DATE_ADD(date, INTERVAL expr unit)date: 要添加时间间隔的日期或日期时间…

【Lampiao靶场渗透】

文章目录 一、IP地址获取 二、信息收集 三、破解SSH密码 四、漏洞利用 五、提权 一、IP地址获取 netdiscover -i eth0 Arp-scan -l Nmap -sP 192.168.78.0/24 靶机地址&#xff1a;192.168.78.177 Kali地址&#xff1a;192.168.78.128 二、信息收集 nmap -sV -p- 192.…

实战:ElasticSearch 索引操作命令(补充)

四.ElasticSearch 操作命令 4.1 集群信息操作命令 4.1.1 查询集群状态 &#xff08;1&#xff09;使用 Postman 客户端直接向 ES 服务器发 GET 请求 http://hlink1:9200/_cat/health?v &#xff08;2&#xff09;使用服务端进行查询 curl -XGET "hlink1:9200/_cat/h…

装饰大师——装饰模式(Python实现)

大家好&#xff0c;今天我们继续来讲结构型设计模式&#xff0c;上一期我们介绍了组合模式&#xff0c;这个模式特别适合用于处理树形结构的问题&#xff0c;它能够让我们像处理单个对象一样来处理对象组合。 装饰模式&#xff08;Decorator Pattern&#xff09;是一种结构型设…

最新彩虹自助下单代发卡码知识付费商城多模板系统完整版去授权源码V6.9

最新彩虹的知识付费商城源码&#xff0c;后台可以选择多套模板&#xff0c;完整版去授权,支持对接多个资源网站&#xff0c;不怕无资源 推荐用宝塔上传后直接访问即可根据提示安装。 后面用户名/密码&#xff1a;admin/123456 PHP推荐使用7.0及以上版本 V6.9 1.修复SQL注入…

k8s 部署RuoYi-Vue-Plus之ingress域名解析

可参看https://blog.csdn.net/weimeibuqieryu/article/details/140798925 搭建ingress 1.创建Ingress对象 ingress-ruoyi.yaml其中host替换为你对应域名&#xff0c;需要解析域名到服务器, 同时为后端服务添加了二级域名解析 api. 访问http://xxx.xyz/就能访问前端&#xff0…

应急靶场(11):【玄机】日志分析-apache日志分析

题目 提交当天访问次数最多的IP&#xff0c;即黑客IP黑客使用的浏览器指纹是什么&#xff0c;提交指纹的md5查看index.php页面被访问的次数&#xff0c;提交次数查看黑客IP访问了多少次&#xff0c;提交次数查看2023年8月03日8时这一个小时内有多少IP访问&#xff0c;提交次数 …

【Redis 初阶】Redis 常见数据类型(Set、Zset、渐进式遍历、数据库管理)

一、Set 集合 集合类型也是保存多个字符串类型的元素的&#xff08;可以使用 json 格式让 string 也能存储结构化数据&#xff09;&#xff0c;但和列表类型不同的是&#xff0c;集合中&#xff1a; 元素之间是无序的。&#xff08;此处的 “无序” 是和 list 的有序相对应的…

重载云台摄像机如何通过国标28181接入到统一视频接入平台(视频国标接入平台)

目录 一、国标GB/T 28181介绍 1、国标GB/T28181 2、内容和特点 二、重载云台摄像机 1、定义 2、结构与设计 3、功能和优势 4、特点 5、应用场景 二、接入准备工作 1、确定网络环境 &#xff08;1&#xff09;公网接入 &#xff08;2&#xff09;专网传输 2、检查重…

软件测试基础1--功能测试

1、什么是软件测试&#xff1f; 软件是控制计算机硬件运行的工具。 软件测试&#xff1a;使用技术手段验证软件是否满足使用需求&#xff0c;为了发现软件功能和需求不相符合的地方&#xff0c;或者寻找实际输出和预期输出之间的差异。 软件测试的目的&#xff1a;减少软件缺陷…

golang JSON序列化

JSON JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。 易于人阅读和编写。同时也易于机器解析和生成。 它基于JavaScript Programming Language, Standard ECMA-262 3rd Edition - December 1999的一个子集。 json历史 [外链图片转存失败,源站可能有防盗链机…