RestTemplate 与 Gzip Content-Encoding

1. 问题描述

最近做一个针对 Yarn Application 进行错误诊断的需求,需要从 Resource Manager 获取 Application 的运行信息,比如:

 GET /ws/v1/cluster/apps/application_1561545353229_936285 HTTP/1.1
 Host: yarn.xxx.com
 Accept: */*
 accept-encoding: gzip, deflate

这个接口使用 Postman 可以得到对应的结果:

 {
     "app": {
         "id": "application_1561545353229_936285",
         "user": "bp_growth",
         "name": "moses:278648",
         "queue": "root.bp_growth_dev",
         "state": "FINISHED",
         "finalStatus": "FAILED",
         "progress": 100,
         "trackingUI": "History",
         "trackingUrl": "http://yarn-rm02.tc.rack.xxx.com:8088/proxy/application_1561545353229_936285/",
         "diagnostics": "Task failed task_1561545353229_936285_m_000008\nJob failed as tasks failed. failedMaps:1 failedReduces:0\n",
         "clusterId": 1561545353229,
         "applicationType": "MAPREDUCE",
         "applicationTags": "",
         "startedTime": 1562740151124,
         "finishedTime": 1562740195218,
         "elapsedTime": 44094,
         "amContainerLogs": "http://data1117.tc.rack.xxx.com:8042/node/containerlogs/container_e93_1561545353229_936285_01_000001/bp_growth",
         "amHostHttpAddress": "data1117.tc.rack.xxx.com:8042",
         "allocatedMB": -1,
         "allocatedVCores": -1,
         "reservedMB": -1,
         "reservedVCores": -1,
         "runningContainers": -1,
         "memorySeconds": 1583688,
         "vcoreSeconds": 721,
         "preemptedResourceMB": 0,
         "preemptedResourceVCores": 0,
         "numNonAMContainerPreempted": 0,
         "numAMContainerPreempted": 0,
         "logAggregationStatus": "TIME_OUT"
     }
 }

但是使用 RestTemplate 去请求的时候却会得到异常:

 org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Illegal character ((CTRL-CHAR, code 31)): only regular white space (\r, \n, \t) is allowed between tokens; nested exception is com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 31)): only regular white space (\r, \n, \t) is allowed between tokens
  at [Source: (PushbackInputStream); line: 1, column: 2]

原因是 Response 的结果使用了 Gzip 进行压缩。

2. 解决方案

在网上找到的最简单的解决方案来自:How to parse gzip encoded response with RestTemplate from Spring-Web

简单来说就是在构造 RestTemplate 的时候指定 ClientHttpRequestFactory。

Maven 项目引入一个依赖:

 <dependency>
   <groupId>org.apache.httpcomponents</groupId>
   <artifactId>httpclient</artifactId>
 </dependency>

由于我同时使用了 SpringBoot,所以不需要指定版本。

具体使用时如:

   @Test
   public void test() {
 
     HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(
         HttpClientBuilder.create().build());
     RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory);
 
     Map forObject = restTemplate.getForObject("http://yarn.xxx.com/ws/v1/cluster/apps/application_1561545353229_936285", Map.class);
     System.out.println("forObject = " + forObject);
   }

即席查询平台的设计与实现

1. 即席查询平台简介

  • 即席查询平台是数据平台针对即席查询(Ad-Hoc)场景推出的一个解决方案。

  • 用户侧提供 SQL 的查询、结果数据的生命周期管理。

  • 运维测统一表权限、Hadoop 组账号、Yarn 队列等。

2. ZUE 的定位与同类产品的比较

离线数据开发平台的技术演进:

  • 石器时代:Hive 客户端直连

  • 青铜时代:beeline 客户端 + Hive Server

  • 英雄时代:HUE + Hive Server

  • 军团时代:ZUE + Moses + Hive Server

开发工具几个阶段的比较:

Hive Cli Beeline HUE ZUE
交互方式 终端 终端 WEB WEB
SQL 支持 全类型 全类型 全类型 部分语句
表权限控制 table.in 元数据与权限
组账号管理 支持 支持
异步查询 不支持 不支持 不支持 支持
查询结果缓存 不支持 不支持 弱支持 支持
资源隔离与限制 无限制 无限制 无限制 有限制

2.1 SQL 的支持

  • Hive Cli、Beeline、HUE 是全功能的客户端工具,提供了完整的 Hive Query 语法的支持以及 HDFS 的管理命令

  • ZUE 限制了可以执行的 SQL 子句类型,把 ZUE 定位为 DML 的平台,将 DDL 划分给元数据管理系统负责。

  • ZUE 仅支持 SELECTCREATE TABLE ASEXPLAIN 等少数语句,并且禁止通过 ZUE 向线上生产表写入数据。

  • 元数据针对 DDL 提供了更加丰富的业务规范与控制,如业务划分、层级划分、表权限控制、变更历史、Location 限制等等。

2.2 表权限控制

  • Hive Cli 和 Beeline 都不具备表权限的功能。

  • HUE 本身不带读写权限的控制,table.in 作为一种 Hack 的方案先天不足(黑名单)。

  • ZUE 用词法解析获取读写的表,结合元数据做权限的控制。

  • 元数据提供了完整的权限申请工作流。

2.3 组账号管理

  • Hive Cli 和 Beeline 没有统一的组账号管理方式,默认以个人账号提交。

  • HUE 的组账号与个人账号等价,无法灵活的切换组账号。

  • Hive Cli、Beeline 和 HUE 大量个人账号的存在导致许多数据出现 owner 的权限问题。

  • ZUE 收敛组账号,从属于业务的数据由统一的组账号进行读写,避免权限问题的发生。

2.4 异步查询

  • Hive Cli、Beeline 和 HUE 的查询操作都是同步,一个客户端会话同一时间只能进行一个查询。关闭客户端或者会话都会导致查询被取消。

  • ZUE 采取异步查询的方式,在同一个窗口可以提交多个查询,多个查询可以并发运行。

2.5 查询结果缓存

  • Hive Cli 和 Beeline 只能手动将查询结果导出保存,否则同一条 SQL 必须重跑才能看到结果。

  • HUE 的查询结果只能在会话过程中查看,一旦页面关闭结果也就丢失了。

  • ZUE 的查询结果缓存 24 小时,重复查看和下载不需要耗费新的计算资源。

2.6 资源隔离与限制

  • Hive Cli、Beeline 和 HUE 无法对用户的查询进行队列的限制,默认提交至 default 队列。

  • Hive Cli、Beeline 和 HUE 运行时可以任意指定队列,无法针对业务线进行资源隔离和计费。

  • ZUE 针对业务线进行队列的限定,保证业务线之间资源的隔离与安全。

3. 功能与架构设计

3.1 HUE 的功能架构缺陷

  • HUE 虽然支持了很多功能,但是本质和 Beeline 并无不同,都是 Thrfit 接口中会话的一种可视化呈现。

  • Thrfit 的会话存在于 HiveServer 中,因此用户的请求必须通过一致性哈希路由到同一台服务器。缺陷在于:1)如果前端请求的路由策略不正确,请求到了其他服务器则该会话的上下文丢失(SET xxx)并且该会话下进行中的查询终止;2)如果某台服务器上 HUE 重启,意味着该服务器上的 Thrfit 会话终结,该服务器上的查询全部自动终止。

  • HUE 的水平扩展实质只能是多个单机节点对总体查询和负载进行分片,单机长时间保持状态。

  • HUE 查询请求量提升时更容易出现线程问题,导致整体不可使用。

  • HUE 虽然界面干净、交互友好,但是本质是适合小团队使用的单机系统。

3.2 ZUE 的整体设计架构

  • 将有状态与无状态的部分分离。无状态接口以容器方式部署,可伸缩性好,迭代升级易维护。长时间保持状态与 HiveServer 保持连接的部分抽象出查询中心模块,部署在物理机上。

  • Moses 查询中心通过暴露 RESTFul 的接口提供异步查询的能力,查询完成之后回调请求方的接口。

  • Moses 查询中心每个结点对等,通过持久化与 HiveServer 的会话信息实现服务重启过程中会话不丢失,不影响用户查询。

  • 使用 HDFS 和 Redis 来缓存查询的结果,缓存生命周期结束后自动回收存储空间。

4. 实现方案一些细节

4.1 语法限制与表权限

  • 对 SQL 进行词法解析,得到抽象语法树。

  • 对抽象语法树进行后续遍历获得语句的类型以及读写的表。

4.1.1 词法分析实例

举例:

 insert overwrite table tb.tb_2 select * from tb.tb_1;

解析成抽象语法树,采用后续遍历进行打印可以得到结果:

 nil
    TOK_QUERY
       TOK_FROM
          TOK_TABREF
             TOK_TABNAME
                tb
                tb_1
       TOK_INSERT
          TOK_DESTINATION
             TOK_TAB
                TOK_TABNAME
                   tb
                   tb_2
          TOK_SELECT
             TOK_SELEXPR
                TOK_ALLCOLREF
    <EOF>

几种 token 代表的含义:

  • TOK_TAB:写入目标表 tb.tb_2

  • TOK_TABREF:查询数据来源表 tb.tb_1

  • TOK_INSERT:语句的类型。

4.1.2 其他工具与缺陷

Hive 自己提供了一个血缘解析的工具:org.apache.hadoop.hive.ql.tools.LineageInfo

缺陷:

  1. 缺少上下文的支持,比如上文使用 USE <db> 语句,那么解析出来的输入表只包含表名,缺少库名。

  2. 反引号转义符需要另行清理。

  3. 对于 CTE 语句中的别名无法处理。

4.2 异步查询

  • 与 HUE 的区别:将用户会话与 Thrift 会话解耦,WEB 交互无状态,封装 Thrfit 会话。

  • 从 JDBC 获得的启示:底层原理同 Beeline 一致,对 Thrift 进行封装,通过覆盖特定的会话配置支持会话恢复重连,查询中心作为一个查询中间件可重启升级不影响用户查询。

     config.put("hive.server2.session.check.interval", "1h");
     config.put("hive.server2.close.session.on.disconnect", "false");
     config.put("hive.server2.idle.session.timeout", "24h");
     config.put("hive.server2.idle.operation.timeout", "24h");
     private synchronized TOperationHandle submitQuery(String sql) throws TException {
         log.info("Start to submit sql of task {} with content:\n{}", this.taskMeta.getTaskId(), sql);
         TExecuteStatementReq execReq = new TExecuteStatementReq(this.sessionHandle, sql);
         execReq.setRunAsync(true);
 
         TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
         log.info("execResp = " + execResp);
         this.checkStatus(execResp.getStatus());
 
         return execResp.getOperationHandle();
     }

4.3 结果缓存

结果缓存分为三种:

  1. 较复杂 SQL 或者大数据量的查询结果通过改写 SQL 将结果以 Avro 的形式存储在 HDFS 上,定时回收存储空间。

  2. 简单 SQL 如单表查询直接将结果缓存于 Redis 中,设置缓存的过期时间。

  3. 使用 CTAS 创建的临时表,定时从临时库中清除。

5. 本季度的迭代方向

  • 更友好的故障排查(Yarn Application 日志)与异常诊断。

  • 更多元化的大数据计算引擎的集成(如 Presto)。

6. FAQ

  1. code 2 如何排查

  2. Method Not Found

  3. job counter 不准

  4. read timeout

Spring 与 Auto-Configuration

1. 简述

最近在捣鼓一个类似 Spring Boot starter 的小项目:把常见的 Web 开发所使用的组件或者技术方案封装在一起,通过一个 Maven 引用快速地将项目搭建起来。

2. 遇到的问题

遇到了一个问题:我在被引用项目中用 @Configuration 注解标注了一个类,在开发项目中这个类却没有被实例化。

该类代码如下所示:

  package com.avaloninc.web.aop.config;
  
  import com.avaloninc.web.aop.filter.RequestBodyWrapperFilter;
  import com.avaloninc.web.aop.interceptor.RequestAuditInterceptor;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
  import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
  
  /**
   * @Author: wuzhiyu.
   * @Date: 2019-02-22 15:15:18.
   * @Description:
   */
  @Configuration
  @ConditionalOnProperty(value = "log.request.audit.enable", havingValue = "true")
  public class RequestAuditConfiguration extends WebMvcConfigurerAdapter {
  
    private final String   logPartSeparator;
    private final String[] uriWhiteList;
  
    @Autowired
    public RequestAuditConfiguration(@Value("${log.request.audit.separator:'||'}") String logPartSeparator,
                                     @Value("${log.request.audit.whitelist:''}") String[] uriWhiteList) {
      this.logPartSeparator = logPartSeparator;
      this.uriWhiteList = uriWhiteList;
    }
  
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
      registry.addInterceptor(new RequestAuditInterceptor(logPartSeparator, uriWhiteList));
    }
  
    @Bean
    public RequestBodyWrapperFilter getRequestBodyWrapperFilter() {
      return new RequestBodyWrapperFilter();
    }
  }

