ElasticSearch 多值类型的数据存储

1. 使用场景

一般来说,我们的离线计算和统计都是在 Hive 里面进行的,但是也有离线计算不适应的场景。

比如在做移动行为分析的时候我们想要知道一连串事件的转化率,这就需要建立一个漏斗。如果使用 Hive 来计算漏斗,那么有几个问题是需要我们考虑的:

  1. 漏斗的建立需要一个尝试和调整的过程,有的时候业务比较复杂,那么漏斗的调整次数也会比较多。每次用 Hive 来计算的话显然是比较慢的。

  2. 同一个漏斗多人查看的话每次都使用 Hive 计算吗?显然不,数据应该使用 MySQL 来缓存已经查询到的结果,那么查询就变成了二级查询:先查 MySQL,如果 MySQL 中没有数据再调用 Hive 计算,然后把计算结果写到 MySQL 中,这就变成了一个比较复杂的异步的过程。我一开始的方案是:首次建立漏斗的话立刻异步地调用 Hive 去计算漏斗的历史数据,然后每天用调度系统将漏斗的配置 Sqoop 导入到 Hive 中进行计算,然后 Sqoop 导出到 MySQL 中用来展示。

  3. 同时也会遇到某些天的数据突然出现问题,修复后需要重启计算任务,那么如何保持 Hive、MySQL 数据的一致性也是需要我们考虑的。

总之:使用 Hive + MySQL 的方式来做漏斗计算不仅慢,而且复杂!两周的冲刺应该是上不了线的!

2. 解决方案

然后老大就让我使用 Hive + ElasticSearch 来做。简单来说,先使用 Hive 把源数据进行处理,得到初步聚合的结果,然后将数据导入到 ElasticSearch 中。使用的时候 Java 客户端程序通过参数构成查询条件向 ElasticSearch 查询数据。

当然坑也踩了不少,首先在使用 ElasticSearch 之前没有做过预先的调研,一直以关系型数据库的思维在想着如何来计算数据和查询数据,然后就发现了 ElasticSearch:

  • 不支持子查询
  • 不支持 COUNT(DISTINCT)
  • 不支持 Join

按照官方文档的说法,他们暂时不支持子查询和 COUNT(DISTINCT),短期内也不想增加这个功能。至于 Join,官方文档的建议是在 Java 程序中进行 Join,也就是说分为两步操作,Java 客户端先获取 Join 的字段,再以查到的 Join 字段作为查询条件进行查询。如果数据量很大的话,两次查询光传输数据就要数十秒了!

当时最大的困惑是我要计算用户事件行为的漏斗转化率,包括 pv 和 uv。问题来了,一开始的计算方案是把每一个用户的每一次事件信息都存储下来,计算的时候就用事件名去过滤然后 COUNT 就能算 pv 了。但是无法对用户去重,uv 计算不了。

好在老大提醒我把每个用户发生的事件及其点击次数放在一个 Map 里面,事件名作为 key,点击次数作为 value,一个用户一行就行。计算 pv 的时候根据对应事件以及它们的点击次数来求 sum,比如 map_type.event_id 就是从一个名叫 map_type 的字段中以 event_id 作为事件名查询点击次数。而求 uv 也很简单,因为每一行就对应一个用户,所以 COUNT 行数就行了。

在做漏斗计算的时候,只需要用 MySQL 来保存漏斗的配置信息,每次在 Java 后端代码中根据漏斗各个步骤的事件名构建查询语句,得到 ElasticSearch 返回的结果简单处理一下即可返回给前端。本来也担心过查询的速度是不是够快,需不需要用 MySQL 缓存漏斗的查询结果。但是真实使用的时候 ElasticSearch 查询的速度完全打消了我的顾虑。

3. 几个值得记录的问题

3.1 Map<String, bigint>

在解决问题的过程中还是有几个点值得提一下,首先 Hive 没有提供原生的将多行记录转成 Map<String, bigint> 类型的方法,str_to_map 函数只能返回 Map<String, String> 类型。当然这里其实可以写一个 UDAF 来解决,但是之前没有写过 UDF 和 UDAF,所以为了节省时间我用了 Streaming 中的 TRANSFORM 来解决这个问题。代码如下:

#!/bin/env python

import sys

def str_to_map(map_str):
    pairs = map_str.split(',')
    ret   = []
    for pair in pairs:
        key, value = pair.split('=')
        ret.append("%s\003%d" % (key, int(value)))
    return '\002'.join(ret)

for line in sys.stdin:
    line     = line.strip()
    fields   = line.split()
    map_str3 = fields.pop()
    map_str2 = fields.pop()
    map_str1 = fields.pop()
    fields.append(str_to_map(map_str1))
    fields.append(str_to_map(map_str2))
    fields.append(str_to_map(map_str3))
    print "\001".join(fields)

在这里返回 Map 类型数据的时候,我采用了默认的分割字符来标记 Map。

这里更新一下,为了节省磁盘空间,公司要求尽量以 ORC 格式来存储数据,导致上述的 Python 脚本不能使用。所以修改了一下 Hive 中 UDF str_to_map,使得其能够返回 Map<String, bigint>。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;

import java.util.LinkedHashMap;

/**
 * GenericUDFStringToMap.
 *
 */
