MapReduce排序

news2024/11/17 5:42:25

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类


1.部分排序 


MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序


2.全排序


最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。(比如wordcount最终将所有数据放到一个文件内再总的进行一次全排序)


3.二次排序


在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。(比如先按照总流量使用降序,如果两个用户总流量相同的情况下再根据上行流量进行降序排列...)。

自定义排序WritableComparable

全排序

需求

根据上次的【流量统计案例】的结果,要求结果按照总流量进行降序排列。(默认是按照key(手机号)进行排序)。

输入数据

13480253104	180	180	360
13502468823	7335	110349	117684
13560436666	1116	954	2070
13560439658	2034	5892	7926
13602846565	1938	2910	4848
13660577991	6960	690	7650
13719199419	240	0	240
13726230503	2481	24681	27162
13726238888	2481	24681	27162
13760778710	120	120	240
13826544101	264	0	264
13922314466	3008	3720	6728
13925057413	11058	48243	59301
13926251106	240	0	240
13926435656	132	1512	1644
15013685858	3659	3538	7197
15920133257	3156	2936	6092
15989002119	1938	180	2118
18211575961	1527	2106	3633
18320173382	9531	2412	11943
84138413	4116	1432	5548

思路

让FlowBean来作为key(因为hadoop框架中的key必须支持可排序,如果让手机号作为key,结果仍然是按照手机号的字典序进行排序)

实现

1.修改FlowBean继承Writablepmparable接口,泛型的类型为需要排序的类

public class FlowBean implements WritableComparable<FlowBean> {
    //...
}

2.重写compareTo方法

@Override
    public int compareTo(FlowBean o) {
        //按照总流量进行降序排列
        if (this.sumFlow > o.sumFlow){
            return -1;  //降序
        }else if (this.sumFlow < o.sumFlow){
            return 1;   //升序
        }else {
            return 0;
        }
    }

3.Mapper

package com.lyh.mapreduce.writableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text,FlowBean,Text> {

    private FlowBean outKey = new FlowBean();
    private Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1.获取一行数据
        String line = value.toString();

        //2.切割
        String[] words = StringUtils.split(line, '\t');

        //3.封装value
        outValue.set(words[0]);
        outKey.setUpFlow(Long.parseLong(words[1]));//对上一次的运行结果进行排序
        outKey.setDownFlow(Long.parseLong(words[2]));
        outKey.setSumFlow();

        //4.写出
        context.write(outKey,outValue);

    }
}

4.Reducer

public class FlowReducer extends Reducer<FlowBean, Text,Text,FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {

            context.write(value,key);

        }
    }
}

5.Runner类

主要修改MapOutputKeyClass 和 MapOutputValueClass。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FlowRunner extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new FlowRunner(),args);
    }

    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "flow compu");

        //2.配置jar包路径
        job.setJarByClass(FlowRunner.class);

        //3.关联mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //5.设置数据输入的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\writable\\output1"));

        //6.设置输出路径-输出目录不可存在
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\writable\\output2"));

        //7.提交job
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

运行结果

全部按照总流量(最后一列)排序

13502468823	7335	110349	117684
13925057413	11058	48243	59301
13726230503	2481	24681	27162
13726238888	2481	24681	27162
18320173382	9531	2412	11943
13560439658	2034	5892	7926
13660577991	6960	690	7650
15013685858	3659	3538	7197
13922314466	3008	3720	6728
15920133257	3156	2936	6092
84138413	4116	1432	5548
13602846565	1938	2910	4848
18211575961	1527	2106	3633
15989002119	1938	180	2118
13560436666	1116	954	2070
13926435656	132	1512	1644
13480253104	180	180	360
13826544101	264	0	264
13719199419	240	0	240
13760778710	120	120	240
13926251106	240	0	240

二次排序

我们发现,刚才的输出文件中有三位用户的总流量相同,我们希望如果用户总流量相同的情况下,我们再根据上行流量去降序排列。

13719199419	240	0	240
13760778710	120	120	240
13926251106	240	0	240

只需要再FlowBean中修改compareTo方法

@Override
    public int compareTo(FlowBean o) {
        //按照总流量进行降序排列
        if (this.sumFlow > o.sumFlow){
            return -1;  //降序
        }else if (this.sumFlow < o.sumFlow){
            return 1;   //升序
        }else {//如果总流量相同
            //按照上行流量降序排列
            if (this.upFlow > o.upFlow){
                return -1;
            }else if (this.upFlow < o.upFlow){
                return 1;
            }else {
                return 0;
            }
        }
    }

运行结果

...
13719199419	240	0	240
13926251106	240	0	240
13760778710	120	120	240

区内排序