3. 解决方案

定下心来想了一下,被引用项目的配置类全名为:com.avaloninc.web.aop.config.RequestAuditConfiguration ,而开发项目中以 @SpringBootApplication 修饰的启动类全名为 com.avaloninc.web.demo.Main

第一个想法是:会不会是因为包名的问题导致 Spring 没有去扫描 com.avaloninc.web.aop 包下面的 Bean 呢?

尝试了解决办法:在启动类上加上注解 @ComponentScan("com.avaloninc"),果然好使!

但是转念一想,Spring Boot 的各种 starter 中定义的 Bean 也不会和我们自己的项目同属于一个包下面啊,为什么它们没有这个问题呢?

于是找了一篇自定义 starter 的文章1看了看了一下。大部分没什么特别,但是其中提到了一个 spring.factories 文件唤醒了我记忆中关于 Spring Boot 初始化过程的一点记忆。所以果断在被引用项目中也加了这个文件 src/main/resources/META-INF/spring.factories,内容为:

  org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.avaloninc.web.aop.config.RequestAuditConfiguration

果然好使!

当然还是参考官方文档2来的更加详细准确。

4. 参考文献

Java 8 DateTimeFormatter 踩坑

1. 问题描述

本月是 2019 年的第一个月,除了迎接新年之外同样也迎来了跨年带来的关于时间参数的 bug。

废话不多说,进入正题。之前写了一个任务调度系统,支持以周为粒度进行任务的调度。针对任务的每次运行都会产生一个批次号,一般以 任务名_<年份><周数> 的格式编号(比如 job_201850 ),但是在 2018 年 12 月 31 日却发现了自然周的序号发生了跳跃。

2. 分析出问题的代码

查看了一下代码,问题出在 DateTime 格式化上,原来的 DateTimeFormatter 的定义如下:

DateTimeFormatter.ofPattern("yyyyww")

预想的是 yyyy 代表年,ww 代表自然周,对于 2018 年 12 月 31 日原本预计得到的结果是 201853,但是此处 ww 得到的结果竟然是 01。

百思不得其解之下上网查了一下,发现原来错的不是 ww,而是 yyyy

一般来说 yyyy 是和自然年的月份、日期搭配的,对于和自然周的搭配是要用 YYYY 的。代码如下:

@Test
public void testForWeek() {
    DateTimeFormatter rightFormatter = DateTimeFormatter.ofPattern("YYYYww");
    DateTimeFormatter wrongFormatter = DateTimeFormatter.ofPattern("yyyyww");

    LocalDateTime now = LocalDateTime.of(2019, 1, 7, 0, 30, 0);
    System.out.println(now);
    String right = rightFormatter.format(now);
    String wrong = wrongFormatter.format(now);
    System.out.println("right = " + right);
    System.out.println("wrong = " + wrong);

    now = LocalDateTime.of(2018, 12, 31, 0, 30, 0);
    System.out.println(now);
    right = rightFormatter.format(now);
    wrong = wrongFormatter.format(now);
    System.out.println("right = " + right);
    System.out.println("wrong = " + wrong);
}

