大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——shuffle机制

news2024/9/21 22:34:01

3.3.1Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

Untitled

3.3.2Partition分区

1、问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

2、默认Partitioner分区

public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

3、自定义Partitioner步骤

(1)自定义类继承Partitioner,重写getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        //控制分区代码逻辑
				...
        return partition;
    }
}

(2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(ProvincePartitioner.class);

(3)自定义Parttition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);

4、分区总结

(1)如果ReduceTask的数量>getPartition的结果数,则会多昌盛几个空的输出文件part-r000xx;

(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;

(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;

(4)分区号必须从零开始,逐一累加。

5、案例分析

例如:假设自定义分区数为5,则

(1)job.setNumReduceTasks(1);会正常运行,只产生一个输出文件

(2)job.setNumReduceTasks(2); 会报错

(3)job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

3.3.3Partition分区案例实操

1、需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1)输入数据

1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2	13846544121	192.196.100.2			264	0	200
3 	13956435636	192.196.100.3			132	1512	200
4 	13966251146	192.168.100.1			240	0	404
5 	18271575951	192.168.100.2	www.atguigu.com	1527	2106	200
6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
7 	13590439668	192.168.100.4			1116	954	200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			240	0	200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.atguigu.com	1938	180	500
13 	13560439638	192.168.100.10			918	4938	200
14 	13470253144	192.168.100.11			180	180	200
15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			120	120	200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			1116	954	200

(2)期望输出数据

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

2、需求分析

Untitled

3、在案例2.4的基础上,增加一个分区类

package com.cuiyf41.flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);

        int partition = 4;
        
        // 2 判断是哪个省
        if ("136".equals(preNum)){
            partition = 0;
        }else if ("137".equals(preNum)){
            partition = 1;
        }else if ("138".equals(preNum)){
            partition = 2;
        }else if ("139".equals(preNum)){
            partition = 3;
        }
        return partition;
    }
}

4、在驱动函数中增加自定义数据分区设置和ReduceTask设置

// 8 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);

// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
package com.cuiyf41.flowsum;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "e:/input/phone_data.txt", "e:/output" };

        // 1 获取配置信息,或者job对象实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的输入原始文件所在目录
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        // 如果输出路径存在,则进行删除
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output,true);
        }

        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);

        // 8 指定自定义数据分区
        job.setPartitionerClass(ProvincePartitioner.class);

        // 9 同时指定相应数量的reduce task
        job.setNumReduceTasks(5);

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.3.4WritableComparable排序

1、概述

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值时,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定的阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数量超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

2、排序的分类

(1)部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序

(2)全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3)辅助排序(GroupingComparator分组)

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

(4)二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

3、自定义排序WritableComparable

(1)原理分析

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override
public int compareTo(FlowBean o) {

	int result;
		
	// 按照总流量大小,倒序排列
	if (sumFlow > bean.getSumFlow()) {
		result = -1;
	}else if (sumFlow < bean.getSumFlow()) {
		result = 1;
	}else {
		result = 0;
	}

	return result;
}

3.3.5WritableComparable排序案例实操(全排序)

1、需求

根据案例2.3产生的结果再次对总流量进行排序。

(1)输入数据

原始数据 第一次处理后的数据

(2)期望输出数据

13509468723	7335	110349	117684
13736230513	2481	24681	27162
13956435636	132		1512	1644
13846544121	264		0		264
。。。 。。。

2、需求分析

Untitled

3、代码实现

(1)FlowBean对象在在需求1基础上增加了比较功能

