13.hadoop系列之MapReduce排序实践

news2025/1/11 11:04:11

本文我们学习MapReduce的全排序、二次排序以及区内排序

1.MapReduce概述

  • MapTask和ReduceTask均会对数据按照key进行排序。该操作属于hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要
  • 默认排序是按照字典顺序排序,通过快速排序实现
  • 对于MapTask,它会将处理结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后(默认80%),对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完后,对磁盘上的所有文件进行归并排序
  • 对于ReduceTask,会从每个MapTask上远程拷贝相应数据文件,如果文件大小超过阈值,则溢写磁盘上,否则存储到内存中;如果磁盘上文件数据达到阈值,则进行归并排序生成更大文件,如果内存中文件大小或数据超过阈值,则合并后溢写到磁盘
  • 当所有数据拷贝完后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

2.全排序

最终输出结果只有一个,且文件内部有序。

// 我们主要实现WritableComparable接口重写compareTo方法即可
public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow; // 上行流量
    private long downFlow; // 下行流量
    private long totalFlow; // 总流量

    public FlowBean() {}

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", totalFlow=" + totalFlow +
                '}';
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getTotalFlow() {
        return totalFlow;
    }

    public void setTotalFlow() {
        this.totalFlow = this.upFlow + this.downFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        // 按照总流量倒序排序
        if (this.totalFlow > o.totalFlow) {
            return -1;
        }
        if (this.totalFlow < o.totalFlow) {
            return 1;
        }
        return 0;
    }
}
// 注意我们将FlowBean作为键用于排序
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    private FlowBean keyOut = new FlowBean();
    private Text valueOut = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split(" ");
        String phone = split[1];
        String up = split[3];
        String down = split[4];

        valueOut.set(phone);
        keyOut.setUpFlow(Long.parseLong(up));
        keyOut.setDownFlow(Long.parseLong(down));
        keyOut.setTotalFlow();

        context.write(keyOut, valueOut);
    }
}
public class FlowReduce extends Reducer<FlowBean, Text, Text, FlowBean> {

