1. 序
实习的公司考虑到 ORC
格式拥有更高的压缩比和传输效率,所以要求我们建表的时候都采用 ORC
的格式。
好死不死,有个项目要把 Hive 里算出来的数通过 Sqoop 导出到 MySQL 里去。而 Sqoop 对于 ORC
格式的数据导出是有问题的,所以我面临的问题就是把 ORC
分区的数据转换成 TEXT
格式的。
2. 面临的实际状况
- 都是外部表
- 分区的字段是
year
、month
和day
- 挂分区时指定
location '2016/03/03'
(否则分区目录就会是/year=2016/month=03/day=03
这样的形式) - 已经有若干天的分区数据
3. 解决方案
最直观的的解决方案就是重新建表,指定 TEXT
作为存储格式,然后将原来的数据直接 insert
进去。
这种方法的缺陷是:虽然可以使用相同的表结构,但是由于名字不能重复,那么之前的计算脚本和调度任务都要进行相应的修改。考虑到一共有四张表,工作量还是挺大的。如果不想改变脚本和任务,那么就要在导出数据之后更改原表的存储方式并重新将数据导回去,还要花费一定的时间。
综合来说,最好要达到两个目标:
1. 不改变表的名字,避免大面积的修改
2. 尽量少地导数据,节省时间
4. 小伎俩
通过了一定时间的思考,我用了一点小伎俩来达成这个任务。
- 首先创建一张临时表,建为
TEXT
存储方式的外部表,然后指向原表相同的HDFS
文件目录。 - 为临时表挂上分区,指定
location
确保分区的路径和原表一一对应。 - 使用
insert overwrite <TABLE> select ...
以及动态分区的技术,将ORC
格式的原表读出来,以TEXT
的格式存回原路径。 drop
原表,按照TEXT
方式建立新表。- 为新表挂上分区,删掉临时表。
5. 动态分区
a. 开启设置
--允许使用动态分区可通过set hive.exec.dynamic.partition;查看
set hive.exec.dynamic.partition=true;
--当需要设置所有列为dynamic时需要这样设置
set hive.exec.dynamic.partition.mode=nonstrict;
--如果分区总数超过这个数量会报错
set hive.exec.max.dynamic.partitions=1000;
--单个MR Job允许创建分区的最大数量
set hive.exec.max.dynamic.partitions.pernode=1000;
b. 使用方式
假设表的结构是:
CREATE EXTERNAL TABLE temp_table(
field1 string comment 'field1',
field2 string comment 'field2'
)
COMMENT '临时表'
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';
那么调用的方式如下:
insert overwrite table temp_table_new partition(year,month,day)
select field1, field2, year, month, day
from temp_table;
注意:如果父分区是动态分区,那么子分区不能是静态分区
c. 挂分区的脚本
由于动态分区在创建分区的时候产生的路径形如: year=2016/month=03/day=05
,和内部常用的 2016/03/03
的形式不一样,所以我写了一个 Python 脚本先挂分区指定 location
。
简单来说还是构建 SQL 字符串,然后使用 subprocess
模块来调用 hive -e 'cmd'
来执行。
#!/bin/env python
# -*- coding: utf-8 -*-
from datetime import date, timedelta
def date_from_str(date_str, splitor='-'):
year, month, day = map(int, date_str.split(splitor))
return date(year=year, month=month, day=day)
def date_interval(start_date, end_date):
days = (end_date - start_date).days + 1
return [(start_date + timedelta(var_date)) for var_date in range(0, days)]
def add_partitions_cmd(datebase, table, start_date, end_date):
CMD_STR = "use {};\n".format(datebase)
for var_date in date_interval(start_date, end_date):
year, month, day = var_date.isoformat().split('-')
CMD_STR += "alter table {} add if not exists partition(year='{}', month='{}', day='{}') location '{}/{}/{}';\n" \
.format(table, year, month, day, year, month, day)
return CMD_STR
if __name__ == '__main__':
import sys
if len(sys.argv) != 5:
print 'Usage: python add_partitions.py <DATABASE> <TABLE> <START_DATE> <END_DATE>'
sys.exit(255)
datebase = sys.argv[1]
table = sys.argv[2]
start_date = date_from_str(sys.argv[3])
end_date = date_from_str(sys.argv[4])
ADD_PARTITIONS = add_partitions_cmd(datebase, table, start_date, end_date)
CMD = 'hive -e "{}"'.format(ADD_PARTITIONS)
print CMD
from subprocess import Popen, PIPE
output, param = Popen(['hive', '-e', ADD_PARTITIONS], stdout=PIPE).communicate()
print output
print param
吐槽一下 subprocess
以前使用 Python 执行 Shell 的命令的时候常用 subprocess.getstatusoutput(cmd)
。因为同时可以获得输出和返回值。这次一开始也是这么用的,结果老是报错。一查才知道尼玛这个函数没有了!