关于 Kafka 的一点思考

1. Kafka 消费者可能遇到的问题

Kafka 消息队列在消费时可能遇到两个问题:

  1. 消息重复(Producer 设置 acksall 可能会有重复数据)
  2. 消息乱序(网络延迟)

但是这两者在处理的时候却必须综合考虑,如果消息的生产者按顺序生产消息,消费者在消费的时候也会期望消息的有序性,但是如果消息发生了重复或者乱序,消费者是无法区分的,也就无法针对两种情况分别进行处理。

因此我们必须通过某些手段来区分和解决这些问题。

2. 解决思路

针对的业务场景不同,我们能够使用的手段也不尽相同,总体来说:

  1. 唯一标示
  2. 状态机机制
  3. 幂等的方法

2.1 唯一标识

生产者在生产消息的时候为每一条消息增加一个递增的、唯一的标识。消费者在消费之前首先查询数据库该消息是否已经消费,如果已经消费了则直接忽略。该手段用来处理重复的数据简单直接。

在唯一标识生成的时候还可以附加生成规则,比如 唯一标识 = 批次号 + 版本序号。对消息顺序有严格要求的场景,可以根据同一批次对消息进行缓存,等同一批次内的版本序号都齐了再进行处理。

2.2 状态机机制1

状态机机制针对于有严格状态流转的业务场景,比如工作流程的处理、资源状态的转换等等。业务流程本身可以表示成一个状态机。那么在生产数据的时候可以把目标实体的当前状态和目标状态写入消息中,消费的过程中通过消息的当前状态和实体实际的当前状态进行对比,如果一致再进行处理。

如果不一致可以通知上游重发,也可以针对业务场景设置不同策略。

2.3 幂等的方法2

幂等性:

HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用。同一个请求,发送一次和发送N次效果是一样的。

幂等性所表达的概念关注的是数学层面的运算和数值,并没有提及到数值的安全性问题。所以 RESTful 设计中将幂等性和安全性是作为两个不同的指标来衡量,如 POSTPUTGETDELETE 操作:

重要方法 安全 幂等
GET
DELETE
PUT
POST

因此,幂等性是系统的接口对外一种承诺(而不是实现),承诺只要调用接口成功,外部多次调用对系统的影响是一致的。声明为幂等的接口会认为外部调用失败是常态,并且失败之后必然会有重试。

对于消息队列的消费者也是类似的。HTTP 方法中相对于 POST 方法,PUT 方法是幂等的。从资源状态转换的角度来考虑或者从容错的角度出发,在某些应用场景下将处理流程进行改造。

既然 POST 方法意味着创建资源的实例,那么我们可以将 POST 方法尽量改造成 PUT 方法。比如:如果消费者从消息队列中读取消息来进行资源的创建,我们可以给每条消息增加唯一标识,创建资源的时候如果该唯一标识不存在则直接创建资源,否则进行全量的更新(类似 MySQLon duplicate key update 子句或者 Replace 语法)。ElasticSearch 可以通过 _index_type_id 组合确定唯一一条文档,如果在写入数据的时候三者重复了,那么ElasticSearch 会用新的文档完全覆盖老的文档。这也是通过 KafkaElasticSearch 中导入数据时避免重复数据的一个技巧。

其实不仅幂等的方法满足幂等性,唯一标识的方案、状态机机制同样满足了幂等的要求。这三种方案可以进行任意地组合以应对复杂的业务场景。

参考文章

  1. 消息队列设计精要
  2. 分布式系统接口幂等性
  3. 幂等性 个人理解及应用

Elasticsearch Query DSL

1. 序

Hive 无疑是数据相关工作最常用的的工具,利用 SQL 就能够完成许多统计和分析的任务,学习成本低,开发效率高。

但是 Hive 的缺点也很明显:运行慢!所以 Hive 一般用来做离线计算,每天由调度系统调用脚本完成一系列的计算任务,产出数据然后交给其他应用展现。但总有一些情景是 Hive 满足不了的,比如对数据的实时查询,比如对数据的快速分析。

这个时候就可以使用我们今天要介绍的工具:Elasticsearch。

2. 简介

ElasticSearch 是一种基于 Apache Lucene 的开源项目,主要面向的是数据的实时检索、分析以及全文搜索等。

Elasticsearch 的几个特性非常棒:
1. RESTful API,使用 Json 交换数据
2. 跨 index 的查询
3. 速度,尤其是大数据量下聚合函数的速度非常快!

3. Query DSL

