ElasticSearch 多值类型的数据存储

1. 使用场景

一般来说,我们的离线计算和统计都是在 Hive 里面进行的,但是也有离线计算不适应的场景。

比如在做移动行为分析的时候我们想要知道一连串事件的转化率,这就需要建立一个漏斗。如果使用 Hive 来计算漏斗,那么有几个问题是需要我们考虑的:

  1. 漏斗的建立需要一个尝试和调整的过程,有的时候业务比较复杂,那么漏斗的调整次数也会比较多。每次用 Hive 来计算的话显然是比较慢的。

  2. 同一个漏斗多人查看的话每次都使用 Hive 计算吗?显然不,数据应该使用 MySQL 来缓存已经查询到的结果,那么查询就变成了二级查询:先查 MySQL,如果 MySQL 中没有数据再调用 Hive 计算,然后把计算结果写到 MySQL 中,这就变成了一个比较复杂的异步的过程。我一开始的方案是:首次建立漏斗的话立刻异步地调用 Hive 去计算漏斗的历史数据,然后每天用调度系统将漏斗的配置 Sqoop 导入到 Hive 中进行计算,然后 Sqoop 导出到 MySQL 中用来展示。

  3. 同时也会遇到某些天的数据突然出现问题,修复后需要重启计算任务,那么如何保持 Hive、MySQL 数据的一致性也是需要我们考虑的。

总之:使用 Hive + MySQL 的方式来做漏斗计算不仅慢,而且复杂!两周的冲刺应该是上不了线的!

2. 解决方案

然后老大就让我使用 Hive + ElasticSearch 来做。简单来说,先使用 Hive 把源数据进行处理,得到初步聚合的结果,然后将数据导入到 ElasticSearch 中。使用的时候 Java 客户端程序通过参数构成查询条件向 ElasticSearch 查询数据。

当然坑也踩了不少,首先在使用 ElasticSearch 之前没有做过预先的调研,一直以关系型数据库的思维在想着如何来计算数据和查询数据,然后就发现了 ElasticSearch:

  • 不支持子查询
  • 不支持 COUNT(DISTINCT)
  • 不支持 Join

按照官方文档的说法,他们暂时不支持子查询和 COUNT(DISTINCT),短期内也不想增加这个功能。至于 Join,官方文档的建议是在 Java 程序中进行 Join,也就是说分为两步操作,Java 客户端先获取 Join 的字段,再以查到的 Join 字段作为查询条件进行查询。如果数据量很大的话,两次查询光传输数据就要数十秒了!

当时最大的困惑是我要计算用户事件行为的漏斗转化率,包括 pv 和 uv。问题来了,一开始的计算方案是把每一个用户的每一次事件信息都存储下来,计算的时候就用事件名去过滤然后 COUNT 就能算 pv 了。但是无法对用户去重,uv 计算不了。

好在老大提醒我把每个用户发生的事件及其点击次数放在一个 Map 里面,事件名作为 key,点击次数作为 value,一个用户一行就行。计算 pv 的时候根据对应事件以及它们的点击次数来求 sum,比如 map_type.event_id 就是从一个名叫 map_type 的字段中以 event_id 作为事件名查询点击次数。而求 uv 也很简单,因为每一行就对应一个用户,所以 COUNT 行数就行了。

在做漏斗计算的时候,只需要用 MySQL 来保存漏斗的配置信息,每次在 Java 后端代码中根据漏斗各个步骤的事件名构建查询语句,得到 ElasticSearch 返回的结果简单处理一下即可返回给前端。本来也担心过查询的速度是不是够快,需不需要用 MySQL 缓存漏斗的查询结果。但是真实使用的时候 ElasticSearch 查询的速度完全打消了我的顾虑。

3. 几个值得记录的问题

3.1 Map<String, bigint>

在解决问题的过程中还是有几个点值得提一下,首先 Hive 没有提供原生的将多行记录转成 Map<String, bigint> 类型的方法,str_to_map 函数只能返回 Map<String, String> 类型。当然这里其实可以写一个 UDAF 来解决,但是之前没有写过 UDF 和 UDAF,所以为了节省时间我用了 Streaming 中的 TRANSFORM 来解决这个问题。代码如下:

#!/bin/env python

import sys

def str_to_map(map_str):
    pairs = map_str.split(',')
    ret   = []
    for pair in pairs:
        key, value = pair.split('=')
        ret.append("%s\003%d" % (key, int(value)))
    return '\002'.join(ret)

