GitLab API 踩坑小结

1. 前言

今年上半年一直在做一个离线作业的调度系统。这个季度为了更好的用户体验,避免用户手动上传和管理文件,做了与公司内部 GitLab 打通的功能。

一方面通过 GitLab 提供的 API,可以很方便地选定某个脚本文件作为数据加工的执行脚本。每次升级的时候选定不同 commit 的版本即可快速地发布任务。

另一种场景下,通过打通 GitLab 的 WebHook 可以在用户向 master 分支推送代码的时候自动触发构建和上传构建好的 artifact,避免用户手动上传几百兆的 JAR 包的等待时间。

2. 实现方案

2.1 文件获取与存储

先说一下文件存储,由于作业调度在分布式环境下执行,所以文件的存储也必须是一个高可用的分布式的文件存储。

作为一个离线作业的调度系统,第一个想到的自然是 HDFS。毕竟如果 HDFS 挂了,基本离线计算任务你也别想跑了。

文件的获取方式分为两种:

第一种是直接从 GitLab 的 Repository 里面拉取文件,这种只适合 SQL、Shell、Python 纯文本的脚本文件(相关接口见:Get file from repository)。

第二种则是面向诸如 Spark 程序的 JAR 包,动辄上百兆。具体做法是与 Jenkins 这种打包构建的工具相结合,master 分支的代码更新后触发 WebHook,由构建工具将源代码打包并通过接口回传。接口可以根据项目的 Project id 和 commit id 组合成一个确定的路径写入 HDFS。

2.2 账号与权限

说到 GitLab API 的调用就不得不提 GitLab 的认证的过程。GitLab 提供了三种验证方式

本着猛糙快的互联网精神,果断选择了第二种方式:使用账户生成一个 TOKEN,通过 HTTP 请求的 Header 参数或者 URL 中的查询参数传递给服务器。简单粗暴!

那么第一个问题来了,这个账号用谁的账号呢?

从代码安全性的角度考虑,个人账号不合适。原因有两点:首先私有项目如果要使用该功能必须把这个账号加入到项目的 Members 中,意味着个人账号可以看到别人(或者别的组)私有项目的内容。本着不粘不背锅的精神,能不碰的就不碰。第二点,GitLab 的账号与企业 LDAP 账号是互通的,一旦离职很可能直接导致 TOKEN 失效,API 无法调用(负责 GitLab 的小姐姐一直强调交接的问题,一种明天就要离职的感觉)。

解决方案是申请一个应用账号,这个账号默认不带有任何项目的权限,用户需要使用这个功能的时候将应用账号加入到 Members 中,赋予 Reporter 角色(至少是 Reporter 角色,否则无法获取文件内容)。

第二个问题:如果共享一个应用账号如防止用户窥探无权限的项目呢?

答案是每次涉及项目信息时通过 GitLab 的 Get project users 接口获取有权限用户的用户名列表,与请求用户的用户名对照。

2.3 交互过程

无论是通过 GitLab 获取文件还是构建好的 artifact,第一步都是先确定一个项目以及其版本信息。搜索 GitLab 项目、选取 Master 分支的特定 commit 流程可以用下图来简单描述一下:

接下来首先说一下 GitLab 文件的上传过程。要唯一的确定 GitLab 中的一个文件需要三个要素:GitLab Project id、Commit id 以及文件在项目中相对路径。因此,后续的交互过程可以用下图来描述:

然后说一下 Git artifact 的上传。使用 WebHook 之后 Jenkins 会把所有 artifact 通过接口回传,我们要做的只是权限和文件的验证即可:

3. 踩过的坑

3.1 分页参数

先从 GitLab 的 API 返回格式说起吧。在过往的工作经历中,后端返回 JSON 一半分为三个部分:

  • Response 元信息,比如返回的状态码、错误提示、请求的唯一标识等等。

  • Response 数据题,真正承载数据的部分。

  • Response 分页信息,主要针对列表查询。

但是 GitLab API 的风格完全不一样,没有响应的元信息和分页信息,直接使用 HTTP 的 Status Code 描述请求的异常。

