在 HDFS API
1. 问题描述
在程序运行中时不时地会抛出 IOException
,内容提示 filesystem closed
。一开始也很疑惑,因为不确定 org.apache.hadoop.fs.FileSystem
实例的线程安全性,所以每次调用的时候都通过 FileSystem.get(conf)
方法来获取一个新的对象。然后查阅了资料发现 FileSystem.get
方法会得到的并不是一个全新的对象,而是一个缓存过的对象。
源码如下:
/** Returns the FileSystem for this URI's scheme and authority. The scheme
* of the URI determines a configuration property name,
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
可以看到 FileSyste.get
方法的确是取缓存的。
2. 解决方案
为了保障线程安全性,我们可以每次调用的时候都创建一个新的实例,FileSystem.newInstance
方法可以满足我们的需求。
package com.avaloninc.hdfsapi.service.impl;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.avaloninc.hdfsapi.service.HdfsService;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Author: wuzhiyu.
* @Date: 2018-03-19 14:39.
* @Description:
*/
public class HdfsServiceImpl implements HdfsService {
private Configuration config;
public String read(String path) throws IOException {
try (FileSystem fileSystem = FileSystem.newInstance(config)) {
Path hdfsPath = new Path(path);
List<String> lines = IOUtils.readLines(fileSystem.open(hdfsPath));
return Joiner.on("\n").join(lines);
}
}
public void write(String path, InputStream inputStream) throws IOException {
FileSystem fileSystem = null;
FSDataOutputStream outputStream = null;
try {
Path hdfsPath = new Path(path);
fileSystem = FileSystem.newInstance(config);
outputStream = fileSystem.create(hdfsPath, true);
byte[] bytes = new byte[1024];
int numBytes = 0;
while ((numBytes = inputStream.read(bytes)) > 0) {
outputStream.write(bytes, 0, numBytes);
}
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
IOUtils.closeQuietly(fileSystem);
}
}
public boolean rename(String src, String dest) throws IOException {
try (FileSystem fileSystem = FileSystem.newInstance(config)) {
Path srcPath = new Path(src);
Path destPath = new Path(dest);
if (!fileSystem.exists(srcPath)) {
throw new IOException("Path " + src + " do not exists.");
}
if (!fileSystem.exists(destPath.getParent())) {
fileSystem.mkdirs(destPath.getParent());
}
return fileSystem.rename(srcPath, destPath);
}
}
public boolean delete(String path) throws IOException {
try (FileSystem fileSystem = FileSystem.newInstance(config)) {
return fileSystem.delete(new Path(path), true);
}
}
public List<String> ls(String path) throws IOException {
try (FileSystem fileSystem = FileSystem.newInstance(config)) {
Path hdfsPath = new Path(path);
if (!fileSystem.exists(hdfsPath)) {
throw new IllegalArgumentException(
"Path " + path + " do not exist or is not a dir.");
}
if (fileSystem.isDirectory(hdfsPath)) {
return Arrays.stream(fileSystem.listStatus(hdfsPath))
.map(FileStatus::getPath)
.map(Path::getName)
.collect(Collectors.toList());
} else {
FileStatus status = fileSystem.getFileStatus(hdfsPath);
return ImmutableList.of(status.getPath().getName());
}
}
}
}