1. 前言
最近尝试使用 Flink 来做多数据源之间的数据同步,在做到 Hive 到 MySQL 使用 org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink
(版本 1.10.1
)的时候遇到了一个问题:
明明设置了 keyFields
和 isAppendOnly
(为 false
) ,但是在运行时却进入了 append-only
的分支。
2. 诊断过程
通过本地打断点可以看到这个过程,首先是我们代码对其进行了设定:

然后我们继续运行,可以再一次在断点停下:

这一次,可以清楚的发现 setKeyFields
再次被调用,调用者是 org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink
中的 96 行的代码。
点进去看看:
case upsertSink: UpsertStreamTableSink[T] =>
// check for append only table
val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
upsertSink.setIsAppendOnly(isAppendOnlyTable)
关键是 UpdatingPlanChecker.isAppendOnly
方法到底干了些什么:
def isAppendOnly(plan: RelNode): Boolean = {
val appendOnlyValidator = new AppendOnlyValidator
appendOnlyValidator.go(plan)
appendOnlyValidator.isAppendOnly
}
private class AppendOnlyValidator extends RelVisitor {
var isAppendOnly = true
override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
node match {
case s: DataStreamRel if s.producesUpdates || s.producesRetractions =>
isAppendOnly = false
case _ =>
super.visit(node, ordinal, parent)
}
}
}
从代码来看,要返回 false
的条件是:node 的类型为 DataStreamRel
并且 s.producesUpdates || s.producesRetractions
返回结果为 true
。
我们继续设置新的断点:

从断点来看,Flink 认为 StreamExecSink
类型既不产生 update
也不产生 retraction
,因此即使手动设置了属性也会被物理执行计划改写。
3. 后续
请教了一下 godfreyhe@163.com 同学,对于 JDBCStreamTableSink
有了一些新的了解,概括来说:
JDBCStreamTableSink
支持的 append-only 和 upsert 两种模式,upsert 模式是 planner 根据查询推断出来的,由框架来设置 keyFields
以及 isAppendOnly
属性。
代码中手动设置主键约束想表达的是物理存储的约束,而非查询结果的特性。这一点上 Flink 并没有很好的支持,这一讨论也在继续进行中 FLIP 87: Primary key constraints in Table API。