这倒也罢了,但是类似于列表的接口完全没有分页信息,请求的参数里也没有提到分页参数的设置。一开始还自作聪明地以为 GitLab 的 API 返回了全量的数据,结果在通过关键字搜索 Git 仓库的时候竟然搜不到自己的项目!之后查阅文档才发现 GitLab 的分页信息是写在 Header 里面的(详情请见 Pagination)!

3.2 URL 参数转义

Get file from repository 这个接口的参数定义中,file_path 的说明是 Url encoded full path to new file. Ex. lib%2Fclass%2Erb。意思是说 lib/class.rub 这种文件的相对路径要进行转义,在使用 RestTemplate 的时候猜到了 URL 参数转义的坑。

首先看如下代码:

  @Test
  public void testForUriTemplateWithRawPathParam() {
    String url = "https://gitlab.example.com/api/v4/projects/{project_id}/repository/files/{file_path}"
        + "?private_token={token}";
    UriTemplate uriTemplate = new UriTemplate(url);
    URI expand = uriTemplate.expand(ImmutableMap.of("project_id", 1,
                                                    "file_path", "lib/class.rb",
                                                    "token", "abc"));
    System.out.println("expand = " + expand.toString());
  }

标准输出流的结果是:

expand = https://gitlab.example.com/api/v4/projects/1/repository/files/lib/class.rb?private_token=abc

看来 UriTemplate(org.springframework.web.util.UriTemplate) 并不会主动为参数进行转义,那么我们手动为参数进行转义试试:

  @Test
  public void testForUriTemplateWithEncodedPathParam() throws UnsupportedEncodingException {
    String url = "https://gitlab.example.com/api/v4/projects/{project_id}/repository/files/{file_path}"
        + "?private_token={token}";
    UriTemplate uriTemplate = new UriTemplate(url);

    String encode = UriUtils.encode("lib/class.rb", "UTF-8");
    System.out.println("encode = " + encode);
    URI expand = uriTemplate.expand(ImmutableMap.of("project_id", 1, "file_path", encode, "token", "abc"));
    System.out.println("expand = " + expand.toString());
  }

输出的结果为:

encode = lib%2Fclass.rb
expand = https://gitlab.example.com/api/v4/projects/1/repository/files/lib%252Fclass.rb?private_token=abc

看来 UriTemplate 把我们手动转义的参数中的 % 又进行了一次转义变成了 %25

看来 UriTemplate 要么不转义,要么把结果再给转一次,反正是没法用了。

那么到底如何才能得到正确的参数呢?

我们需要使用 UriComponentsBuilder(org.springframework.web.util.UriComponentsBuilder)

虽然 UriTemplate 底层也是使用的 UriComponentsBuilder,但是我们需要更加精细的控制:

 @Test
  public void testForUriComponentsBuilder() throws UnsupportedEncodingException {

    URI    uri;
    String filePath = "lib/class.rb";

    // Illegal for path, it should use pathSegment
    //uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
    //    .path(String.valueOf(1)).path("repository").path("files").path("lib/class.rb")
    //    .queryParam("private_token", "abc").build(false).toUri();
    //System.out.println("uri = " + uri.toString());
    // result: uri = https://gitlab.example.com/api/v4/projects/1repositoryfileslib/class.rb?private_token=abc
    //
    //uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
    //    .path(String.valueOf(1)).path("repository").path("files").path("lib/class.rb")
    //    .queryParam("private_token", "abc").build(true).toUri();
    //System.out.println("uri = " + uri.toString());
    // result: uri = https://gitlab.example.com/api/v4/projects/1repositoryfileslib/class.rb?private_token=abc

    // exception for build true parameter because 'lib/class.rb' contains '/'
    //uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
    //    .pathSegment(String.valueOf(1), "repository", "files", "lib/class.rb")
    //    .queryParam("private_token", "abc").build(true).toUri();
    //System.out.println("uri = " + uri.toString());

    String encode = UriUtils.encode(filePath, "UTF-8");
    System.out.println("encode = " + encode);
    String encodePath = UriUtils.encodePath(filePath, "UTF-8");
    System.out.println("encodePath = " + encodePath);
    String encodePathSegment = UriUtils.encodePathSegment(filePath, "UTF-8");
    System.out.println("encodePathSegment = " + encodePathSegment);

    uri = UriComponentsBuilder.fromUriString("https://gitlab.example.com/api/v4/projects/")
        .pathSegment(String.valueOf(1), "repository", "files", encodePathSegment)
        .queryParam("private_token", "abc").build(true).toUri();
    System.out.println("uri = " + uri.toString());
  }

