即席查询平台的设计与实现

1. 即席查询平台简介

  • 即席查询平台是数据平台针对即席查询(Ad-Hoc)场景推出的一个解决方案。

  • 用户侧提供 SQL 的查询、结果数据的生命周期管理。

  • 运维测统一表权限、Hadoop 组账号、Yarn 队列等。

2. ZUE 的定位与同类产品的比较

离线数据开发平台的技术演进:

  • 石器时代:Hive 客户端直连

  • 青铜时代:beeline 客户端 + Hive Server

  • 英雄时代:HUE + Hive Server

  • 军团时代:ZUE + Moses + Hive Server

开发工具几个阶段的比较:

Hive Cli Beeline HUE ZUE
交互方式 终端 终端 WEB WEB
SQL 支持 全类型 全类型 全类型 部分语句
表权限控制 table.in 元数据与权限
组账号管理 支持 支持
异步查询 不支持 不支持 不支持 支持
查询结果缓存 不支持 不支持 弱支持 支持
资源隔离与限制 无限制 无限制 无限制 有限制

2.1 SQL 的支持

  • Hive Cli、Beeline、HUE 是全功能的客户端工具,提供了完整的 Hive Query 语法的支持以及 HDFS 的管理命令

  • ZUE 限制了可以执行的 SQL 子句类型,把 ZUE 定位为 DML 的平台,将 DDL 划分给元数据管理系统负责。

  • ZUE 仅支持 SELECTCREATE TABLE ASEXPLAIN 等少数语句,并且禁止通过 ZUE 向线上生产表写入数据。

  • 元数据针对 DDL 提供了更加丰富的业务规范与控制,如业务划分、层级划分、表权限控制、变更历史、Location 限制等等。

2.2 表权限控制

  • Hive Cli 和 Beeline 都不具备表权限的功能。

  • HUE 本身不带读写权限的控制,table.in 作为一种 Hack 的方案先天不足(黑名单)。

  • ZUE 用词法解析获取读写的表,结合元数据做权限的控制。

  • 元数据提供了完整的权限申请工作流。

2.3 组账号管理

  • Hive Cli 和 Beeline 没有统一的组账号管理方式,默认以个人账号提交。

  • HUE 的组账号与个人账号等价,无法灵活的切换组账号。

  • Hive Cli、Beeline 和 HUE 大量个人账号的存在导致许多数据出现 owner 的权限问题。

  • ZUE 收敛组账号,从属于业务的数据由统一的组账号进行读写,避免权限问题的发生。

2.4 异步查询

  • Hive Cli、Beeline 和 HUE 的查询操作都是同步,一个客户端会话同一时间只能进行一个查询。关闭客户端或者会话都会导致查询被取消。

  • ZUE 采取异步查询的方式,在同一个窗口可以提交多个查询,多个查询可以并发运行。

2.5 查询结果缓存

  • Hive Cli 和 Beeline 只能手动将查询结果导出保存,否则同一条 SQL 必须重跑才能看到结果。

  • HUE 的查询结果只能在会话过程中查看,一旦页面关闭结果也就丢失了。

  • ZUE 的查询结果缓存 24 小时,重复查看和下载不需要耗费新的计算资源。

2.6 资源隔离与限制

  • Hive Cli、Beeline 和 HUE 无法对用户的查询进行队列的限制,默认提交至 default 队列。

  • Hive Cli、Beeline 和 HUE 运行时可以任意指定队列,无法针对业务线进行资源隔离和计费。

  • ZUE 针对业务线进行队列的限定,保证业务线之间资源的隔离与安全。

3. 功能与架构设计

3.1 HUE 的功能架构缺陷

  • HUE 虽然支持了很多功能,但是本质和 Beeline 并无不同,都是 Thrfit 接口中会话的一种可视化呈现。

  • Thrfit 的会话存在于 HiveServer 中,因此用户的请求必须通过一致性哈希路由到同一台服务器。缺陷在于:1)如果前端请求的路由策略不正确,请求到了其他服务器则该会话的上下文丢失(SET xxx)并且该会话下进行中的查询终止;2)如果某台服务器上 HUE 重启,意味着该服务器上的 Thrfit 会话终结,该服务器上的查询全部自动终止。

  • HUE 的水平扩展实质只能是多个单机节点对总体查询和负载进行分片,单机长时间保持状态。

  • HUE 查询请求量提升时更容易出现线程问题,导致整体不可使用。

  • HUE 虽然界面干净、交互友好,但是本质是适合小团队使用的单机系统。