输出结果:

2019-01-07T00:30
right = 201902
wrong = 201902
2018-12-31T00:30
right = 201901
wrong = 201801

也就是说 2018 年 12 月 31 日实际上算是 2019 年的第一周,这也算刷新了我的认知!

3. 后续订正

今天又出幺蛾子了!

今天是 2019-04-28 星期日,按理来说今天应该是 2019 年的第十七周的最后一天。可是今天用 YYYYww 格式化日期得到的却是 201918,代码如下:

  @Test
  public void testForDateTimeFormatter() {

    LocalDateTime now = LocalDateTime.now();
    DateTimeFormatter week = DateTimeFormatter.ofPattern("YYYYww");

    LocalDateTime yesterday = now.plusDays(-1);
    System.out.println("yesterday = " + yesterday);
    System.out.println(yesterday.getDayOfWeek().getValue());
    System.out.println(week.format(yesterday));
    System.out.println(this.getWeekFormat(yesterday));

    System.out.println("now = " + now);
    System.out.println(now.getDayOfWeek().getValue());
    System.out.println(week.format(now));
    System.out.println(this.getWeekFormat(now));

    LocalDateTime tomorrow = now.plusDays(1);
    System.out.println("tomorrow = " + tomorrow);
    System.out.println(tomorrow.getDayOfWeek().getValue());
    System.out.println(week.format(tomorrow));
    System.out.println(this.getWeekFormat(tomorrow));
  }

  private String getWeekFormat(LocalDateTime localDateTime) {
    int year = localDateTime.get(IsoFields.WEEK_BASED_YEAR);
    int weekNum = localDateTime.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR);
    return year + String.format("%02d", weekNum);
  }

输出的内容为:

yesterday = 2019-04-27T10:00
6
201917
201917
now = 2019-04-28T10:00
7
201918
201917
tomorrow = 2019-04-29T10:00
1
201918
201918

也就是说按照 YYYYww 格式化的时候在星期日就会进行周号的递增!

正确格式化的的方式应该是:

  private String getWeekFormat(LocalDateTime localDateTime) {
    int year = localDateTime.get(IsoFields.WEEK_BASED_YEAR);
    int weekNum = localDateTime.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR);
    return year + String.format("%02d", weekNum);
  }

Spring Bean Validation

1. 前言

平时写接口的时候难免有要对外开放接口调用,如果接口请求体的结构比较简单还好,但是不可避免会遇到一些包含嵌套对象的请求体。

这个时候最烦的就是做参数校验了,一堆的 Objects.isNull 或者 Strings.isNullOrEmpty。冗长的一堆很不雅观而且基本不太可能复用。

最近突然发现 JSR303 (Bean Validation) 的存在,结合 @ControllerAdvice 定制全局异常可以大大减轻参数校验的负担呢。

简单来说,从参数校验的内容来看可以分为两种:

  • 业务无关的参数校验:NotNull、NotEmpty 等等。

  • 业务相关的校验:ID 值的有效性等。

对于业务无关的参数校验,如果校验不通过则会抛出相应的异常。对于与业务有关的参数我们可以在校验不通过后抛出运行时异常。

这些异常可以在 @ControllerAdvice 注解的类中统一处理。

2. 代码实例

2.1 依赖引入

pom.xml 文件中首先引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-validation</artifactId>
</dependency>

2.2 简单使用

我们首先定义一个接口:

@GetMapping("jobs")
public Response<String> get(@Valid GetRequest request) {
    return Responses.successResponse(request.getKeyword());
}

然后是请求体格式:

package com.avaloninc.springvalidationexample.request;

import com.avaloninc.webapi.common.request.base.BaseRequest;
import lombok.Data;
import org.hibernate.validator.constraints.NotBlank;

@Data
public class GetRequest extends BaseRequest {
  @NotBlank
  private String keyword;
}

简单来说就是一个只包含一个参数的 GET 请求。请求体的 keyword 字段上面通过 @NotBlank 注解修饰了参数。

Controller 上使用 @Valid 参数触发对参数的校验。

2.3 嵌套结构

通常的使用场景不会有这么简单,请求体往往是嵌套的结构。即请求体的实例成员变量往往是一个集合对象或者另一个类的实例:

package com.avaloninc.springvalidationexample.request;

import com.avaloninc.webapi.common.request.base.BaseRequest;
import lombok.Data;
import org.hibernate.validator.constraints.NotBlank;
import org.hibernate.validator.constraints.NotEmpty;

import java.util.List;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;

@Data
public class CreateRequest extends BaseRequest {
  @NotBlank
  private String       name;
  @NotEmpty
  private List<String> alarmReceivers;
  @NotNull
  @Valid
  private Resource     resource;
  @NotEmpty
  @Valid
  private List<File>   files;

  @Data
  public static class Resource {
    @NotEmpty
    private String account;
    @NotEmpty
    private String queue;
  }

  @Data
  public static class File {
    @NotEmpty
    private String fileName;
    @NotEmpty
    private String filePath;
  }
}

这种情况下记得在这些成员变量之上同样加上 @Valid 注解即可!

2.3 全局异常处理

对于参数违反的约束都会以异常的形式抛出来,我们可以在一个类中进行全局的异常处理:

package com.avaloninc.springvalidationexample.advice;

import com.avaloninc.webapi.common.response.Response;
import com.avaloninc.webapi.common.response.Responses;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.validation.BindException;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;

import java.util.stream.Collectors;

@ControllerAdvice
@Slf4j
public class GlobalControllerAdvice {

  @ExceptionHandler(BindException.class)
  @ResponseBody
  @ResponseStatus(HttpStatus.BAD_REQUEST)
  public Response handle(final BindException ex) {
    String errorMsg = ex.getFieldErrorCount() > 0
        ? ex.getFieldErrors().stream()
        .map(this::getFieldErrorMessage).collect(Collectors.joining(" "))
        : ex.getMessage();
    return Responses.errorResponse(HttpStatus.BAD_REQUEST.value(), errorMsg);
  }

  @ExceptionHandler(MethodArgumentNotValidException.class)
  @ResponseBody
  @ResponseStatus(HttpStatus.BAD_REQUEST)
  public Response handle(final MethodArgumentNotValidException ex) {
    String errorMsg = ex.getBindingResult().getFieldErrorCount() > 0
        ? ex.getBindingResult().getFieldErrors().stream()
        .map(this::getFieldErrorMessage).collect(Collectors.joining(" "))
        : ex.getMessage();
    return Responses.errorResponse(HttpStatus.BAD_REQUEST.value(), errorMsg);
  }

  private String getFieldErrorMessage(FieldError err) {
    return err.getField() + " " + err.getDefaultMessage() + "!";
  }

  @ExceptionHandler(IllegalArgumentException.class)
  @ResponseBody
  @ResponseStatus(HttpStatus.BAD_REQUEST)
  public Response handle(final IllegalArgumentException ex) {
    return Responses.errorResponse(HttpStatus.BAD_REQUEST.value(), ex.getMessage());
  }

  @ExceptionHandler(Exception.class)
  @ResponseBody
  @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
  public Response handle(Exception ex) {
    Response response = Responses.errorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage());
    log.error("Internal Server Error with trace id {}.", response.getMeta().getTraceId(), ex);
    return response;
  }
}

如上面的代码所示,参数校验不通过抛出的异常基本可以总结为:BindExceptionMethodArgumentNotValidException 两种。我们可以从 FieldError 中取出违反约束的参数名以及对应的错误提示!

对于业务相关的参数我们可以用 IllegalArgumentException 来进行异常的抛出和捕获。

最后使用了一个 Exception 来捕获所有没有处理的异常,进行统一的错误日志记录和错误信息的返回。

3. 总结

