Flink 类型踩坑

1. 本地环境搭建和启动

首先在本地启动 SQL 客户端:

# 安装 flink 1.11.1
brew install apache-flink

# 补充 jar 包
wget -O flink-libs/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.11.0/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar

wget -O flink-libs/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

wget -O flink-libs/flink-connector-jdbc_2.11-1.11.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.1/flink-connector-jdbc_2.11-1.11.1.jar

wget -O flink-libs/mysql-connector-java-8.0.21.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar

# 本地启动集群
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/start-cluster.sh

# 启动 SQL 客户端
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/sql-client.sh embedded -l flink-libs

2. 准备本地的测试数据

然后我们在本地 MySQL 中创建测试表:

create table `flink`.`test_type` (
  bigint_unsigned_id bigint unsigned,
  bigint_id bigint,
  int_unsigned_id int unsigned,
  int_id int
);

insert into `flink`.`test_type` values
(9223372036854775807, 9223372036854775807, 2147483647, 2147483647),
(9223372036854775808, 9223372036854775807, 2147483648, 2147483647),
(18446744073709551615, 9223372036854775807, 4294967295, 2147483647);

3. 创建映射表并查询数据

接着在 Flink 中创建映射表:

CREATE TABLE `test_type` (
  bigint_unsigned_id bigint,
  bigint_id bigint,
  int_unsigned_id int,
  int_id int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
   'table-name' = 'test_type',
   'username' = 'root',
   'password' = 'root'
);

然后我们分别选按列查询数据:

image-20200907153649784

可以看到查询的时候分别提示里两个异常:1)查询 bigint_unsigned_id 时提示 BitInteger 无法转换为 Long;2)查询 int_unsigned_id 时提示 Long 无法转换为 Integer

原因在于:

MySQL 的 JDBC Driver 在获取数据的时候对于 INT UNSIGNED 类型会使用 Long 类型来承接数据,对于 BIGINT UNSIGNED 类型会使用 BigInteger 来承接数据。

具体文档参见:《 Java, JDBC, and MySQL Types》

4. 变更映射表的类型定义

bigint_unsigned_id 定义为 decimal(38, 0) 类型,而 int_unsigned_id 定义为 bigint 类型:

CREATE TABLE `test_type` (
  bigint_unsigned_id decimal(38, 0),
  bigint_id bigint,
  int_unsigned_id bigint,
  int_id int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
   'table-name' = 'test_type',
   'username' = 'root',
   'password' = 'root'
);

然后我们再次查询刚才无法查询的字段:

select  bigint_unsigned_id, int_unsigned_id
from    test_type;

得到结果:

image-20200907163008625

如上图所示,类型的问题得到了解决。

5. 使用边界条件测试类型转换

进一步延伸,如果我们可以把超过 bigint 表示范围的值进行强制类型转换会不会溢出呢?

我们不妨来测试一下:

select  cast(bigint_unsigned_id as bigint) as bigint_unsigned_id,
        bigint_id,
        cast(int_unsigned_id as int) as int_unsigned_id,
        int_id
from    test_type;

得到结果:

image-20200907162834778

结果显而易见,溢出是必然的。如果我们要做一种安全的类型转换,还是要借助带数值检查的 UDF 来完成才行。