3.2 ZUE 的整体设计架构

  • 将有状态与无状态的部分分离。无状态接口以容器方式部署,可伸缩性好,迭代升级易维护。长时间保持状态与 HiveServer 保持连接的部分抽象出查询中心模块,部署在物理机上。

  • Moses 查询中心通过暴露 RESTFul 的接口提供异步查询的能力,查询完成之后回调请求方的接口。

  • Moses 查询中心每个结点对等,通过持久化与 HiveServer 的会话信息实现服务重启过程中会话不丢失,不影响用户查询。

  • 使用 HDFS 和 Redis 来缓存查询的结果,缓存生命周期结束后自动回收存储空间。

4. 实现方案一些细节

4.1 语法限制与表权限

  • 对 SQL 进行词法解析,得到抽象语法树。

  • 对抽象语法树进行后续遍历获得语句的类型以及读写的表。

4.1.1 词法分析实例

举例:

 insert overwrite table tb.tb_2 select * from tb.tb_1;

解析成抽象语法树,采用后续遍历进行打印可以得到结果:

 nil
    TOK_QUERY
       TOK_FROM
          TOK_TABREF
             TOK_TABNAME
                tb
                tb_1
       TOK_INSERT
          TOK_DESTINATION
             TOK_TAB
                TOK_TABNAME
                   tb
                   tb_2
          TOK_SELECT
             TOK_SELEXPR
                TOK_ALLCOLREF
    <EOF>

几种 token 代表的含义:

  • TOK_TAB:写入目标表 tb.tb_2

  • TOK_TABREF:查询数据来源表 tb.tb_1

  • TOK_INSERT:语句的类型。

4.1.2 其他工具与缺陷

Hive 自己提供了一个血缘解析的工具:org.apache.hadoop.hive.ql.tools.LineageInfo

缺陷:

  1. 缺少上下文的支持,比如上文使用 USE <db> 语句,那么解析出来的输入表只包含表名,缺少库名。

  2. 反引号转义符需要另行清理。

  3. 对于 CTE 语句中的别名无法处理。

4.2 异步查询

  • 与 HUE 的区别:将用户会话与 Thrift 会话解耦,WEB 交互无状态,封装 Thrfit 会话。

  • 从 JDBC 获得的启示:底层原理同 Beeline 一致,对 Thrift 进行封装,通过覆盖特定的会话配置支持会话恢复重连,查询中心作为一个查询中间件可重启升级不影响用户查询。

     config.put("hive.server2.session.check.interval", "1h");
     config.put("hive.server2.close.session.on.disconnect", "false");
     config.put("hive.server2.idle.session.timeout", "24h");
     config.put("hive.server2.idle.operation.timeout", "24h");
     private synchronized TOperationHandle submitQuery(String sql) throws TException {
         log.info("Start to submit sql of task {} with content:\n{}", this.taskMeta.getTaskId(), sql);
         TExecuteStatementReq execReq = new TExecuteStatementReq(this.sessionHandle, sql);
         execReq.setRunAsync(true);
 
         TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
         log.info("execResp = " + execResp);
         this.checkStatus(execResp.getStatus());
 
         return execResp.getOperationHandle();
     }

4.3 结果缓存

结果缓存分为三种:

  1. 较复杂 SQL 或者大数据量的查询结果通过改写 SQL 将结果以 Avro 的形式存储在 HDFS 上,定时回收存储空间。

  2. 简单 SQL 如单表查询直接将结果缓存于 Redis 中,设置缓存的过期时间。

  3. 使用 CTAS 创建的临时表,定时从临时库中清除。

5. 本季度的迭代方向

  • 更友好的故障排查(Yarn Application 日志)与异常诊断。

  • 更多元化的大数据计算引擎的集成(如 Presto)。