其实这里只是做了一个最简单的罗列,具体的注解的功能以及使用的类型大家可以从参考资料进一步学习。

借助于 Bean Validation 的确可以大大简化参数的校验,但是还没有想好怎么在工程实践中大规模地使用。

毕竟参数校验可能包含了一定的业务逻辑,全部放在注解中是否合适还有待商榷。

4. 参考资料

GitLab API 踩坑小结

1. 前言

今年上半年一直在做一个离线作业的调度系统。这个季度为了更好的用户体验,避免用户手动上传和管理文件,做了与公司内部 GitLab 打通的功能。

一方面通过 GitLab 提供的 API,可以很方便地选定某个脚本文件作为数据加工的执行脚本。每次升级的时候选定不同 commit 的版本即可快速地发布任务。

另一种场景下,通过打通 GitLab 的 WebHook 可以在用户向 master 分支推送代码的时候自动触发构建和上传构建好的 artifact,避免用户手动上传几百兆的 JAR 包的等待时间。

2. 实现方案

2.1 文件获取与存储

先说一下文件存储,由于作业调度在分布式环境下执行,所以文件的存储也必须是一个高可用的分布式的文件存储。

作为一个离线作业的调度系统,第一个想到的自然是 HDFS。毕竟如果 HDFS 挂了,基本离线计算任务你也别想跑了。

文件的获取方式分为两种:

第一种是直接从 GitLab 的 Repository 里面拉取文件,这种只适合 SQL、Shell、Python 纯文本的脚本文件(相关接口见:Get file from repository)。

第二种则是面向诸如 Spark 程序的 JAR 包,动辄上百兆。具体做法是与 Jenkins 这种打包构建的工具相结合,master 分支的代码更新后触发 WebHook,由构建工具将源代码打包并通过接口回传。接口可以根据项目的 Project id 和 commit id 组合成一个确定的路径写入 HDFS。

2.2 账号与权限

说到 GitLab API 的调用就不得不提 GitLab 的认证的过程。GitLab 提供了三种验证方式

本着猛糙快的互联网精神,果断选择了第二种方式:使用账户生成一个 TOKEN,通过 HTTP 请求的 Header 参数或者 URL 中的查询参数传递给服务器。简单粗暴!

那么第一个问题来了,这个账号用谁的账号呢?

从代码安全性的角度考虑,个人账号不合适。原因有两点:首先私有项目如果要使用该功能必须把这个账号加入到项目的 Members 中,意味着个人账号可以看到别人(或者别的组)私有项目的内容。本着不粘不背锅的精神,能不碰的就不碰。第二点,GitLab 的账号与企业 LDAP 账号是互通的,一旦离职很可能直接导致 TOKEN 失效,API 无法调用(负责 GitLab 的小姐姐一直强调交接的问题,一种明天就要离职的感觉)。

解决方案是申请一个应用账号,这个账号默认不带有任何项目的权限,用户需要使用这个功能的时候将应用账号加入到 Members 中,赋予 Reporter 角色(至少是 Reporter 角色,否则无法获取文件内容)。

第二个问题:如果共享一个应用账号如防止用户窥探无权限的项目呢?

答案是每次涉及项目信息时通过 GitLab 的 Get project users 接口获取有权限用户的用户名列表,与请求用户的用户名对照。

2.3 交互过程

无论是通过 GitLab 获取文件还是构建好的 artifact,第一步都是先确定一个项目以及其版本信息。搜索 GitLab 项目、选取 Master 分支的特定 commit 流程可以用下图来简单描述一下:

接下来首先说一下 GitLab 文件的上传过程。要唯一的确定 GitLab 中的一个文件需要三个要素:GitLab Project id、Commit id 以及文件在项目中相对路径。因此,后续的交互过程可以用下图来描述:

然后说一下 Git artifact 的上传。使用 WebHook 之后 Jenkins 会把所有 artifact 通过接口回传,我们要做的只是权限和文件的验证即可:

3. 踩过的坑

3.1 分页参数

先从 GitLab 的 API 返回格式说起吧。在过往的工作经历中,后端返回 JSON 一半分为三个部分:

  • Response 元信息,比如返回的状态码、错误提示、请求的唯一标识等等。

  • Response 数据题,真正承载数据的部分。

  • Response 分页信息,主要针对列表查询。

但是 GitLab API 的风格完全不一样,没有响应的元信息和分页信息,直接使用 HTTP 的 Status Code 描述请求的异常。

这倒也罢了,但是类似于列表的接口完全没有分页信息,请求的参数里也没有提到分页参数的设置。一开始还自作聪明地以为 GitLab 的 API 返回了全量的数据,结果在通过关键字搜索 Git 仓库的时候竟然搜不到自己的项目!之后查阅文档才发现 GitLab 的分页信息是写在 Header 里面的(详情请见 Pagination)!

3.2 URL 参数转义

Get file from repository 这个接口的参数定义中,file_path 的说明是 Url encoded full path to new file. Ex. lib%2Fclass%2Erb。意思是说 lib/class.rub 这种文件的相对路径要进行转义,在使用 RestTemplate 的时候猜到了 URL 参数转义的坑。

首先看如下代码:

  @Test
  public void testForUriTemplateWithRawPathParam() {
    String url = "https://gitlab.example.com/api/v4/projects/{project_id}/repository/files/{file_path}"
        + "?private_token={token}";
    UriTemplate uriTemplate = new UriTemplate(url);
    URI expand = uriTemplate.expand(ImmutableMap.of("project_id", 1,
                                                    "file_path", "lib/class.rb",
                                                    "token", "abc"));
    System.out.println("expand = " + expand.toString());
  }

标准输出流的结果是:

expand = https://gitlab.example.com/api/v4/projects/1/repository/files/lib/class.rb?private_token=abc

看来 UriTemplate(org.springframework.web.util.UriTemplate) 并不会主动为参数进行转义,那么我们手动为参数进行转义试试:

  @Test
  public void testForUriTemplateWithEncodedPathParam() throws UnsupportedEncodingException {
    String url = "https://gitlab.example.com/api/v4/projects/{project_id}/repository/files/{file_path}"
        + "?private_token={token}";
    UriTemplate uriTemplate = new UriTemplate(url);

    String encode = UriUtils.encode("lib/class.rb", "UTF-8");
    System.out.println("encode = " + encode);
    URI expand = uriTemplate.expand(ImmutableMap.of("project_id", 1, "file_path", encode, "token", "abc"));
    System.out.println("expand = " + expand.toString());
  }

输出的结果为:

encode = lib%2Fclass.rb
expand = https://gitlab.example.com/api/v4/projects/1/repository/files/lib%252Fclass.rb?private_token=abc

看来 UriTemplate 把我们手动转义的参数中的 % 又进行了一次转义变成了 %25

看来 UriTemplate 要么不转义,要么把结果再给转一次,反正是没法用了。

那么到底如何才能得到正确的参数呢?

我们需要使用 UriComponentsBuilder(org.springframework.web.util.UriComponentsBuilder)

虽然 UriTemplate 底层也是使用的 UriComponentsBuilder,但是我们需要更加精细的控制:

 @Test
  public void testForUriComponentsBuilder() throws UnsupportedEncodingException {

    URI    uri;
    String filePath = "lib/class.rb";

    // Illegal for path, it should use pathSegment
    //uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
    //    .path(String.valueOf(1)).path("repository").path("files").path("lib/class.rb")
    //    .queryParam("private_token", "abc").build(false).toUri();
    //System.out.println("uri = " + uri.toString());
    // result: uri = https://gitlab.example.com/api/v4/projects/1repositoryfileslib/class.rb?private_token=abc
    //
    //uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
    //    .path(String.valueOf(1)).path("repository").path("files").path("lib/class.rb")
    //    .queryParam("private_token", "abc").build(true).toUri();
    //System.out.println("uri = " + uri.toString());
    // result: uri = https://gitlab.example.com/api/v4/projects/1repositoryfileslib/class.rb?private_token=abc

    // exception for build true parameter because 'lib/class.rb' contains '/'
    //uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
    //    .pathSegment(String.valueOf(1), "repository", "files", "lib/class.rb")
    //    .queryParam("private_token", "abc").build(true).toUri();
    //System.out.println("uri = " + uri.toString());

    String encode = UriUtils.encode(filePath, "UTF-8");
    System.out.println("encode = " + encode);
    String encodePath = UriUtils.encodePath(filePath, "UTF-8");
    System.out.println("encodePath = " + encodePath);
    String encodePathSegment = UriUtils.encodePathSegment(filePath, "UTF-8");
    System.out.println("encodePathSegment = " + encodePathSegment);

    uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
        .pathSegment(String.valueOf(1), "repository", "files", encodePathSegment)
        .queryParam("private_token", "abc").build(true).toUri();
    System.out.println("uri = " + uri.toString());
  }

