HDFS API

1. 前言

最近在做项目的时候遇到了这样一个需求:WEB 端 API 接受用户上传的文件以供后端服务定时执行任务时使用这些文件或者程序。

一般来说部署的 API 和后端服务都是多点部署的,所以文件的存储必须在多个节点都能访问。简单来说就是需要一个分布式的文件存储服务。

可供选择的方式也有很多,比如:Redis、HDFS 甚至可以把文件直接存储到数据库里。但是考虑到上传的文件可能是 jar 包或者其他大文件以及管理的便利性,最终采用了 HDFS 作为我们的分布式存储方案。

2. 本地环境搭建

在本地环境搭建方面,基本采用了参考文献 1 里的实施方法,主要还是用 brew 来安装,但是为了兼容线上 2.6.0 的版本,在做本地安装的时候还是使用了 2.6.0 的版本。

首先打开远程登录的配置:

  
  sudo systemsetup -setremotelogin on

然后生成公钥和私钥(略),接着将公钥加入到 authrized_keys 里面:

  cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

接着是安装 hadoop,如果直接使用 brew install hadoop 来安装的话默认会安装上 3.0.0,为了安装指定的 2.6.0 还需要一点小技巧。

首先如果 brew 安装的其实是一系列 ruby 脚本,brew install <package> 会转换成对应软件包的安装脚本进行。到这里我们就可以想到在 brew 的提交历史中找到 2.6.0 的安装脚本即可,其对应的 Github 地址是:hadoop.rb

因此安装的时候只要输入:

  brew install https://raw.githubusercontent.com/Homebrew/homebrew-core/ed89a8d0422f29c9bb87e2ea11b3a3f550493294/Formula/hadoop.rb

安装好之后,我们只需要使用 HDFS,因此只做 HDFS 相关的配置。

修改 /usr/local/Cellar/hadoop/2.6.0/libexec/etc/hadoop/core-site.xml 的内容:

  
   <configuration>
      <property>
          <name>hadoop.tmp.dir</name>
          <value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
          <description>A base for other temporary directories.</description>
      </property>
      <property>
          <name>fs.default.name</name>
          <value>hdfs://localhost:8020</value>
      </property>
  </configuration>

然后初始化 namenode

  
  cd /usr/local/Cellar/hadoop/2.6.0/libexec/bin
  ./hdfs namenode -format

最后通过 /usr/local/Cellar/hadoop/2.6.0/sbin/start-dfs.sh 启动,而 /usr/local/Cellar/hadoop/2.6.0/sbin/stop-dfs.sh 负责关闭 HDFS。

3. API 使用实例

这里代码不做赘述,只是简单放一下基本读写的方法:

  package com.avaloninc.hdfsapi.service;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.List;
  
  public interface HdfsService {
      String read(String path) throws IOException;
  
      void write(String path, InputStream inputStream) throws IOException;
  
      boolean rename(String src, String dest) throws IOException;
  
      boolean delete(String path) throws IOException;
  
      List<String> ls(String path) throws IOException;
  }
  

