使用HDFS的时候
final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf);
final Path path = new Path(hdfsFile);
if (fs.exists(path)) {
final FSDataInputStream is = fs.open(path);
final FileStatus stat = fs.getFileStatus(path);
final byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
return buffer;
} else {
throw new Exception("the file is not found .");
}
在高并发情况下会报错:
java.io.IOException: Failed on local exception: java.io.InterruptedIOException: Interrupted while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.16.3.2:52305 remote=/10.16.3.2:59000]. 60000 millis timeout left.; Host Details : local host is: "hadoop-test/10.16.3.2"; destination host is: "hadoop-alone-test":59000;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy109.getListing(Unknown Source)
问题原因为:多线程访问问题,线程A、B同时获取filesystem后使用,线程B使用完后调用了filesystem.close()方法,这个时候线程A还在操作filesystem,所以报错上面种种异常
二、解决办法
禁用FileSystem缓存
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl.disable.cache", "true");
三、问题原因
FileSystem.get源码分析
那么明明使用了两个集群,为什么会使用到Cache呢,分析FileSystem.get源码便知道原因了
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
应用在获取FileSystem时,提供了完整的hdfs目录,同时没有设置fs.hdfs.impl.disable.cache为true,所以创建slave集群的filesystem对象时,会使用CACHE.get(uri, conf)获取,Cache内部使用一个HashMap来维护filesystem对象,很容易想到,当HashMap的key相同时,便返回了同一个filesystem对象,那么Cache中的key是什么样的呢,代码如下:
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
static class Key {
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
}
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this.unique = unique;
this.ugi = UserGroupInformation.getCurrentUser();
}
@Override
public int hashCode() {
return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
}
static boolean isEqual(Object a, Object b) {
return a == b || (a != null && a.equals(b));
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Key) {
Key that = (Key)obj;
return isEqual(this.scheme, that.scheme)
&& isEqual(this.authority, that.authority)
&& isEqual(this.ugi, that.ugi)
&& (this.unique == that.unique);
}
return false;
}
@Override
public String toString() {
return "("+ugi.toString() + ")@" + scheme + "://" + authority;
}
}
}
可以看到Key由四个要素构成,其中前2个跟URI相关,我们使用的为一个hdfs://nameservice1,ugi为安全认证的用户,使用的是同一个,unique为0,因此Key相同,第二次获取filesystem对象时,直接返回了第一次创建的filesystem对象,最终造成了应用虽然使用了不同的集群配置文件,但最中获取的是同一个filesystem对象。
解决
fs.hdfs.impl.disable.cache参数本身不建议修改,修改集群的fs.defaultFS,使不同集群的fs.defaultFS不一样
参考:
多个HDFS集群的fs.defaultFS配置一样,造成应用一直连接同一个集群的问题分析 - 远去的列车 - 博客园
Filesystem closed错误排查 - 简书
java.io.IOException: Filesystem closed - 简书
java.io.IOException: Filesystem closed_bitcarmanlee的博客-CSDN博客_filesystem closed