目录
- 1. WritableComparable
- 1.1 Writable
- 1.2 Comparable
- 1.3 IntWritable
- 2. 自定义序列化数据类型RectangleWritable
- 3. 矩形面积计算
- 3.1 Map
- 3.2 Reduce
- 4. 代码和结果
- 4.1 pom.xml中依赖配置
- 4.2 工具类util
- 4.3 矩形面积计算
- 4.4 结果
- 参考
本文引用的Apache Hadoop源代码基于Apache许可证 2.0,详情请参阅 Apache许可证2.0。
1. WritableComparable
自定义序列化数据类型需要继承org.apache.hadoop.io.WritableComparable
,该类的源代码如下。从源代码中可以看出WritableComparable的父类有两个:org.apache.hadoop.io.Writable
和java.lang.Comparable
。
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A {@link Writable} which is also {@link Comparable}.
*
* <p><code>WritableComparable</code>s can be compared to each other, typically
* via <code>Comparator</code>s. Any type which is to be used as a
* <code>key</code> in the Hadoop Map-Reduce framework should implement this
* interface.</p>
*
* <p>Note that <code>hashCode()</code> is frequently used in Hadoop to partition
* keys. It's important that your implementation of hashCode() returns the same
* result across different instances of the JVM. Note also that the default
* <code>hashCode()</code> implementation in <code>Object</code> does <b>not</b>
* satisfy this property.</p>
*
* <p>Example:</p>
* <blockquote><pre>
* public class MyWritableComparable implements
* WritableComparable{@literal <MyWritableComparable>} {
* // Some data
* private int counter;
* private long timestamp;
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public int compareTo(MyWritableComparable o) {
* int thisValue = this.value;
* int thatValue = o.value;
* return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
* }
*
* public int hashCode() {
* final int prime = 31;
* int result = 1;
* result = prime * result + counter;
* result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
* return result
* }
* }
* </pre></blockquote>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
1.1 Writable
Writable中主要有两个方法:void write(DataOutput out)
和void readFields(DataInput in)
,其中readFields负责序列化读入,而write负责序列化写出。由于Writable是WritableComparable的父类,因此自定义序列化数据类型必须实现这两个方法。
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A serializable object which implements a simple, efficient, serialization
* protocol, based on {@link DataInput} and {@link DataOutput}.
*
* <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
* framework implements this interface.</p>
*
* <p>Implementations typically implement a static <code>read(DataInput)</code>
* method which constructs a new instance, calls {@link #readFields(DataInput)}
* and returns the instance.</p>
*
* <p>Example:</p>
* <blockquote><pre>
* public class MyWritable implements Writable {
* // Some data
* private int counter;
* private long timestamp;
*
* // Default constructor to allow (de)serialization
* MyWritable() { }
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public static MyWritable read(DataInput in) throws IOException {
* MyWritable w = new MyWritable();
* w.readFields(in);
* return w;
* }
* }
* </pre></blockquote>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException any other problem for write.
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException any other problem for readFields.
*/
void readFields(DataInput in) throws IOException;
}
1.2 Comparable
Comparable是WritableComparable的另一个父类,它的源代码如下。它只有一个方法int compareTo(T var1)
,该方法负责比较两个T类型的大小(一般在继承Comparable的子类中)。自定义序列化数据类型也必须实现这个方法。
// Source code is decompiled from a .class file using FernFlower decompiler.
package java.lang;
public interface Comparable<T> {
int compareTo(T var1);
}
1.3 IntWritable
IntWritable是官方定义的序列化数据类型,自定义序列化数据类型时可以参考它的源代码,其中除了上面提到的write,readFields和compareTo方法之外,还建议实现toString方法(public String toString()
)。
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** A WritableComparable for ints. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class IntWritable implements WritableComparable<IntWritable> {
private int value;
public IntWritable() {}
public IntWritable(int value) { set(value); }
/**
* Set the value of this IntWritable.
* @param value input value.
*/
public void set(int value) { this.value = value; }
/**
* Return the value of this IntWritable.
* @return value of this IntWritable.
*/
public int get() { return value; }
@Override
public void readFields(DataInput in) throws IOException {
value = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(value);
}
/** Returns true iff <code>o</code> is a IntWritable with the same value. */
@Override
public boolean equals(Object o) {
if (!(o instanceof IntWritable))
return false;
IntWritable other = (IntWritable)o;
return this.value == other.value;
}
@Override
public int hashCode() {
return value;
}
/** Compares two IntWritables. */
@Override
public int compareTo(IntWritable o) {
int thisValue = this.value;
int thatValue = o.value;
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
@Override
public String toString() {
return Integer.toString(value);
}
/** A Comparator optimized for IntWritable. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
}
static { // register this comparator
WritableComparator.define(IntWritable.class, new Comparator());
}
}
2. 自定义序列化数据类型RectangleWritable
自定义序列化数据类型时候,如果额外实现了public int hashCode()
、public boolean equals(Object o)
和public String toString()
,则该自定义序列化数据类型可以作为Mapper输出的键或值、Reducer输入和输出的键或值(写入结果时会调用toString方法来写入该类型),而且还可以使用MapReduce自带的哈希分区。此外,实现hashCode方法时,要产生一个基于该类型内属性值的哈希函数;而实现compareTo方法,要避免该类型的不同实例实际不同而被比较后得到的结果是相同的影响。
下面是RectangleWritable的代码,该类的功能是处理矩形的长和宽。同时我认为长为6,宽为3和长为3,宽为6是两种不同的矩形,基于这个想法实现了compareTo方法。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.io.WritableComparable;
public class RectangleWritable implements WritableComparable<RectangleWritable> {
private int length, width;
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getWidth() {
return width;
}
public void setWidth(int width) {
this.width = width;
}
public RectangleWritable() {
super();
}
public RectangleWritable(int length, int width) {
super();
setLength(length);
setWidth(width);
}
@Override
public String toString() {
return String.format("%d\t%d", getLength(), getWidth());
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(getLength());
out.writeInt(getWidth());
}
@Override
public int hashCode() {
return Objects.hash(getLength(), getWidth());
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof RectangleWritable))
return false;
RectangleWritable other = (RectangleWritable) o;
return other.getLength() == getLength() && other.getWidth() == getWidth();
}
@Override
public void readFields(DataInput in) throws IOException {
this.length = in.readInt();
this.width = in.readInt();
}
@Override
public int compareTo(RectangleWritable o) {
int res = Integer.compare(getLength(), o.getLength());
return res == 0 ? Integer.compare(getWidth(), o.getWidth()) : res;
}
}
3. 矩形面积计算
输入文本如下。
9 9
3 27
7 8
1 1
3 6
6 3
3.1 Map
3.2 Reduce
4. 代码和结果
4.1 pom.xml中依赖配置
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.6</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
4.2 工具类util
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class util {
public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {
URI add = new URI(uri);
return FileSystem.get(add, conf);
}
public static void removeALL(String uri, Configuration conf, String path) throws Exception {
FileSystem fs = getFileSystem(uri, conf);
if (fs.exists(new Path(path))) {
boolean isDeleted = fs.delete(new Path(path), true);
System.out.println("Delete Output Folder? " + isDeleted);
}
}
public static void showResult(String uri, Configuration conf, String path) throws Exception {
FileSystem fs = getFileSystem(uri, conf);
String regex = "part-r-";
Pattern pattern = Pattern.compile(regex);
if (fs.exists(new Path(path))) {
FileStatus[] files = fs.listStatus(new Path(path));
for (FileStatus file : files) {
Matcher matcher = pattern.matcher(file.getPath().toString());
if (matcher.find()) {
System.out.println(file.getPath() + ":");
FSDataInputStream openStream = fs.open(file.getPath());
IOUtils.copyBytes(openStream, System.out, 1024);
openStream.close();
}
}
}
}
}
4.3 矩形面积计算
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class App {
public static class MyMapper extends Mapper<LongWritable, Text, RectangleWritable, NullWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splitStr = value.toString().split(" ");
RectangleWritable keyOut = new RectangleWritable(Integer.parseInt(splitStr[0]), Integer.parseInt(splitStr[1]));
context.write(keyOut, NullWritable.get());
}
}
public static class MyReducer extends Reducer<RectangleWritable, NullWritable, RectangleWritable, IntWritable> {
public void reduce(RectangleWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
IntWritable area = new IntWritable(key.getLength() * key.getWidth());
context.write(key, area);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] myArgs = {
"file:///home/developer/CodeArtsProjects/CalRectangleArea/rectangle.txt",
"hdfs://localhost:9000/user/developer/CalRectangleArea/output"
};
util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
Job job = Job.getInstance(conf, "CalRectangleArea");
job.setJarByClass(App.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(RectangleWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(RectangleWritable.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < myArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(myArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));
int res = job.waitForCompletion(true) ? 0 : 1;
if (res == 0) {
System.out.println("计算结果为:");
util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);
}
System.exit(res);
}
}
4.4 结果
参考
吴章勇 杨强著 大数据Hadoop3.X分布式处理实战