使用上述单元测试里的代码即可构造出访问 GitLab 所需要的 URI 了!

4. 小结

本文所有的参考资料全部来源于 GitLab 的 API 文档,涉及到的 API 有:

以上。

Spring与Cache

1. 前言

在做项目中往往遇到这样一些场景:从外部系统获取一些配置信息,它们被频繁读取但是却不经常修改,甚至是只读的数据。它们不要求非常强的实时性却有着非常高的访问频率。

如果每次都重新发起一次 HTTP 请求无疑是对系统资源的一种浪费。常见的做法是将这些外部数据以缓存的形式存储下来。

下面介绍一下 Spring 自带的 Cache 功能。

2. 几种基本注解

  • @Cacheable:开启缓存,指定命名空间参数(可以指定多个)。

  • @CacheEvict:指定缓存的清除,如果使用 allEntries = true 则会删除所有缓存。

  • @CachePut:首先运行目标方法然后将返回结果加入缓存。

  • @CacheConfig:在类的级别指定缓存的配置信息,比如缓存的名字或者 CacheManager

  • @Caching:由于一个方法上可以指定多个缓存,而 Java 不允许同一个注解在同一个地方出现多次,因此 @Caching 注解来组织。

3. 常见的缓存存储

Spring 虽然提供了缓存的框架,但是缓存的具体实现还分为很多种,常见的有 ConcurrentMapCacheGuavaCache 以及基于 Redis 的实现。

3.1 ConcurrentMapCache

ConcurrentMapCache 是一种简单的缓存实现,不能进行配置。那么 ConcurrentMapCache 缓存的生命周期是永远,除非使用 @CacheEvict 或者 @CachePut 否则缓存不会失效或者更新。

3.2 GuavaCache

这是基于 Guava 缓存的一种实现,我们可以像使用 Guava 中的 Cache 一样进行配置,比如缓存的失效时间、缓存的最大容量等等。

3.3 基于 Redis 的缓存

上文无论 ConcurrentMapCache 也好、GuavaCache 也好,都是基于本机的内存,无法在多个节点共享。基于 Redis 的缓存才能实现分布式缓存。

4. 代码示例

首先给出代码的依赖:

  <dependencies>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
          <groupId>com.avalon-inc</groupId>
          <artifactId>web-api-common</artifactId>
          <version>1.0-SNAPSHOT</version>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-cache</artifactId>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>
  </dependencies>

然后是 Redis 的配置,见 application.properties

  spring.redis.host=localhost
  spring.redis.port=6379

接着进入代码配置的正文,使用 Cache 的第一件事是开启对缓存的支持:

  package com.avaloninc.springcacheexample ;
  
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;
  import org.springframework.cache.annotation.EnableCaching;
  
  /**
   * @Author: wuzhiyu.
   * @Date: 2018-05-18 02:46:01.
   * @Description:
   */
  @SpringBootApplication
  @EnableCaching
  public class Main {
    public static void main(String[] args) {
      SpringApplication.run(Main.class);
    }
  }

然后我们执行最小化的设置,注册三个 CacheManager

  package com.avaloninc.springcacheexample;
  
  import com.google.common.cache.CacheBuilder;
  
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.cache.CacheManager;
  import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
  import org.springframework.cache.guava.GuavaCacheManager;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  import org.springframework.context.annotation.Primary;
  import org.springframework.data.redis.cache.RedisCacheManager;
  import org.springframework.data.redis.connection.RedisConnectionFactory;
  import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
  import org.springframework.data.redis.core.RedisTemplate;
  import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
  import org.springframework.data.redis.serializer.StringRedisSerializer;
  
  import java.util.concurrent.TimeUnit;
  
  /**
   * @Author: wuzhiyu.
   * @Date: 2018-06-12 01:44:20.
   * @Description:
   */
  @Configuration
  public class Config {
    @Bean("concurrentMapCacheManager")
    @Primary
    public CacheManager getDefaultCacheManager() {
      return new ConcurrentMapCacheManager();
    }
  
    @Bean("guavaCacheManager")
    public CacheManager getGuavaCacheManager() {
      GuavaCacheManager guavaCacheManager = new GuavaCacheManager();
      guavaCacheManager.setCacheBuilder(
          CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(5, TimeUnit.SECONDS));
      return guavaCacheManager;
    }
  
    @Bean
    public JedisConnectionFactory getRedisConnectionFactory(@Value("${spring.redis.host}") String host,
                                                            @Value("${spring.redis.port}") int port) {
      JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
      jedisConnectionFactory.setHostName(host);
      jedisConnectionFactory.setPort(port);
      return jedisConnectionFactory;
    }
  
    @Bean
    public RedisTemplate<String, Object> getRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
      RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
      redisTemplate.setConnectionFactory(redisConnectionFactory);
      redisTemplate.setKeySerializer(new StringRedisSerializer());
      redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
      return redisTemplate;
    }
  
    @Bean("redisCacheManager")
    public CacheManager getRedisCacheManager(RedisTemplate<String, Object> redisTemplate) {
      RedisCacheManager redisCacheManager = new RedisCacheManager(redisTemplate);
      redisCacheManager.setDefaultExpiration(3);
      return redisCacheManager;
    }
  }

注意上面的 new ConcurrentMapCacheManager(),实例的构造函数有两种签名版本:

  • public ConcurrentMapCacheManager() {}

  • public ConcurrentMapCacheManager(String... cacheNames) { setCacheNames(Arrays.asList(cacheNames));}

如果使用无参构造,那么允许动态构造缓存,缓存数量可变。如果使用有参版本那么只能以固定个数的命名空间来创建固定个数的缓存。GuavaCacheRedisCache 同上。

接着我们编写使用缓存的具体代码:

  package com.avaloninc.springcacheexample ;
  
  import com.avaloninc.webapi.common.response.Response;
  import com.avaloninc.webapi.common.response.Responses;
  import lombok.extern.slf4j.Slf4j;
  import org.joda.time.DateTime;
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;
  import org.springframework.cache.annotation.CacheConfig;
  import org.springframework.cache.annotation.CacheEvict;
  import org.springframework.cache.annotation.CachePut;
  import org.springframework.cache.annotation.Cacheable;
  import org.springframework.cache.annotation.Caching;
  import org.springframework.cache.annotation.EnableCaching;
  import org.springframework.web.bind.annotation.DeleteMapping;
  import org.springframework.web.bind.annotation.GetMapping;
  import org.springframework.web.bind.annotation.PathVariable;
  import org.springframework.web.bind.annotation.PutMapping;
  import org.springframework.web.bind.annotation.RestController;
  
  /**
   * @Author: wuzhiyu.
   * @Date: 2018-05-18 02:46:01.
   * @Description:
   */
  @SpringBootApplication
  @EnableCaching
  @RestController
  @Slf4j
  @CacheConfig(cacheNames = "caches")
  public class Main {
    public static void main(String[] args) {
      SpringApplication.run(Main.class);
    }
  
    @GetMapping("/obj/{key}")
    @Caching(cacheable = {
        @Cacheable(cacheManager = "concurrentMapCacheManager"),
        @Cacheable(cacheManager = "guavaCacheManager"),
        @Cacheable(cacheManager = "redisCacheManager")
    })
    public Response<String> get(@PathVariable("key") String key) {
      String value = key.concat(" ").concat(DateTime.now().toString());
      log.info("Get the real value of " + key);
      return Responses.sucessResponse(value);
    }
  
    @DeleteMapping("/obj/{key}")
    @Caching(evict = {
        @CacheEvict(cacheManager = "concurrentMapCacheManager"),
        @CacheEvict(cacheManager = "guavaCacheManager"),
        @CacheEvict(cacheManager = "redisCacheManager")
    })
    public Response<String> del(@PathVariable("key") String key) {
      log.info("Evict the key " + key);
      return Responses.sucessResponse("success");
    }
  
    @PutMapping("/obj/{key}")
    @Caching(put = {
        @CachePut(cacheManager = "concurrentMapCacheManager"),
        @CachePut(cacheManager = "guavaCacheManager"),
        @CachePut(cacheManager = "redisCacheManager")
    })
    public Response<String> put(@PathVariable("key") String key) {
      String value = key.concat(" ").concat(DateTime.now().toString());
      log.info("Put the real value of " + key);
      return Responses.sucessResponse(value);
    }
  }

