Flink 类型踩坑

1. 本地环境搭建和启动

首先在本地启动 SQL 客户端:

# 安装 flink 1.11.1
brew install apache-flink

# 补充 jar 包
wget -O flink-libs/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.11.0/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar

wget -O flink-libs/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

wget -O flink-libs/flink-connector-jdbc_2.11-1.11.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.1/flink-connector-jdbc_2.11-1.11.1.jar

wget -O flink-libs/mysql-connector-java-8.0.21.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar

# 本地启动集群
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/start-cluster.sh

# 启动 SQL 客户端
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/sql-client.sh embedded -l flink-libs

2. 准备本地的测试数据

然后我们在本地 MySQL 中创建测试表:

create table `flink`.`test_type` (
  bigint_unsigned_id bigint unsigned,
  bigint_id bigint,
  int_unsigned_id int unsigned,
  int_id int
);

insert into `flink`.`test_type` values
(9223372036854775807, 9223372036854775807, 2147483647, 2147483647),
(9223372036854775808, 9223372036854775807, 2147483648, 2147483647),
(18446744073709551615, 9223372036854775807, 4294967295, 2147483647);

3. 创建映射表并查询数据

接着在 Flink 中创建映射表:

CREATE TABLE `test_type` (
  bigint_unsigned_id bigint,
  bigint_id bigint,
  int_unsigned_id int,
  int_id int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
   'table-name' = 'test_type',
   'username' = 'root',
   'password' = 'root'
);

然后我们分别选按列查询数据:

image-20200907153649784

可以看到查询的时候分别提示里两个异常:1)查询 bigint_unsigned_id 时提示 BitInteger 无法转换为 Long;2)查询 int_unsigned_id 时提示 Long 无法转换为 Integer

原因在于:

MySQL 的 JDBC Driver 在获取数据的时候对于 INT UNSIGNED 类型会使用 Long 类型来承接数据,对于 BIGINT UNSIGNED 类型会使用 BigInteger 来承接数据。

具体文档参见:《 Java, JDBC, and MySQL Types》

4. 变更映射表的类型定义

bigint_unsigned_id 定义为 decimal(38, 0) 类型,而 int_unsigned_id 定义为 bigint 类型:

CREATE TABLE `test_type` (
  bigint_unsigned_id decimal(38, 0),
  bigint_id bigint,
  int_unsigned_id bigint,
  int_id int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
   'table-name' = 'test_type',
   'username' = 'root',
   'password' = 'root'
);

然后我们再次查询刚才无法查询的字段:

select  bigint_unsigned_id, int_unsigned_id
from    test_type;

得到结果:

image-20200907163008625

如上图所示,类型的问题得到了解决。

5. 使用边界条件测试类型转换

进一步延伸,如果我们可以把超过 bigint 表示范围的值进行强制类型转换会不会溢出呢?

我们不妨来测试一下:

select  cast(bigint_unsigned_id as bigint) as bigint_unsigned_id,
        bigint_id,
        cast(int_unsigned_id as int) as int_unsigned_id,
        int_id
from    test_type;

得到结果:

image-20200907162834778

结果显而易见,溢出是必然的。如果我们要做一种安全的类型转换,还是要借助带数值检查的 UDF 来完成才行。

Flink JDBCUpsertTableSink 踩坑

1. 前言

最近尝试使用 Flink 来做多数据源之间的数据同步,在做到 Hive 到 MySQL 使用 org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink (版本 1.10.1)的时候遇到了一个问题:

明明设置了 keyFieldsisAppendOnly(为 false) ,但是在运行时却进入了 append-only 的分支。

2. 诊断过程

通过本地打断点可以看到这个过程,首先是我们代码对其进行了设定:

image-20200615144904879

然后我们继续运行,可以再一次在断点停下:

image-20200615151214033

这一次,可以清楚的发现 setKeyFields 再次被调用,调用者是 org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink 中的 96 行的代码。

点进去看看:

case upsertSink: UpsertStreamTableSink[T] =>
  // check for append only table
  val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
  upsertSink.setIsAppendOnly(isAppendOnlyTable)

关键是 UpdatingPlanChecker.isAppendOnly 方法到底干了些什么:

  def isAppendOnly(plan: RelNode): Boolean = {
    val appendOnlyValidator = new AppendOnlyValidator
    appendOnlyValidator.go(plan)

    appendOnlyValidator.isAppendOnly
  }

  private class AppendOnlyValidator extends RelVisitor {

    var isAppendOnly = true

    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
      node match {
        case s: DataStreamRel if s.producesUpdates || s.producesRetractions =>
          isAppendOnly = false
        case _ =>
          super.visit(node, ordinal, parent)
      }
    }
  }

从代码来看,要返回 false 的条件是:node 的类型为 DataStreamRel 并且 s.producesUpdates || s.producesRetractions 返回结果为 true

我们继续设置新的断点:

image-20200615154832415

从断点来看,Flink 认为 StreamExecSink 类型既不产生 update 也不产生 retraction,因此即使手动设置了属性也会被物理执行计划改写。

3. 后续

请教了一下 godfreyhe@163.com 同学,对于 JDBCStreamTableSink 有了一些新的了解,概括来说:

JDBCStreamTableSink 支持的 append-only 和 upsert 两种模式,upsert 模式是 planner 根据查询推断出来的,由框架来设置 keyFields 以及 isAppendOnly 属性。

代码中手动设置主键约束想表达的是物理存储的约束,而非查询结果的特性。这一点上 Flink 并没有很好的支持,这一讨论也在继续进行中 FLIP 87: Primary key constraints in Table API

4. 相关资料