HDFS API 后记关于 FileSystem 缓存

HDFS API 这篇文章中,简单介绍了一下通过 Hadoop client 的 API 读写 HDFS 的方法。但是在实际使用过程中也发现了一个 FileSystem 的问题。

1. 问题描述

在程序运行中时不时地会抛出 IOException,内容提示 filesystem closed。一开始也很疑惑,因为不确定 org.apache.hadoop.fs.FileSystem 实例的线程安全性,所以每次调用的时候都通过 FileSystem.get(conf) 方法来获取一个新的对象。然后查阅了资料发现 FileSystem.get 方法会得到的并不是一个全新的对象,而是一个缓存过的对象。

源码如下:

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

可以看到 FileSyste.get 方法的确是取缓存的。

2. 解决方案

为了保障线程安全性,我们可以每次调用的时候都创建一个新的实例,FileSystem.newInstance 方法可以满足我们的需求。

package com.avaloninc.hdfsapi.service.impl;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;

import com.avaloninc.hdfsapi.service.HdfsService;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @Author: wuzhiyu.
 * @Date: 2018-03-19 14:39.
 * @Description:
 */
@Service
public class HdfsServiceImpl implements HdfsService {
    @Autowired
    private Configuration config;

    @Override
    public String read(String path) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            Path hdfsPath = new Path(path);
            List<String> lines = IOUtils.readLines(fileSystem.open(hdfsPath));
            return Joiner.on("\n").join(lines);
        }
    }

    @Override
    public void write(String path, InputStream inputStream) throws IOException {
        FileSystem         fileSystem   = null;
        FSDataOutputStream outputStream = null;
        try {
            Path hdfsPath = new Path(path);
            fileSystem = FileSystem.newInstance(config);
            outputStream = fileSystem.create(hdfsPath, true);

            byte[] bytes    = new byte[1024];
            int    numBytes = 0;
            while ((numBytes = inputStream.read(bytes)) > 0) {
                outputStream.write(bytes, 0, numBytes);
            }
        } finally {
            IOUtils.closeQuietly(inputStream);
            IOUtils.closeQuietly(outputStream);
            IOUtils.closeQuietly(fileSystem);
        }
    }

    @Override
    public boolean rename(String src, String dest) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            Path srcPath = new Path(src);
            Path destPath = new Path(dest);

            if (!fileSystem.exists(srcPath)) {
                throw new IOException("Path " + src + " do not exists.");
            }

            if (!fileSystem.exists(destPath.getParent())) {
                fileSystem.mkdirs(destPath.getParent());
            }

            return fileSystem.rename(srcPath, destPath);
        }
    }

    @Override
    public boolean delete(String path) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            return fileSystem.delete(new Path(path), true);
        }
    }

    @Override
    public List<String> ls(String path) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            Path hdfsPath = new Path(path);
            if (!fileSystem.exists(hdfsPath)) {
                throw new IllegalArgumentException(
                    "Path " + path + " do not exist or is not a dir.");
            }

            if (fileSystem.isDirectory(hdfsPath)) {
                return Arrays.stream(fileSystem.listStatus(hdfsPath))
                    .map(FileStatus::getPath)
                    .map(Path::getName)
                    .collect(Collectors.toList());
            } else {
                FileStatus status = fileSystem.getFileStatus(hdfsPath);
                return ImmutableList.of(status.getPath().getName());
            }
        }
    }
}

3. 参考资料

HDFS API

1. 前言

最近在做项目的时候遇到了这样一个需求:WEB 端 API 接受用户上传的文件以供后端服务定时执行任务时使用这些文件或者程序。

一般来说部署的 API 和后端服务都是多点部署的,所以文件的存储必须在多个节点都能访问。简单来说就是需要一个分布式的文件存储服务。

可供选择的方式也有很多,比如:Redis、HDFS 甚至可以把文件直接存储到数据库里。但是考虑到上传的文件可能是 jar 包或者其他大文件以及管理的便利性,最终采用了 HDFS 作为我们的分布式存储方案。

2. 本地环境搭建

在本地环境搭建方面,基本采用了参考文献 1 里的实施方法,主要还是用 brew 来安装,但是为了兼容线上 2.6.0 的版本,在做本地安装的时候还是使用了 2.6.0 的版本。

首先打开远程登录的配置:

  
  sudo systemsetup -setremotelogin on

然后生成公钥和私钥(略),接着将公钥加入到 authrized_keys 里面:

  cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

接着是安装 hadoop,如果直接使用 brew install hadoop 来安装的话默认会安装上 3.0.0,为了安装指定的 2.6.0 还需要一点小技巧。

首先如果 brew 安装的其实是一系列 ruby 脚本,brew install <package> 会转换成对应软件包的安装脚本进行。到这里我们就可以想到在 brew 的提交历史中找到 2.6.0 的安装脚本即可,其对应的 Github 地址是:hadoop.rb

因此安装的时候只要输入:

  brew install https://raw.githubusercontent.com/Homebrew/homebrew-core/ed89a8d0422f29c9bb87e2ea11b3a3f550493294/Formula/hadoop.rb

安装好之后,我们只需要使用 HDFS,因此只做 HDFS 相关的配置。