6. FAQ

  1. code 2 如何排查

  2. Method Not Found

  3. job counter 不准

  4. read timeout

Hive 向 ElasticSearch 导出数据

一般来说,Hive 里的表经过初步的处理之后就可以向 ElasticSearch 中导入数据了。

ElasticSearch 提供了和 Hive 的整合方案 elasticsearch-hadoop.jar,简单来说我们可以按照如下步骤进行操作。

  1. 如果已存在同名 Index 则先删除旧的 Index。
  2. 创建 Index,设置好 mappings。
  3. 创建导入数据用的映射表。
  4. 导入数据
  5. 删除临时表。

1. 删除已存在的 Index

因为我在使用的时候将每天的分区数据映射到一个 Index 中,所以为了回溯数据,每次导数开始之前先删除旧有的 Index。这个操作使用 ElasticSearch 提供的 RESTful API 就行:

curl -XDELETE "http://${IP_ADDRESS}:9200/${PATITION_NAME}"

2. 创建 Index,设置 mappings

其实这一步可以省略,在导入数据的时候 ElasticSearch 会自动创建对应的 mappings,但是提前创建 mappings 的好处是可以做自定义的设置。

比如下面的 "index": "not_analyzed" 属性禁止 ElasticSearch 对字符串的分析。如果不加上这个设置那么在匹配字符串的时候只能用 matchPhrase 而不能用 term,而且在 group by 某个字段的时候这个字段可能只返回字符串值的一部分,比如“北京”只返回一个“北”等等。

总之,如果不需要对文本做检索,所有字符串类型的属性最好都加上 "index": "not_analyzed"

curl -XPUT "http://${IP_ADDRESS}:9200/${PATITION_NAME}" -d '
{
    "mappings": {
        "data":{
            "properties": {
                "id": {
                    "type": "long"
                },
                "stat_date": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "some_str_field": {
                    "type": "string",
                    "index":  "not_analyzed"
                },
                "some_num_field": {
                    "type": "long"
                }
            },
            "_all": {
              "enabled": false
            }
        }
    }
}'

3. 创建导入数据用的映射表

创建一个外部表,并用 es.nodes 指定几个连接用的节点,es.resource指定映射的 Index 和 Type。给个例子:

add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
    id bigint comment 'id',
    stat_date string comment '日期',
    some_str_field string comment '字符串类型',
    some_num_field bigint comment '数值类型'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}'
);

4. 导入数据

SET hive.mapred.reduce.tasks.speculative.execution = false;
SET mapreduce.map.speculative = false;
SET mapreduce.reduce.speculative = false;
insert overwrite table ${TEMP_TABLE}
select  ${COLUMNS}
from    ${DBNAME}.${FILENAME}
where   concat(year,month,day)='$V_TS';

5. 删除临时表

导数用的临时表在导入数据结束后就可以删除了。

drop table ${TEMP_TABLE};

6. 此处有坑

在使用 Hive 向 ElasticSearch 中导出数据的时候,有一个大坑非常关键:并发!

这种导入数据的方法会启动 mapper 来执行,但是如果 mapper 数过高,ElasticSearch 过载(Overload)或者多出冗余的数据。据同事分析,原因是 mapper 会批量地将数据插入 ElasticSearch 中,而当 mapper 失败的时候会被重启,已经导入的数据又会重新插入 ElasticSearch 导致数据重复。

其中的一个手段就是适当减少并发的 mapper 的数量。通过合并文件的手段可以降低 mapper 的数量。

SET mapred.max.split.size=1000000000;
SET mapred.min.split.size=1000000000;
set mapred.min.split.size.per.node=1000000000;
set mapred.min.split.size.per.rack=1000000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

另外,还可以通过设置 ElasticSearch 的 _id 字段避免重复。比如为每一行生成一个 uuid,把 uuid 作为 mappings 中的 _id。ElasticSearch 对于 uuid 相同的记录会覆盖旧的记录,这样相同 uuid 的记录就不会重复了。

add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
    id bigint comment 'id',
    stat_date string comment '日期',
    some_str_field string comment '字符串类型',
    some_num_field bigint comment '数值类型',
    uuid string COMMENT '为了导入ES而准备的id'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}',
'es.mapping.id' = 'uuid'
);

最近更新:最近公司架构部门搞了一套通过 Kafka 导入数据的方案,先把数据写入到 Kafka,然后 Kafka 写入到 ElasticSearch。但是 Kafka 保证不会丢数,还是可能有重复数据项,所以也加了 uuid 作为 _id 来去重。这个套方案运行的很好,可惜同时导入的时候并发导入任务数也并不能太高,有些回溯数据的任务也只能一个个跑。

参考资料

ElasticSearch 与 Hive 的整合

Elasticsearch Query DSL

1. 序

Hive 无疑是数据相关工作最常用的的工具,利用 SQL 就能够完成许多统计和分析的任务,学习成本低,开发效率高。

但是 Hive 的缺点也很明显:运行慢!所以 Hive 一般用来做离线计算,每天由调度系统调用脚本完成一系列的计算任务,产出数据然后交给其他应用展现。但总有一些情景是 Hive 满足不了的,比如对数据的实时查询,比如对数据的快速分析。

这个时候就可以使用我们今天要介绍的工具:Elasticsearch。

2. 简介

ElasticSearch 是一种基于 Apache Lucene 的开源项目,主要面向的是数据的实时检索、分析以及全文搜索等。

Elasticsearch 的几个特性非常棒:
1. RESTful API,使用 Json 交换数据
2. 跨 index 的查询
3. 速度,尤其是大数据量下聚合函数的速度非常快!

3. Query DSL

我们可以使用两种结构化语句: 结构化查询(Query DSL)和结构化过滤(Filter DSL)。 查询与过滤语句非常相似,但是它们由于使用目的不同而稍有差异。
一条过滤语句会询问每个文档的字段值是否包含着特定值,一条查询语句会计算每个文档与查询语句的相关性,会给出一个相关性评分 _score,并且 按照相关性对匹配到的文档进行排序。

3.1 term 过滤

精确匹配数字、日期、布尔值或 not_analyzed 的字符串。相当于等号 =

{ "term": { "age":    26           }}
{ "term": { "date":   "2014-09-01" }}
{ "term": { "public": true         }}
{ "term": { "tag":    "full_text"  }}

3.2 terms 过滤

精确匹配多个值。相当于 in

{
    "terms": {
        "tag": [ "search", "full_text", "nosql" ]
    }
}

3.3 range 过滤

按照指定范围查找一批数据,相当于 between

{
    "range": {
        "age": {
            "gte":  20,
            "lt":   30
        }
    }
}
字段 含义
gt >
gte >=
lt <
lte <=

3.4 existsmissing 过滤

针对已经查出一批数据,但是想区分是否存在某个字段。类似于 IS_NULL 条件。

{
    "exists": {
        "field": "title"
    }
}

3.5 bool 过滤

bool 过滤可以用来合并多个过滤条件查询结果的布尔逻辑:

过滤子句 含义
must 多个查询条件完全匹配,相当于 and
must_not 多个查询条件的相反匹配,相当于 not
should 至少有一个查询条件匹配,相当于 or

复合实例:

{
    "bool": {
        "must": { "term": { "folder": "inbox" }},
        "must_not": { "term": { "tag": "spam" }},
        "should": [
            { "term": { "starred": true }},
            { "term": { "unread":  true }}
        ]
    }
}

3.6 match_all 查询

使用 match_all 可以查询到所有文档,事没有查询条件下的默认语句。

{
    "query": {
        "match_all": {}
    }
}

此查询常用于合并过滤条件。比如说你需要检索所有的邮箱,所有的文档相关性都是相同的,所以得到的 _score 为 1。

3.7 match 查询

某个字段中包含特定内容

{
    "query": {
        "match": {
            "字段名": "特定内容"
        }
    }
}

如果用 match 指定了一个确切值,在遇到数字,日期,布尔值或者 not_analyzed 的字符串时,它将为你搜索你给定的值:

{ "match": { "age":    26           }}
{ "match": { "date":   "2014-09-01" }}
{ "match": { "public": true         }}
{ "match": { "tag":    "full_text"  }}

3.8 multi_match 查询

multi_match 查询允许你在 match 查询的基础上同时搜索多个字段:

{
    "multi_match": {
        "query":    "full text search",
        "fields":   [ "title", "body" ]
    }
}

3.9 bool 查询

bool 查询与 bool 过滤相似,用于合并多个查询子句。不同的是,bool 过滤可以直接给出是否匹配成功,而 bool 查询要计算每一个查询子句的 _score (相关性分值)。

查询子句 含义
must 查询指定文档一定要被包含。
must_not 查询指定文档一定不要被包含。
should 查询指定文档,有则可以为文档相关性加分。

如果 bool 查询下没有 must 子句,那至少应该有一个 should 子句。但是 如果有 must 子句,那么没有 should 子句也可以进行查询。

4. 查询与过滤条件的合并

search API 中只能包含 query 语句,所以我们需要用 filtered 来同时包含 queryfilter 子句,外面再包含一层 query

{
    "query": {
        "filtered": {
            "query":  { "match": { "email": "business opportunity" }},
            "filter": { "term":  { "folder": "inbox" }}
        }
    }
}

5. 验证查询

5.1 验证

validate API 可以验证查询语句是否合法:

curl -XGET 'http://dp.diditaxi.com.cn:80/es/<INDEX>/<TYPE>/_validate/query?pretty=true' -d '{
    QUERY
}'

返回结果:

#合法
{
  "valid" : true,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  }
}

#非法
{ 
    "valid" :         false, 
    "_shards" : {    
        "total" :       1,    
        "successful" :  1,    
        "failed" :      0  
    }
 }

5.2 理解错误信息

想知道语句非法的具体错误信息,需要加上 explain 参数:

curl -XGET 'http://dp.diditaxi.com.cn:80/es/<INDEX>/<TYPE>/_validate/query?explain&pretty=true' -d '{
    QUERY
}'

ElasticSearch 多值类型的数据存储

1. 使用场景

一般来说,我们的离线计算和统计都是在 Hive 里面进行的,但是也有离线计算不适应的场景。

比如在做移动行为分析的时候我们想要知道一连串事件的转化率,这就需要建立一个漏斗。如果使用 Hive 来计算漏斗,那么有几个问题是需要我们考虑的:

  1. 漏斗的建立需要一个尝试和调整的过程,有的时候业务比较复杂,那么漏斗的调整次数也会比较多。每次用 Hive 来计算的话显然是比较慢的。

  2. 同一个漏斗多人查看的话每次都使用 Hive 计算吗?显然不,数据应该使用 MySQL 来缓存已经查询到的结果,那么查询就变成了二级查询:先查 MySQL,如果 MySQL 中没有数据再调用 Hive 计算,然后把计算结果写到 MySQL 中,这就变成了一个比较复杂的异步的过程。我一开始的方案是:首次建立漏斗的话立刻异步地调用 Hive 去计算漏斗的历史数据,然后每天用调度系统将漏斗的配置 Sqoop 导入到 Hive 中进行计算,然后 Sqoop 导出到 MySQL 中用来展示。

  3. 同时也会遇到某些天的数据突然出现问题,修复后需要重启计算任务,那么如何保持 Hive、MySQL 数据的一致性也是需要我们考虑的。

总之:使用 Hive + MySQL 的方式来做漏斗计算不仅慢,而且复杂!两周的冲刺应该是上不了线的!

2. 解决方案

然后老大就让我使用 Hive + ElasticSearch 来做。简单来说,先使用 Hive 把源数据进行处理,得到初步聚合的结果,然后将数据导入到 ElasticSearch 中。使用的时候 Java 客户端程序通过参数构成查询条件向 ElasticSearch 查询数据。

当然坑也踩了不少,首先在使用 ElasticSearch 之前没有做过预先的调研,一直以关系型数据库的思维在想着如何来计算数据和查询数据,然后就发现了 ElasticSearch:

  • 不支持子查询
  • 不支持 COUNT(DISTINCT)
  • 不支持 Join

按照官方文档的说法,他们暂时不支持子查询和 COUNT(DISTINCT),短期内也不想增加这个功能。至于 Join,官方文档的建议是在 Java 程序中进行 Join,也就是说分为两步操作,Java 客户端先获取 Join 的字段,再以查到的 Join 字段作为查询条件进行查询。如果数据量很大的话,两次查询光传输数据就要数十秒了!