需求:根据手机号(前三位数)进行分区,并要求每个分区内的

输入:

13502468823	7335	110349	117684
13925057413	11058	48243	59301
13726230503	2481	24681	27162
13726238888	2481	24681	27162
13820173382	9531	2412	11943
13560439658	2034	5892	7926
13760577991	6960	690	7650
13813685858	3659	3538	7197
13922314466	3008	3720	6728
13920133257	3156	2936	6092
84138413	4116	1432	5548
13502846565	1938	2910	4848
13911575961	1527	2106	3633
13789002119	1938	180	2118
13560436666	1116	954	2070
13926435656	132	1512	1644
13880253104	180	180	360
13826544101	264	0	264
13719199419	240	0	240
13926251106	240	0	240
13760778710	120	120	240

只需要重写一个分区类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<FlowBean,Text> {


    @Override
    public int getPartition(FlowBean flowBean, Text text, int partitioner) {

        //获取手机号
        String line = text.toString();
        String phone = line.substring(0,3);

        if (phone.equals("135")){
            partitioner = 0;
        }else if (phone.equals("137")){
            partitioner = 1;
        }else if (phone.equals("138")){
            partitioner = 2;
        }else if (phone.equals("139")){
            partitioner = 3;
        }else {
            partitioner = 0;
        }
        return partitioner;
    }
}

运行结果

生成4个分区,分别存储135、137、138、139开头的手机号。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/546829.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

25 SQL ——标量子查询

create table dept(id int primary key auto_increment,name varchar(15))comment 部门;insert into dept(id, name) values (1,研发部),(2,市场部),(3,财务部),(4,销售部),(5,总经办),(6,人事部);create table staff (id int primary key auto_increment commentID,name …

Java课设部署教程

这里我只演示使用IDEA软件或Eclipse两种常用的Java编译器的导入项目的教程&#xff01; IDEA部署教程 把下载的压缩包解压&#xff0c;解压后就是源码&#xff0c;打开IDEA&#xff0c;导入项目【源码】 选择源码所在的位置&#xff0c;点击ok即可导入 下面就是把源码导入到I…

(三)人工智能应用--深度学习原理与实战--神经网络的工作原理

机器学习是将输入(比如图像)映射到目标(比如标签“猫”)&#xff0c;并建立映射规则(即模型)。在深度学习中&#xff0c;神经网络通过一系列数据变换层来实现这种输入到目标的映射&#xff0c;本章节我们具体来看这种学习过程是如何实现的。 学习内容 1、理解层(Layer)及权重(…

【王道·计算机网络】第四章 网络层

一、 概述和功能 1.1 网络层功能 主要任务&#xff1a;把分组从源端传到目的端&#xff0c;为分组交换网上的不同主机提供通信服务传输单位&#xff1a;数据报功能&#xff1a; 路由选择与分组转发&#xff0c;即选择最佳路径异构网络互联(依靠路由器)拥塞控制(所有结点都来不…

基于PyQt5的图形化界面开发——自制ssh工具

基于PyQt5的图形化界面开发——自制ssh工具 0. 前言1. 第三方库的安装2. ssh原理3. 完整代码4. 演示效果5. 其他PyQt文章 0. 前言 本节我们使用PyQt5来制作一个简单的ssh小工具。 操作系统&#xff1a;Windows10 专业版 开发环境&#xff1a;Pycahrm Comunity 2022.3 Pytho…

设计模式中的UML基础

目录 1、UML概述 2、UML的用途 3、UML的构成 4、UML图 5、UML类图 5.1、类的构成 5.2、类与类之间的关系 6、绘制UML图的软件工具 VC常用功能开发汇总&#xff08;专栏文章列表&#xff0c;欢迎订阅&#xff0c;持续更新...&#xff09;https://blog.csdn.net/chenlycl…

java线程组

文章目录 1. 简介2. 线程对象关联线程组&#xff1a;一级关联3. 线程对象关联线程组&#xff1a;多级关联4. 自动归属属性5. 获取根线程组 1. 简介 为了方便某些具有相同功能的线程进行管理&#xff0c;我们可以把线程归属到某一个线程组。线程组中可以有线程对象、线程&#…

【三维激光扫描】实验03:点云着色渲染模式详解

在SiScan软件中,点云的着色模式有:高程彩色、倾斜度、反射强度、自有颜色、点云测站、分隔片、分类7中,本文进行详细讲解。 文章目录 一、高程彩色二、倾斜度三、反射强度四、自有颜色五、分隔片一、高程彩色 高程彩色模式是按点云数据的Z值起算,颜色渐变显示。 二、倾斜度…

【AIGC】10、Chinese CLIP | 专为中文图文匹配设计

文章目录 一、背景二、方法2.1 基础内容2.2 数据集2.3 预训练方法2.4 模型尺寸 三、效果 论文&#xff1a;Chinese CLIP: Contrastive Vision-Language Pretraining in Chinese 代码&#xff1a;https://github.com/OFA-Sys/Chinese-CLIP 出处&#xff1a;阿里达摩院 时间&a…

061:cesium设置棋盘图材质(material-5)

第061个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中设置棋盘材质,请参考源代码,了解CheckerboardMaterialProperty的应用。 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共89行)相关API参考:专栏目标…