package com.cuiyf41.sort;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    /**
     * 序列化方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        int result;

        // 按照总流量大小,倒序排列
        if (this.sumFlow > o.getSumFlow()) {
            result = -1;
        }else if (this.sumFlow < o.getSumFlow()) {
            result = 1;
        }else {
            result = 0;
        }

        return result;
    }
    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
}

(2)编写Mapper类

package com.cuiyf41.sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    FlowBean k = new FlowBean();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 截取
        String[] fields = line.split("\t");

        // 3 封装对象
        String phoneNum = fields[0];
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);

        k.set(upFlow, downFlow);
        v.set(phoneNum);

        // 4 输出
        context.write(k, v);

    }
}

(3)编写Reducer类

package com.cuiyf41.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        // 循环输出,避免总流量相同情况
        for(Text value: values){
            context.write(value, key);
        }
    }
}

(4)编写Driver类

package com.cuiyf41.flowsum;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    FlowBean v = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 切割字段
        String[] fields = line.split("\t");

        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];

        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);

        k.set(phoneNum);
//        FlowBean v = new FlowBean(upFlow, downFlow);
        v.set(upFlow, downFlow);

        // 4 写出
        context.write(k, v);
    }
}

3.3.6WritableComparable排序案例实操(区内排序)

1.需求

要求每个省份手机号输出的文件中按照总流量内部排序。

2.需求分析

基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

Untitled

3.案例实操

(1)增加自定义分区类

package com.cuiyf41.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
    @Override
    public int getPartition(FlowBean key, Text value, int numPartitions) {

        // 1 获取手机号码前三位
        String preNum = value.toString().substring(0, 3);

        int partition = 4;

        // 2 根据手机号归属地设置分区
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }

        return partition;
    }
}

(2)在驱动类中添加分区类

// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);

// 设置Reducetask个数
job.setNumReduceTasks(5);

3.3.7Combiner合并

1)概述

(1)Combiner是MR程序中Mapper和Reducer之外的一个组件。

(2)Combiner组件的父类就是Reducer。

(3)Combiner和Reducer的区别在于运行的维值

Combiner是在每个MapTask所在的节点运行

Reducer是接收全局所有Mapper的输出结果

(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应该跟Reducer的输入kv类型保持一致。

Mapper Reducer

3 5 7 →(3+5+7)/3 = 5 (3+5+7+2+6)/5 = 23/5 等于(5+4)/2 = 9/2

2 6 →(2+6)/2 = 4

2)自定义Combiner实现步骤

(a)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 汇总操作
		int count = 0;
		for(IntWritable v :values){
			count += v.get();
		}

        // 2 写出
		context.write(key, new IntWritable(count));
	}
}

(b)在Job驱动类中设置:

job.setCombinerClass(WordcountCombiner.class);

3.3.8Combiner合并案例实操

1、需求

统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。

(1)数据输入

(2)期望输出数据

期望:Combine输入数据多,输出时经过合并,输出数据降低。

2、需求分析

Untitled

3.案例实操-方案一

1)增加一个WordcountCombiner类继承Reducer

package com.cuiyf41.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 1 汇总
        int sum = 0;

        for(IntWritable value :values){
            sum += value.get();
        }

        v.set(sum);

        // 2 写出
        context.write(key, v);
    }
}

2)在WordcountDriver驱动类中指定Combiner

// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);

4.案例实操-方案二

1)将WordcountReducer作为Combiner在WordcountDriver驱动类中指定

// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordcountReducer.class);

3.3.9GroupingComparator分组(辅助排序)

1)概述

对Reduce阶段的数据根据某一个或几个字段进行分组。

分组排序步骤:

(1)自定义类继承WritableComparator

(2)重写compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {
		// 比较的业务逻辑
		return result;
}

(3)创建一个构造将比较对象的类传给父类

protected OrderGroupingComparator() {
		super(OrderBean.class, true);
}

3.3.10GroupingComparator分组案例实操

1、需求

有如下订单数据

Untitled

现在需要求出每一个订单中最贵的商品。

(1)输入数据

0000001	Pdt_01	222.8
0000002	Pdt_05	722.4
0000001	Pdt_02	33.8
0000003	Pdt_06	232.8
0000003	Pdt_02	33.8
0000002	Pdt_03	522.8
0000002	Pdt_04	122.4

(2)期望输出数据

1	222.8
2	722.4
3	232.8

2、需求分析

(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。

(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如图4-18所示。

Untitled

3、代码实现

(1)定义订单信息OrderBean类

package com.cuiyf41.order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private int order_id; // 订单id号
    private double price; // 价格

    public OrderBean() {
        super();
    }

    public OrderBean(int order_id, double price) {
        super();
        this.order_id = order_id;
        this.price = price;
    }

    // 二次排序
    @Override
    public int compareTo(OrderBean o) {
        int result;

        if (order_id > o.getOrder_id()) {
            result = 1;
        } else if (order_id < o.getOrder_id()) {
            result = -1;
        } else {
            // 价格倒序排序
            result = price > o.getPrice() ? -1 : 1;
        }

        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        order_id = in.readInt();
        price = in.readDouble();

    }

    @Override
    public String toString() {
        return order_id + "\t" + price;
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

(2)编写OrderSortMapper类

package com.cuiyf41.order;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    OrderBean k = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();

        // 2 截取
        String[] fields = line.split("\t");

        // 3 封装对象
        k.setOrder_id(Integer.parseInt(fields[0]));
        k.setPrice(Double.parseDouble(fields[2]));

        // 4 写出
        context.write(k, NullWritable.get());
    }
}

(3)编写OrderSortGroupingComparator类

package com.cuiyf41.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

    protected OrderGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;

        int result;
        if (aBean.getOrder_id() > bBean.getOrder_id()) {
            result = 1;
        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
            result = -1;
        } else {
            result = 0;
        }

        return result;
    }
}

(4)编写OrderSortReducer类

package com.cuiyf41.order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

(5)编写OrderSortDriver类

package com.cuiyf41.order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args  = new String[]{"e:/input/inputorder" , "e:/output1"};

        // 1 获取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 设置jar包加载路径
        job.setJarByClass(OrderDriver.class);

        // 3 加载map/reduce类
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        // 4 设置map输出数据key和value类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5 设置最终输出数据的key和value类型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 设置输入数据和输出数据路径
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        // 如果输出路径存在,则进行删除
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output,true);
        }
        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);

        // 8 设置reduce端的分组
        job.setGroupingComparatorClass(OrderGroupingComparator.class);

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023春季露营投影怎么选?轻薄投影极米Z6X Pro值得推荐

近年来&#xff0c;露营经济在多重因素的共同助推下快速发展&#xff0c;精致露营的攻略开始占据小红书、微博、朋友圈等各类社交平台&#xff0c;吸引着更多用户种草并加入到露营大军中&#xff0c;而露营经济的强势“破圈”给家用智能投影带来了更多的发展契机。凭借着小巧的…

探访上汽通用武汉奥特能超级工厂

上汽通用汽车在电动化和智能网联化新技术领域投入了700亿大洋&#xff0c;武汉奥特能超级工厂就是其中一个重点项目。这个工厂已经投产&#xff0c;将成为上汽通用汽车的新能源生产基地&#xff0c;加速奥特能平台车型的推出。 最近别克推出了Electra E5&#xff0c;它是别克第…

新品BCM6755A1KFEBG/MT7921LE/MT7921AU WiFi芯片

博通在WiFi市场具有相当的实力。在WiFi6上有下面这几个解决方案&#xff1a;型号&#xff1a;BCM6755 BCM6755A1KFEBG类型&#xff1a;四核1.5GHz CPU封装&#xff1a;BGA批次&#xff1a;新BCM6755和BCM6750还是A7架构&#xff0c;更多的用在中低端型号上。BCM6755和BCM6750 C…

Spark 广播变量累加器

广播变量 场景描述&#xff1a;一份数据存在Driver中&#xff0c;但是每个Executor都需要一份。 常规模式下&#xff0c;Driver会给每个分区都发送一份数据。如果在Executor中存在多个分区的情况&#xff0c;那么一个Executor会获得多份数据。 Executor是进程&#xff0c;task…

微信小程序阻止页面返回(包滑动、自动返回键)

这个场景还是挺有意思的&#xff0c;比如某多多&#xff0c;只要你点左上角的返回 好家伙&#xff0c;满满又 花不了 的优惠券就来了&#xff0c;让你拥有一种消费最划算的感觉。 如果你的场景比较简单&#xff0c;只是对左上角的返回进行监听&#xff0c;只需要关闭自带的导航…

16_FreeRTOS队列集

目录 队列集 队列集相关API函数介绍 队列集使用流程 实验源码 队列集 一个队列只允许任务间传递的消息为同一种数据类型,如果需要在任务间传递不同数据类型的消息时,那么就可以使用队列集! 作用:用于对多个队列或信号量进行“监听”其中不管哪一个消息到来&#xff0c;都…

JVM学习笔记四:运行时数据区之虚拟机栈

目录 概述 StackOverflowError测试案例 栈运行原理 栈帧的内部结构 改变栈帧大小的StackOverflowError测试案例 局部变量表 局部变量槽 操作数栈 动态链接 静态链接 动态链接 早期绑定 晚期绑定 方法返回地址 概述 与程序计数器一样&#xff0c;Java虚拟机栈也是…

4665: 求前n项和

描述给定序列&#xff1a;求前n项之和。输入输入数据有多组&#xff0c;第一行为数据的组数t&#xff08;1<t<15&#xff09;。每组数据有一行&#xff0c;每行为一个正整数n&#xff08;n<1000000&#xff09;。输出每组输出前n项的和&#xff0c;保留4位小数。样例输…

【编程入门】应用市场(安卓版)

背景 前面已输出多个系列&#xff1a; 《十余种编程语言做个计算器》 《十余种编程语言写2048小游戏》 《17种编程语言10种排序算法》 《十余种编程语言写博客系统》 《十余种编程语言写云笔记》 《N种编程语言做个记事本》 目标 为编程初学者打造入门学习项目&#xff0c;使…

Jmeter常用断言之BeanShell断言详解

BeanShell断言可以使用beanshell脚本来执行断言检查&#xff0c;可以用于更复杂的个性化需求&#xff0c;使用更灵活&#xff0c;功能更强大&#xff0c;但是要能够熟练使用beanshell脚本 在这里除了可以使用beanshell的内置变量外&#xff0c;主要通过 Failure 和 FailureMess…

es 7.8.0 linux 集群

1. 下载es linux版本的数据包 地址: https://www.elastic.co/cn/downloads/past-releases#elasticsearch 解压: 解压 tar -xzvf xxx 2. 我是在一个服务器上测试的,实际上是不同的服务器 所以复制了三份,模拟多节点 进去之后主要是修改elasticsearch.yml 内容如下 节点一…

关于在VM上的windows server 2022系统安装

目录 1、windows serer 2022安装的准备工作 1&#xff09;下载系统 2&#xff09;寻找对应系统密钥 3&#xff09;配置server系统开机配置项&#xff08;可能会出现sconfig配置界面&#xff09; 2、开始安装server系统 1、windows serer 2022安装的准备工作 1&#xff09;…

Dropout

目录一、Dropout出现的原因二、什么是Dropout&#xff1f;三、为什么Dropout解决过拟合?3.1 取平均的作用3.2 减少神经元间复杂的共适应关系四、实现Dropout—— pytorchexample 1example 2example 3设置dropout参数技巧一、Dropout出现的原因 在机器学习的模型中 如果模型的…

处理窗口的常用API函数及窗口处理经验总结(附源码)

目录 1、检测窗口状态 2、将窗口前置显示 2.1、将窗口拉到最前面显示 2.2、将窗口置顶显示 2.3、将窗口设置到指定窗口的上面 3、将不显示的窗口强行显示出来 4、获取窗口的信息 5、通过窗口信息去查找窗口 5.1、调用GetClassName接口去比对窗口的类名 5.2、调用Find…

清理bib文件(删除重复项,仅保留tex中引用的条目)

在写latex文件的过程中&#xff0c;经常会遇到添加了一堆文献的bibtex到bib文件中&#xff0c;有时候文章一长同一篇文献用不同的cite-key引用了多次&#xff0c;同时也会有一些文献最后并没被正文引用&#xff0c;这就需要对bib文件进行清理。 删除重复项 可以用JabRef 在J…

45岁当打之年再创业,剑指中国版ChatGPT,这位美团联合创始人能否圆梦?

文 BFT机器人 “即便只有一个人&#xff0c;我也要出发。” 这是45岁的前美团联合创始人王慧文再次冲上创业沙场的“征战”宣言&#xff0c;这一次他的梦想是“组队拥抱新时代&#xff0c;打造中国OpenAI”。 01 当打之年&#xff0c; AI新梦再起航 “我的人工智能宣言&…

视频投票和图文投票之间的差异投票链接制作平台微擎投票

“我的舞台我的梦”网络评选投票_线上小程序的投票方式_视频投票的功能_在线投票程序用户在使用微信投票的时候&#xff0c;需要功能齐全&#xff0c;又快捷方便的投票小程序。而“活动星投票”这款软件使用非常的方便&#xff0c;用户可以随时使用手机微信小程序获得线上投票服…

Spring中的拦截器

这里写目录标题基本概念HandlerInterceptor拦截器HandlerInterceptor讲解MethodInterceptor拦截器二者的区别基本概念 在web开发中&#xff0c;拦截器是经常用到的功能。它可以帮我们预先设置数据以及统计方法的执行效率等等。 Spring中拦截器主要分两种&#xff0c;一个是Han…

【学习总结】激光雷达与相机外参标定:代码(cam_lidar_calibration)

前段时间尝试了一款激光雷达和相机标定的代码&#xff0c;总结了博客&#xff1a; 【学习总结】激光雷达与相机外参标定&#xff1a;原理与代码 但总觉得那个代码太差劲&#xff0c;而且精度不行&#xff0c;于是又找了些新的代码&#xff0c;体验比之前的好很多&#xff0c;在…

【自然语言处理】主题建模:Top2Vec(理论篇)

主题建模&#xff1a;Top2Vec&#xff08;理论篇&#xff09;Top2Vec 是一种用于 主题建模 和 语义搜索 的算法。它自动检测文本中出现的主题&#xff0c;并生成联合嵌入的主题、文档和词向量。 算法基于的假设&#xff1a;许多语义相似的文档都可以由一个潜在的主题表示。首先…