Hive 向 ElasticSearch 导出数据

一般来说,Hive 里的表经过初步的处理之后就可以向 ElasticSearch 中导入数据了。

ElasticSearch 提供了和 Hive 的整合方案 elasticsearch-hadoop.jar,简单来说我们可以按照如下步骤进行操作。

  1. 如果已存在同名 Index 则先删除旧的 Index。
  2. 创建 Index,设置好 mappings。
  3. 创建导入数据用的映射表。
  4. 导入数据
  5. 删除临时表。

1. 删除已存在的 Index

因为我在使用的时候将每天的分区数据映射到一个 Index 中,所以为了回溯数据,每次导数开始之前先删除旧有的 Index。这个操作使用 ElasticSearch 提供的 RESTful API 就行:

curl -XDELETE "http://${IP_ADDRESS}:9200/${PATITION_NAME}"

2. 创建 Index,设置 mappings

其实这一步可以省略,在导入数据的时候 ElasticSearch 会自动创建对应的 mappings,但是提前创建 mappings 的好处是可以做自定义的设置。

比如下面的 "index": "not_analyzed" 属性禁止 ElasticSearch 对字符串的分析。如果不加上这个设置那么在匹配字符串的时候只能用 matchPhrase 而不能用 term,而且在 group by 某个字段的时候这个字段可能只返回字符串值的一部分,比如“北京”只返回一个“北”等等。

总之,如果不需要对文本做检索,所有字符串类型的属性最好都加上 "index": "not_analyzed"

curl -XPUT "http://${IP_ADDRESS}:9200/${PATITION_NAME}" -d '
{
    "mappings": {
        "data":{
            "properties": {
                "id": {
                    "type": "long"
                },
                "stat_date": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "some_str_field": {
                    "type": "string",
                    "index":  "not_analyzed"
                },
                "some_num_field": {
                    "type": "long"
                }
            },
            "_all": {
              "enabled": false
            }
        }
    }
}'

3. 创建导入数据用的映射表

创建一个外部表,并用 es.nodes 指定几个连接用的节点,es.resource指定映射的 Index 和 Type。给个例子:

add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
    id bigint comment 'id',
    stat_date string comment '日期',
    some_str_field string comment '字符串类型',
    some_num_field bigint comment '数值类型'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}'
);

4. 导入数据

SET hive.mapred.reduce.tasks.speculative.execution = false;
SET mapreduce.map.speculative = false;
SET mapreduce.reduce.speculative = false;
insert overwrite table ${TEMP_TABLE}
select  ${COLUMNS}
from    ${DBNAME}.${FILENAME}
where   concat(year,month,day)='$V_TS';

5. 删除临时表

导数用的临时表在导入数据结束后就可以删除了。

drop table ${TEMP_TABLE};

6. 此处有坑

在使用 Hive 向 ElasticSearch 中导出数据的时候,有一个大坑非常关键:并发!

这种导入数据的方法会启动 mapper 来执行,但是如果 mapper 数过高,ElasticSearch 过载(Overload)或者多出冗余的数据。据同事分析,原因是 mapper 会批量地将数据插入 ElasticSearch 中,而当 mapper 失败的时候会被重启,已经导入的数据又会重新插入 ElasticSearch 导致数据重复。

其中的一个手段就是适当减少并发的 mapper 的数量。通过合并文件的手段可以降低 mapper 的数量。

SET mapred.max.split.size=1000000000;
SET mapred.min.split.size=1000000000;
set mapred.min.split.size.per.node=1000000000;
set mapred.min.split.size.per.rack=1000000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

另外,还可以通过设置 ElasticSearch 的 _id 字段避免重复。比如为每一行生成一个 uuid,把 uuid 作为 mappings 中的 _id。ElasticSearch 对于 uuid 相同的记录会覆盖旧的记录,这样相同 uuid 的记录就不会重复了。

add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
    id bigint comment 'id',
    stat_date string comment '日期',
    some_str_field string comment '字符串类型',
    some_num_field bigint comment '数值类型',
    uuid string COMMENT '为了导入ES而准备的id'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}',
'es.mapping.id' = 'uuid'
);

最近更新:最近公司架构部门搞了一套通过 Kafka 导入数据的方案,先把数据写入到 Kafka,然后 Kafka 写入到 ElasticSearch。但是 Kafka 保证不会丢数,还是可能有重复数据项,所以也加了 uuid 作为 _id 来去重。这个套方案运行的很好,可惜同时导入的时候并发导入任务数也并不能太高,有些回溯数据的任务也只能一个个跑。

参考资料

ElasticSearch 与 Hive 的整合

一种 Hive 表存储格式的转换的方式

1. 序

实习的公司考虑到 ORC 格式拥有更高的压缩比和传输效率,所以要求我们建表的时候都采用 ORC 的格式。

好死不死,有个项目要把 Hive 里算出来的数通过 Sqoop 导出到 MySQL 里去。而 Sqoop 对于 ORC 格式的数据导出是有问题的,所以我面临的问题就是把 ORC 分区的数据转换成 TEXT 格式的。