@Description(name = "str_to_map", value = "_FUNC_(text, delimiter1, delimiter2) - "
        + "Creates a map by parsing text ", extended = "Split text into key-value pairs"
        + " using two delimiters. The first delimiter seperates pairs, and the"
        + " second delimiter sperates key and value. If only one parameter is given, default"
        + " delimiters are used: ',' as delimiter1 and '=' as delimiter2.")
public class GenericUDFStrToMap extends GenericUDF {
    // Must be deterministic order map for consistent q-test output across Java versions - see HIVE-9161
    private final LinkedHashMap<Object, Long> ret = new LinkedHashMap<Object, Long>();
    private transient Converter soi_text, soi_de1 = null, soi_de2 = null;
    final static String default_de1 = ",";
    final static String default_de2 = ":";

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        for (int idx = 0; idx < Math.min(arguments.length, 3); ++idx) {
            if (arguments[idx].getCategory() != Category.PRIMITIVE
                    || PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
                    ((PrimitiveObjectInspector) arguments[idx]).getPrimitiveCategory())
                    != PrimitiveGrouping.STRING_GROUP) {
                throw new UDFArgumentException("All argument should be string/character type");
            }
        }
        soi_text = ObjectInspectorConverters.getConverter(arguments[0],
                PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        if (arguments.length > 1) {
            soi_de1 = ObjectInspectorConverters.getConverter(arguments[1],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }
        if (arguments.length > 2) {
            soi_de2 = ObjectInspectorConverters.getConverter(arguments[2],
                    PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }

        return ObjectInspectorFactory.getStandardMapObjectInspector(
                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
                PrimitiveObjectInspectorFactory.javaLongObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        ret.clear();
        String text = (String) soi_text.convert(arguments[0].get());
        String delimiter1 = (soi_de1 == null) ?
                default_de1 : (String) soi_de1.convert(arguments[1].get());
        String delimiter2 = (soi_de2 == null) ?
                default_de2 : (String) soi_de2.convert(arguments[2].get());

        String[] keyValuePairs = text.split(delimiter1);

        for (String keyValuePair : keyValuePairs) {
            String[] keyValue = keyValuePair.split(delimiter2, 2);
            if (keyValue.length < 2) {
                ret.put(keyValuePair, null);
            } else {
                ret.put(keyValue[0], Long.valueOf(keyValue[1]));
            }
        }

        return ret;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "usage not know!";
    }
}

3.2 Index OR Type

在存储的时候还有一个问题需要解决:对于每一天的分区数据,是用 Index 还是 Type 来存储?这个问题的由来和 ElasticSearch 的存储方式以及数据的回溯有关。

ElasticSearch 在设计的时候一开始是对照关系型数据库的。一个 Index 对应一个数据库,Index 下的 Type 对应 Table。但这并不是强制的要求的,数据到底怎么存应该根据自己的使用情景来定,具体可以参照这个文章 index vs type

另外一方面,难免有些数据有错误或者指标口径被修改了,那么要重新计算和导入数据。那么在重新导入数据的时候就要先删除所有的分区数据,恰巧 ElasticSearch 是没有分区的概念的。如果把 Hive 一个表中所有分区数据存在一张表中,用 stat_date 字段来标记日期,那么在删除数据的时候 ElasticSearch 会先查出所有这一天的数据然后再删除,无疑是很慢的解决方法。

最终我采用的是一个分区一个 Index,每次重新导入数据前先把对应的 Index 删除。虽然 Index 更耗资源,但是这样使用更符合一般的工作流程。

一种 Hive 表存储格式的转换的方式

1. 序

实习的公司考虑到 ORC 格式拥有更高的压缩比和传输效率,所以要求我们建表的时候都采用 ORC 的格式。

好死不死,有个项目要把 Hive 里算出来的数通过 Sqoop 导出到 MySQL 里去。而 Sqoop 对于 ORC 格式的数据导出是有问题的,所以我面临的问题就是把 ORC 分区的数据转换成 TEXT 格式的。

2. 面临的实际状况

  1. 都是外部表
  2. 分区的字段是 yearmonthday
  3. 挂分区时指定 location '2016/03/03'(否则分区目录就会是 /year=2016/month=03/day=03 这样的形式)
  4. 已经有若干天的分区数据

3. 解决方案

最直观的的解决方案就是重新建表,指定 TEXT 作为存储格式,然后将原来的数据直接 insert 进去。

这种方法的缺陷是:虽然可以使用相同的表结构,但是由于名字不能重复,那么之前的计算脚本和调度任务都要进行相应的修改。考虑到一共有四张表,工作量还是挺大的。如果不想改变脚本和任务,那么就要在导出数据之后更改原表的存储方式并重新将数据导回去,还要花费一定的时间。

综合来说,最好要达到两个目标:
1. 不改变表的名字,避免大面积的修改
2. 尽量少地导数据,节省时间

4. 小伎俩

通过了一定时间的思考,我用了一点小伎俩来达成这个任务。

  1. 首先创建一张临时表,建为 TEXT 存储方式的外部表,然后指向原表相同的 HDFS 文件目录。
  2. 为临时表挂上分区,指定 location 确保分区的路径和原表一一对应。
  3. 使用 insert overwrite <TABLE> select ... 以及动态分区的技术,将 ORC 格式的原表读出来,以 TEXT 的格式存回原路径。
  4. drop 原表,按照 TEXT 方式建立新表。
  5. 为新表挂上分区,删掉临时表。

5. 动态分区

a. 开启设置

--允许使用动态分区可通过set hive.exec.dynamic.partition;查看
set hive.exec.dynamic.partition=true;
--当需要设置所有列为dynamic时需要这样设置
set hive.exec.dynamic.partition.mode=nonstrict;
--如果分区总数超过这个数量会报错
set hive.exec.max.dynamic.partitions=1000;
--单个MR Job允许创建分区的最大数量
set hive.exec.max.dynamic.partitions.pernode=1000;

b. 使用方式

假设表的结构是:

CREATE EXTERNAL TABLE temp_table(
    field1    string  comment 'field1',
    field2    string  comment 'field2'
)
COMMENT '临时表'
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';

那么调用的方式如下:

insert overwrite table temp_table_new partition(year,month,day)
select field1, field2, year, month, day
from temp_table;

注意:如果父分区是动态分区,那么子分区不能是静态分区

c. 挂分区的脚本

由于动态分区在创建分区的时候产生的路径形如: year=2016/month=03/day=05,和内部常用的 2016/03/03 的形式不一样,所以我写了一个 Python 脚本先挂分区指定 location

简单来说还是构建 SQL 字符串,然后使用 subprocess 模块来调用 hive -e 'cmd' 来执行。

#!/bin/env python
# -*- coding: utf-8 -*-

from datetime import date, timedelta


def date_from_str(date_str, splitor='-'):
    year, month, day = map(int, date_str.split(splitor))
    return date(year=year, month=month, day=day)

def date_interval(start_date, end_date):
    days = (end_date - start_date).days + 1
    return  [(start_date + timedelta(var_date)) for var_date in range(0, days)]

def add_partitions_cmd(datebase, table, start_date, end_date):
    CMD_STR = "use {};\n".format(datebase)
    for var_date in date_interval(start_date, end_date):
        year, month, day = var_date.isoformat().split('-')
        CMD_STR += "alter table {} add if not exists partition(year='{}', month='{}', day='{}') location '{}/{}/{}';\n" \
                    .format(table, year, month, day, year, month, day)
    return CMD_STR

if __name__ == '__main__':
    import sys

    if len(sys.argv) != 5:
        print 'Usage: python add_partitions.py <DATABASE> <TABLE> <START_DATE> <END_DATE>'
        sys.exit(255)

    datebase   = sys.argv[1]
    table      = sys.argv[2]
    start_date = date_from_str(sys.argv[3])
    end_date   = date_from_str(sys.argv[4])

    ADD_PARTITIONS = add_partitions_cmd(datebase, table, start_date, end_date)
    CMD = 'hive -e "{}"'.format(ADD_PARTITIONS)
    print CMD

    from subprocess import Popen, PIPE
    output, param = Popen(['hive', '-e', ADD_PARTITIONS], stdout=PIPE).communicate()
    print output
    print param

吐槽一下 subprocess

以前使用 Python 执行 Shell 的命令的时候常用 subprocess.getstatusoutput(cmd)。因为同时可以获得输出和返回值。这次一开始也是这么用的,结果老是报错。一查才知道尼玛这个函数没有了!

Python 使用 MySQL 数据库

#!/bin/env python
# -*- coding: utf-8 -*-

import MySQLdb

def printRow(res):
    for prop in res:
        print prop,
    print

conn = MySQLdb.connect(host="localhost",user="root",passwd="root",db="employees",port=3306)
cur = conn.cursor()

# fetchone()
# 获取一条结果
print '='*5 + 'fetchone' + '='*5
rows = cur.execute("select count(*) from employees")
result = cur.fetchone()
print "rows:%d, result:%d" % (rows,result[0])

# fetchmany()
# 获取多条结果
print '='*5 + 'fetchmany' + '='*5
rows = cur.execute("select * from employees limit 10")
print "rows:%d" % rows
result = cur.fetchmany(5)
print "first 5:"
for res in result:
    printRow(res)
print "last 5:"
result = cur.fetchmany(5)
for res in result:
    for prop in res:
        print prop,
    print

# fetchall()
# 获取所有结果
print '='*5 + 'fetchall' + '='*5
rows = cur.execute("select * from employees limit 10")
print "rows:%d" % rows
result = cur.fetchall()
print "all 10:"
for res in result:
    printRow(res)

# scroll()
# 控制游标的移动
print '='*5 + 'scroll' + '='*5
rows = cur.execute("select * from employees limit 10")
print "rows:%d" % rows
print "first row:"
first_row = cur.fetchone()
printRow(first_row)
# absolute 从第 0 条位置向下移动 2 条
print "third row:"
cur.scroll(2,mode='absolute')
third_row = cur.fetchone()
printRow(third_row)
# relative 从当前移动 3 条
print "seventh row:"
cur.scroll(3,mode='relative')
seventh_row = cur.fetchone()
printRow(seventh_row)

# executemany()
# 用多个参数执行同一条语句,参数是所有支持迭代的对象
print '='*5 + 'executemany' + '='*5
genders = ['F','M']
rows = cur.executemany('select * from employees where gender=%s limit 1', genders)
result = cur.fetchall()
print rows
print result
# 执行结果表明每一条语句执行过后游标 cursor 都会改变。
# 因此不适合多参数的查询,而适合多参数的插入

# conn.commit()
# 用来执行事务
print '='*5 + 'commit' + '='*5
try:
    cur.execute("insert into employees values(0,date(now()),'Jack','Sparrow','M',date(now()))")
    cur.execute("insert into employees values(1,date(now()),'Jack','Sparrow','M',date(now()))")
    raise MySQLdb.Error
    conn.commit()
except MySQLdb.Error,e:
    # cur.execute("delete from employees where emp_no in (0,1)")
    print "Error Message: %s" % str(e.args)

cur.close()
conn.close()

SVN迁移到Git:保持提交记录

1.前言

最近组里要把之前在 SVN 上管理的项目迁移到 SVN 上去,所以花了一个晚上研究了一下 SVN 向 Git 迁移的方法。这里首先给出一种能保存提交记录的方案。

2.实施准备

再实施迁移之前,有几件事情是必须做的:

  1. 获取 SVN 路径权限
  2. 将本机的公钥提交到 Git 服务器上
  3. 在 Git 服务器上建立仓库

3.实施过程

实施的过程分为以下几个步骤:

  1. 将 SVN checkout 到本地
  2. 获取用户名列表
  3. 用 git svn clone 创建 Git 仓库
  4. 导出 .gitignore 文件
  5. 调整分支和标签
  6. 推送到远程

a. 将 SVN checkout 到本地

svn checkout --username USER_NAME --password PASSWORD https://svn.xxx/dir_name  dir_name

b. 获取用户名列表

通过如下命令获得用户列表:

svn log --xml | grep -P "^<author" | sort -u | perl -pe 's/<author>(.*?)<\/author>/$1 = /'

然后按照如下格式放入一个文件中(比如 user.txt):

schacon = Scott Chacon <schacon@geemail.com>
selse = Someo Nelse <selse@geemail.com>

c. 用 git svn clone 创建 Git 仓库

git svn clone --no-metadata -A users.txt https://svn.xxx/dir_name  dir_name

d. 导出 .gitignore 文件

git svn show-ignore > .gitignore
git add .gitignore
git commit -m 'Convert svn:ignore properties to .gitignore.'

e. 调整分支和标签

首先要移动标签,把它们从奇怪的远程分支变成实际的标签,然后把剩下的分支移动到本地。

cp -Rf .git/refs/remotes/tags/* .git/refs/tags/
rm -Rf .git/refs/remotes/tags

接下来,把 refs/remotes 下面剩下的索引变成本地分支:

cp -Rf .git/refs/remotes/* .git/refs/heads/
rm -Rf .git/refs/remotes

f. 推送到远程

git remote add origin git@my-git-server:myrepository.git
git push origin --all

4. Python 脚本

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import commands

if len(sys.argv) < 3:
    print 'Usage: python %s USERNAME PASSWORD' % sys.argv[0]
    exit(255)

user_name = sys.argv[1]
user_pwd  = sys.argv[2]

svn_repos = [
        svn_path,
        ......
        ]

git_repos = [
        git_path,
        ......
        ]

users = {}

def init():
    run_cmd('.', 'mkdir git')
    run_cmd('.', 'mkdir svn')

def run_cmd(dir_name, cmd):
    cmd_str = 'cd %s && %s' % (dir_name, cmd)
    print cmd_str
    status,output = commands.getstatusoutput(cmd_str)
    print 'status: %d' % status
    print 'output: \n%s' % output
    print '=========================================================\n'

    return status,output

def clone_svn_to_local(svn_addr):
    repo_dir_name = get_repo_dir_name(svn_addr)
    run_cmd('svn', "svn checkout --username %s --password %s %s %s" % (user_name, user_pwd, svn_addr, repo_dir_name))
    _,output = run_cmd('svn/' + repo_dir_name, "svn log --xml | grep -P '^<author' | sort -u | perl -pe 's/<author>(.*?)<\/author>/$1 = /'")
    lines = output.split("\n")
    for line in lines:
        name = line.split('=')[0].strip()
        users[name] = ('%s = %s <%s@didichuxing.com>' % (name, name, name))

def write_users():
    with open('git/users.txt', 'w+') as f:
        for name in users.values():
            f.write('%s\n' % name)

def git_svn_clone(svn_addr):
    repo_dir_name = get_repo_dir_name(svn_addr)
    run_cmd('git', 'git svn clone --no-metadata -A users.txt %s %s' % (svn_addr, repo_dir_name))
    run_cmd('git/' + repo_dir_name, 'git svn show-ignore > .gitignore')
    run_cmd('git/' + repo_dir_name, 'git add .gitignore')
    run_cmd('git/' + repo_dir_name, "git commit -m 'Convert svn:ignore properties to .gitignore.'")
    run_cmd('git/' + repo_dir_name, 'cp -Rf .git/refs/remotes/tags/* .git/refs/tags/')
    run_cmd('git/' + repo_dir_name, 'rm -Rf .git/refs/remotes/tags')
    run_cmd('git/' + repo_dir_name, 'cp -Rf .git/refs/remotes/* .git/refs/heads/')
    run_cmd('git/' + repo_dir_name, 'rm -Rf .git/refs/remotes')

def diff_git_and_svn_repos(svn_addr):
    repo_dir_name = get_repo_dir_name(svn_addr)
    git_repo_path = 'git/' + repo_dir_name
    svn_repo_path = 'svn/' + repo_dir_name
    _,output = run_cmd('.', 'diff -r %s %s' % (git_repo_path, svn_repo_path))
    expected = 'Only in git/%s: .git\nOnly in git/%s: .gitignore\nOnly in svn/%s: .svn' % (repo_dir_name, repo_dir_name, repo_dir_name)
    if expected != output:
        raise Exception('迁移后内容不一致:\n%s' % output)

def push_to_remote(svn_addr, git_addr):
    repo_dir_name = get_repo_dir_name(svn_addr)
    run_cmd('git/' + repo_dir_name, 'git checkout -b svn_migrate_branch')
    run_cmd('git/' + repo_dir_name, 'git remote add origin %s' % git_addr)
    run_cmd('git/' + repo_dir_name, 'git push origin --all')

def get_repo_dir_name(svn_addr):
    return svn_addr.split('/')[-1]

def main():
    init()

    for svn_addr in svn_repos:
        clone_svn_to_local(svn_addr)

    write_users()

    repo_nums = len(git_repos)
    for i in range(0,repo_nums):
        git_svn_clone(svn_repos[i])
        diff_git_and_svn_repos(svn_repos[i])
        push_to_remote(svn_repos[i], git_repos[i])

if __name__ == '__main__':
    main()

4. 参考文献

5. 一点想法

其实大家都能看出来这个脚本大部分都是对 shell 命令的调用。为什么舍近求远呢?
因为本人一直觉得 shell 的内部命令,关于流程控制以及字符串处理等实在不好用。我更倾向于 Python 和 Ruby 这样强大的脚本语言。
其实我对 Python 用的也不是很多,在写脚本的过程中查阅了一些 Python 调用 shell 的资料,也和 Ruby 中类似的代码做个比较。
在上面的脚本中我使用了 commands 包的 getstatusoutput('cmd') 方法 。该函数返回了一个元组,通过序列解包可以同时获得返回值和标准输出流的输出。一般调用方法为:

(status, output) = commands.getstatusoutput('shell 命令')
print status, output

而 Ruby 中也有类似的方法,即 Kernel 模块中的 `cmd` 方法。但是该方法直接返回的只有标准输出,返回值还要靠 $? 来获取。

irb(main):014:0> `echo hello && exit 99`
=> "hello\n"
irb(main):015:0> $?.exitstatus
=> 99

在便利性上,Python 的序列解包使得多个函数返回值变成可能,减少了封装也提高了使用的便利性。

Fuzzy C Means 算法及其 Python 实现

1. K-Means 算法向 FCM 算法的扩展

K-Means 算法中,如果要将数据集合 X=\{ X_1,X_2,X_3,\dots,X_n \} 划分为 k\,(1 \le k \le n) 个类,使得任意数据对象 X_i 必须属于并且仅属于一个类,同时每一个类至少包含一个数据对象,那么可以用一个 k\times n 的矩阵 U 来表示,矩阵中的任意一个元素 u_{ij} 可以表示为:

    \[{u_{ij}} = \left{ \begin{cases} 1& {X_i \in G_j} \\ 0& {X_i \notin G_j} \end{cases} \right.\]

其中 {G_j}\left( {j = 1,2, \ldots ,k} \right) 表示第 j 个类。并且 U 需要满足如下条件 (1 \le i \le k,\,1 \le j \le n)

    \[\left{ \begin{cases} {u_{ij}} \in \{0,1\} \\ \sum\limits_{i = 1}^k {{u_{ij}}} = 1 \\ \sum\limits_{j = 1}^n {{u_{ij}}} > 0 \end{cases} \right.\]

如果上述矩阵 U 中的元素 u_{ij} 的取值范围不仅仅是 0 或者 1,那么就可以推广到模糊集合上的划分,U 就变成了模糊判定矩阵。此时 {u_{ij}} 需满足:

(1)   \begin{equation*}  \left{ \begin{cases} {u_{ij}} \in [ 0,1 ]} \\ \sum\limits_{i = 1}^k {{u_{ij}}} = 1 \\ \sum\limits_{j = 1}^n {{u_{ij}}} > 0 \end{cases} \right. \end{equation*}

2. 目标函数与聚类中心

K-Means 算法在度量数据对象的非相似性(或者说距离)时一般使用欧几里得距离,要求每个类的聚类中心与数据对象的距离平方之和最小,目标函数可以表示为:

    \[J = \sum\limits_{i = 1}^k {\sum\limits_{j = 1}^n {s_{ij}^2} }\]

    \[{s_{ij}} = Eculid({C_i},{X_j})\]

其中 C_i 表示任意聚类中心,而聚类中心一般取类内所有对象在各属性上的平均值,因此可以表示为:

    \[{C_i} = \frac{{\sum\limits_{j,{X_j} \in {G_i}} {{X_j}} }}{{\sum\limits_{j = 1}^n {{u_{ij}}} }}\]

{G_i}{\kern 1pt} \left( {1 \le i \le k} \right) 表示任意一个类。

将算法推广到模糊集后,Dunn 对样本与类中心之间的距离采用隶属度的平方来加权,Bezdek 则进一步引入了隶属度的加权指数 m 从而得到了新的目标函数:

(2)   \begin{equation*}  J = \sum\limits_{i = 1}^k {\sum\limits_{j = 1}^n {{{\left( {{u_{ij}}} \right)}^m}s_{ij}^2} } \end{equation*}

要使得 (2) 式达到最小值则要求聚类中心 C_i 和隶属度 u_{ij} 满足如下条件:

(3)   \begin{equation*}  {C_i} = \frac{{\sum\limits_{j = 1}^n {u_{ij}^m{X_j}} }}{{\sum\limits_{j = 1}^n {u_{ij}^m} }} \end{equation*}

(4)   \begin{equation*}  {u_{ij}} = \frac{1}{{\sum\limits_{l = 1}^k {{{\left( {\frac{{{u_{ij}}}}{{{u_{lj}}}}} \right)}^{2/\left( {m - 1} \right)}}} }} \end{equation*}

3. FCM 算法计算过程

由于聚类中心和隶属度两个必要条件的存在,FCM 算法的可以表示为一个简单的迭代过程。将 n 个拥有 m 个属性的对象构建成一个 n\times m 的矩阵记为 DFCM 的算法可以描述为:

输入:隶属度加权指数 m,数据矩阵 D,聚类的个数 k,最大迭代次数 iterMax,阈值 threshold
输出:隶属度矩阵 U
算法过程:
Step1: 用区间 [0,1] 内的满足 (1) 式的随机数初始化隶属度矩阵,记为 D^{(0)}。初始化迭代计数器 iterCounter
Step2: 根据 (3) 式和 U^{(iterCounter)} 计算 k 个聚类中心,记做 C^{(iterCounter)}
Step3: 根据 (2) 式计算目标函数的值。如果本次迭代的目标函数值与上一次计算的目标函数值之差小于 threshold 或者迭代计数器 iterCounter 大于等于 iterMax 那么迭代结束,输出结果,算法停止。
Step4: 依据 C^{(iterCounter)} 和 (4) 式计算 U^{(iterCounter+1)}iterCounter++ ,返回Step2。

4. Python 实现代码

import numpy as np

def loadDataFromTxt(fileName):
    dataSet=np.mat(np.loadtxt(fileName,delimiter='\t'))
    return dataSet

def normalization(dataSet):
    colNum=np.shape(dataSet)[1]
    for index in range(colNum):
        colMax=np.max(dataSet[:,index])
        dataSet[:,index]=dataSet[:,index]/colMax
    return dataSet

def initWithFuzzyMat(n,k):
    fuzzyMat=np.mat(np.zeros((k,n)))
    for colIndex in range(n):
        memDegreeSum=0
        randoms=np.random.rand(k-1,1)
        for rowIndex in range(k-1):
            fuzzyMat[rowIndex,colIndex]=randoms[rowIndex,0]*(1-memDegreeSum)
            memDegreeSum+=fuzzyMat[rowIndex,colIndex]
        fuzzyMat[-1,colIndex]=1-memDegreeSum
    return fuzzyMat

def eculidDistance(vectA,vectB):
    return np.sqrt(np.sum(np.power(vectA-vectB,2)))

def calCentWithFuzzyMat(dataSet,fuzzyMat,p):
    n,m=dataSet.shape
    k=fuzzyMat.shape[0]
    centroids=np.mat(np.zeros((k,m)))
    for rowIndex in range(k):
        degExpArray=np.power(fuzzyMat[rowIndex,:],p)
        denominator=np.sum(degExpArray)
        numerator=np.array(np.zeros((1,m)))
        for colIndex in range(n):
            numerator+=dataSet[colIndex]*degExpArray[0,colIndex]
        centroids[rowIndex,:]=numerator/denominator
    return centroids

def calFuzzyMatWithCent(dataSet,centroids,p):
    n,m=dataSet.shape
    c=centroids.shape[0]
    fuzzyMat=np.mat(np.zeros((c,n)))
    for rowIndex in range(c):
        for colIndex in range(n):
            d_ij=eculidDistance(centroids[rowIndex,:],dataSet[colIndex,:])
            fuzzyMat[rowIndex,colIndex]=1/np.sum([np.power(d_ij/eculidDistance(centroid,dataSet[colIndex,:]),2/(p-1)) for centroid in centroids])
    return fuzzyMat

def calTargetFunc(dataSet,fuzzyMat,centroids,k,p):
    n,m=dataSet.shape
    c=fuzzyMat.shape[0]
    targetFunc=0
    for rowIndex in range(c):
        for colIndex in range(n):
            targetFunc+=eculidDistance(centroids[rowIndex,:],dataSet[colIndex,:])**2*np.power(fuzzyMat[rowIndex,colIndex],p)
    return targetFunc

def fuzzyCMean(dataSet,k,p,initMethod=initWithFuzzyMat):
    n,m=dataSet.shape
    fuzzyMat=initWithFuzzyMat(n,k)
    centroids=calCentWithFuzzyMat(dataSet,fuzzyMat,p)
    lastTargetFunc=calTargetFunc(dataSet,fuzzyMat,centroids,k,p)

    fuzzyMat=calFuzzyMatWithCent(dataSet,centroids,p)
    centroids=calCentWithFuzzyMat(dataSet,fuzzyMat,p)
    targetFunc=calTargetFunc(dataSet,fuzzyMat,centroids,k,p)
    while lastTargetFunc*0.99>targetFunc:
        lastTargetFunc=targetFunc
        fuzzyMat=calFuzzyMatWithCent(dataSet,centroids,p)
        centroids=calCentWithFuzzyMat(dataSet,fuzzyMat,p)
        targetFunc=calTargetFunc(dataSet,fuzzyMat,centroids,k,p)
    return fuzzyMat,centroids


if __name__=='__main__':
    dataSet=loadDataFromTxt('data.txt')
    dataSet=normalization(dataSet)
    fuzzyMat,centroids=fuzzyCMean(dataSet,3,2)
    print 'fuzzyMat=\n',fuzzyMat
    print np.sum(fuzzyMat,axis=0)
    print 'centroids=\n',centroids

使用Matplotlib绘制散点图

由于要做算法结果的可视化输出,需要绘制大量的散点图,所以打算使用 matplotlib,以下给出一个绘制散点图例子。

效果图

scatter

源代码

import matplotlib.pyplot as plt
import numpy as np

n = 100

for color in ['red','blue','green']:
    x,y=np.random.rand(2,n)
    scale=100*np.random.rand(n)
    plt.scatter(x,y,c=color,s=scale,label=color,alpha=0.6,edgecolors='white')

plt.title('Scatter')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.grid(True)
plt.show()

函数说明

numpy.random.rand(d0,d1,d2…)

返回维度为 (d0,d1,d2…) 的位于区间 [0,1) 满足均匀分布的随机数,返回值结果类型为 ndarray。

matplotlib.pyplot.scatter(x,y,c=color,s=scale,label=color,alpha=0.6,edgecolors=’white’)

scatter() 函数用来绘制散点图, x 和 y 为输入数据,形如 shape (n, )。c 表示散点的颜色,指定一个颜色或者色序。s 表示散点的大小,形如 shape (n, )。label 表示显示在图例中的标注。alpha 是 RGBA 颜色的透明分量。edgecolors 指定三点圆周的颜色。

其他函数

函数名 作用
title 图标的标题
xlabel x轴的名称
ylabel y轴的名称
legend 显示右上角的图例
grid 显示网格
show 显示图像

使用Python对图像进行尺寸和格式的转换

说起来这还是在万网虚拟主机备案的时候遇到的问题。备案的时候要求上传 800×600 的带背景的照片,因为指定的照相馆离我住的地方有一定的距离,所以就申请了幕布自己拍,拍完以后需要当然要修改照片的分辨率。按照以往就是安装一个格式工厂转一下,但是既然学了 Python 当然要使用 PIL 来试试手了!
代码比较简单:

from PIL import Image

img=Image.open('./IMG_0154.jpg')
img.show()
print img.size

resize_img=img.resize((800,600))
resize_img.show()
resize_img.save('resize.jpg')

使用 Image 还能做图像的旋转、缩略图等效果我就不介绍了,真心感慨一下 Python 真是一个方便的好工具!

使用Numpy实现K-Means算法

1. K-Means算法简介

1967年,MacQueen 首次提出了 K 均值聚类算法( K-Means算法 )。迄今为止,很多聚类任务都选择该经典算法。K-Means算法的优点是快速高效,其算法复杂度为 O(tKmn),其中 t 表示算法迭代的次数,K 表示聚类的数目,m 表示每个对象拥有的属性个数,n 表示待聚类的对象的个数。

该算法的核心思想是找出 k 个聚类中心,使得每一个数据点 X_i 和与其最近的聚类中心 C_l 的平方距离和被最小化( 该平方距离和被称为代价函数值 E )。

    \[D(X_i,C_l) = \sum\limits_{j = 1}^{ m } {Euclid^2(X_{ij},C_{lj})}\]

    \[E = \sum\limits_{l = 1}^k {\sum\limits_{i = 1}^{{n}} {D({X_i},{C_l})} }\]

K-Means算法的 mean 为类簇的中心,是一个类簇中所有对象在所有属性上的平均。在聚类的初始阶段,我们一般随机指定待聚类对象中的 K 个对象作为 mean

但是该方法也存在着一些缺陷:(1)K-Means 算法往往只能收敛到一个局部最优值。(2)聚类结果对聚类数目 K 和聚类开始时选取的 K 个初始对象非常敏感。

2. K-Means算法描述

输入:n 个带聚类对象,聚类中心个数 K
输出:K 个类簇(包括类簇中心和类簇中的对象)
算法过程:

  1. n 个对象的数据进行标准化。
  2. 随机选取 K 个对象作为初始的聚类中心。
  3. 计算每个对象到 K 个聚类中心的距离,将对象加入距离最近的类簇中。
  4. 根据类簇内的对象更新类簇的中心。
  5. 计算代价函数值 E
  6. 迭代第3步到第5步直至 E收敛。

3. Python代码

下文的代码实现使用了Python的Numpy库,关于Numpy的安装和使用请参照这里

import numpy as np

def loadDataFromTxt(fileName):
    dataMat=[]
    fr=open(fileName)
    for line in fr.readlines():
        curLine=line.strip().split('\t')
        fltLine=map(float,curLine)
        dataMat.append(fltLine)
    dataMat=np.mat(dataMat)
    return dataMat

def autoNormal(dataSet):
    dataShape=np.shape(dataSet)
    n=dataShape[0]
    m=dataShape[1]
    for j in range(m):
        colMax=np.max(dataSet[:,j])
        colMin=np.min(dataSet[:,j])
        colRange=colMax-colMin
        dataSet[:,j]=(dataSet[:,j]-colMin)/colRange
    return dataSet

def randomInitCentroids(dataSet,k):
    m=np.shape(dataSet)[1]
    centroids=np.mat(np.zeros((k,m)))
    for j in range(m):
        colMin=np.min(dataSet[:,j])
        colMax=np.max(dataSet[:,j])
        rangeJ=float(colMax-colMin)
        centroids[:,j]=colMin+np.random.rand(k,1)*rangeJ
    return centroids

def euclidDistance(vectA,vectB):
    return np.sqrt(np.sum(np.power(vectA-vectB,2)))

def KMean(dataSet,k,normalMethod=autoNormal,intialMethod=randomInitCentroids,distMethod=euclidDistance):
    n=dataSet.shape[0]
    dataSet=normalMethod(dataSet)
    centroids=intialMethod(dataSet,k)
    clusterAssignment=np.mat(np.zeros((n,2)))
    clusterChanged=True
    while clusterChanged:
        clusterChanged=False
        for i in range(n):
            minIndex=-1;minDist=np.inf
            for j in range(k):
                distance=distMethod(dataSet[i,:],centroids[j,:])
                if distance<minDist:
                    minIndex=j
                    minDist=distance
            if clusterAssignment[i,0] != minIndex :
                clusterChanged=True
                clusterAssignment[i,:]=minIndex,minDist        
        for cent in range(k):
            ptsInCluster=dataSet[np.where(clusterAssignment[:,0].A==cent)[0]]
            centroids[cent,:]=np.mean(ptsInCluster,axis=0)
    return centroids,clusterAssignment

dataSet=loadDataFromTxt('data.txt')
centroids,clusterAssignment=KMean(dataSet,2)
print clusterAssignment
print centroids

Numpy和matplotlib的安装和使用

最近要写几个数据挖掘的算法,可是由于Java和C#没有比较成熟和知名的可视化类库,所以想到了用Python的Numpy计算,用matplotlib来做结果的可视化。

Numpy与matplotlib

Numpy
NumPy系统是Python的一种开源的数字扩展。这种工具可用来存储和处理大型矩阵,比Python自身的嵌套列表(nested list structure)结构要高效的多(该结构也可以用来表示矩阵matrix)。我用它来进行矩阵的计算,非常便捷。
matplotlib
Matplotlib 可能是 Python 2D- 绘图领域使用最广泛的套件。它能让使用者很轻松地将数据图形化,并且提供多样化的输出格式。

在Windows上安装软件包

在Windows上安装Numpy和matplotlib是一个和痛苦的过程。总体来说,安装这两个包的官方安装程序是远远不够的。总会提示你缺这个缺那个。我的解决办法接单粗暴,提示缺少什么了我就谷歌然后安装这个包。所有的包都可以在UCI的网站上找到。

最后为了安装Nump和matplotlib,我一共安装了6个包:

  • matplotlib-1.4.2.win-amd64-py2.7
  • numpy-MKL-1.9.1.win-amd64-py2.7
  • pyparsing-2.0.3.win-amd64-py2.7
  • python-dateutil-2.2.win-amd64-py2.7
  • scipy-0.15.0b1.win-amd64-py2.7
  • six-1.8.0.win-amd64-py2.7

在Ubuntu Linux上安装

在Ubuntu上安装可就简单多了,软件源里就包括这两个包,一句话搞定:

sudo apt-get install python-numpy python-matplotlib

所有依赖的包都自动安装上了,给个赞!
真心说一句,Ubuntu除了网络问题更新软件不给力之外,装软件真心是很方便、很赞的!

通用方法

其实只要用Python都可以用Python包管理工具pip,两句命令即可搞定:

pip install numpy
pip install matplotlib

而安装pip依赖于setuptools,可以点击setuptoolspip来进行下载安装。

测试代码

安装完成后,可以使用引自此页面的代码进行测试。

import numpy as np
import matplotlib.pyplot as plt

N = 5
menMeans = (20, 35, 30, 35, 27)
menStd =   (2, 3, 4, 1, 2)

ind = np.arange(N)  # the x locations for the groups
width = 0.35       # the width of the bars

fig, ax = plt.subplots()
rects1 = ax.bar(ind, menMeans, width, color='r', yerr=menStd)

womenMeans = (25, 32, 34, 20, 25)
womenStd =   (3, 5, 2, 3, 3)
rects2 = ax.bar(ind+width, womenMeans, width, color='y', yerr=womenStd)

# add some
ax.set_ylabel('Scores')
ax.set_title('Scores by group and gender')
ax.set_xticks(ind+width)
ax.set_xticklabels( ('G1', 'G2', 'G3', 'G4', 'G5') )

ax.legend( (rects1[0], rects2[0]), ('Men', 'Women') )

def autolabel(rects):
    # attach some text labels
    for rect in rects:
        height = rect.get_height()
        ax.text(rect.get_x()+rect.get_width()/2., 1.05*height, '%d'%int(height),
                ha='center', va='bottom')

autolabel(rects1)
autolabel(rects2)

plt.show()

参考文献

查阅到了几篇不错的博客和网站可以用大家参考一下: