一般来说,Hive 里的表经过初步的处理之后就可以向 ElasticSearch 中导入数据了。
ElasticSearch 提供了和 Hive 的整合方案 elasticsearch-hadoop.jar,简单来说我们可以按照如下步骤进行操作。
- 如果已存在同名 Index 则先删除旧的 Index。
- 创建 Index,设置好 mappings。
- 创建导入数据用的映射表。
- 导入数据
- 删除临时表。
1. 删除已存在的 Index
因为我在使用的时候将每天的分区数据映射到一个 Index 中,所以为了回溯数据,每次导数开始之前先删除旧有的 Index。这个操作使用 ElasticSearch 提供的 RESTful API 就行:
curl -XDELETE "http://${IP_ADDRESS}:9200/${PATITION_NAME}"
2. 创建 Index,设置 mappings
其实这一步可以省略,在导入数据的时候 ElasticSearch 会自动创建对应的 mappings,但是提前创建 mappings 的好处是可以做自定义的设置。
比如下面的 "index": "not_analyzed"
属性禁止 ElasticSearch 对字符串的分析。如果不加上这个设置那么在匹配字符串的时候只能用 matchPhrase 而不能用 term,而且在 group by 某个字段的时候这个字段可能只返回字符串值的一部分,比如“北京”只返回一个“北”等等。
总之,如果不需要对文本做检索,所有字符串类型的属性最好都加上 "index": "not_analyzed"
。
curl -XPUT "http://${IP_ADDRESS}:9200/${PATITION_NAME}" -d '
{
"mappings": {
"data":{
"properties": {
"id": {
"type": "long"
},
"stat_date": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"some_str_field": {
"type": "string",
"index": "not_analyzed"
},
"some_num_field": {
"type": "long"
}
},
"_all": {
"enabled": false
}
}
}
}'
3. 创建导入数据用的映射表
创建一个外部表,并用 es.nodes
指定几个连接用的节点,es.resource
指定映射的 Index 和 Type。给个例子:
add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
id bigint comment 'id',
stat_date string comment '日期',
some_str_field string comment '字符串类型',
some_num_field bigint comment '数值类型'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}'
);
4. 导入数据
SET hive.mapred.reduce.tasks.speculative.execution = false;
SET mapreduce.map.speculative = false;
SET mapreduce.reduce.speculative = false;
insert overwrite table ${TEMP_TABLE}
select ${COLUMNS}
from ${DBNAME}.${FILENAME}
where concat(year,month,day)='$V_TS';
5. 删除临时表
导数用的临时表在导入数据结束后就可以删除了。
drop table ${TEMP_TABLE};
6. 此处有坑
在使用 Hive 向 ElasticSearch 中导出数据的时候,有一个大坑非常关键:并发!
这种导入数据的方法会启动 mapper 来执行,但是如果 mapper 数过高,ElasticSearch 过载(Overload)或者多出冗余的数据。据同事分析,原因是 mapper 会批量地将数据插入 ElasticSearch 中,而当 mapper 失败的时候会被重启,已经导入的数据又会重新插入 ElasticSearch 导致数据重复。
其中的一个手段就是适当减少并发的 mapper 的数量。通过合并文件的手段可以降低 mapper 的数量。
SET mapred.max.split.size=1000000000;
SET mapred.min.split.size=1000000000;
set mapred.min.split.size.per.node=1000000000;
set mapred.min.split.size.per.rack=1000000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
另外,还可以通过设置 ElasticSearch 的 _id 字段避免重复。比如为每一行生成一个 uuid,把 uuid 作为 mappings 中的 _id。ElasticSearch 对于 uuid 相同的记录会覆盖旧的记录,这样相同 uuid 的记录就不会重复了。
add jar elasticsearch-hadoop-hive-2.2.0.jar;
drop table if exists ${TEMP_TABLE};
CREATE EXTERNAL TABLE ${TEMP_TABLE}(
id bigint comment 'id',
stat_date string comment '日期',
some_str_field string comment '字符串类型',
some_num_field bigint comment '数值类型',
uuid string COMMENT '为了导入ES而准备的id'
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '${NODE_IP_ADDR_1}:9200,${NODE_IP_ADDR_2}:9200,${NODE_IP_ADDR_3}:9200',
'es.resource' = '${PATITION_NAME}/${TYPE}',
'es.mapping.id' = 'uuid'
);
最近更新:最近公司架构部门搞了一套通过 Kafka 导入数据的方案,先把数据写入到 Kafka,然后 Kafka 写入到 ElasticSearch。但是 Kafka 保证不会丢数,还是可能有重复数据项,所以也加了 uuid 作为 _id 来去重。这个套方案运行的很好,可惜同时导入的时候并发导入任务数也并不能太高,有些回溯数据的任务也只能一个个跑。