当时最大的困惑是我要计算用户事件行为的漏斗转化率,包括 pv 和 uv。问题来了,一开始的计算方案是把每一个用户的每一次事件信息都存储下来,计算的时候就用事件名去过滤然后 COUNT 就能算 pv 了。但是无法对用户去重,uv 计算不了。

好在老大提醒我把每个用户发生的事件及其点击次数放在一个 Map 里面,事件名作为 key,点击次数作为 value,一个用户一行就行。计算 pv 的时候根据对应事件以及它们的点击次数来求 sum,比如 map_type.event_id 就是从一个名叫 map_type 的字段中以 event_id 作为事件名查询点击次数。而求 uv 也很简单,因为每一行就对应一个用户,所以 COUNT 行数就行了。

在做漏斗计算的时候,只需要用 MySQL 来保存漏斗的配置信息,每次在 Java 后端代码中根据漏斗各个步骤的事件名构建查询语句,得到 ElasticSearch 返回的结果简单处理一下即可返回给前端。本来也担心过查询的速度是不是够快,需不需要用 MySQL 缓存漏斗的查询结果。但是真实使用的时候 ElasticSearch 查询的速度完全打消了我的顾虑。

3. 几个值得记录的问题

3.1 Map<String, bigint>

在解决问题的过程中还是有几个点值得提一下,首先 Hive 没有提供原生的将多行记录转成 Map<String, bigint> 类型的方法,str_to_map 函数只能返回 Map<String, String> 类型。当然这里其实可以写一个 UDAF 来解决,但是之前没有写过 UDF 和 UDAF,所以为了节省时间我用了 Streaming 中的 TRANSFORM 来解决这个问题。代码如下:

#!/bin/env python

import sys

def str_to_map(map_str):
    pairs = map_str.split(',')
    ret   = []
    for pair in pairs:
        key, value = pair.split('=')
        ret.append("%s\003%d" % (key, int(value)))
    return '\002'.join(ret)

for line in sys.stdin:
    line     = line.strip()
    fields   = line.split()
    map_str3 = fields.pop()
    map_str2 = fields.pop()
    map_str1 = fields.pop()
    fields.append(str_to_map(map_str1))
    fields.append(str_to_map(map_str2))
    fields.append(str_to_map(map_str3))
    print "\001".join(fields)

在这里返回 Map 类型数据的时候,我采用了默认的分割字符来标记 Map。

这里更新一下,为了节省磁盘空间,公司要求尽量以 ORC 格式来存储数据,导致上述的 Python 脚本不能使用。所以修改了一下 Hive 中 UDF str_to_map,使得其能够返回 Map<String, bigint>。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;

import java.util.LinkedHashMap;

/**
 * GenericUDFStringToMap.
 *
 */
@Description(name = "str_to_map", value = "_FUNC_(text, delimiter1, delimiter2) - "
        + "Creates a map by parsing text ", extended = "Split text into key-value pairs"
        + " using two delimiters. The first delimiter seperates pairs, and the"
        + " second delimiter sperates key and value. If only one parameter is given, default"
        + " delimiters are used: ',' as delimiter1 and '=' as delimiter2.")