注意上文是为了演示 @Caching 才同时使用了三种缓存。实验显示多个缓存下 @Cacheable 只要有一个缓存没有失效那么就不会执行具体的方法体获取的新的值!如果其他缓存和 ConcurrentMapCache 一起使用而不调用 @CacheEvict 或者 @CachePut,那么其他缓存可能永远不会更新!

5. 总结

本文只是简单介绍了一下 SpringBoot 集成缓存以及几种常见的存储方式,具体的选择按需求场景确定即可。但是正确的缓存使用姿势或者更新策略可以结合现有的设计模式,灵活运用这几种注解实现选定的模式。

6. 参考资料

Spring 与配置获取方式

1. 前言

Spring 在配置文件方面的支持非常强大,本文不再赘述,有需求可以查看 Spring Boot 的官方文档。本文的叙述内容是如何在程序中使用 Spring Boot 配置文件中的参数值。常见的手法有三种:

  • @Value 注解

  • org.springframework.core.env.Environment 对象

  • @ConfigurationProperties 注解

2. @Value

先从 @Value 说起,通过配置完整名称的形式即可获取需要的值,如:

@Value("${myConfig.name}")
private String name;

但是需要注意的一点是 @Value 注入的时间一般在 Bean 构造完成之后。如果构造 Bean 的方法需要使用到配置文件里的参数,那么可以把这些参数作为构造函数或者 @Bean 注解修饰的方法的传入参数,并以 @Value 注解来指定注入的参数。如:

@Bean("client1")
public Client getClient(@Value("${endPoint.beijing}") String endPoint) {
    Client client = new Client();
    client.setEndPoint(endPoint);
    return client;
}

3. org.springframework.core.env.Environment

@Value 注解在获取少量配置的时候还是相当方便的,但是如果我们需要从配置文件中获取大量配置的时候往往需要定义大量的实例变量,就不如直接从 Environment 获取来的方便。Environment 对象可以直接通过 @Autowired 注解注入得到。从 Environment 获取配置的方式也相当简单,如:

String myConfigNameOfEnv = environment.getProperty("myConfig.name");

getProperty 方法还有可以指定默认值和参数类型的重载方法,此处不展开。

4. @ConfigurationProperties

通过 @ConfigurationProperties 注解获取配置有两种形式:

下面给出一个示例,对于如下配置:

myConfig:
  name: myConfig
  list:
    - a
    - b
yourConfig:
  name: yourConfig
  list:
    - c
    - d

如果我们想使用 myConfig 前缀下的所有配置,那么两种方案分别可以按照如下形式获取。

方案一:

@Configuration
@ConfigurationProperties(prefix = "myConfig")
@Data
public class MyConfig {
  private String       name;
  private List<String> list;
}

这里通过前缀和字段的名字来映射配置。

方案二:

@Configuration
@ConfigurationProperties(prefix = "")
@Data
public class ConfigAsMap {
  private Map<String, String> myConfig;
  private Map<String, String> yourConfig;
}

这里我们将所有所有配置映射为一个对象,每一个 namespace 下的配置以 Map 的形式来存储。使用的时候可以按照如下方式获取特定配置:

Map<String, String> myConfigMap     = configAsMap.getMyConfig();
String              nameOfConfigMap = myConfigMap.get("name");

这里还要记述一下 Environment 对象和 @ConfigurationProperties 映射为 Map 后两者在获取列表时的区别。

对于 Environemnt 对象,我们要获取 myConfig.list 对象时的方式如下:

List<String> myConfigListOfEnv = Lists.newArrayList(environment.getProperty("myConfig.list[0]"),
                                            environment.getProperty("myConfig.list[1]"));

而通过 @ConfigurationProperties 映射为 Map 后获取的方式则是:

List<String> listOfConfigMap = Lists.newArrayList(myConfigMap.get("list.0"),
                                                  myConfigMap.get("list.1"));

两者在处理列表的索引时不一致,使用时需要我们注意。

5. 跨 Module 引用配置

最近开始接手改造一个项目,改造过程中有这样一个问题:该项目原来所有的配置都采用集中管理的方式,每一个项目的配置都从配置服务器的接口获取然后初始化 Spring 的容器。该项目将从配置服务器获取配置的部分抽出为一个单独的 Maven Module。

但是这个配置集中管理的项目即将下线,于是配置本地化的改造就迎面而来了。

我的改造方案是以不同的 profile 的形式将配置固化到配置 Module 的配置文件中。在线上环境和测试环境通过指定不同的 profile 达到配置的切换。

对于运行时指定的 profile 能否作用到依赖的 Maven 模块并没有绝对把握,而改造的范围比较广,所以还是捏了一把汗的。于是写了一个小的 Demo 作为原型验证。

首先给出配置文件:

application.yaml

myConfig:
  name: myConfig
  list:
    - a
    - b
yourConfig:
  name: yourConfig
  list:
    - c
    - d

application-test.yaml

hisConfig:
  name: hisConfig-test
  list:
    - g
    - h

application-prod.yaml

hisConfig:
  name: hisConfig-prod
  list:
    - e
    - f

两种通过 @ConfigurationProperties 获取配置的方式:

package com.avaloninc.springconfigexample.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "myConfig")
@Data
public class MyConfig {
  private String       name;
  private List<String> list;
}

以及:

package com.avaloninc.springconfigexample.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
@ConfigurationProperties(prefix = "")
@Data
public class ConfigAsMap {
  private Map<String, String> myConfig;
  private Map<String, String> yourConfig;
  private Map<String, String> hisConfig;
}

先在当前 Module 测试一下:

package com.avaloninc.springconfigexample;

import com.avaloninc.springconfigexample.config.ConfigAsMap;
import com.avaloninc.springconfigexample.config.MyConfig;
import junit.framework.TestCase;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.env.Environment;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.Map;

@SpringBootTest
@RunWith(SpringRunner.class)
@ActiveProfiles(value = "test")
public class MainTest extends TestCase {

  @Value("${myConfig.name}")
  private String name;
  @Autowired
  private Environment environment;
  @Autowired
  private ConfigAsMap configAsMap;
  @Autowired
  private MyConfig    myConfig;