接口实现类:

  package com.avaloninc.hdfsapi.service.impl;
  
  import com.google.common.base.Joiner;
  import com.google.common.collect.ImmutableList;
  
  import com.avaloninc.hdfsapi.service.HdfsService;
  import org.apache.commons.io.IOUtils;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FSDataOutputStream;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.stereotype.Service;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.Arrays;
  import java.util.List;
  import java.util.stream.Collectors;
  
  @Service
  public class HdfsServiceImpl implements HdfsService {
      @Autowired
      private Configuration config;
  
      @Override
      public String read(String path) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              Path hdfsPath = new Path(path);
              List<String> lines = IOUtils.readLines(fileSystem.open(hdfsPath));
              return Joiner.on("\n").join(lines);
          }
      }
  
      @Override
      public void write(String path, InputStream inputStream) throws IOException {
          FileSystem         fileSystem   = null;
          FSDataOutputStream outputStream = null;
          try {
              Path hdfsPath = new Path(path);
              fileSystem = FileSystem.get(config);
              outputStream = fileSystem.create(hdfsPath, true);
  
              byte[] bytes    = new byte[1024];
              int    numBytes = 0;
              while ((numBytes = inputStream.read(bytes)) > 0) {
                  outputStream.write(bytes, 0, numBytes);
              }
          } finally {
              IOUtils.closeQuietly(inputStream);
              IOUtils.closeQuietly(outputStream);
              IOUtils.closeQuietly(fileSystem);
          }
      }
  
      @Override
      public boolean rename(String src, String dest) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              Path srcPath = new Path(src);
              Path destPath = new Path(dest);
  
              if (!fileSystem.exists(srcPath)) {
                  throw new IOException("Path " + src + " do not exists.");
              }
  
              if (!fileSystem.exists(destPath.getParent())) {
                  fileSystem.mkdirs(destPath.getParent());
              }
  
              return fileSystem.rename(srcPath, destPath);
          }
      }
  
      @Override
      public boolean delete(String path) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              return fileSystem.delete(new Path(path), true);
          }
      }
  
      @Override
      public List<String> ls(String path) throws IOException {
          try (FileSystem fileSystem = FileSystem.get(config)) {
              Path hdfsPath = new Path(path);
              if (!fileSystem.exists(hdfsPath)) {
                  throw new IllegalArgumentException(
                      "Path " + path + " do not exist or is not a dir.");
              }
  
              if (fileSystem.isDirectory(hdfsPath)) {
                  return Arrays.stream(fileSystem.listStatus(hdfsPath))
                      .map(FileStatus::getPath)
                      .map(Path::getName)
                      .collect(Collectors.toList());
              } else {
                  FileStatus status = fileSystem.getFileStatus(hdfsPath);
                  return ImmutableList.of(status.getPath().getName());
              }
          }
      }
  }

单元测试:

  package com.avaloninc.hdfsapi.service;
  
  import junit.framework.TestCase;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;
  
  import java.io.ByteArrayInputStream;
  import java.io.IOException;
  import java.io.InputStream;
  import java.util.List;
  
  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class HdfsServiceTest extends TestCase {
  
      @Autowired
      private HdfsService hdfsService;
  
      @Test
      public void test() throws IOException {
          String      sql         = "select * \nfrom test.test_tb";
          String      path        = "/user/wuzhiyu";
          String      fileName    = "test.sql";
          String      srcPath     = path + "/" + fileName;
          InputStream inputStream = new ByteArrayInputStream(sql.getBytes());
          hdfsService.write(srcPath, inputStream);
  
          List<String> list = hdfsService.ls(path);
          assertTrue(list.contains(fileName));
          System.out.println("list = " + list);
  
          String content = hdfsService.read(srcPath);
          assertEquals(sql, content);
          System.out.println("content = " + content);
  
          String newFileName = "test_rename.sql";
          String newPath = path + "/" + newFileName;
          assertTrue(hdfsService.rename(srcPath, newPath));
  
          list = hdfsService.ls(path);
          assertTrue(list.contains(newFileName));
          System.out.println("list = " + list);
  
          assertTrue(hdfsService.delete(newPath));
      }
  }

引用 Hadoop 客户端:

  
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.6.0</version>
          </dependency>

4. 参考文献

@Configuration、@Bean 与重载

1. 问题描述

记录一下之前遇到一个问题:在不同的场景下需要用到两个配置不同的 Client 实例。

实现方法:通过 @Configuration 注解来生命配置类,然后用 @Bean("clientx") 标记的方法来返回实例,每个返回 bean 的方法都通过 @Value 注解获取具体参数。

但是即使申明了不同的 name,自动注入的时候还是传入了同一个值。

2. 代码实例

首先构造一个 Client 类:

  
  package com.avaloninc.domain;
  
  import lombok.Data;
  
  @Data
  public class Client {
    private String endPoint;
    private String accessKey;
    private String accessSecret;
    private String region;
  }

