Hive 向 ElasticSearch 导出数据

一般来说,Hive 里的表经过初步的处理之后就可以向 ElasticSearch 中导入数据了。

ElasticSearch 提供了和 Hive 的整合方案 elasticsearch-hadoop.jar,简单来说我们可以按照如下步骤进行操作。

  1. 如果已存在同名 Index 则先删除旧的 Index。
  2. 创建 Index,设置好 mappings。
  3. 创建导入数据用的映射表。
  4. 导入数据
  5. 删除临时表。

1. 删除已存在的 Index

因为我在使用的时候将每天的分区数据映射到一个 Index 中,所以为了回溯数据,每次导数开始之前先删除旧有的 Index。这个操作使用 ElasticSearch 提供的 RESTful API 就行:

curl -XDELETE "http://${IP_ADDRESS}:9200/${PATITION_NAME}"

2. 创建 Index,设置 mappings

其实这一步可以省略,在导入数据的时候 ElasticSearch 会自动创建对应的 mappings,但是提前创建 mappings 的好处是可以做自定义的设置。

比如下面的 "index": "not_analyzed" 属性禁止 ElasticSearch 对字符串的分析。如果不加上这个设置那么在匹配字符串的时候只能用 matchPhrase 而不能用 term,而且在 group by 某个字段的时候这个字段可能只返回字符串值的一部分,比如“北京”只返回一个“北”等等。

总之,如果不需要对文本做检索,所有字符串类型的属性最好都加上 "index": "not_analyzed"

curl -XPUT "http://${IP_ADDRESS}:9200/${PATITION_NAME}" -d '
{
    "mappings": {
        "data":{
            "properties": {
                "id": {
                    "type": "long"
                },
                "stat_date": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "some_str_field": {
                    "type": "string",
                    "index":  "not_analyzed"
                },
                "some_num_field": {
                    "type": "long"
                }
            },
            "_all": {
              "enabled": false
            }
        }
    }
}'

3. 创建导入数据用的映射表

创建一个外部表,并用 es.nodes 指定几个连接用的节点,es.resource指定映射的 Index 和 Type。给个例子:

add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
    id bigint comment 'id',
    stat_date string comment '日期',
    some_str_field string comment '字符串类型',
    some_num_field bigint comment '数值类型'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}'
);

4. 导入数据

SET hive.mapred.reduce.tasks.speculative.execution = false;
SET mapreduce.map.speculative = false;
SET mapreduce.reduce.speculative = false;
insert overwrite table ${TEMP_TABLE}
select  ${COLUMNS}
from    ${DBNAME}.${FILENAME}
where   concat(year,month,day)='$V_TS';

5. 删除临时表

导数用的临时表在导入数据结束后就可以删除了。

drop table ${TEMP_TABLE};

6. 此处有坑

在使用 Hive 向 ElasticSearch 中导出数据的时候,有一个大坑非常关键:并发!

这种导入数据的方法会启动 mapper 来执行,但是如果 mapper 数过高,ElasticSearch 过载(Overload)或者多出冗余的数据。据同事分析,原因是 mapper 会批量地将数据插入 ElasticSearch 中,而当 mapper 失败的时候会被重启,已经导入的数据又会重新插入 ElasticSearch 导致数据重复。

其中的一个手段就是适当减少并发的 mapper 的数量。通过合并文件的手段可以降低 mapper 的数量。

SET mapred.max.split.size=1000000000;
SET mapred.min.split.size=1000000000;
set mapred.min.split.size.per.node=1000000000;
set mapred.min.split.size.per.rack=1000000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

另外,还可以通过设置 ElasticSearch 的 _id 字段避免重复。比如为每一行生成一个 uuid,把 uuid 作为 mappings 中的 _id。ElasticSearch 对于 uuid 相同的记录会覆盖旧的记录,这样相同 uuid 的记录就不会重复了。

add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
    id bigint comment 'id',
    stat_date string comment '日期',
    some_str_field string comment '字符串类型',
    some_num_field bigint comment '数值类型',
    uuid string COMMENT '为了导入ES而准备的id'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}',
'es.mapping.id' = 'uuid'
);

最近更新:最近公司架构部门搞了一套通过 Kafka 导入数据的方案,先把数据写入到 Kafka,然后 Kafka 写入到 ElasticSearch。但是 Kafka 保证不会丢数,还是可能有重复数据项,所以也加了 uuid 作为 _id 来去重。这个套方案运行的很好,可惜同时导入的时候并发导入任务数也并不能太高,有些回溯数据的任务也只能一个个跑。

参考资料

ElasticSearch 与 Hive 的整合