  @Test
  public void test() {
    System.out.println("this.name = " + this.name);

    String myConfigNameOfEnv = environment.getProperty("myConfig.name");
    List<String> myConfigListOfEnv = Lists.newArrayList(environment.getProperty("myConfig.list[0]"),
                                                        environment.getProperty("myConfig.list[1]"));
    System.out.println("myConfigNameOfEnv = " + myConfigNameOfEnv);
    System.out.println("myConfigListOfEnv = " + myConfigListOfEnv);

    String       myConfigName = myConfig.getName();
    List<String> myConfigList = myConfig.getList();

    Map<String, String> myConfigMap     = configAsMap.getMyConfig();
    String              nameOfConfigMap = myConfigMap.get("name");
    List<String> listOfConfigMap = Lists.newArrayList(myConfigMap.get("list.0"),
                                                      myConfigMap.get("list.1"));

    System.out.println("myConfig = " + myConfig);
    System.out.println("configAsMap = " + configAsMap);

    assertEquals(this.name, myConfigNameOfEnv);
    assertEquals(myConfigNameOfEnv, myConfigName);
    assertEquals(myConfigName, nameOfConfigMap);
    assertEquals(myConfigListOfEnv, myConfigList);
    assertEquals(myConfigList, listOfConfigMap);
  }
}

然后我们在另一个 Module 中引用这里的配置,下面是单元测试:

package com.avaloninc.springtestexample;

import com.avaloninc.springconfigexample.config.ConfigAsMap;
import com.avaloninc.springconfigexample.config.MyConfig;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@Import({ConfigAsMap.class, MyConfig.class})
public class MainTest extends TestCase {

  @Autowired
  private ConfigAsMap configAsMap;
  @Autowired
  private MyConfig  myConfig;

  @Test
  public void test() {
    System.out.println("configAsMap = " + configAsMap);
    assertEquals(configAsMap.getHisConfig().get("name"), "hisConfig-test");
    System.out.println("myConfig = " + myConfig);
  }
}

注意:跨模块引用配置的时候记得使用 @Import 注解,否则启动的时候会报错:org.springframework.beans.factory.UnsatisfiedDependencyException。

HDFS API 后记关于 FileSystem 缓存

HDFS API 这篇文章中,简单介绍了一下通过 Hadoop client 的 API 读写 HDFS 的方法。但是在实际使用过程中也发现了一个 FileSystem 的问题。

1. 问题描述

在程序运行中时不时地会抛出 IOException,内容提示 filesystem closed。一开始也很疑惑,因为不确定 org.apache.hadoop.fs.FileSystem 实例的线程安全性,所以每次调用的时候都通过 FileSystem.get(conf) 方法来获取一个新的对象。然后查阅了资料发现 FileSystem.get 方法会得到的并不是一个全新的对象,而是一个缓存过的对象。

源码如下:

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

可以看到 FileSyste.get 方法的确是取缓存的。

2. 解决方案

为了保障线程安全性,我们可以每次调用的时候都创建一个新的实例,FileSystem.newInstance 方法可以满足我们的需求。

package com.avaloninc.hdfsapi.service.impl;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;

import com.avaloninc.hdfsapi.service.HdfsService;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @Author: wuzhiyu.
 * @Date: 2018-03-19 14:39.
 * @Description:
 */
@Service
public class HdfsServiceImpl implements HdfsService {
    @Autowired
    private Configuration config;

    @Override
    public String read(String path) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            Path hdfsPath = new Path(path);
            List<String> lines = IOUtils.readLines(fileSystem.open(hdfsPath));
            return Joiner.on("\n").join(lines);
        }
    }

    @Override
    public void write(String path, InputStream inputStream) throws IOException {
        FileSystem         fileSystem   = null;
        FSDataOutputStream outputStream = null;
        try {
            Path hdfsPath = new Path(path);
            fileSystem = FileSystem.newInstance(config);
            outputStream = fileSystem.create(hdfsPath, true);

            byte[] bytes    = new byte[1024];
            int    numBytes = 0;
            while ((numBytes = inputStream.read(bytes)) > 0) {
                outputStream.write(bytes, 0, numBytes);
            }
        } finally {
            IOUtils.closeQuietly(inputStream);
            IOUtils.closeQuietly(outputStream);
            IOUtils.closeQuietly(fileSystem);
        }
    }

    @Override
    public boolean rename(String src, String dest) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            Path srcPath = new Path(src);
            Path destPath = new Path(dest);

            if (!fileSystem.exists(srcPath)) {
                throw new IOException("Path " + src + " do not exists.");
            }

            if (!fileSystem.exists(destPath.getParent())) {
                fileSystem.mkdirs(destPath.getParent());
            }

            return fileSystem.rename(srcPath, destPath);
        }
    }

    @Override
    public boolean delete(String path) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            return fileSystem.delete(new Path(path), true);
        }
    }

    @Override
    public List<String> ls(String path) throws IOException {
        try (FileSystem fileSystem = FileSystem.newInstance(config)) {
            Path hdfsPath = new Path(path);
            if (!fileSystem.exists(hdfsPath)) {
                throw new IllegalArgumentException(
                    "Path " + path + " do not exist or is not a dir.");
            }

            if (fileSystem.isDirectory(hdfsPath)) {
                return Arrays.stream(fileSystem.listStatus(hdfsPath))
                    .map(FileStatus::getPath)
                    .map(Path::getName)
                    .collect(Collectors.toList());
            } else {
                FileStatus status = fileSystem.getFileStatus(hdfsPath);
                return ImmutableList.of(status.getPath().getName());
            }
        }
    }
}

3. 参考资料

Spring多数据源使用

1. 前言

平时开发的时候偶尔会遇到多数据库读写的情况(非分库分表),本文会给出一个简单的配置和使用两个数据库的示例。

2. 依赖与属性配置

首先给出 pom.xml 中引入的依赖:

<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>LATEST</version>
</dependency>

然后我们手动创建两个数据库。在尝试使用多数据源的时候发现的一个问题是 spring.datasource.schema 不生效了,意味着不能使用 Spring 启动时自动创建库表的特性,所以只能手动创建数据库:

mysql root@localhost:(none)> DROP DATABASE IF EXISTS `prime`;
                          -> CREATE DATABASE `prime`;
                          -> use prime;
                          ->
                          -> DROP TABLE IF EXISTS person;
                          -> CREATE TABLE `person` (
                          ->   `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
                          ->   `name` varchar(50) NOT NULL DEFAULT '' COMMENT '
                          -> 名字',
                          ->   `age` int(11) NOT NULL COMMENT '年龄',
                          ->   `gender` int(11) NOT NULL COMMENT '性别',
                          ->   PRIMARY KEY (`id`)
                          -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
                          ->
You're about to run a destructive command.
Do you want to proceed? (y/n): y
Your call!
Query OK, 0 rows affected
Time: 0.006s

Query OK, 1 row affected
Time: 0.012s

You are now connected to database "prime" as user "root"
Time: 0.002s

Query OK, 0 rows affected
Time: 0.007s

Query OK, 0 rows affected
Time: 0.021s
mysql root@localhost:(none)> DROP DATABASE IF EXISTS `secondary`;
                          -> CREATE DATABASE `secondary`;
                          -> use secondary;
                          ->
                          -> DROP TABLE IF EXISTS person;
                          -> CREATE TABLE `person` (
                          ->   `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
                          ->   `name` varchar(50) NOT NULL DEFAULT '' COMMENT '
                          -> 名字',
                          ->   `age` int(11) NOT NULL COMMENT '年龄',
                          ->   `gender` int(11) NOT NULL COMMENT '性别',
                          ->   PRIMARY KEY (`id`)
                          -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
You're about to run a destructive command.
Do you want to proceed? (y/n): y
Your call!
Query OK, 0 rows affected
Time: 0.004s

Query OK, 1 row affected
Time: 0.001s

You are now connected to database "secondary" as user "root"
Time: 0.002s

Query OK, 0 rows affected
Time: 0.001s

Query OK, 0 rows affected
Time: 0.028s

下面是 application.properties,注明了两个数据库的基本信息:

spring.datasource.primary.url=jdbc:mysql://127.0.0.1:3306/prime?charset=utf8
spring.datasource.primary.username=root
spring.datasource.primary.password=root
spring.datasource.primary.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.secondary.url=jdbc:mysql://127.0.0.1:3306/secondary?charset=utf8
spring.datasource.secondary.username=root
spring.datasource.secondary.password=root
spring.datasource.secondary.driver-class-name=com.mysql.jdbc.Driver

3. 配置数据源

针对两个数据库我们要分别创建对应的 DataSourcePlatformTransactionManagerSqlSessionFactory

第一个 DataSource 我们使用 Spring 中的 DataSourceBuilder 来构建,并用 @Primary 注解来标记:

package com.avaloninc.springmultidatasourceexample.conf;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

@Configuration
@EnableTransactionManagement
@MapperScan(basePackages = "com.avaloninc.springmultidatasourceexample.mapper.prime", sqlSessionFactoryRef = "primeSsf")
public class DataSourceConfigOne {

  @Bean(name = "primaryDs")
  @ConfigurationProperties(prefix = "spring.datasource.primary")
  @Primary
  public DataSource primaryDs() {
    return DataSourceBuilder.create().build();
  }


  @Bean(name = "primaryTxm")
  @Primary
  public PlatformTransactionManager primaryTxm(@Qualifier("primaryDs") DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
  }

  @Bean(name = "primeSsf")
  @Primary
  public SqlSessionFactory primeSsf(@Qualifier("primaryDs") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSource);
    return sqlSessionFactoryBean.getObject();
  }
}

