Hadoop中MapReduce过程中Shuffle过程实现自定义排序

news2024/12/28 20:51:41

文章目录

  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
    • 一、引言
    • 二、实现WritableComparable接口
      • 1、自定义Key类
    • 三、使用Job.setSortComparatorClass方法
      • 2、设置自定义排序器
      • 3、自定义排序器类
    • 四、使用示例
    • 五、总结

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

在这里插入图片描述

一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。

二、实现WritableComparable接口

1、自定义Key类

首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。

package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Employee implements WritableComparable<Employee> {
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    @Override
    public String toString(){
        return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";
    }

    @Override
    public int compareTo(Employee o) {
        // 多个列的排序:select * from emp order by deptno,sal;
        // 首先按照deptno排序
        if(this.deptno > o.getDeptno()){
            return 1;
        }else if(this.deptno < o.getDeptno()){
            return -1;
        }
        // 如果deptno相等,按照sal排序
        if(this.sal >= o.getSal()){
            return 1;
        }else{
            return -1;
        }
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }
}

三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。

package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CustomSort {
    public static class Map extends Mapper<Object, Text, Employee, IntWritable> {
        private static Employee emp = new Employee();
        private static IntWritable one = new IntWritable(1);

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");
            emp.setEmpno(Integer.parseInt(line[0]));
            emp.setEname(line[1]);
            emp.setJob(line[2]);
            emp.setMgr(Integer.parseInt(line[3]));
            emp.setHiredate(line[4]);
            emp.setSal(Integer.parseInt(line[5]));
            emp.setComm(Integer.parseInt(line[6]));
            emp.setDeptno(Integer.parseInt(line[7]));
            context.write(emp, one);
        }
    }

    public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {
        @Override
        protected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(key, val);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "CustomSort");
        job.setJarByClass(CustomSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Employee.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置自定义排序器
        job.setSortComparatorClass(EmployeeComparator.class);
        
        Path in = new Path("hdfs://localhost:9000/mr/in/customsort");
        Path out = new Path("hdfs://localhost:9000/mr/out/customsort");
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3、自定义排序器类

package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class EmployeeComparator extends WritableComparator {
    protected EmployeeComparator() {
        super(Employee.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        Employee e1 = (Employee) w1;
        Employee e2 = (Employee) w2;
        // 首先按照deptno排序
        int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());
        if (deptCompare != 0) {
            return deptCompare;
        }
        // 如果deptno相等,按照sal排序
        return Integer.compare(e1.getSal(), e2.getSal());
    }
}

四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator

五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。


版权声明:本博客内容为原创,转载请保留原文链接及作者信息。

参考文章

  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2267100.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

科技云报到:人工智能时代“三大件”:生成式AI、数据、云服务

科技云报到原创。 就像自行车、手表和缝纫机是工业时代的“三大件”。生成式AI、数据、云服务正在成为智能时代的“新三大件”。加之全球人工智能新基建加速建设&#xff0c;成为了人类社会数字化迁徙的助推剂&#xff0c;让新三大件之间的耦合越来越紧密。从物理世界到数字世…

Windows 11 中部署 Linux 项目

一、总体思路 在 Windows 11 中部署 Linux 项目&#xff0c;主要是借助 Windows Subsystem for Linux&#xff08;WSL&#xff09;来实现。在WSL中新建基于Linux的项目虚拟环境&#xff0c;以供WIN下已克隆的项目使用。WSL 允许在 Windows 系统上运行原生的 Linux 二进制可执行…

【ETCD】【实操篇(十五)】etcd集群成员管理:如何高效地添加、删除与更新节点

etcd 是一个高可用的分布式键值存储&#xff0c;广泛应用于存储服务发现、配置管理等场景。为了确保集群的稳定性和可扩展性&#xff0c;管理成员节点的添加、删除和更新变得尤为重要。本文将指导您如何在etcd集群中处理成员管理&#xff0c;帮助您高效地维护集群节点。 目录 …

数据结构与算法Python版 平衡二叉查找树AVL

文章目录 一、平衡二叉查找树二、AVL树测试三、AVL树-算法分析 一、平衡二叉查找树 平衡二叉查找树-AVL树的定义 AVL树&#xff1a;在key插入时一直保持平衡的二叉查找树。可以利用AVL树实现抽象数据类型映射Map。与二叉查找树相比&#xff0c;AVL树基本上与二叉查找树的实现…

【Redis】Redis 安装与启动

在实际工作中&#xff0c;大多数企业选择基于 Linux 服务器来部署项目。本文演示如何使用 MobaXterm 远程连接工具&#xff0c;在 CentOS 7 上安装和启动 Redis 服务&#xff08;三种启动方式&#xff0c;包括默认启动、指定配置启动和开机自启&#xff09;。在安装之前&#x…

通过Js动态控制Bootstrap模态框-弹窗效果

目的&#xff1a;实现弹出窗、仅关闭弹窗之后才能操作&#xff08;按ESC可退出&#xff09;。自适应宽度与高度、当文本内容太多时、添加滚动条效果。 效果图 源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8">…

el-table合并单元行后的多选框选中问题

问题描述 合并单元行以后&#xff0c;首列的多选框也会合并&#xff0c;此时选中该多选框其实是只选中了合并单元行的第一行的多选框&#xff0c;其他的都未被选中。 解决方案 原本想着手动去修改表头的半选状态和全选状态 &#xff0c;但是没有找到相关方法&#xff0c;后面觉…

电脑缺失libcurl.dll怎么解决?详解电脑libcurl.dll文件丢失问题

一、libcurl.dll文件丢失的原因 libcurl.dll是一个用于处理URL传输的库文件&#xff0c;广泛应用于各种基于网络的应用程序。当这个文件丢失时&#xff0c;可能会导致相关应用程序无法正常运行。以下是libcurl.dll文件丢失的一些常见原因&#xff1a; 软件安装或卸载不完整&a…

图文教程:使用PowerDesigner导出数据库表结构为Word/Html文档

1、第一种情况-无数据库表&#xff0c;但有数据模型 1.1 使用PowerDesigner已完成数据建模 您已经使用PowerDesigner完成数据库建模&#xff0c;如下图&#xff1a; 1.2 Report配置和导出 1、点击&#xff1a;Report->Reports&#xff0c;如下图&#xff1a; 2、点击&…

UE--如何用 Python 调用 C++ 及蓝图函数

前言 先讲下如何用 Python 调用 C 函数吧。 详细可见我的上篇文章 最关键的一点就是得在函数上加一个宏&#xff1a;UFUNCTION(BlueprintCallable) UFUNCTION(BlueprintCallable) static bool GetOrCreatePackage(const FString& PackagePath, UPackage*& OutPackag…

小程序租赁系统开发的优势与实践探索

内容概要 小程序租赁系统开发正在引起广泛关注&#xff0c;特别是在数字化快速发展的今天。很多企业开始意识到&#xff0c;小程序不仅能为他们带来更多的客户&#xff0c;还能极大地提高管理效率。借助小程序&#xff0c;用户在租赁时可以更加方便地浏览和选择产品&#xff0…

闲谭Scala(3)--使用IDEA开发Scala

1. 背景 广阔天地、大有作为的青年&#xff0c;怎么可能仅仅满足于命令行。 高端大气集成开发环境IDEA必须顶上&#xff0c;提高学习、工作效率。 开整。 2. 步骤 2.1 创建工程 打开IDEA&#xff0c;依次File-New-Project…&#xff0c;不好意思我的是中文版&#xff1a;…

富芮坤FR800X系列之PWM输出程序应用设计

文章目录 前言1.设计背景2.简介3.如何设计控制调光的接口呢4.硬件设计5.软件设计5.1.软件流程图5.2.软件代码 6.小结 前言 版权归作者所有、未经允许、请勿转载。 读者对象&#xff1a; 本文档主要适用以下工程师&#xff1a; 嵌入式系统工程师 单片机软件工程师 IOT固…

node-js Express防盗链

什么是防盗连 一个简单的说明&#xff0c;假如在前端img标签想要引用图片网站上的图片&#xff0c;当你将图片地址放到img标签上想要显示的时候你发现&#xff0c;图片显示不了&#xff0c;这说明网站采用了防盗链。 怎么实现的呢 在请求头中一般会有 Referer&#xff0c;它…

使用ArcGIS/ArcGIS pro绘制六边形/三角形/菱形渔网图

在做一些尺度分析时&#xff0c;经常会涉及到对研究区构建不同尺度的渔网进行分析&#xff0c;渔网的形状通常为规则四边形。构建渔网的方法也很简单&#xff0c;使用ArcGIS/ArcGIS Pro工具箱中的【创建渔网/CreateFishnet】工具来构建。但如果想构建其他形状渔网进行相关分析&…

RabbitMQ工作模式(详解 工作模式:简单队列、工作队列、公平分发以及消息应答和消息持久化)

文章目录 十.RabbitMQ10.1 简单队列实现10.2 Work 模式&#xff08;工作队列&#xff09;10.3 公平分发10.4 RabbitMQ 消息应答与消息持久化消息应答概念配置 消息持久化概念配置 十.RabbitMQ 10.1 简单队列实现 简单队列通常指的是一个基本的消息队列&#xff0c;它可以用于…

nexus docker安装

#nexus docker 安装 docker pull sonatype/nexus3 mkdir -p /data/nexus-data docker run -itd -p 8081:8081 --privilegedtrue --name nexus3 \ -v /data/nexus-data:/var/nexus-data --restartalways docker.io/sonatype/nexus3 #访问 http://192.168.31.109:8081/ 用户名&am…

ADC(二):外部触发

有关ADC的基础知识请参考标准库入门教程 ADC&#xff08;二&#xff09;&#xff1a;外部触发 1、TIM1的CC1事件触发ADC1DMA重装载2、TIM3的TRGO事件(的更新事件)触发ADC1DMA重装载3、TIM3的TRGO事件(的捕获事件)触发ADC1DMA重装载4、优化TIM3的TRGO事件(的捕获事件)触发ADC1D…

【产品应用】一体化无刷电机在旋转等离子喷枪中的应用

在现代工业制造与加工领域&#xff0c;等离子喷枪凭借其高温、高速的等离子射流&#xff0c;能够实现高效的材料表面处理、切割以及焊接等工艺&#xff0c;在众多行业中发挥着关键作用。而一体化无刷电机的应用&#xff0c;更是为等离子喷枪的性能提升和稳定运行注入了强大动力…

ElasticSearch - 深入解析 Elasticsearch Composite Aggregation 的分页与去重机制

文章目录 Pre概述什么是 composite aggregation&#xff1f;基本结构after 参数的作用问题背景&#xff1a;传统分页的重复问题after 的设计理念响应示例 after 如何确保数据不重复核心机制Example步骤 1: 创建测试数据创建索引插入测试数据 步骤 2: 查询第一页结果查询第一页返…