public class GenericUDFStrToMap extends GenericUDF {
    // Must be deterministic order map for consistent q-test output across Java versions - see HIVE-9161
    private final LinkedHashMap<Object, Long> ret = new LinkedHashMap<Object, Long>();
    private transient Converter soi_text, soi_de1 = null, soi_de2 = null;
    final static String default_de1 = ",";
    final static String default_de2 = ":";

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        for (int idx = 0; idx < Math.min(arguments.length, 3); ++idx) {
            if (arguments[idx].getCategory() != Category.PRIMITIVE
                    || PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
                    ((PrimitiveObjectInspector) arguments[idx]).getPrimitiveCategory())
                    != PrimitiveGrouping.STRING_GROUP) {
                throw new UDFArgumentException("All argument should be string/character type");
            }
        }
        soi_text = ObjectInspectorConverters.getConverter(arguments[0],
                PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        if (arguments.length > 1) {
            soi_de1 = ObjectInspectorConverters.getConverter(arguments[1],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }
        if (arguments.length > 2) {
            soi_de2 = ObjectInspectorConverters.getConverter(arguments[2],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }

        return ObjectInspectorFactory.getStandardMapObjectInspector(
                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
                PrimitiveObjectInspectorFactory.javaLongObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        ret.clear();
        String text = (String) soi_text.convert(arguments[0].get());
        String delimiter1 = (soi_de1 == null) ?
                default_de1 : (String) soi_de1.convert(arguments[1].get());
        String delimiter2 = (soi_de2 == null) ?
                default_de2 : (String) soi_de2.convert(arguments[2].get());

        String[] keyValuePairs = text.split(delimiter1);

        for (String keyValuePair : keyValuePairs) {
            String[] keyValue = keyValuePair.split(delimiter2, 2);
            if (keyValue.length < 2) {
                ret.put(keyValuePair, null);
            } else {
                ret.put(keyValue[0], Long.valueOf(keyValue[1]));
            }
        }

        return ret;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "usage not know!";
    }
}

3.2 Index OR Type

在存储的时候还有一个问题需要解决:对于每一天的分区数据,是用 Index 还是 Type 来存储?这个问题的由来和 ElasticSearch 的存储方式以及数据的回溯有关。

ElasticSearch 在设计的时候一开始是对照关系型数据库的。一个 Index 对应一个数据库,Index 下的 Type 对应 Table。但这并不是强制的要求的,数据到底怎么存应该根据自己的使用情景来定,具体可以参照这个文章 index vs type

另外一方面,难免有些数据有错误或者指标口径被修改了,那么要重新计算和导入数据。那么在重新导入数据的时候就要先删除所有的分区数据,恰巧 ElasticSearch 是没有分区的概念的。如果把 Hive 一个表中所有分区数据存在一张表中,用 stat_date 字段来标记日期,那么在删除数据的时候 ElasticSearch 会先查出所有这一天的数据然后再删除,无疑是很慢的解决方法。

最终我采用的是一个分区一个 Index,每次重新导入数据前先把对应的 Index 删除。虽然 Index 更耗资源,但是这样使用更符合一般的工作流程。

ElasticSearch 使用小结

1. 序

最近一直在做的一个项目使用了 ElasticSearch 作为查询和计算的数据源,中间踩了不少坑,也有一些发现和心得体会,趁着假期好好总结一下。

2. 使用体会

ElasticSearch 也不介绍了,我这等菜鸟也讲不出什么所以然,反正老大怎么说就怎么来吧。这里直接谈谈心得体会,一个字:快!

这次用的 ElasticSearch 集群一共 15 台服务器,每台机器大概是 32G 内存吧。几千万、几个亿条的数据做查询和聚合杠杠的!(原谅我一个江南人在北京七年也变得放荡不羁)几乎可以做实时的离线分析了!

下面讲讲使用 ElasticSearch 遇到的坑,分为以下几个部分:

  • Query DSL
  • 多值类型的数据存储
  • Hive 向 ElasticSearch 导出数据
  • Java 客户端 API 调用

这几个部分会在接下来的几篇文章中详细讲一下。

一种 Hive 表存储格式的转换的方式

1. 序

实习的公司考虑到 ORC 格式拥有更高的压缩比和传输效率,所以要求我们建表的时候都采用 ORC 的格式。

好死不死,有个项目要把 Hive 里算出来的数通过 Sqoop 导出到 MySQL 里去。而 Sqoop 对于 ORC 格式的数据导出是有问题的,所以我面临的问题就是把 ORC 分区的数据转换成 TEXT 格式的。

2. 面临的实际状况

  1. 都是外部表
  2. 分区的字段是 yearmonthday
  3. 挂分区时指定 location '2016/03/03'(否则分区目录就会是 /year=2016/month=03/day=03 这样的形式)
  4. 已经有若干天的分区数据

3. 解决方案

最直观的的解决方案就是重新建表,指定 TEXT 作为存储格式,然后将原来的数据直接 insert 进去。

这种方法的缺陷是:虽然可以使用相同的表结构,但是由于名字不能重复,那么之前的计算脚本和调度任务都要进行相应的修改。考虑到一共有四张表,工作量还是挺大的。如果不想改变脚本和任务,那么就要在导出数据之后更改原表的存储方式并重新将数据导回去,还要花费一定的时间。

综合来说,最好要达到两个目标:
1. 不改变表的名字,避免大面积的修改
2. 尽量少地导数据,节省时间

4. 小伎俩

通过了一定时间的思考,我用了一点小伎俩来达成这个任务。

  1. 首先创建一张临时表,建为 TEXT 存储方式的外部表,然后指向原表相同的 HDFS 文件目录。
  2. 为临时表挂上分区,指定 location 确保分区的路径和原表一一对应。
  3. 使用 insert overwrite <TABLE> select ... 以及动态分区的技术,将 ORC 格式的原表读出来,以 TEXT 的格式存回原路径。
  4. drop 原表,按照 TEXT 方式建立新表。
  5. 为新表挂上分区,删掉临时表。

5. 动态分区

a. 开启设置

--允许使用动态分区可通过set hive.exec.dynamic.partition;查看
set hive.exec.dynamic.partition=true;
--当需要设置所有列为dynamic时需要这样设置
set hive.exec.dynamic.partition.mode=nonstrict;
--如果分区总数超过这个数量会报错
set hive.exec.max.dynamic.partitions=1000;
--单个MR Job允许创建分区的最大数量
set hive.exec.max.dynamic.partitions.pernode=1000;

b. 使用方式

假设表的结构是:

CREATE EXTERNAL TABLE temp_table(
    field1    string  comment 'field1',
    field2    string  comment 'field2'
)
COMMENT '临时表'
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';

那么调用的方式如下:

insert overwrite table temp_table_new partition(year,month,day)
select field1, field2, year, month, day
from temp_table;

注意:如果父分区是动态分区,那么子分区不能是静态分区

c. 挂分区的脚本

由于动态分区在创建分区的时候产生的路径形如: year=2016/month=03/day=05,和内部常用的 2016/03/03 的形式不一样,所以我写了一个 Python 脚本先挂分区指定 location

简单来说还是构建 SQL 字符串,然后使用 subprocess 模块来调用 hive -e 'cmd' 来执行。

#!/bin/env python
# -*- coding: utf-8 -*-

from datetime import date, timedelta


def date_from_str(date_str, splitor='-'):
    year, month, day = map(int, date_str.split(splitor))
    return date(year=year, month=month, day=day)

def date_interval(start_date, end_date):
    days = (end_date - start_date).days + 1
    return  [(start_date + timedelta(var_date)) for var_date in range(0, days)]

def add_partitions_cmd(datebase, table, start_date, end_date):
    CMD_STR = "use {};\n".format(datebase)
    for var_date in date_interval(start_date, end_date):
        year, month, day = var_date.isoformat().split('-')
        CMD_STR += "alter table {} add if not exists partition(year='{}', month='{}', day='{}') location '{}/{}/{}';\n" \
                    .format(table, year, month, day, year, month, day)
    return CMD_STR

if __name__ == '__main__':
    import sys

    if len(sys.argv) != 5:
        print 'Usage: python add_partitions.py <DATABASE> <TABLE> <START_DATE> <END_DATE>'
        sys.exit(255)

    datebase   = sys.argv[1]
    table      = sys.argv[2]
    start_date = date_from_str(sys.argv[3])
    end_date   = date_from_str(sys.argv[4])

    ADD_PARTITIONS = add_partitions_cmd(datebase, table, start_date, end_date)
    CMD = 'hive -e "{}"'.format(ADD_PARTITIONS)
    print CMD

    from subprocess import Popen, PIPE
    output, param = Popen(['hive', '-e', ADD_PARTITIONS], stdout=PIPE).communicate()
    print output
    print param

吐槽一下 subprocess

以前使用 Python 执行 Shell 的命令的时候常用 subprocess.getstatusoutput(cmd)。因为同时可以获得输出和返回值。这次一开始也是这么用的,结果老是报错。一查才知道尼玛这个函数没有了!