我们可以使用两种结构化语句: 结构化查询(Query DSL)和结构化过滤(Filter DSL)。 查询与过滤语句非常相似,但是它们由于使用目的不同而稍有差异。
一条过滤语句会询问每个文档的字段值是否包含着特定值,一条查询语句会计算每个文档与查询语句的相关性,会给出一个相关性评分 _score,并且 按照相关性对匹配到的文档进行排序。

3.1 term 过滤

精确匹配数字、日期、布尔值或 not_analyzed 的字符串。相当于等号 =

{ "term": { "age":    26           }}
{ "term": { "date":   "2014-09-01" }}
{ "term": { "public": true         }}
{ "term": { "tag":    "full_text"  }}

3.2 terms 过滤

精确匹配多个值。相当于 in

{
    "terms": {
        "tag": [ "search", "full_text", "nosql" ]
    }
}

3.3 range 过滤

按照指定范围查找一批数据,相当于 between

{
    "range": {
        "age": {
            "gte":  20,
            "lt":   30
        }
    }
}
字段 含义
gt >
gte >=
lt <
lte <=

3.4 existsmissing 过滤

针对已经查出一批数据,但是想区分是否存在某个字段。类似于 IS_NULL 条件。

{
    "exists": {
        "field": "title"
    }
}

3.5 bool 过滤

bool 过滤可以用来合并多个过滤条件查询结果的布尔逻辑:

过滤子句 含义
must 多个查询条件完全匹配,相当于 and
must_not 多个查询条件的相反匹配,相当于 not
should 至少有一个查询条件匹配,相当于 or

复合实例:

{
    "bool": {
        "must": { "term": { "folder": "inbox" }},
        "must_not": { "term": { "tag": "spam" }},
        "should": [
            { "term": { "starred": true }},
            { "term": { "unread":  true }}
        ]
    }
}

3.6 match_all 查询

使用 match_all 可以查询到所有文档,事没有查询条件下的默认语句。

{
    "query": {
        "match_all": {}
    }
}

此查询常用于合并过滤条件。比如说你需要检索所有的邮箱,所有的文档相关性都是相同的,所以得到的 _score 为 1。

3.7 match 查询

某个字段中包含特定内容

{
    "query": {
        "match": {
            "字段名": "特定内容"
        }
    }
}

如果用 match 指定了一个确切值,在遇到数字,日期,布尔值或者 not_analyzed 的字符串时,它将为你搜索你给定的值:

{ "match": { "age":    26           }}
{ "match": { "date":   "2014-09-01" }}
{ "match": { "public": true         }}
{ "match": { "tag":    "full_text"  }}

3.8 multi_match 查询

multi_match 查询允许你在 match 查询的基础上同时搜索多个字段:

{
    "multi_match": {
        "query":    "full text search",
        "fields":   [ "title", "body" ]
    }
}

3.9 bool 查询

bool 查询与 bool 过滤相似,用于合并多个查询子句。不同的是,bool 过滤可以直接给出是否匹配成功,而 bool 查询要计算每一个查询子句的 _score (相关性分值)。

查询子句 含义
must 查询指定文档一定要被包含。
must_not 查询指定文档一定不要被包含。
should 查询指定文档,有则可以为文档相关性加分。

如果 bool 查询下没有 must 子句,那至少应该有一个 should 子句。但是 如果有 must 子句,那么没有 should 子句也可以进行查询。

4. 查询与过滤条件的合并

search API 中只能包含 query 语句,所以我们需要用 filtered 来同时包含 queryfilter 子句,外面再包含一层 query

{
    "query": {
        "filtered": {
            "query":  { "match": { "email": "business opportunity" }},
            "filter": { "term":  { "folder": "inbox" }}
        }
    }
}

5. 验证查询

5.1 验证

validate API 可以验证查询语句是否合法:

curl -XGET 'http://dp.diditaxi.com.cn:80/es/<INDEX>/<TYPE>/_validate/query?pretty=true' -d '{
    QUERY
}'

返回结果:

#合法
{
  "valid" : true,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  }
}

#非法
{ 
    "valid" :         false, 
    "_shards" : {    
        "total" :       1,    
        "successful" :  1,    
        "failed" :      0  
    }
 }

5.2 理解错误信息

想知道语句非法的具体错误信息,需要加上 explain 参数:

curl -XGET 'http://dp.diditaxi.com.cn:80/es/<INDEX>/<TYPE>/_validate/query?explain&pretty=true' -d '{
    QUERY
}'