第09章_子查询

第09章_子查询 子查询指一个查询语句嵌套在另一个查询语句内部的查询&#xff0c;这个特性从MySQL 4.1开始引入。 SQL 中子查询的使用大大增强了 SELECT 查询的能力&#xff0c;因为很多时候查询需要从结果集中获取数据&#xff0c;或者需要从同一个表中先计算得出一个数据结果…

SeaweedFs使用-环境准备

SeaweedFs使用-环境准备 1.下载go语言包实现go语言环境2.下载SeaweedFs文件3.安装SeaweedFs SeaweedFs是一款开源的分布式存储软件&#xff0c;在存储大量小文件方面有更好的优化&#xff0c;比较适合存储web项目的图片等文件。 1.下载go语言包实现go语言环境 因为SeaweedFs是…

实战TCP三次握手

开篇 在几乎所有的后端开发面试题中&#xff0c;TCP三次握手绝对是最被面试官青睐的题目之一。但是这个东西&#xff0c;平时开发中看不见&#xff0c;摸不着&#xff0c;对于很多人来说&#xff0c;是纯理论的知识&#xff0c;玄之又玄。但是为了应对面试&#xff0c;又不得不…

《JavaEE》网络编程TCP/IP五层协议万字详解

文章目录 TCP/IP五层协议栈应用层xml &#xff08;可读性比较好 但是运行效率不高&#xff09;json&#xff08;可读性好 但是运行效率不高&#xff09;prtobuffer(可读性不好 但是运行效率很高) 传输层UDP TCPTCP数据解读32位序号32位确认序号4位首部的长度保留位6位标志位字段…

C语言实现学生管理系统

学习完C语言之后&#xff0c;我们可以通过简单写一个学生管理系统来检验自己学的怎么样。很多计算机系大学生都会学到C语言&#xff0c;对于C语言课程的设计作业可能会感到困难&#xff0c;该篇博客的核心点就是带领读者单独完成学生管理系统&#xff0c;此篇博客附有整个学生管…

【C++ 入坑指南】(11)指针

文章目录 一、概念定义和使用二、空指针 & 野指针2.1 空指针2.2 野指针2.3 小结 三、const 修饰的指针四、指针 和 数组五、指针和函数六、实例 学习 C 的指针既简单又有趣。通过指针&#xff0c;可以简化一些 C 编程任务的执行&#xff0c;还有一些任务&#xff0c;如动态…

数据全生命周期管理

数据存储 时代"海纳百川&#xff0c;有容乃大"意味结构化、半结构和非结构化多样化的海量的 &#xff0c;也意味着批数据和流数据多种数据形式的存储和计算。面对不同数据结构、数据形式、时效性与性能要求和存储与计算成本等因素考虑&#xff0c;应该使用适合的存储…

组合预测模型 | ARIMA-CNN-LSTM时间序列预测(Python)

组合预测模型 | ARIMA-CNN-LSTM时间序列预测&#xff08;Python&#xff09; 目录 组合预测模型 | ARIMA-CNN-LSTM时间序列预测&#xff08;Python&#xff09;预测结果基本介绍程序设计参考资料 预测结果 基本介绍 ARIMA-CNN-LSTM是一种结合了传统时间序列模型和深度学习模型的…

chatgpt赋能Python-python3的下载

Python 3-您在编程路上不可或缺的伙伴 如果您正在寻找一种流行的编程语言&#xff0c;那么Python 3就是一个不错的选择。Python 3作为一种高级编程语言&#xff0c;可以轻松地创建各种应用程序和网站。它是最受欢迎的编程语言之一&#xff0c;就是因为它易于学习和使用。 Pyt…

本地部署 VisualGLM-6B

本地部署 VisualGLM-6B 1. 什么是 VisualGLM-6B2. Github 地址3. 安装 Miniconda34. 创建虚拟环境5. 安装 VisualGLM-6B6. 启动 VisualGLM-6B7. 访问 VisualGLM-6B8. API部署9. 命令行部署 1. 什么是 VisualGLM-6B VisualGLM-6B 是一个开源的&#xff0c;支持图像、中文和英文…