    private FlowBean valueOut = new FlowBean();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value, key);
        }
    }
}
public class FlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "flowSort");
        job.setJarByClass(FlowDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

观察输出结果,可以看到已按结果降序输出
在这里插入图片描述

3.二次排序

在自定义排序过程中,如果compareTo中判断条件为两个即为二次排序

public int compareTo(FlowBean o) {
        // 按照总流量倒序排序
        if (this.totalFlow > o.totalFlow) {
            return -1;
        }
        if (this.totalFlow < o.totalFlow) {
            return 1;
        }
        // 按照下行流量倒序排序
        if (this.downFlow > o.downFlow) {
            return -1;
        }
        if (this.downFlow < o.downFlow) {
            return 1;
        }
        return 0;
}

在这里插入图片描述

4.区内排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序

// 注意FlowBean导入为排序的呢个类,最好新建包
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        // Text是手机号
        String phone = text.toString().substring(0, 3);
        // 注意分区号需要连续,从0开始分区
        int partition;
        if ("136".equals(phone)) {
            partition = 0;
        } else if ("137".equals(phone)) {
            partition = 1;
        } else if ("138".equals(phone)) {
            partition = 2;
        } else if ("139".equals(phone)) {
            partition = 3;
        } else {
            partition = 4;
        }
        return partition;
    }
}
public class FlowPartitionerDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "flowPartitionerSort");
        job.setJarByClass(FlowPartitionerDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 关联自定义分区类
        job.setPartitionerClass(ProvincePartitioner.class);
        // 设置ReduceTask任务数,保持与分区数一致
        job.setNumReduceTasks(5);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

在这里插入图片描述
欢迎关注公众号算法小生与我沟通交流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/351937.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【蓝桥杯嵌入式】第十三届蓝桥杯嵌入式国赛程序设计试题以及详细题解

文章目录原题展示原题分析详细题解LED模块按键模块串口LCD模块模拟电压读取(ADC)脉冲输入输出文章福利原题展示 原题分析 本届国赛试题主要包含LCD、LED、按键、EEPROM、串口、模拟电压输入、脉冲输入输出七大部分&#xff0c;其中前面三个部分是蓝桥杯嵌入式的“亲儿子”(必考…

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka1 安装 Kafka2 创建 Topic3 Java 创建 Topic4 SpringBoot 项目4.1 pom.xml4.2 application.yml4.3 KafkaApplication.java4.4 CustomizePartitioner.java4.5 KafkaInitialConfig.java4.6 SendMessageController.java5 测试1 安装 Kafka Docker 安装 Kafk…

数据库原理及应用期末复习汇总(附某高校期末真题试卷)

文章目录《数据库原理及应用》试题1一、选择题&#xff08;共35分&#xff09;二、填空&#xff08;每空1分&#xff0c;共20分&#xff09;三、T-SQL综合题(共35分)四、综合应用题(共10分)《数据库原理及应用》试题2一、选择题&#xff08;共35分&#xff09;二、填空&#xf…

handler解析(5)常见面试题

目录 1.请大致讲下handler的工作原理 2.handler.postDelay原理 3.一个线程有几个Looper?几个Handler&#xff1f; 4. Handler内存泄漏原因&#xff1f;以及解决方案 5.为何主线程可以new Handler如果想要在子线程中new Handler要做些什么准备&#xff1f; 6.消息退出是调…

R语言广义可加模型在空气环境污染方面的应用(1)

粉丝私信我希望复制一篇文章的图片&#xff0c;图片来源于文章&#xff1a;Wu C, Yan Y, Chen X, Gong J, Guo Y, Zhao Y, Yang N, Dai J, Zhang F, Xiang H. Short-term exposure to ambient air pollution and type 2 diabetes mortality: A population-based time series st…

中频采样和IQ采样的比较和转换

一、什么是中频采样&#xff0c;什么是IQ采样 射频接收系统通常使用数字信号处理算法进行信号解调和分析&#xff0c;因此需要使用ADC对信号进行采样。根据采样频率的不同&#xff0c;可以分为射频直接采样、中频采样、IQ采样。射频采样和中频采样只需要一路ADC&#xff0c;采…

搜索引擎ES相关问题

一、什么是倒排索引&#xff1f;有什么好处&#xff1f;索引&#xff1a; 从ID到内容。倒排索引&#xff1a; 从内容到ID。好处&#xff1a; 比较适合做关键字检索。 可以控制数据的总量。提高查询效率。搜索引擎为什么比MySQL查询快&#xff1f; lucence文章 -》 term ->排…

element-ui中el-table点击其他自定义按钮展开table中某一行

element-ui中el-table点击其他自定义按钮展开table中某一行 在日常开发中&#xff0c;我们遇见了会有点击某些按钮&#xff0c;使得表格行展开的需求&#xff0c;这时候去查看文档 element-ui&#xff08;table&#xff09; 这里官方提供了示例为在行最左侧有一个展开合并ico…

JAVA开发测试(jmeter如何测试性能与估算)

对C的业务网站或应用&#xff0c;进行性能测试来评估使用服务器情况是必不可少的一项工作。 一、测试工具&#xff1a; Apache JMeter 可以用于对服务器、网络或对象模拟巨大的负载&#xff0c;来自不同压力类别下测试它们的强度和分析整体性能&#xff0c;是Apache组织开发的…

CCF-CSP真题《202212-1 现值计算》思路+python满分题解

想查看其他题的真题及题解的同学可以前往查看&#xff1a;CCF-CSP真题附题解大全 试题编号&#xff1a;202212-1试题名称&#xff1a;现值计算时间限制&#xff1a;1.0s内存限制&#xff1a;512.0MB问题描述&#xff1a; 问题描述 评估一个长期项目的投资收益&#xff0c;资金的…

中点BH算法对任意斜率的直线扫描转换方法

作者&#xff1a;非妃是公主 专栏&#xff1a;《计算机图形学》 博客地址&#xff1a;https://blog.csdn.net/myf_666 个性签&#xff1a;顺境不惰&#xff0c;逆境不馁&#xff0c;以心制境&#xff0c;万事可成。——曾国藩 文章目录专栏推荐专栏系列文章序一、算法原理二、…

六“元”数智增长模型,企业元宇宙时代的经营新范式

摘要&#xff1a;在中国传统哲学里&#xff0c;“元”表示最基本的、最根本的东西;在企业管理经营中&#xff0c;将“元”解释为企业的核心竞争力或者基础能力;元宇宙下&#xff0c;“元”就代表数智化下的新场景&#xff0c;来支撑企业的各种业务创新。 一、元宇宙下的“元” …

分享IDEA通过插件 【一键自动生成】 在线api接口文档

开发写代码已经很辛苦&#xff0c;相信每个开发人员都不想写接口文档&#xff0c;但是不写又不行。尤其现在开发的项目偏向于前后端分离&#xff0c;在没有接口的情况下&#xff0c;前后端很难对接联调&#xff0c;测试也无法很好的测试。现在IDEA的插件仓库里有款插件&#xf…

qt 内存泄漏处理办法

windows 版本windows msvc版本可以使用vld检测可以得到内存泄漏点的调用堆栈&#xff0c;如果可以的话&#xff0c;还可以得到其所在文件及行号&#xff1b;可以得到泄露内存的完整数据&#xff1b;可以设置内存泄露报告的级别。缺点&#xff1a;1.只针对 Visual C &#xff08…

VUE -- defineExpose

defineExpose定义demo定义 defineExpose定义&#xff1a;用于组件通信中父级组件调用操作子组建方法和响应式属性参数能力 在使用definExpose前需要了解两个拷贝对象函数 对象copy&#xff1a;shallowReactive 与 数据 copy&#xff1a;shallowRef 这两个都是vue包里面的 简…

图片文字识别OCR调研-中文

直接看效果对比 tesseract-ocr 该识别引擎最新版本tesseract4添加了支持神经网络&#xff08;LSTM&#xff09;的&#xff0c;该引擎专注于线条识别&#xff0c; 同时也保留了Tesseract OCR 引擎&#xff0c;该引擎通过识别字符模式来工作。 我们需求端的后台语言是go&#x…

时尚高级实用,零跑C01满足各种用车需求

零跑C01在新能源车市场上销量可观且口碑较好&#xff0c;为什么消费者会相中这个国产车全域自主研发的新能源车呢&#xff1f;下面的介绍会给出答案。就其外观而言&#xff0c;零跑C01的外观定位于中大型轿车&#xff0c;在外观设计上充分考虑到美学观念。零跑给出了七个车身颜…

扬帆优配|日均客运量恢复,民航业加速复苏,外资买入2股超亿元

春运民航客运量康复至疫情前七成。 2月16日&#xff0c;民航局举行2月例行新闻发布会。会上介绍&#xff0c;自1月7日至2月15日&#xff0c;春运40天&#xff0c;民航运送旅客5523万人次&#xff0c;日均客运量138万人次&#xff0c;同比去年春运添加39%&#xff0c;康复至2019…

Lesson5.1---Python 之 NumPy 简介和创建数组

一、NumPy 简介 NumPy&#xff08;Numerical Python&#xff09;是 Python 的一种开源的数值计算扩展。这种工具可用来存储和处理大型矩阵&#xff0c;比 Python 自身的嵌套列表&#xff08;nested list structure&#xff09;结构要高效的多&#xff08;该结构也可以用来表示…

【贰】嵌入式系统的分类

随手拍拍&#x1f481;‍♂️&#x1f4f7; 日期: 2022.08.31 地点: 杭州 介绍: 2022.08.31下午一点&#xff0c;在闷热的学校里实在是待不下去了&#xff0c;跑到了门口的钱塘江边散了一会儿步&#x1f6b6;正值盛夏&#xff0c;八月即将完结&#xff0c;日子越过越快&#x1…