然后给定我们的配置类:

  
  package com.avaloninc.factory;
  
  import com.avaloninc.domain.Client;
  import lombok.extern.slf4j.Slf4j;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  
  @Configuration
  @Slf4j
  public class ClientConfiguration {
  
    @Bean("client1")
    public Client getClient(@Value("${endPoint.beijing}") String endPoint) {
      log.info("client1 with one parameter is called.");
      Client client = new Client();
      client.setEndPoint(endPoint);
      return client;
    }
  
    @Bean("client2")
    public Client getClient(@Value("${endPoint.shanghai}") String endPoint,
                            @Value("${region}") String region) {
      log.info("client2 with two parameters is called.");
      Client client = new Client();
      client.setEndPoint(endPoint);
      client.setRegion(region);
      return client;
    }
  }

然后单元测试:

  
  package com.avaloninc.factory;
  
  import com.avaloninc.domain.Client;
  import junit.framework.TestCase;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Qualifier;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;
  
  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class BeanFactoryTest extends TestCase {
  
    @Autowired
    @Qualifier("client1")
    private Client client1;
  
    @Autowired
    @Qualifier("client2")
    private Client client2;
  
    @Test
    public void test() {
      assertEquals(client1, client2);
    }
  }

单元测试完成运行我们得到日志:

  
  2018-03-18 00:35:36.611  INFO 90857 --- [           main] c.a.factory.ClientConfigurationTest      : Starting ClientConfigurationTest on MacBookPro.lan with PID 90857 (started by wuzhiyu in /Users/wuzhiyu/Projects/manuscripts/spring-bean-factory-overload)
  2018-03-18 00:35:36.612  INFO 90857 --- [           main] c.a.factory.ClientConfigurationTest      : No active profile set, falling back to default profiles: default
  2018-03-18 00:35:36.692  INFO 90857 --- [           main] o.s.w.c.s.GenericWebApplicationContext   : Refreshing org.springframework.web.context.support.GenericWebApplicationContext@561b6512: startup date [Sun Mar 18 00:35:36 CST 2018]; root of context hierarchy
  2018-03-18 00:35:37.520  INFO 90857 --- [           main] c.avaloninc.factory.ClientConfiguration  : client2 with two parameters is called.
  2018-03-18 00:35:37.523  INFO 90857 --- [           main] c.avaloninc.factory.ClientConfiguration  : client2 with two parameters is called.
  2018-03-18 00:35:38.011  INFO 90857 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.web.context.support.GenericWebApplicationContext@561b6512: startup date [Sun Mar 18 00:35:36 CST 2018]; root of context hierarchy

可以看到两个参数的方法被调用了两次,所以实际上两个不同名字的 bean 却拥有完全相同的内容。

有同事建议我试试 @Resource 注解来注入 bean。他的理由是 @Autowired 一般是通过类型来匹配 bean。所以加了如下代码:

  
  
    @Bean(name = "client3")
    public Client getClient(@Value("${endPoint.hangzhou}") String endPoint,
                            @Value("${accessKey}") String accessKey,
                            @Value("${accessSecret}") String accessSecret) {
      log.info("client3 with three parameters is called.");
      Client client = new Client();
      client.setEndPoint(endPoint);
      client.setAccessKey(accessKey);
      client.setAccessSecret(accessSecret);
      return client;
    }

以及修改单元测试:

  
    @Resource(name = "client3")
    private Client client3;

结果依然类似:

  
  2018-03-18 00:45:07.757  INFO 90937 --- [           main] o.s.w.c.s.GenericWebApplicationContext   : Refreshing org.springframework.web.context.support.GenericWebApplicationContext@385e9564: startup date [Sun Mar 18 00:45:07 CST 2018]; root of context hierarchy
  2018-03-18 00:45:08.585  INFO 90937 --- [           main] c.avaloninc.factory.ClientConfiguration  : client3 with three parameters is called.
  2018-03-18 00:45:08.589  INFO 90937 --- [           main] c.avaloninc.factory.ClientConfiguration  : client3 with three parameters is called.
  2018-03-18 00:45:08.590  INFO 90937 --- [           main] c.avaloninc.factory.ClientConfiguration  : client3 with three parameters is called.
  2018-03-18 00:45:09.078  INFO 90937 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.web.context.support.GenericWebApplicationContext@385e9564: startup date [Sun Mar 18 00:45:07 CST 2018]; root of context hierarchy

3. 转机