for line in sys.stdin:
    line     = line.strip()
    fields   = line.split()
    map_str3 = fields.pop()
    map_str2 = fields.pop()
    map_str1 = fields.pop()
    fields.append(str_to_map(map_str1))
    fields.append(str_to_map(map_str2))
    fields.append(str_to_map(map_str3))
    print "\001".join(fields)

在这里返回 Map 类型数据的时候,我采用了默认的分割字符来标记 Map。

这里更新一下,为了节省磁盘空间,公司要求尽量以 ORC 格式来存储数据,导致上述的 Python 脚本不能使用。所以修改了一下 Hive 中 UDF str_to_map,使得其能够返回 Map<String, bigint>。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;

import java.util.LinkedHashMap;

/**
 * GenericUDFStringToMap.
 *
 */
@Description(name = "str_to_map", value = "_FUNC_(text, delimiter1, delimiter2) - "
        + "Creates a map by parsing text ", extended = "Split text into key-value pairs"
        + " using two delimiters. The first delimiter seperates pairs, and the"
        + " second delimiter sperates key and value. If only one parameter is given, default"
        + " delimiters are used: ',' as delimiter1 and '=' as delimiter2.")
public class GenericUDFStrToMap extends GenericUDF {
    // Must be deterministic order map for consistent q-test output across Java versions - see HIVE-9161
    private final LinkedHashMap<Object, Long> ret = new LinkedHashMap<Object, Long>();
    private transient Converter soi_text, soi_de1 = null, soi_de2 = null;
    final static String default_de1 = ",";
    final static String default_de2 = ":";

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        for (int idx = 0; idx < Math.min(arguments.length, 3); ++idx) {
            if (arguments[idx].getCategory() != Category.PRIMITIVE
                    || PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
                    ((PrimitiveObjectInspector) arguments[idx]).getPrimitiveCategory())
                    != PrimitiveGrouping.STRING_GROUP) {
                throw new UDFArgumentException("All argument should be string/character type");
            }
        }
        soi_text = ObjectInspectorConverters.getConverter(arguments[0],
                PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        if (arguments.length > 1) {
            soi_de1 = ObjectInspectorConverters.getConverter(arguments[1],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }
        if (arguments.length > 2) {
            soi_de2 = ObjectInspectorConverters.getConverter(arguments[2],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }

        return ObjectInspectorFactory.getStandardMapObjectInspector(
                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
                PrimitiveObjectInspectorFactory.javaLongObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        ret.clear();
        String text = (String) soi_text.convert(arguments[0].get());
        String delimiter1 = (soi_de1 == null) ?
                default_de1 : (String) soi_de1.convert(arguments[1].get());
        String delimiter2 = (soi_de2 == null) ?
                default_de2 : (String) soi_de2.convert(arguments[2].get());

        String[] keyValuePairs = text.split(delimiter1);

        for (String keyValuePair : keyValuePairs) {
            String[] keyValue = keyValuePair.split(delimiter2, 2);
            if (keyValue.length < 2) {
                ret.put(keyValuePair, null);
            } else {
                ret.put(keyValue[0], Long.valueOf(keyValue[1]));
            }
        }

        return ret;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "usage not know!";
    }
}

3.2 Index OR Type

在存储的时候还有一个问题需要解决:对于每一天的分区数据,是用 Index 还是 Type 来存储?这个问题的由来和 ElasticSearch 的存储方式以及数据的回溯有关。

ElasticSearch 在设计的时候一开始是对照关系型数据库的。一个 Index 对应一个数据库,Index 下的 Type 对应 Table。但这并不是强制的要求的,数据到底怎么存应该根据自己的使用情景来定,具体可以参照这个文章 index vs type

另外一方面,难免有些数据有错误或者指标口径被修改了,那么要重新计算和导入数据。那么在重新导入数据的时候就要先删除所有的分区数据,恰巧 ElasticSearch 是没有分区的概念的。如果把 Hive 一个表中所有分区数据存在一张表中,用 stat_date 字段来标记日期,那么在删除数据的时候 ElasticSearch 会先查出所有这一天的数据然后再删除,无疑是很慢的解决方法。

最终我采用的是一个分区一个 Index,每次重新导入数据前先把对应的 Index 删除。虽然 Index 更耗资源,但是这样使用更符合一般的工作流程。

发表评论

电子邮件地址不会被公开。 必填项已用*标注