修改 /usr/local/Cellar/hadoop/2.6.0/libexec/etc/hadoop/core-site.xml 的内容:

  
   <configuration>
      <property>
          <name>hadoop.tmp.dir</name>
          <value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
          <description>A base for other temporary directories.</description>
      </property>
      <property>
          <name>fs.default.name</name>
          <value>hdfs://localhost:8020</value>
      </property>
  </configuration>

然后初始化 namenode

  
  cd /usr/local/Cellar/hadoop/2.6.0/libexec/bin
  ./hdfs namenode -format

最后通过 /usr/local/Cellar/hadoop/2.6.0/sbin/start-dfs.sh 启动,而 /usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh 负责关闭 HDFS。

3. API 使用实例

这里代码不做赘述,只是简单放一下基本读写的方法:

  package com.avaloninc.hdfsapi.service;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.List;
  
  public interface HdfsService {
      String read(String path) throws IOException;
  
      void write(String path, InputStream inputStream) throws IOException;
  
      boolean rename(String src, String dest) throws IOException;
  
      boolean delete(String path) throws IOException;
  
      List<String> ls(String path) throws IOException;
  }
  

接口实现类:

  package com.avaloninc.hdfsapi.service.impl;
  
  import com.google.common.base.Joiner;
  import com.google.common.collect.ImmutableList;
  
  import com.avaloninc.hdfsapi.service.HdfsService;
  import org.apache.commons.io.IOUtils;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FSDataOutputStream;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.stereotype.Service;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.Arrays;
  import java.util.List;
  import java.util.stream.Collectors;
  
  @Service
  public class HdfsServiceImpl implements HdfsService {
      @Autowired
      private Configuration config;
  
      @Override
      public String read(String path) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              Path hdfsPath = new Path(path);
              List<String> lines = IOUtils.readLines(fileSystem.open(hdfsPath));
              return Joiner.on("\n").join(lines);
          }
      }
  
      @Override
      public void write(String path, InputStream inputStream) throws IOException {
          FileSystem         fileSystem   = null;
          FSDataOutputStream outputStream = null;
          try {
              Path hdfsPath = new Path(path);
              fileSystem = FileSystem.get(config);
              outputStream = fileSystem.create(hdfsPath, true);
  
              byte[] bytes    = new byte[1024];
              int    numBytes = 0;
              while ((numBytes = inputStream.read(bytes)) > 0) {
                  outputStream.write(bytes, 0, numBytes);
              }
          } finally {
              IOUtils.closeQuietly(inputStream);
              IOUtils.closeQuietly(outputStream);
              IOUtils.closeQuietly(fileSystem);
          }
      }
  
      @Override
      public boolean rename(String src, String dest) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              Path srcPath = new Path(src);
              Path destPath = new Path(dest);
  
              if (!fileSystem.exists(srcPath)) {
                  throw new IOException("Path " + src + " do not exists.");
              }
  
              if (!fileSystem.exists(destPath.getParent())) {
                  fileSystem.mkdirs(destPath.getParent());
              }
  
              return fileSystem.rename(srcPath, destPath);
          }
      }
  
      @Override
      public boolean delete(String path) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              return fileSystem.delete(new Path(path), true);
          }
      }
  
      @Override
      public List<String> ls(String path) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              Path hdfsPath = new Path(path);
              if (!fileSystem.exists(hdfsPath)) {
                  throw new IllegalArgumentException(
                      "Path " + path + " do not exist or is not a dir.");
              }
  
              if (fileSystem.isDirectory(hdfsPath)) {
                  return Arrays.stream(fileSystem.listStatus(hdfsPath))
                      .map(FileStatus::getPath)
                      .map(Path::getName)
                      .collect(Collectors.toList());
              } else {
                  FileStatus status = fileSystem.getFileStatus(hdfsPath);
                  return ImmutableList.of(status.getPath().getName());
              }
          }
      }
  }

单元测试:

  package com.avaloninc.hdfsapi.service;
  
  import junit.framework.TestCase;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;
  
  import java.io.ByteArrayInputStream;
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.List;
  
  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class HdfsServiceTest extends TestCase {
  
      @Autowired
      private HdfsService hdfsService;
  
      @Test
      public void test() throws IOException {
          String      sql         = "select * \nfrom test.test_tb";
          String      path        = "/user/wuzhiyu";
          String      fileName    = "test.sql";
          String      srcPath     = path + "/" + fileName;
          InputStream inputStream = new ByteArrayInputStream(sql.getBytes());
          hdfsService.write(srcPath, inputStream);
  
          List<String> list = hdfsService.ls(path);
          assertTrue(list.contains(fileName));
          System.out.println("list = " + list);
  
          String content = hdfsService.read(srcPath);
          assertEquals(sql, content);
          System.out.println("content = " + content);
  
          String newFileName = "test_rename.sql";
          String newPath = path + "/" + newFileName;
          assertTrue(hdfsService.rename(srcPath, newPath));
  
          list = hdfsService.ls(path);
          assertTrue(list.contains(newFileName));
          System.out.println("list = " + list);
  
          assertTrue(hdfsService.delete(newPath));
      }
  }

引用 Hadoop 客户端:

  
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.6.0</version>
          </dependency>

4. 参考文献