但是通过日志发现,两个场景每个 bean 在构造的时候都采用了最后一个方法。在这里做了一个假设:Spring 在构造 bean 的时候采用了反射的方式,而且可能因为某些原因对于重载函数只使用最后一个同名的函数。

假设之后小心求证一下,再次修改一下代码。首先增加新的构造方法:

  
    @Bean("client4")
    public Client getClientFour(@Value("${endPoint.shanghai}") String endPoint,
                                @Value("${region}") String region,
                                @Value("${accessKey}") String accessKey,
                                @Value("${accessSecret}") String accessSecret) {
      log.info("client4 with four parameters is called.");
      Client client = new Client();
      client.setEndPoint(endPoint);
      client.setRegion(region);
      client.setAccessKey(accessKey);
      client.setAccessSecret(accessSecret);
      return client;
    }

然后在单元测试中注入:

  
    @Resource(name = "client4")
    private Client client4;

通过日志我们发现新的构造方法构造的实例与之前的发生了区别:

  
  2018-03-18 00:57:52.552  INFO 91033 --- [           main] o.s.w.c.s.GenericWebApplicationContext   : Refreshing org.springframework.web.context.support.GenericWebApplicationContext@8c3b9d: startup date [Sun Mar 18 00:57:52 CST 2018]; root of context hierarchy
  2018-03-18 00:57:53.369  INFO 91033 --- [           main] c.avaloninc.factory.ClientConfiguration  : client3 with three parameters is called.
  2018-03-18 00:57:53.372  INFO 91033 --- [           main] c.avaloninc.factory.ClientConfiguration  : client3 with three parameters is called.
  2018-03-18 00:57:53.373  INFO 91033 --- [           main] c.avaloninc.factory.ClientConfiguration  : client3 with three parameters is called.
  2018-03-18 00:57:53.374  INFO 91033 --- [           main] c.avaloninc.factory.ClientConfiguration  : client4 with four parameters is called.
  2018-03-18 00:57:53.811  INFO 91033 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.web.context.support.GenericWebApplicationContext@8c3b9d: startup date [Sun Mar 18 00:57:52 CST 2018]; root of context hierarchy

4. 结论

虽然没有阅读 Spring 的源码,但是大致可以想见 Spring 在构造实例时对于返回不同 name 的 bean 的重载方法处理的时候只用最后一个同名方法。也就是说只用了函数的名字而不是完整的函数签名。

因此,下次在配置类中返回相同类型不同名字的实例时还是避免使用相同的函数名!

5. 参考文章

类之间的关系

一般来说,类之间的关系有:依赖、关联、聚集、组合、泛化等。

1. 依赖(dependency)

假如有两个元素 X 和 Y,如果修改 Y 元素的定义会导致 X 元素的修改,那么 X 依赖 Y。Java 中依赖关系常常表现为一个类 X 中某个方法的参数类型为 Y,那么称 X 依赖于 Y。如下图所示:
dependency

2. 关联(association)

关联是对具有相同的结构特性、行为特性、关系和语义的链 (link) 的描述。关联表示的是类与类之间的关系,链表示的是对象与对象之间的关系。
Association

如上图所示,常见的关联关系有单向关联和双向关联。单向关联在 Java 中可以理解为类 Earth 中有一个类型为 Satelite 的变量 moon。而双向关联可以理解为两个类中互有对方类型的变量,如上图中 Father 类中有一个 Son 类型的变量,而 Son 类中有一个 Father 类型的变量。

关联本身也可以有特性,可以通过关联类进一步进行描述。关联还可以加上限定和约束。关联按照种类还可以分为:自返关联(递归关联)、二元关联和 N 元关联。

关联也是一种依赖关系,但是在有关联关系的情况下我们只需要画出关联关系而不需要画出依赖关系。

3. 聚集(aggregation)

聚集是一种特殊形式的关联。聚集表示类之间整体与部分的关系,对应于语义“包含”、“组成”表述等。
aggregation

4. 组合(composition)

组合同样表示类之间整体与部分的关系,但是组合关系中整体和部分有着相同生命周期
Composition

5. 泛化(generalization)

泛化定义了一般元素和特殊元素之间的分类关系,在 Java 中表现为继承(父类)或者实现(接口)。
Generalization