第二个数据源我们使用 druid 作为连接池,除了基本参数外其他都采用默认配置:

package com.avaloninc.springmultidatasourceexample.conf;

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

@Configuration
@EnableTransactionManagement
@MapperScan(basePackages = "com.avaloninc.springmultidatasourceexample.mapper.secondary", sqlSessionFactoryRef = "secondarySsf")
public class DataSourceConfigTwo {

  @Bean(name = "secondaryDs")
  @ConfigurationProperties(prefix = "spring.datasource.secondary")
  public DataSource secondaryDs() {
    return DruidDataSourceBuilder.create().build();
  }

  @Bean(name = "secondaryTxm")
  public PlatformTransactionManager secondaryTxm(@Qualifier("secondaryDs") DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
  }


  @Bean(name = "secondarySsf")
  public SqlSessionFactory secondarySsf(@Qualifier("secondaryDs") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSource);
    return sqlSessionFactoryBean.getObject();
  }
}

注意,在配置两个不同的数据源时都各自加了 @MapperSacn 注解,并各自给定了 basePackagessqlSessionFactoryRef。不同数据源的 Mapper 在不同的 package 里面定义,并使用不同的 SqlSessionFactory 来创建。

下面是 Mapper 接口的定义:

package com.avaloninc.springmultidatasourceexample.mapper.prime;

import com.avaloninc.springmultidatasourceexample.domain.Person;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;

@Mapper
public interface PersonMapper {
  /**
   * Insert int.
   *
   * @param person the person
   * @return the int
   */
  @Insert("insert into person (name, age, gender) values (#{p.name}, #{p.age}, #{p.gender, typeHandler=org.apache.ibatis.type.EnumOrdinalTypeHandler, javaType=com.avaloninc.springmultidatasourceexample.domain.Person$Gender})")
  @Options(useGeneratedKeys = true, keyProperty = "p.id")
  int insert(@Param("p") Person person);

  /**
   * Gets person by id.
   *
   * @param id the id
   * @return the person by id
   */
  @Select("select id, name, age, gender from person where id = #{id}")
  @Results(id = "person", value = {
      @Result(column = "gender", property = "gender", typeHandler = EnumOrdinalTypeHandler.class)
  })
  Person getPersonById(@Param("id") int id);
}

以及:

package com.avaloninc.springmultidatasourceexample.mapper.secondary;

import com.avaloninc.springmultidatasourceexample.domain.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;

@Mapper
public interface UserMapper {
  /**
   * Insert int.
   *
   * @param user the user
   * @return the int
   */
  @Insert("insert into person (name, age, gender) values (#{p.name}, #{p.age}, #{p.gender, typeHandler=org.apache.ibatis.type.EnumOrdinalTypeHandler, javaType=com.avaloninc.springmultidatasourceexample.domain.User$Gender})")
  @Options(useGeneratedKeys = true, keyProperty = "p.id")
  int insert(@Param("p") User user);

  /**
   * Gets person by id.
   *
   * @param id the id
   * @return the person by id
   */
  @Select("select id, name, age, gender from person where id = #{id}")
  @Results(id = "person", value = {
      @Result(column = "gender", property = "gender", typeHandler = EnumOrdinalTypeHandler.class)
  })
  User getUserById(@Param("id") int id);
}

4. 简单的 Service 调用

下面是针对这两个 Mapper 的写的简单的读写接口:

package com.avaloninc.springmultidatasourceexample.service;


import com.avaloninc.springmultidatasourceexample.domain.Person;

public interface PersonService {
  /**
   * Insert int.
   *
   * @param person the person
   * @return the int
   */
  int insert(Person person);

  /**
   * Gets person by id.
   *
   * @param id the id
   * @return the person by id
   */
  Person getPersonById(int id);
}
package com.avaloninc.springmultidatasourceexample.service;

import com.avaloninc.springmultidatasourceexample.domain.Person;
import com.avaloninc.springmultidatasourceexample.mapper.prime.PersonMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PersonServiceImpl implements PersonService {

  @Autowired
  private PersonMapper personMapper;

  @Override
  public int insert(Person person) {
    return this.personMapper.insert(person);
  }

  @Override
  public Person getPersonById(int id) {
    return this.personMapper.getPersonById(id);
  }
}

以及:

package com.avaloninc.springmultidatasourceexample.service;

import com.avaloninc.springmultidatasourceexample.domain.User;

public interface UserService {
  /**
   * Insert int.
   *
   * @param user the user
   * @return the int
   */
  int insert(User user);

  /**
   * Gets user by id.
   *
   * @param id the id
   * @return the user by id
   */
  User getUserById(int id);
}
package com.avaloninc.springmultidatasourceexample.service;

import com.avaloninc.springmultidatasourceexample.domain.User;
import com.avaloninc.springmultidatasourceexample.mapper.secondary.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class UserServiceImpl implements UserService {

  @Autowired
  private UserMapper userMapper;

  @Override
  public int insert(User user) {
    return this.userMapper.insert(user);
  }

  @Override
  public User getUserById(int id) {
    return this.userMapper.getUserById(id);
  }
}

5. 单元测试

单元测试一发入魂:

package com.avaloninc.springmultidatasourceexample;

import static org.junit.Assert.*;

import com.avaloninc.springmultidatasourceexample.domain.Person;
import com.avaloninc.springmultidatasourceexample.domain.User;
import com.avaloninc.springmultidatasourceexample.domain.User.Gender;
import com.avaloninc.springmultidatasourceexample.service.PersonService;
import com.avaloninc.springmultidatasourceexample.service.UserService;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MainTest extends TestCase {

  @Autowired
  private UserService userService;
  @Autowired
  private PersonService personService;

  @Test
  public void test() {
    User user = new User();
    user.setAge(28);
    user.setGender(Gender.MALE);
    user.setName("John");

    int count = this.userService.insert(user);
    assertEquals(1, count);

    User userById = this.userService.getUserById(user.getId());
    assertEquals(user, userById);

    Person person = new Person();
    person.setAge(27);
    person.setName("Doe");
    person.setGender(Person.Gender.MALE);

    int insertCount = this.personService.insert(person);
    assertEquals(1, insertCount);

    Person personById = this.personService.getPersonById(person.getId());
    assertEquals(person, personById);
  }
}

查看数据库确认一下:

mysql root@localhost:(none)> select * from prime.person;
+----+------+-----+--------+
| id | name | age | gender |
+----+------+-----+--------+
| 1  | Doe  | 27  | 0      |
+----+------+-----+--------+
1 row in set
Time: 0.006s
mysql root@localhost:(none)> select * from secondary.person;
+----+------+-----+--------+
| id | name | age | gender |
+----+------+-----+--------+
| 1  | John | 28  | 0      |
+----+------+-----+--------+
1 row in set
Time: 0.006s
mysql root@localhost:(none)>

以上!