最近在做项目的时候遇到了这样一个需求:WEB 端 API 接受用户上传的文件以供后端服务定时执行任务时使用这些文件或者程序。
一般来说部署的 API 和后端服务都是多点部署的,所以文件的存储必须在多个节点都能访问。简单来说就是需要一个分布式的文件存储服务。
可供选择的方式也有很多,比如:Redis、HDFS 甚至可以把文件直接存储到数据库里。但是考虑到上传的文件可能是 jar 包或者其他大文件以及管理的便利性,最终采用了 HDFS 作为我们的分布式存储方案。
2. 本地环境搭建
在本地环境搭建方面,基本采用了参考文献 1 里的实施方法,主要还是用 brew
来安装,但是为了兼容线上 2.6.0
的版本,在做本地安装的时候还是使用了 2.6.0
的版本。
首先打开远程登录的配置:
sudo systemsetup -setremotelogin on
然后生成公钥和私钥(略),接着将公钥加入到 authrized_keys
里面:
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
接着是安装 hadoop
,如果直接使用 brew install hadoop
来安装的话默认会安装上 3.0.0
,为了安装指定的 2.6.0
还需要一点小技巧。
首先如果 brew
安装的其实是一系列 ruby
脚本,brew install <package>
会转换成对应软件包的安装脚本进行。到这里我们就可以想到在 brew
的提交历史中找到 2.6.0
的安装脚本即可,其对应的 Github
地址是:hadoop.rb
因此安装的时候只要输入:
brew install https://raw.githubusercontent.com/Homebrew/homebrew-core/ed89a8d0422f29c9bb87e2ea11b3a3f550493294/Formula/hadoop.rb
安装好之后,我们只需要使用 HDFS,因此只做 HDFS 相关的配置。
修改 /usr/local/Cellar/hadoop/2.6.0/libexec/etc/hadoop/core-site.xml
的内容:
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:8020</value>
</property>
</configuration>
然后初始化 namenode
:
cd /usr/local/Cellar/hadoop/2.6.0/libexec/bin
./hdfs namenode -format
最后通过 /usr/local/Cellar/hadoop/2.6.0/sbin/start-dfs.sh
启动,而 /usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh
负责关闭 HDFS。
3. API 使用实例
这里代码不做赘述,只是简单放一下基本读写的方法:
package com.avaloninc.hdfsapi.service;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
public interface HdfsService {
String read(String path) throws IOException;
void write(String path, InputStream inputStream) throws IOException;
boolean rename(String src, String dest) throws IOException;
boolean delete(String path) throws IOException;
List<String> ls(String path) throws IOException;
}
接口实现类:
package com.avaloninc.hdfsapi.service.impl;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.avaloninc.hdfsapi.service.HdfsService;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class HdfsServiceImpl implements HdfsService {
private Configuration config;
public String read(String path) throws IOException {
try (FileSystem fileSystem = FileSystem.get(config)) {
Path hdfsPath = new Path(path);
List<String> lines = IOUtils.readLines(fileSystem.open(hdfsPath));
return Joiner.on("\n").join(lines);
}
}
public void write(String path, InputStream inputStream) throws IOException {
FileSystem fileSystem = null;
FSDataOutputStream outputStream = null;
try {
Path hdfsPath = new Path(path);
fileSystem = FileSystem.get(config);
outputStream = fileSystem.create(hdfsPath, true);
byte[] bytes = new byte[1024];
int numBytes = 0;
while ((numBytes = inputStream.read(bytes)) > 0) {
outputStream.write(bytes, 0, numBytes);
}
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
IOUtils.closeQuietly(fileSystem);
}
}
public boolean rename(String src, String dest) throws IOException {
try (FileSystem fileSystem = FileSystem.get(config)) {
Path srcPath = new Path(src);
Path destPath = new Path(dest);
if (!fileSystem.exists(srcPath)) {
throw new IOException("Path " + src + " do not exists.");
}
if (!fileSystem.exists(destPath.getParent())) {
fileSystem.mkdirs(destPath.getParent());
}
return fileSystem.rename(srcPath, destPath);
}
}
public boolean delete(String path) throws IOException {
try (FileSystem fileSystem = FileSystem.get(config)) {
return fileSystem.delete(new Path(path), true);
}
}
public List<String> ls(String path) throws IOException {
try (FileSystem fileSystem = FileSystem.get(config)) {
Path hdfsPath = new Path(path);
if (!fileSystem.exists(hdfsPath)) {
throw new IllegalArgumentException(
"Path " + path + " do not exist or is not a dir.");
}
if (fileSystem.isDirectory(hdfsPath)) {
return Arrays.stream(fileSystem.listStatus(hdfsPath))
.map(FileStatus::getPath)
.map(Path::getName)
.collect(Collectors.toList());
} else {
FileStatus status = fileSystem.getFileStatus(hdfsPath);
return ImmutableList.of(status.getPath().getName());
}
}
}
}
单元测试:
package com.avaloninc.hdfsapi.service;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
SpringRunner.class)
(
public class HdfsServiceTest extends TestCase {
private HdfsService hdfsService;
public void test() throws IOException {
String sql = "select * \nfrom test.test_tb";
String path = "/user/wuzhiyu";
String fileName = "test.sql";
String srcPath = path + "/" + fileName;
InputStream inputStream = new ByteArrayInputStream(sql.getBytes());
hdfsService.write(srcPath, inputStream);
List<String> list = hdfsService.ls(path);
assertTrue(list.contains(fileName));
System.out.println("list = " + list);
String content = hdfsService.read(srcPath);
assertEquals(sql, content);
System.out.println("content = " + content);
String newFileName = "test_rename.sql";
String newPath = path + "/" + newFileName;
assertTrue(hdfsService.rename(srcPath, newPath));
list = hdfsService.ls(path);
assertTrue(list.contains(newFileName));
System.out.println("list = " + list);
assertTrue(hdfsService.delete(newPath));
}
}
引用 Hadoop 客户端:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>