1. 本地环境搭建和启动
首先在本地启动 SQL 客户端:
# 安装 flink 1.11.1
brew install apache-flink
# 补充 jar 包
wget -O flink-libs/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.11.0/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar
wget -O flink-libs/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
wget -O flink-libs/flink-connector-jdbc_2.11-1.11.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.1/flink-connector-jdbc_2.11-1.11.1.jar
wget -O flink-libs/mysql-connector-java-8.0.21.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar
# 本地启动集群
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/start-cluster.sh
# 启动 SQL 客户端
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/sql-client.sh embedded -l flink-libs
2. 准备本地的测试数据
然后我们在本地 MySQL 中创建测试表:
create table `flink`.`test_type` (
bigint_unsigned_id bigint unsigned,
bigint_id bigint,
int_unsigned_id int unsigned,
int_id int
);
insert into `flink`.`test_type` values
(9223372036854775807, 9223372036854775807, 2147483647, 2147483647),
(9223372036854775808, 9223372036854775807, 2147483648, 2147483647),
(18446744073709551615, 9223372036854775807, 4294967295, 2147483647);
3. 创建映射表并查询数据
接着在 Flink 中创建映射表:
CREATE TABLE `test_type` (
bigint_unsigned_id bigint,
bigint_id bigint,
int_unsigned_id int,
int_id int
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
'table-name' = 'test_type',
'username' = 'root',
'password' = 'root'
);
然后我们分别选按列查询数据:

可以看到查询的时候分别提示里两个异常:1)查询 bigint_unsigned_id
时提示 BitInteger
无法转换为 Long
;2)查询 int_unsigned_id
时提示 Long
无法转换为 Integer
。
原因在于:
MySQL 的 JDBC Driver 在获取数据的时候对于
INT UNSIGNED
类型会使用Long
类型来承接数据,对于BIGINT UNSIGNED
类型会使用BigInteger
来承接数据。
4. 变更映射表的类型定义
把 bigint_unsigned_id
定义为 decimal(38, 0)
类型,而 int_unsigned_id
定义为 bigint
类型:
CREATE TABLE `test_type` (
bigint_unsigned_id decimal(38, 0),
bigint_id bigint,
int_unsigned_id bigint,
int_id int
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
'table-name' = 'test_type',
'username' = 'root',
'password' = 'root'
);
然后我们再次查询刚才无法查询的字段:
select bigint_unsigned_id, int_unsigned_id
from test_type;
得到结果:

如上图所示,类型的问题得到了解决。
5. 使用边界条件测试类型转换
进一步延伸,如果我们可以把超过 bigint
表示范围的值进行强制类型转换会不会溢出呢?
我们不妨来测试一下:
select cast(bigint_unsigned_id as bigint) as bigint_unsigned_id,
bigint_id,
cast(int_unsigned_id as int) as int_unsigned_id,
int_id
from test_type;
得到结果:

结果显而易见,溢出是必然的。如果我们要做一种安全的类型转换,还是要借助带数值检查的 UDF 来完成才行。