使用上述单元测试里的代码即可构造出访问 GitLab 所需要的 URI 了!

4. 小结

本文所有的参考资料全部来源于 GitLab 的 API 文档,涉及到的 API 有:

以上。

Insert Or Update 后续

1. 问题描述

使用 insert … on duplicate key update 语法实现 insertOrUpdate 之后出现了几个新问题,首先给出测试代码:

DROP database if EXISTS test;
CREATE database test;

DROP TABLE if EXISTS test.person;
CREATE table test.person (
    id int not NULL PRIMARY KEY auto_increment,
    name VARCHAR(100) not NULL DEFAULT '' UNIQUE COMMENT '名字',
    age int not NULL DEFAULT 0 COMMENT '年龄',
    gender VARCHAR(20) NOT NULL DEFAULT '' COMMENT '性别',
    addresses text NOT NULL COMMENT '地址'
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='person';

MyBatis 语句:

<insert id="insertOrUpdate" useGeneratedKeys="true" keyProperty="id">
    insert into test.person
    (id, name, age, gender, addresses)
    VALUES
    <foreach collection="list" item="person" separator=",">
        (id, #{person.name}, #{person.age}, #{person.gender},
        #{person.addresses, typeHandler=com.note4code.test.persistence.typeHandler.GenericMapHandler})
    </foreach>
    on duplicate key update
    age = VALUES(age),
    gender = VALUES(gender),
    addresses = VALUES(addresses)
</insert>

Java 代码:

package com.note4code.test.service;

import com.note4code.test.domain.Address;
import com.note4code.test.domain.Gender;
import com.note4code.test.domain.Person;
import com.note4code.test.domain.Province;
import junit.framework.TestCase;
import org.assertj.core.util.Lists;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PersonServiceTest extends TestCase{

  @Autowired
  private PersonService personService;

  private Person me;
  private Person you;
  private Person him;

  @Before
  public void initData() {
    Address address = new Address(Province.BEIJING, "北京", "学院路");
    Map<Province, Address> map = Maps.newHashMap(Province.BEIJING, address);

    this.me = new Person();
    this.me.setName("me");
    this.me.setAge(27);
    this.me.setGender(Gender.MALE);
    this.me.setAddresses(map);

    this.you = new Person();
    this.you.setName("you");
    this.you.setAge(25);
    this.you.setGender(Gender.FEMALE);
    this.you.setAddresses(map);

    this.him = new Person();
    this.him.setName("him");
    this.him.setAge(25);
    this.him.setGender(Gender.MALE);
    this.him.setAddresses(map);
  }

  @Test
  public void testForOnDuplicateKey() {
    personService.addPerson(me);
    int id = me.getId();

    me.setAge(28);
    List<Person> people = Lists.newArrayList(me, you, him);
    personService.addOrUpdate(people);
    assertTrue(id != me.getId());
  }
}

运行测试用例,得到的输出结果是:

people = [Person{id=2, name=’me’, age=28, gender=MALE, addresses={BEIJING=Address{province=BEIJING, city=’北京’, street=’学院路’}}}
, Person{id=0, name=’you’, age=25, gender=FEMALE, addresses={BEIJING=Address{province=BEIJING, city=’北京’, street=’学院路’}}}
, Person{id=0, name=’him’, age=25, gender=MALE, addresses={BEIJING=Address{province=BEIJING, city=’北京’, street=’学院路’}}}
]

另外,查询数据库可以得到:

mysql root@localhost:test> SELECT * from person;
+------+--------+-------+----------+--------------------------------------------------------------------+
|   id | name   |   age | gender   | addresses                                                          |
|------+--------+-------+----------+--------------------------------------------------------------------|
|    1 | me     |    28 | MALE     | {"BEIJING":{"province":"BEIJING","city":"北京","street":"学院路"}} |
|    2 | you    |    25 | FEMALE   | {"BEIJING":{"province":"BEIJING","city":"北京","street":"学院路"}} |
|    3 | him    |    25 | MALE     | {"BEIJING":{"province":"BEIJING","city":"北京","street":"学院路"}} |
+------+--------+-------+----------+--------------------------------------------------------------------+
3 rows in set
Time: 0.002s
mysql root@localhost:test> SELECT LAST_INSERT_ID();
+--------------------+
|   LAST_INSERT_ID() |
|--------------------|
|                 17 |
+--------------------+
1 row in set
Time: 0.001s

从上面的示例可以看出 3 个问题:

  1. 即使使用了 userGeneratedKeys = true 并指定了 keyProperty,只回写了第一行的主键。
  2. 回写的主键与数据库不一致。
  3. LAST_INSERT_ID() 的值发生了跳跃,按理来说应该是 3,但是变成了 17。

2. 疑问

看到这里其实很让人费解:

  1. 为什么只返回了一个主键?
  2. useGeneratedKeys 返回的主键不对那么到底是什么?
  3. 为什么 LAST_INSERT_ID() 发生了跳变?

首先从 userGeneratedKeys 说起:

useGeneratedKeys(仅对 insert 和 update 有用)这会令 MyBatis 使用 JDBC 的 getGeneratedKeys 方法来取出由数据库内部生成的主键(比如:像 MySQL 和 SQL Server 这样的关系数据库管理系统的自动递增字段),默认值:false。

引自 insert, update 和 delete

With older JDBC drivers for MySQL, you could always use a MySQL-specific method on theStatement interface, or issue the query SELECT LAST_INSERT_ID() after issuing an INSERT to a table that had an AUTO_INCREMENT key.

First, we demonstrate the use of the new JDBC 3.0 method getGeneratedKeys() which is now the preferred method to use if you need to retrieve AUTO_INCREMENT keys and have access to JDBC 3.0. The second example shows how you can retrieve the same value using a standard SELECT LAST_INSERT_ID() query. 

引自 Retrieving AUTO_INCREMENT Column Values through JDBC

也就是说 Mybatis 通过 useGeneratedKeys 返回的是 LAST_INSERT_ID()

接着说,那么为什么只回写了一个主键,并且还是错的呢?

If you insert multiple rows using a single INSERT statement, LAST_INSERT_ID() returns the value generated for the first inserted row only.

引自 LAST_INSERT_ID()LAST_INSERT_ID(expr)

按照上文的说法,批量插入只会返回插入的第一条数据的主键。第一次插入 me 这个对象之后 LAST_INSERT_ID() 返回 1。接着在插入 people 时首先是更新了 me 这行记录,而 LAST_INSERT_ID() 没有变。直到插入第二行 you 这个对象,此时 LAST_INSERT_ID() 返回 2,也就是批量插入后回写的主键值。这同时解释了为什么只回写了一个主键并且回写的主键与数据库没有对应上。

最后,关于 LAST_INSERT_ID() 的跳变,我也找到了一些参考资料:

  1. 官方文档:InnoDB AUTO_INCREMENT Lock Modes
  2. 其实这个说的更加简洁清楚:AUTO_INCREMENT 字段的GAP

ElasticSearch 多值类型的数据存储

1. 使用场景

一般来说,我们的离线计算和统计都是在 Hive 里面进行的,但是也有离线计算不适应的场景。

比如在做移动行为分析的时候我们想要知道一连串事件的转化率,这就需要建立一个漏斗。如果使用 Hive 来计算漏斗,那么有几个问题是需要我们考虑的:

  1. 漏斗的建立需要一个尝试和调整的过程,有的时候业务比较复杂,那么漏斗的调整次数也会比较多。每次用 Hive 来计算的话显然是比较慢的。

  2. 同一个漏斗多人查看的话每次都使用 Hive 计算吗?显然不,数据应该使用 MySQL 来缓存已经查询到的结果,那么查询就变成了二级查询:先查 MySQL,如果 MySQL 中没有数据再调用 Hive 计算,然后把计算结果写到 MySQL 中,这就变成了一个比较复杂的异步的过程。我一开始的方案是:首次建立漏斗的话立刻异步地调用 Hive 去计算漏斗的历史数据,然后每天用调度系统将漏斗的配置 Sqoop 导入到 Hive 中进行计算,然后 Sqoop 导出到 MySQL 中用来展示。

  3. 同时也会遇到某些天的数据突然出现问题,修复后需要重启计算任务,那么如何保持 Hive、MySQL 数据的一致性也是需要我们考虑的。

总之:使用 Hive + MySQL 的方式来做漏斗计算不仅慢,而且复杂!两周的冲刺应该是上不了线的!

2. 解决方案

然后老大就让我使用 Hive + ElasticSearch 来做。简单来说,先使用 Hive 把源数据进行处理,得到初步聚合的结果,然后将数据导入到 ElasticSearch 中。使用的时候 Java 客户端程序通过参数构成查询条件向 ElasticSearch 查询数据。

当然坑也踩了不少,首先在使用 ElasticSearch 之前没有做过预先的调研,一直以关系型数据库的思维在想着如何来计算数据和查询数据,然后就发现了 ElasticSearch:

  • 不支持子查询
  • 不支持 COUNT(DISTINCT)
  • 不支持 Join

按照官方文档的说法,他们暂时不支持子查询和 COUNT(DISTINCT),短期内也不想增加这个功能。至于 Join,官方文档的建议是在 Java 程序中进行 Join,也就是说分为两步操作,Java 客户端先获取 Join 的字段,再以查到的 Join 字段作为查询条件进行查询。如果数据量很大的话,两次查询光传输数据就要数十秒了!

当时最大的困惑是我要计算用户事件行为的漏斗转化率,包括 pv 和 uv。问题来了,一开始的计算方案是把每一个用户的每一次事件信息都存储下来,计算的时候就用事件名去过滤然后 COUNT 就能算 pv 了。但是无法对用户去重,uv 计算不了。

好在老大提醒我把每个用户发生的事件及其点击次数放在一个 Map 里面,事件名作为 key,点击次数作为 value,一个用户一行就行。计算 pv 的时候根据对应事件以及它们的点击次数来求 sum,比如 map_type.event_id 就是从一个名叫 map_type 的字段中以 event_id 作为事件名查询点击次数。而求 uv 也很简单,因为每一行就对应一个用户,所以 COUNT 行数就行了。

在做漏斗计算的时候,只需要用 MySQL 来保存漏斗的配置信息,每次在 Java 后端代码中根据漏斗各个步骤的事件名构建查询语句,得到 ElasticSearch 返回的结果简单处理一下即可返回给前端。本来也担心过查询的速度是不是够快,需不需要用 MySQL 缓存漏斗的查询结果。但是真实使用的时候 ElasticSearch 查询的速度完全打消了我的顾虑。

3. 几个值得记录的问题

3.1 Map<String, bigint>

在解决问题的过程中还是有几个点值得提一下,首先 Hive 没有提供原生的将多行记录转成 Map<String, bigint> 类型的方法,str_to_map 函数只能返回 Map<String, String> 类型。当然这里其实可以写一个 UDAF 来解决,但是之前没有写过 UDF 和 UDAF,所以为了节省时间我用了 Streaming 中的 TRANSFORM 来解决这个问题。代码如下:

#!/bin/env python

import sys

def str_to_map(map_str):
    pairs = map_str.split(',')
    ret   = []
    for pair in pairs:
        key, value = pair.split('=')
        ret.append("%s\003%d" % (key, int(value)))
    return '\002'.join(ret)

for line in sys.stdin:
    line     = line.strip()
    fields   = line.split()
    map_str3 = fields.pop()
    map_str2 = fields.pop()
    map_str1 = fields.pop()
    fields.append(str_to_map(map_str1))
    fields.append(str_to_map(map_str2))
    fields.append(str_to_map(map_str3))
    print "\001".join(fields)

在这里返回 Map 类型数据的时候,我采用了默认的分割字符来标记 Map。

这里更新一下,为了节省磁盘空间,公司要求尽量以 ORC 格式来存储数据,导致上述的 Python 脚本不能使用。所以修改了一下 Hive 中 UDF str_to_map,使得其能够返回 Map<String, bigint>。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;

import java.util.LinkedHashMap;

/**
 * GenericUDFStringToMap.
 *
 */
@Description(name = "str_to_map", value = "_FUNC_(text, delimiter1, delimiter2) - "
        + "Creates a map by parsing text ", extended = "Split text into key-value pairs"
        + " using two delimiters. The first delimiter seperates pairs, and the"
        + " second delimiter sperates key and value. If only one parameter is given, default"
        + " delimiters are used: ',' as delimiter1 and '=' as delimiter2.")
public class GenericUDFStrToMap extends GenericUDF {
    // Must be deterministic order map for consistent q-test output across Java versions - see HIVE-9161
    private final LinkedHashMap<Object, Long> ret = new LinkedHashMap<Object, Long>();
    private transient Converter soi_text, soi_de1 = null, soi_de2 = null;
    final static String default_de1 = ",";
    final static String default_de2 = ":";

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        for (int idx = 0; idx < Math.min(arguments.length, 3); ++idx) {
            if (arguments[idx].getCategory() != Category.PRIMITIVE
                    || PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
                    ((PrimitiveObjectInspector) arguments[idx]).getPrimitiveCategory())
                    != PrimitiveGrouping.STRING_GROUP) {
                throw new UDFArgumentException("All argument should be string/character type");
            }
        }
        soi_text = ObjectInspectorConverters.getConverter(arguments[0],
                PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        if (arguments.length > 1) {
            soi_de1 = ObjectInspectorConverters.getConverter(arguments[1],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }
        if (arguments.length > 2) {
            soi_de2 = ObjectInspectorConverters.getConverter(arguments[2],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }

        return ObjectInspectorFactory.getStandardMapObjectInspector(
                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
                PrimitiveObjectInspectorFactory.javaLongObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        ret.clear();
        String text = (String) soi_text.convert(arguments[0].get());
        String delimiter1 = (soi_de1 == null) ?
                default_de1 : (String) soi_de1.convert(arguments[1].get());
        String delimiter2 = (soi_de2 == null) ?
                default_de2 : (String) soi_de2.convert(arguments[2].get());

        String[] keyValuePairs = text.split(delimiter1);

        for (String keyValuePair : keyValuePairs) {
            String[] keyValue = keyValuePair.split(delimiter2, 2);
            if (keyValue.length < 2) {
                ret.put(keyValuePair, null);
            } else {
                ret.put(keyValue[0], Long.valueOf(keyValue[1]));
            }
        }

        return ret;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "usage not know!";
    }
}

3.2 Index OR Type

在存储的时候还有一个问题需要解决:对于每一天的分区数据,是用 Index 还是 Type 来存储?这个问题的由来和 ElasticSearch 的存储方式以及数据的回溯有关。

ElasticSearch 在设计的时候一开始是对照关系型数据库的。一个 Index 对应一个数据库,Index 下的 Type 对应 Table。但这并不是强制的要求的,数据到底怎么存应该根据自己的使用情景来定,具体可以参照这个文章 index vs type

另外一方面,难免有些数据有错误或者指标口径被修改了,那么要重新计算和导入数据。那么在重新导入数据的时候就要先删除所有的分区数据,恰巧 ElasticSearch 是没有分区的概念的。如果把 Hive 一个表中所有分区数据存在一张表中,用 stat_date 字段来标记日期,那么在删除数据的时候 ElasticSearch 会先查出所有这一天的数据然后再删除,无疑是很慢的解决方法。

最终我采用的是一个分区一个 Index,每次重新导入数据前先把对应的 Index 删除。虽然 Index 更耗资源,但是这样使用更符合一般的工作流程。

ElasticSearch 使用小结

1. 序

最近一直在做的一个项目使用了 ElasticSearch 作为查询和计算的数据源,中间踩了不少坑,也有一些发现和心得体会,趁着假期好好总结一下。

2. 使用体会

ElasticSearch 也不介绍了,我这等菜鸟也讲不出什么所以然,反正老大怎么说就怎么来吧。这里直接谈谈心得体会,一个字:快!

这次用的 ElasticSearch 集群一共 15 台服务器,每台机器大概是 32G 内存吧。几千万、几个亿条的数据做查询和聚合杠杠的!(原谅我一个江南人在北京七年也变得放荡不羁)几乎可以做实时的离线分析了!

下面讲讲使用 ElasticSearch 遇到的坑,分为以下几个部分:

  • Query DSL
  • 多值类型的数据存储
  • Hive 向 ElasticSearch 导出数据
  • Java 客户端 API 调用

这几个部分会在接下来的几篇文章中详细讲一下。

Python Unicode 字节串转成中文问题

1. 序

我觉得 Python 2.7 最大的坑就是字符编码的问题,默认不支持中文。就算你加个中文呢注释都要在文件顶部指定:# -*- coding: utf-8 -*-

说真的,我从来搞不懂也记不住这些个编码解码巴拉巴拉。。。

但是前两天还真的遇到一个问题了。

2. 需求

有一份数据要入库,数据来源是从一个 API 获取 json 数据,解析成行之后写到文件里
到 HDFS 目录就好了。

Python 解析 json 非常方便。但是偏偏开发的这个接口的哥们儿返回的都是 \u7f16\u7801\u771f\u8ba8\u538c 这样的结果,查了一下原来是 unicode 中文编码以 ascii 码的方式来解析的结果。

一开始还以为是字符集的问题,查了半天才找到了两个解决方案:

3. 解决方案

a. str.decode

print '''\u7f16\u7801\u771f\u8ba8\u538c'''.decode('unicode_escape')
#输出:编码真讨厌

b. codecs.open

with codecs.open(file_name, 'w', 'utf-8') as f:
    f.write('\n'.join([ '\001'.join(row) for row in lines ]))

4.参考文章

一种 Hive 表存储格式的转换的方式

1. 序

实习的公司考虑到 ORC 格式拥有更高的压缩比和传输效率,所以要求我们建表的时候都采用 ORC 的格式。

好死不死,有个项目要把 Hive 里算出来的数通过 Sqoop 导出到 MySQL 里去。而 Sqoop 对于 ORC 格式的数据导出是有问题的,所以我面临的问题就是把 ORC 分区的数据转换成 TEXT 格式的。

2. 面临的实际状况

  1. 都是外部表
  2. 分区的字段是 yearmonthday
  3. 挂分区时指定 location '2016/03/03'(否则分区目录就会是 /year=2016/month=03/day=03 这样的形式)
  4. 已经有若干天的分区数据

3. 解决方案

最直观的的解决方案就是重新建表,指定 TEXT 作为存储格式,然后将原来的数据直接 insert 进去。

这种方法的缺陷是:虽然可以使用相同的表结构,但是由于名字不能重复,那么之前的计算脚本和调度任务都要进行相应的修改。考虑到一共有四张表,工作量还是挺大的。如果不想改变脚本和任务,那么就要在导出数据之后更改原表的存储方式并重新将数据导回去,还要花费一定的时间。

综合来说,最好要达到两个目标:
1. 不改变表的名字,避免大面积的修改
2. 尽量少地导数据,节省时间

4. 小伎俩

通过了一定时间的思考,我用了一点小伎俩来达成这个任务。

  1. 首先创建一张临时表,建为 TEXT 存储方式的外部表,然后指向原表相同的 HDFS 文件目录。
  2. 为临时表挂上分区,指定 location 确保分区的路径和原表一一对应。
  3. 使用 insert overwrite <TABLE> select ... 以及动态分区的技术,将 ORC 格式的原表读出来,以 TEXT 的格式存回原路径。
  4. drop 原表,按照 TEXT 方式建立新表。
  5. 为新表挂上分区,删掉临时表。

5. 动态分区

a. 开启设置

--允许使用动态分区可通过set hive.exec.dynamic.partition;查看
set hive.exec.dynamic.partition=true;
--当需要设置所有列为dynamic时需要这样设置
set hive.exec.dynamic.partition.mode=nonstrict;
--如果分区总数超过这个数量会报错
set hive.exec.max.dynamic.partitions=1000;
--单个MR Job允许创建分区的最大数量
set hive.exec.max.dynamic.partitions.pernode=1000;

b. 使用方式

假设表的结构是:

CREATE EXTERNAL TABLE temp_table(
    field1    string  comment 'field1',
    field2    string  comment 'field2'
)
COMMENT '临时表'
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';

那么调用的方式如下:

insert overwrite table temp_table_new partition(year,month,day)
select field1, field2, year, month, day
from temp_table;

注意:如果父分区是动态分区,那么子分区不能是静态分区

c. 挂分区的脚本

由于动态分区在创建分区的时候产生的路径形如: year=2016/month=03/day=05,和内部常用的 2016/03/03 的形式不一样,所以我写了一个 Python 脚本先挂分区指定 location

简单来说还是构建 SQL 字符串,然后使用 subprocess 模块来调用 hive -e 'cmd' 来执行。

#!/bin/env python
# -*- coding: utf-8 -*-

from datetime import date, timedelta


def date_from_str(date_str, splitor='-'):
    year, month, day = map(int, date_str.split(splitor))
    return date(year=year, month=month, day=day)

def date_interval(start_date, end_date):
    days = (end_date - start_date).days + 1
    return  [(start_date + timedelta(var_date)) for var_date in range(0, days)]

def add_partitions_cmd(datebase, table, start_date, end_date):
    CMD_STR = "use {};\n".format(datebase)
    for var_date in date_interval(start_date, end_date):
        year, month, day = var_date.isoformat().split('-')
        CMD_STR += "alter table {} add if not exists partition(year='{}', month='{}', day='{}') location '{}/{}/{}';\n" \
                    .format(table, year, month, day, year, month, day)
    return CMD_STR

if __name__ == '__main__':
    import sys

    if len(sys.argv) != 5:
        print 'Usage: python add_partitions.py <DATABASE> <TABLE> <START_DATE> <END_DATE>'
        sys.exit(255)

    datebase   = sys.argv[1]
    table      = sys.argv[2]
    start_date = date_from_str(sys.argv[3])
    end_date   = date_from_str(sys.argv[4])

    ADD_PARTITIONS = add_partitions_cmd(datebase, table, start_date, end_date)
    CMD = 'hive -e "{}"'.format(ADD_PARTITIONS)
    print CMD

    from subprocess import Popen, PIPE
    output, param = Popen(['hive', '-e', ADD_PARTITIONS], stdout=PIPE).communicate()
    print output
    print param

吐槽一下 subprocess

以前使用 Python 执行 Shell 的命令的时候常用 subprocess.getstatusoutput(cmd)。因为同时可以获得输出和返回值。这次一开始也是这么用的,结果老是报错。一查才知道尼玛这个函数没有了!

MySQL 与 Hive 中正则表达式字符转义的一个坑

提到正则表达式的转义,第一个念头就是加一个反斜杠 \

比如 . 在正则表达式中的含义就是匹配除 \n 之外的任何单个字符。

如果让你匹配在 Hive 或者 MySQL 的 SQL 语句中匹配像 4.2.5 这样的字符串,你会怎么做?

我的第一反应是:

[1-9]+\.[0-9]+\.[1-9]+

结果是错的!==!

事实上 . 的转义是 \\. 而不是 \.

有一个说法是 SQL 语句本身也是个字符串,交给程序处理的时候首先使用 \\\ 进行转义,然后用转义过的 \. 进行转义。

真是够绕的!

使用 Shell 变量的正确正确姿势

今天发生了在写 Shell脚本的时候发现了一个问题:变量赋值了却取不出来!

当时的代码是这么写的:

V_TS=`date +%Y%m%d`
TABLE_NAME=dwm_xxx_xxx_xxx_metric_day
TEMP_TABLE="es_$TABLE_NAME_$V_TS"

本意是原来的表名加一个前缀 es,加一个后缀日期。结果 TEMP_TABLE 赋值得到的结果是 es_20160303 !!!

仔细一分析,原来在这里把变量名 TABLE_NAME 识别成了 TABLE_NAME_

解决方案也很简单,把

TEMP_TABLE="es_$TABLE_NAME_$V_TS"

改成

TEMP_TABLE="es_${TABLE_NAME}_${V_TS}"

即可,不仅醒目而且消除了歧义!

延伸

扩展一下,上面

V_TS=`date +%Y%m%d`

的写法其实也不太好,用

V_TS=$(date +%Y%m%d)

其实更好。

无独有偶,在计算算数表达式的时候,VAR=$(( 1 + 3 )) 或者 VAR=$[ 1 + 3 ] 就比

VAR=`expr 3 + 1`

来的好。