2. 面临的实际状况

  1. 都是外部表
  2. 分区的字段是 yearmonthday
  3. 挂分区时指定 location '2016/03/03'(否则分区目录就会是 /year=2016/month=03/day=03 这样的形式)
  4. 已经有若干天的分区数据

3. 解决方案

最直观的的解决方案就是重新建表,指定 TEXT 作为存储格式,然后将原来的数据直接 insert 进去。

这种方法的缺陷是:虽然可以使用相同的表结构,但是由于名字不能重复,那么之前的计算脚本和调度任务都要进行相应的修改。考虑到一共有四张表,工作量还是挺大的。如果不想改变脚本和任务,那么就要在导出数据之后更改原表的存储方式并重新将数据导回去,还要花费一定的时间。

综合来说,最好要达到两个目标:
1. 不改变表的名字,避免大面积的修改
2. 尽量少地导数据,节省时间

4. 小伎俩

通过了一定时间的思考,我用了一点小伎俩来达成这个任务。

  1. 首先创建一张临时表,建为 TEXT 存储方式的外部表,然后指向原表相同的 HDFS 文件目录。
  2. 为临时表挂上分区,指定 location 确保分区的路径和原表一一对应。
  3. 使用 insert overwrite <TABLE> select ... 以及动态分区的技术,将 ORC 格式的原表读出来,以 TEXT 的格式存回原路径。
  4. drop 原表,按照 TEXT 方式建立新表。
  5. 为新表挂上分区,删掉临时表。

5. 动态分区

a. 开启设置

--允许使用动态分区可通过set hive.exec.dynamic.partition;查看
set hive.exec.dynamic.partition=true;
--当需要设置所有列为dynamic时需要这样设置
set hive.exec.dynamic.partition.mode=nonstrict;
--如果分区总数超过这个数量会报错
set hive.exec.max.dynamic.partitions=1000;
--单个MR Job允许创建分区的最大数量
set hive.exec.max.dynamic.partitions.pernode=1000;

b. 使用方式

假设表的结构是:

CREATE EXTERNAL TABLE temp_table(
    field1    string  comment 'field1',
    field2    string  comment 'field2'
)
COMMENT '临时表'
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';

那么调用的方式如下:

insert overwrite table temp_table_new partition(year,month,day)
select field1, field2, year, month, day
from temp_table;

注意:如果父分区是动态分区,那么子分区不能是静态分区

c. 挂分区的脚本

由于动态分区在创建分区的时候产生的路径形如: year=2016/month=03/day=05,和内部常用的 2016/03/03 的形式不一样,所以我写了一个 Python 脚本先挂分区指定 location

简单来说还是构建 SQL 字符串,然后使用 subprocess 模块来调用 hive -e 'cmd' 来执行。

#!/bin/env python
# -*- coding: utf-8 -*-

from datetime import date, timedelta


def date_from_str(date_str, splitor='-'):
    year, month, day = map(int, date_str.split(splitor))
    return date(year=year, month=month, day=day)

def date_interval(start_date, end_date):
    days = (end_date - start_date).days + 1
    return  [(start_date + timedelta(var_date)) for var_date in range(0, days)]

def add_partitions_cmd(datebase, table, start_date, end_date):
    CMD_STR = "use {};\n".format(datebase)
    for var_date in date_interval(start_date, end_date):
        year, month, day = var_date.isoformat().split('-')
        CMD_STR += "alter table {} add if not exists partition(year='{}', month='{}', day='{}') location '{}/{}/{}';\n" \
                    .format(table, year, month, day, year, month, day)
    return CMD_STR

if __name__ == '__main__':
    import sys

    if len(sys.argv) != 5:
        print 'Usage: python add_partitions.py <DATABASE> <TABLE> <START_DATE> <END_DATE>'
        sys.exit(255)

    datebase   = sys.argv[1]
    table      = sys.argv[2]
    start_date = date_from_str(sys.argv[3])
    end_date   = date_from_str(sys.argv[4])

    ADD_PARTITIONS = add_partitions_cmd(datebase, table, start_date, end_date)
    CMD = 'hive -e "{}"'.format(ADD_PARTITIONS)
    print CMD

    from subprocess import Popen, PIPE
    output, param = Popen(['hive', '-e', ADD_PARTITIONS], stdout=PIPE).communicate()
    print output
    print param

吐槽一下 subprocess

以前使用 Python 执行 Shell 的命令的时候常用 subprocess.getstatusoutput(cmd)。因为同时可以获得输出和返回值。这次一开始也是这么用的,结果老是报错。一查才知道尼玛这个函数没有了!

MySQL 与 Hive 中正则表达式字符转义的一个坑

提到正则表达式的转义,第一个念头就是加一个反斜杠 \

比如 . 在正则表达式中的含义就是匹配除 \n 之外的任何单个字符。

如果让你匹配在 Hive 或者 MySQL 的 SQL 语句中匹配像 4.2.5 这样的字符串,你会怎么做?

我的第一反应是:

[1-9]+\.[0-9]+\.[1-9]+

结果是错的!==!

事实上 . 的转义是 \\. 而不是 \.

有一个说法是 SQL 语句本身也是个字符串,交给程序处理的时候首先使用 \\\ 进行转义,然后用转义过的 \. 进行转义。

真是够绕的!