ElasticSearch 多值类型的数据存储

1. 使用场景

一般来说,我们的离线计算和统计都是在 Hive 里面进行的,但是也有离线计算不适应的场景。

比如在做移动行为分析的时候我们想要知道一连串事件的转化率,这就需要建立一个漏斗。如果使用 Hive 来计算漏斗,那么有几个问题是需要我们考虑的:

  1. 漏斗的建立需要一个尝试和调整的过程,有的时候业务比较复杂,那么漏斗的调整次数也会比较多。每次用 Hive 来计算的话显然是比较慢的。

  2. 同一个漏斗多人查看的话每次都使用 Hive 计算吗?显然不,数据应该使用 MySQL 来缓存已经查询到的结果,那么查询就变成了二级查询:先查 MySQL,如果 MySQL 中没有数据再调用 Hive 计算,然后把计算结果写到 MySQL 中,这就变成了一个比较复杂的异步的过程。我一开始的方案是:首次建立漏斗的话立刻异步地调用 Hive 去计算漏斗的历史数据,然后每天用调度系统将漏斗的配置 Sqoop 导入到 Hive 中进行计算,然后 Sqoop 导出到 MySQL 中用来展示。

  3. 同时也会遇到某些天的数据突然出现问题,修复后需要重启计算任务,那么如何保持 Hive、MySQL 数据的一致性也是需要我们考虑的。

总之:使用 Hive + MySQL 的方式来做漏斗计算不仅慢,而且复杂!两周的冲刺应该是上不了线的!

2. 解决方案

然后老大就让我使用 Hive + ElasticSearch 来做。简单来说,先使用 Hive 把源数据进行处理,得到初步聚合的结果,然后将数据导入到 ElasticSearch 中。使用的时候 Java 客户端程序通过参数构成查询条件向 ElasticSearch 查询数据。

当然坑也踩了不少,首先在使用 ElasticSearch 之前没有做过预先的调研,一直以关系型数据库的思维在想着如何来计算数据和查询数据,然后就发现了 ElasticSearch:

  • 不支持子查询
  • 不支持 COUNT(DISTINCT)
  • 不支持 Join

按照官方文档的说法,他们暂时不支持子查询和 COUNT(DISTINCT),短期内也不想增加这个功能。至于 Join,官方文档的建议是在 Java 程序中进行 Join,也就是说分为两步操作,Java 客户端先获取 Join 的字段,再以查到的 Join 字段作为查询条件进行查询。如果数据量很大的话,两次查询光传输数据就要数十秒了!

当时最大的困惑是我要计算用户事件行为的漏斗转化率,包括 pv 和 uv。问题来了,一开始的计算方案是把每一个用户的每一次事件信息都存储下来,计算的时候就用事件名去过滤然后 COUNT 就能算 pv 了。但是无法对用户去重,uv 计算不了。

好在老大提醒我把每个用户发生的事件及其点击次数放在一个 Map 里面,事件名作为 key,点击次数作为 value,一个用户一行就行。计算 pv 的时候根据对应事件以及它们的点击次数来求 sum,比如 map_type.event_id 就是从一个名叫 map_type 的字段中以 event_id 作为事件名查询点击次数。而求 uv 也很简单,因为每一行就对应一个用户,所以 COUNT 行数就行了。

在做漏斗计算的时候,只需要用 MySQL 来保存漏斗的配置信息,每次在 Java 后端代码中根据漏斗各个步骤的事件名构建查询语句,得到 ElasticSearch 返回的结果简单处理一下即可返回给前端。本来也担心过查询的速度是不是够快,需不需要用 MySQL 缓存漏斗的查询结果。但是真实使用的时候 ElasticSearch 查询的速度完全打消了我的顾虑。

3. 几个值得记录的问题

3.1 Map<String, bigint>

在解决问题的过程中还是有几个点值得提一下,首先 Hive 没有提供原生的将多行记录转成 Map<String, bigint> 类型的方法,str_to_map 函数只能返回 Map<String, String> 类型。当然这里其实可以写一个 UDAF 来解决,但是之前没有写过 UDF 和 UDAF,所以为了节省时间我用了 Streaming 中的 TRANSFORM 来解决这个问题。代码如下:

#!/bin/env python

import sys

def str_to_map(map_str):
    pairs = map_str.split(',')
    ret   = []
    for pair in pairs:
        key, value = pair.split('=')
        ret.append("%s\003%d" % (key, int(value)))
    return '\002'.join(ret)

