Flink JDBCUpsertTableSink 踩坑

1. 前言

最近尝试使用 Flink 来做多数据源之间的数据同步,在做到 Hive 到 MySQL 使用 org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink (版本 1.10.1)的时候遇到了一个问题:

明明设置了 keyFieldsisAppendOnly(为 false) ,但是在运行时却进入了 append-only 的分支。

2. 诊断过程

通过本地打断点可以看到这个过程,首先是我们代码对其进行了设定:

image-20200615144904879

然后我们继续运行,可以再一次在断点停下:

image-20200615151214033

这一次,可以清楚的发现 setKeyFields 再次被调用,调用者是 org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink 中的 96 行的代码。

点进去看看:

case upsertSink: UpsertStreamTableSink[T] =>
  // check for append only table
  val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
  upsertSink.setIsAppendOnly(isAppendOnlyTable)

关键是 UpdatingPlanChecker.isAppendOnly 方法到底干了些什么:

  def isAppendOnly(plan: RelNode): Boolean = {
    val appendOnlyValidator = new AppendOnlyValidator
    appendOnlyValidator.go(plan)

    appendOnlyValidator.isAppendOnly
  }

  private class AppendOnlyValidator extends RelVisitor {

    var isAppendOnly = true

    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
      node match {
        case s: DataStreamRel if s.producesUpdates || s.producesRetractions =>
          isAppendOnly = false
        case _ =>
          super.visit(node, ordinal, parent)
      }
    }
  }

从代码来看,要返回 false 的条件是:node 的类型为 DataStreamRel 并且 s.producesUpdates || s.producesRetractions 返回结果为 true

我们继续设置新的断点:

image-20200615154832415

从断点来看,Flink 认为 StreamExecSink 类型既不产生 update 也不产生 retraction,因此即使手动设置了属性也会被物理执行计划改写。

3. 后续

请教了一下 godfreyhe@163.com 同学,对于 JDBCStreamTableSink 有了一些新的了解,概括来说:

JDBCStreamTableSink 支持的 append-only 和 upsert 两种模式,upsert 模式是 planner 根据查询推断出来的,由框架来设置 keyFields 以及 isAppendOnly 属性。

代码中手动设置主键约束想表达的是物理存储的约束,而非查询结果的特性。这一点上 Flink 并没有很好的支持,这一讨论也在继续进行中 FLIP 87: Primary key constraints in Table API

4. 相关资料

发表评论