for line in sys.stdin:
    line     = line.strip()
    fields   = line.split()
    map_str3 = fields.pop()
    map_str2 = fields.pop()
    map_str1 = fields.pop()
    fields.append(str_to_map(map_str1))
    fields.append(str_to_map(map_str2))
    fields.append(str_to_map(map_str3))
    print "\001".join(fields)

在这里返回 Map 类型数据的时候,我采用了默认的分割字符来标记 Map。

这里更新一下,为了节省磁盘空间,公司要求尽量以 ORC 格式来存储数据,导致上述的 Python 脚本不能使用。所以修改了一下 Hive 中 UDF str_to_map,使得其能够返回 Map<String, bigint>。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;

import java.util.LinkedHashMap;

/**
 * GenericUDFStringToMap.
 *
 */
@Description(name = "str_to_map", value = "_FUNC_(text, delimiter1, delimiter2) - "
        + "Creates a map by parsing text ", extended = "Split text into key-value pairs"
        + " using two delimiters. The first delimiter seperates pairs, and the"
        + " second delimiter sperates key and value. If only one parameter is given, default"
        + " delimiters are used: ',' as delimiter1 and '=' as delimiter2.")
public class GenericUDFStrToMap extends GenericUDF {
    // Must be deterministic order map for consistent q-test output across Java versions - see HIVE-9161
    private final LinkedHashMap<Object, Long> ret = new LinkedHashMap<Object, Long>();
    private transient Converter soi_text, soi_de1 = null, soi_de2 = null;
    final static String default_de1 = ",";
    final static String default_de2 = ":";

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        for (int idx = 0; idx < Math.min(arguments.length, 3); ++idx) {
            if (arguments[idx].getCategory() != Category.PRIMITIVE
                    || PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
                    ((PrimitiveObjectInspector) arguments[idx]).getPrimitiveCategory())
                    != PrimitiveGrouping.STRING_GROUP) {
                throw new UDFArgumentException("All argument should be string/character type");
            }
        }
        soi_text = ObjectInspectorConverters.getConverter(arguments[0],
                PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        if (arguments.length > 1) {
            soi_de1 = ObjectInspectorConverters.getConverter(arguments[1],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }
        if (arguments.length > 2) {
            soi_de2 = ObjectInspectorConverters.getConverter(arguments[2],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }

        return ObjectInspectorFactory.getStandardMapObjectInspector(
                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
                PrimitiveObjectInspectorFactory.javaLongObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        ret.clear();
        String text = (String) soi_text.convert(arguments[0].get());
        String delimiter1 = (soi_de1 == null) ?
                default_de1 : (String) soi_de1.convert(arguments[1].get());
        String delimiter2 = (soi_de2 == null) ?
                default_de2 : (String) soi_de2.convert(arguments[2].get());

        String[] keyValuePairs = text.split(delimiter1);

        for (String keyValuePair : keyValuePairs) {
            String[] keyValue = keyValuePair.split(delimiter2, 2);
            if (keyValue.length < 2) {
                ret.put(keyValuePair, null);
            } else {
                ret.put(keyValue[0], Long.valueOf(keyValue[1]));
            }
        }

        return ret;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "usage not know!";
    }
}

3.2 Index OR Type

在存储的时候还有一个问题需要解决:对于每一天的分区数据,是用 Index 还是 Type 来存储?这个问题的由来和 ElasticSearch 的存储方式以及数据的回溯有关。

ElasticSearch 在设计的时候一开始是对照关系型数据库的。一个 Index 对应一个数据库,Index 下的 Type 对应 Table。但这并不是强制的要求的,数据到底怎么存应该根据自己的使用情景来定,具体可以参照这个文章 index vs type

另外一方面,难免有些数据有错误或者指标口径被修改了,那么要重新计算和导入数据。那么在重新导入数据的时候就要先删除所有的分区数据,恰巧 ElasticSearch 是没有分区的概念的。如果把 Hive 一个表中所有分区数据存在一张表中,用 stat_date 字段来标记日期,那么在删除数据的时候 ElasticSearch 会先查出所有这一天的数据然后再删除,无疑是很慢的解决方法。

最终我采用的是一个分区一个 Index,每次重新导入数据前先把对应的 Index 删除。虽然 Index 更耗资源,但是这样使用更符合一般的工作流程。

一种 Hive 表存储格式的转换的方式

1. 序

实习的公司考虑到 ORC 格式拥有更高的压缩比和传输效率,所以要求我们建表的时候都采用 ORC 的格式。

好死不死,有个项目要把 Hive 里算出来的数通过 Sqoop 导出到 MySQL 里去。而 Sqoop 对于 ORC 格式的数据导出是有问题的,所以我面临的问题就是把 ORC 分区的数据转换成 TEXT 格式的。

2. 面临的实际状况

  1. 都是外部表
  2. 分区的字段是 yearmonthday
  3. 挂分区时指定 location '2016/03/03'(否则分区目录就会是 /year=2016/month=03/day=03 这样的形式)
  4. 已经有若干天的分区数据

3. 解决方案

最直观的的解决方案就是重新建表,指定 TEXT 作为存储格式,然后将原来的数据直接 insert 进去。

这种方法的缺陷是:虽然可以使用相同的表结构,但是由于名字不能重复,那么之前的计算脚本和调度任务都要进行相应的修改。考虑到一共有四张表,工作量还是挺大的。如果不想改变脚本和任务,那么就要在导出数据之后更改原表的存储方式并重新将数据导回去,还要花费一定的时间。

综合来说,最好要达到两个目标:
1. 不改变表的名字,避免大面积的修改
2. 尽量少地导数据,节省时间

4. 小伎俩

通过了一定时间的思考,我用了一点小伎俩来达成这个任务。

  1. 首先创建一张临时表,建为 TEXT 存储方式的外部表,然后指向原表相同的 HDFS 文件目录。
  2. 为临时表挂上分区,指定 location 确保分区的路径和原表一一对应。
  3. 使用 insert overwrite <TABLE> select ... 以及动态分区的技术,将 ORC 格式的原表读出来,以 TEXT 的格式存回原路径。
  4. drop 原表,按照 TEXT 方式建立新表。
  5. 为新表挂上分区,删掉临时表。

5. 动态分区

a. 开启设置

--允许使用动态分区可通过set hive.exec.dynamic.partition;查看
set hive.exec.dynamic.partition=true;
--当需要设置所有列为dynamic时需要这样设置
set hive.exec.dynamic.partition.mode=nonstrict;
--如果分区总数超过这个数量会报错
set hive.exec.max.dynamic.partitions=1000;
--单个MR Job允许创建分区的最大数量
set hive.exec.max.dynamic.partitions.pernode=1000;

b. 使用方式

假设表的结构是:

CREATE EXTERNAL TABLE temp_table(
    field1    string  comment 'field1',
    field2    string  comment 'field2'
)
COMMENT '临时表'
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';

那么调用的方式如下:

insert overwrite table temp_table_new partition(year,month,day)
select field1, field2, year, month, day
from temp_table;

注意:如果父分区是动态分区,那么子分区不能是静态分区

c. 挂分区的脚本

由于动态分区在创建分区的时候产生的路径形如: year=2016/month=03/day=05,和内部常用的 2016/03/03 的形式不一样,所以我写了一个 Python 脚本先挂分区指定 location

简单来说还是构建 SQL 字符串,然后使用 subprocess 模块来调用 hive -e 'cmd' 来执行。

#!/bin/env python
# -*- coding: utf-8 -*-

from datetime import date, timedelta


def date_from_str(date_str, splitor='-'):
    year, month, day = map(int, date_str.split(splitor))
    return date(year=year, month=month, day=day)

def date_interval(start_date, end_date):
    days = (end_date - start_date).days + 1
    return  [(start_date + timedelta(var_date)) for var_date in range(0, days)]

def add_partitions_cmd(datebase, table, start_date, end_date):
    CMD_STR = "use {};\n".format(datebase)
    for var_date in date_interval(start_date, end_date):
        year, month, day = var_date.isoformat().split('-')
        CMD_STR += "alter table {} add if not exists partition(year='{}', month='{}', day='{}') location '{}/{}/{}';\n" \
                    .format(table, year, month, day, year, month, day)
    return CMD_STR

if __name__ == '__main__':
    import sys

    if len(sys.argv) != 5:
        print 'Usage: python add_partitions.py <DATABASE> <TABLE> <START_DATE> <END_DATE>'
        sys.exit(255)

    datebase   = sys.argv[1]
    table      = sys.argv[2]
    start_date = date_from_str(sys.argv[3])
    end_date   = date_from_str(sys.argv[4])

    ADD_PARTITIONS = add_partitions_cmd(datebase, table, start_date, end_date)
    CMD = 'hive -e "{}"'.format(ADD_PARTITIONS)
    print CMD

    from subprocess import Popen, PIPE
    output, param = Popen(['hive', '-e', ADD_PARTITIONS], stdout=PIPE).communicate()
    print output
    print param

吐槽一下 subprocess

以前使用 Python 执行 Shell 的命令的时候常用 subprocess.getstatusoutput(cmd)。因为同时可以获得输出和返回值。这次一开始也是这么用的,结果老是报错。一查才知道尼玛这个函数没有了!