Flink 类型踩坑

1. 本地环境搭建和启动

首先在本地启动 SQL 客户端:

# 安装 flink 1.11.1
brew install apache-flink

# 补充 jar 包
wget -O flink-libs/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.11.0/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar

wget -O flink-libs/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

wget -O flink-libs/flink-connector-jdbc_2.11-1.11.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.1/flink-connector-jdbc_2.11-1.11.1.jar

wget -O flink-libs/mysql-connector-java-8.0.21.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar

# 本地启动集群
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/start-cluster.sh

# 启动 SQL 客户端
/usr/local/Cellar/apache-flink/1.11.1/libexec/bin/sql-client.sh embedded -l flink-libs

2. 准备本地的测试数据

然后我们在本地 MySQL 中创建测试表:

create table `flink`.`test_type` (
  bigint_unsigned_id bigint unsigned,
  bigint_id bigint,
  int_unsigned_id int unsigned,
  int_id int
);

insert into `flink`.`test_type` values
(9223372036854775807, 9223372036854775807, 2147483647, 2147483647),
(9223372036854775808, 9223372036854775807, 2147483648, 2147483647),
(18446744073709551615, 9223372036854775807, 4294967295, 2147483647);

3. 创建映射表并查询数据

接着在 Flink 中创建映射表:

CREATE TABLE `test_type` (
  bigint_unsigned_id bigint,
  bigint_id bigint,
  int_unsigned_id int,
  int_id int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
   'table-name' = 'test_type',
   'username' = 'root',
   'password' = 'root'
);

然后我们分别选按列查询数据:

image-20200907153649784

可以看到查询的时候分别提示里两个异常:1)查询 bigint_unsigned_id 时提示 BitInteger 无法转换为 Long;2)查询 int_unsigned_id 时提示 Long 无法转换为 Integer

原因在于:

MySQL 的 JDBC Driver 在获取数据的时候对于 INT UNSIGNED 类型会使用 Long 类型来承接数据,对于 BIGINT UNSIGNED 类型会使用 BigInteger 来承接数据。

具体文档参见:《 Java, JDBC, and MySQL Types》

4. 变更映射表的类型定义

bigint_unsigned_id 定义为 decimal(38, 0) 类型,而 int_unsigned_id 定义为 bigint 类型:

CREATE TABLE `test_type` (
  bigint_unsigned_id decimal(38, 0),
  bigint_id bigint,
  int_unsigned_id bigint,
  int_id int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
   'table-name' = 'test_type',
   'username' = 'root',
   'password' = 'root'
);

然后我们再次查询刚才无法查询的字段:

select  bigint_unsigned_id, int_unsigned_id
from    test_type;

得到结果:

image-20200907163008625

如上图所示,类型的问题得到了解决。

5. 使用边界条件测试类型转换

进一步延伸,如果我们可以把超过 bigint 表示范围的值进行强制类型转换会不会溢出呢?

我们不妨来测试一下:

select  cast(bigint_unsigned_id as bigint) as bigint_unsigned_id,
        bigint_id,
        cast(int_unsigned_id as int) as int_unsigned_id,
        int_id
from    test_type;

得到结果:

image-20200907162834778

结果显而易见,溢出是必然的。如果我们要做一种安全的类型转换,还是要借助带数值检查的 UDF 来完成才行。

一次 TiDB 迁移引发的“血案”

1. 背景

我需要一个能力更强的关系型数据库!

本人之前设计和实现了公司目前使用的离线作业调度系统,选用了 MySQL 作为主要存储。这个系统虽然不算什么黑科技产品,但是也承担了每天 16000~20000 个作业实例(instance)的创建和运行。

显然的,这张记录着运行实例(instance)的数据库表的空间复杂度为 O(m * t)(其中 m 代表每天运行作业实例的个数,而 t 则表示系统运行的天数)。可以想见,随着时间的推移,这张表的数据量基本是线性的增长。事实是,两年多的时间,系统运行的实例数已破千万 。

带来的副作用自然是查询性能的降低、用户响应时长的增长。而这个性能影响主要体现在列表页相关的查询上。考虑下面这个查询场景:通过业务线、实例名匹配、实例类型、实例的状态、负责人信息过滤实例,并且按照创建时间的倒序进行分页返回。

这个查询非常耗费性能,主要体现在如下几个方面:

  1. 业务线是任务(job)的属性,因此通过业务线进行过滤时需要进行多表的 join 操作。
  2. 实例名和负责人的模糊匹配:索引失效,没得洗。
  3. 大量查询结果的全局排序并分页。

在单机 MySQL 的支持下,我的经常性的运维操作就是把三个月之前运行的实例进行删除,确保 instance 表的行数维持在百万行的规模。

由于这个系统的开发和运维基本就是我的个人 Solo,所以当公司的 TiDB 向我递出橄榄枝时,我立刻回以拥抱。

TiDB 带给我欣喜的同时,也引发了新的问题。

2. 用户反馈出问题了

为什么我之前能正常运行的任务现在报错了?

迁移的过程此处按下不表,完全委托给 DBA。但是在接下来的两三天内我陆续接到了有同事的反馈:在没有变更表 schema 和计算逻辑的情况下,相邻两天的任务一个成功,一个失败。

依照用户的描述,我查了一下运行日志,HiveServer 抛出的异常表明计算脚本最后结果的 Schema 与 Hive 表不一致。因此我初步的判断还是计算脚本的问题。

我首先要排查成功和失败的实例是否使用了相同的计算脚本。我从两个实例的启动命令入手,发现两者所使用的文件果然不一样,失败的实例使用的文件 id 为 219021,而成功实例使用的文件 id 为 308972。

从数据库里查处这两个文件的具体信息,如下图所示:

截屏2020-08-28 上午12.19.35

上图出现了一个没有预料到的情况:主键大的那一条记录(id 308972)的创建时间竟然早于于主键小的那一条记录(id 219021),并且早了三天之久!

两条记录有点背离经验啊!

3. 两条不合常理的数据

为什么自增主键大的反而创建时间早呢?而且主键值的 gap 这么大!

从单机 MySQL 的最佳实践来看:

  1. MySQL 主键应该使用一个业务无关的自增的无符号整型来存储比较妥当。
  2. 创建时间和更新时间分别通过默认值 CURRENT_TIMESTAMPON UPDATE CURRENT_TIMESTAMP 来确保时间戳的自动更新。

基于上述理由,我们下意识都认为创建时间的递增顺序和主键的递增顺序是一致的,说人话就是:创建时间早的记录一般自增主键比较小。

当然也有反例,假如两条 SQL 分别通过两个事务并发插入批量数据,MySQL 会为两条 SQL 各自预留一个段主键值。如果主键值段比较大的 SQL 率先写入完成,那么就会造成类似的效果:1)先写入的数据主键值反而大;2)主键值之间留出了比较大的 gap。但是即使是这种场景下,写入数据的创建时间差也不会有 3 天这么大。

开发者的直觉告诉我,这个情况一定与 TiDB 迁移有关。向 DBA 咨询过后,我的猜想果然得到了验证。

TiDB 的自增列仅保证唯一,也能保证在单个 TiDB server 中自增,但不保证多个 TiDB server 中自增,不保证自动分配的值的连续性。

—— 《自增 ID》

TiDB 实现 AUTO_INCREMENT 隐式分配的原理是,对于每一个自增列,都使用一个全局可见的键值对用于记录当前已分配的最大 ID。由于分布式环境下的节点通信存在一定开销,为了避免写请求放大的问题,每个 TiDB 节点在分配 ID 时,都申请一段 ID 作为缓存,用完之后再去取下一段,而不是每次分配都向存储节点申请。

—— 《实现原理》

4. 是改 SQL 呢?还是清理数据呢?

光明白没用,问题还等着解决呢……

明白了技术层面出问题的原因,下面聊聊出问题的这个场景以及解决方案。

自然,从实体关系模型说起。数据库里记录了每个任务多版本所使用的构件(artifact)与文件(custom_file)之间的映射关系,其表结构如下所示:

CREATE TABLE `artifact` (
  `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `name` varchar(100) NOT NULL DEFAULT '' COMMENT '构件名称',
  `desc` varchar(2048) NOT NULL DEFAULT '' COMMENT '说明',
  `job_id` bigint(11) unsigned NOT NULL DEFAULT 0 COMMENT '任务',
  `owner` varchar(100) NOT NULL COMMENT 'owner',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin

CREATE TABLE `artifact_file` (
  `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `artifact_id` bigint(11) unsigned NOT NULL DEFAULT 0 COMMENT '构件id',
  `file_id` bigint(11) unsigned NOT NULL DEFAULT 0 COMMENT '文件id',
  ......
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

CREATE TABLE `custom_file` (
  `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `name` varchar(100) NOT NULL DEFAULT '' COMMENT '文件名',
  ......
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

在服务启动的时候,首先要从数据库中把每个任务最新的构件包含的文件列表读取出来进行初始化。获取每个任务最新构件的 SQL 写作:

SELECT    MAX(id) as id
FROM      artifact
GROUP BY  job_id;

这是一个分组取最大值的操作,简单明了。取出最新的 artifact.id (亦即 Max(id))之后进行 join 操作即可。

那么既然 TiDB 之上无法保证主键全局的自增,最新 这一语义便只能通过创建时间来表现了:

SELECT    job_id, MAX(create_time) as create_time
FROM      artifact
GROUP BY  job_id;

选取每个任务最新的时间戳,通过 job_idcreate_time 两个条件来 join 来筛选数据。看起来可行,但现实给予了无情的打脸。通过这个方式,每个任务返回了多条记录!

问题出在哪里呢?

  1. MAX(id) 同时满足了 最新 以及 selectOne 两大语义。
  2. job_idcreate_time 仅能满足 最新 这一语义,却无法保证 selectOne 的结果!换句话说,在时间戳所能描述的时间精度内,写入了两条甚至以上的数据(比如用户的多次点击导致数据重复写入)。

最直接的解决方案是:我通过 job_idcreate_time 两个列建立联合唯一索引,代价是我首先要清理已有的冗余数据,否则索引建不起来。

It will work, but it’s ugly.

另一个解决方案是:在 job_idcreate_time 查询结果的基础上再加一个 selectOne 的上层查询。类似:

SELECT  MAX(b.id)
FROM
(
    SELECT    job_id, MAX(create_time) as create_time
    FROM      artifact
    GROUP BY  job_id;
) a
INNER JOIN artifact b
ON a.job_id = b.job_id AND a.create_time = b.create_time
GROUP BY  a.job_id

应该可以运行,但是没有自然的解决方法了吗?

答案是有的!

在数据仓库的场景下,分组排序取第一这个语义一般可以通过窗口函数 ROW_NUMBER() OVER() 来实现。

庆幸的是,TiDB 也提供了窗口函数的支持!因此 SQL 可以写作:

SELECT id
FROM
(
  SELECT  id,
          ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY create_time DESC) as rn
  FROM    artifact
) t
WHERE rn = 1

看着似乎没那么违和。

提交代码、测试、上线!

5. 复盘与思考

延伸思考一下……

问题虽然解决了,但是复盘整个过程也给我带来了两点启发。

首先,技术还是没有银弹,工程师还是要保持对技术的敬畏之心。

技术方案的调研和选择要慎重。如果当初迁移之前起码读过文档,应当不至于犯这种低级错误。经验在帮助我提高效率的同时,也给我制造了盲点和轻慢之心。

其次,像我这种规模的内部系统用起分布式数据库也是大姑娘坐花轿——头一遭。

分布式数据库的最佳实践在社区、在互联网上谈论的都还不算多。

SQL 查询的兼容性?分布式数据库如何设计索引?分布式查询计划如何做查询优化……

这一系列命题也有待更多的学习和研究。

有时候我也设想,如果社区也能够推出一个 SQL Advisor ,可以针对业务已有 MySQL 的查询进行分析、诊断和建议,那么能造福的应该不止我一个人。

举个例子:

对于我上文的 MAX(id) 的 SQL,通过语法分析完全可以发现出这个查询对自增主键的全局自增的依赖。从而发出警告,提醒我在迁移之前提前做好 SQL 的改造。

一般来说,数据库的迁移会由技术团队主动发起。怎么降低迁移的成本,减少业务团队的投入是一件非常有价值的事情。

如果我们在向分布式数据库迁移之前先对用户所有的查询进行解析,就可以对 SQL 的兼容性、改造方向以及优化方案提出定向、定量的精准建议。也许能够更好地促进 TiDB 的落地和发展吧。

以上~

Spring多数据源使用

1. 前言

平时开发的时候偶尔会遇到多数据库读写的情况(非分库分表),本文会给出一个简单的配置和使用两个数据库的示例。

2. 依赖与属性配置

首先给出 pom.xml 中引入的依赖:

<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>LATEST</version>
</dependency>

然后我们手动创建两个数据库。在尝试使用多数据源的时候发现的一个问题是 spring.datasource.schema 不生效了,意味着不能使用 Spring 启动时自动创建库表的特性,所以只能手动创建数据库:

mysql root@localhost:(none)> DROP DATABASE IF EXISTS `prime`;
                          -> CREATE DATABASE `prime`;
                          -> use prime;
                          ->
                          -> DROP TABLE IF EXISTS person;
                          -> CREATE TABLE `person` (
                          ->   `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
                          ->   `name` varchar(50) NOT NULL DEFAULT '' COMMENT '
                          -> 名字',
                          ->   `age` int(11) NOT NULL COMMENT '年龄',
                          ->   `gender` int(11) NOT NULL COMMENT '性别',
                          ->   PRIMARY KEY (`id`)
                          -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
                          ->
You're about to run a destructive command.
Do you want to proceed? (y/n): y
Your call!
Query OK, 0 rows affected
Time: 0.006s

Query OK, 1 row affected
Time: 0.012s

You are now connected to database "prime" as user "root"
Time: 0.002s

Query OK, 0 rows affected
Time: 0.007s

Query OK, 0 rows affected
Time: 0.021s
mysql root@localhost:(none)> DROP DATABASE IF EXISTS `secondary`;
                          -> CREATE DATABASE `secondary`;
                          -> use secondary;
                          ->
                          -> DROP TABLE IF EXISTS person;
                          -> CREATE TABLE `person` (
                          ->   `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
                          ->   `name` varchar(50) NOT NULL DEFAULT '' COMMENT '
                          -> 名字',
                          ->   `age` int(11) NOT NULL COMMENT '年龄',
                          ->   `gender` int(11) NOT NULL COMMENT '性别',
                          ->   PRIMARY KEY (`id`)
                          -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
You're about to run a destructive command.
Do you want to proceed? (y/n): y
Your call!
Query OK, 0 rows affected
Time: 0.004s

Query OK, 1 row affected
Time: 0.001s

You are now connected to database "secondary" as user "root"
Time: 0.002s

Query OK, 0 rows affected
Time: 0.001s

Query OK, 0 rows affected
Time: 0.028s

下面是 application.properties,注明了两个数据库的基本信息:

spring.datasource.primary.url=jdbc:mysql://127.0.0.1:3306/prime?charset=utf8
spring.datasource.primary.username=root
spring.datasource.primary.password=root
spring.datasource.primary.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.secondary.url=jdbc:mysql://127.0.0.1:3306/secondary?charset=utf8
spring.datasource.secondary.username=root
spring.datasource.secondary.password=root
spring.datasource.secondary.driver-class-name=com.mysql.jdbc.Driver

3. 配置数据源

针对两个数据库我们要分别创建对应的 DataSourcePlatformTransactionManagerSqlSessionFactory

第一个 DataSource 我们使用 Spring 中的 DataSourceBuilder 来构建,并用 @Primary 注解来标记:

package com.avaloninc.springmultidatasourceexample.conf;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

@Configuration
@EnableTransactionManagement
@MapperScan(basePackages = "com.avaloninc.springmultidatasourceexample.mapper.prime", sqlSessionFactoryRef = "primeSsf")
public class DataSourceConfigOne {

  @Bean(name = "primaryDs")
  @ConfigurationProperties(prefix = "spring.datasource.primary")
  @Primary
  public DataSource primaryDs() {
    return DataSourceBuilder.create().build();
  }


  @Bean(name = "primaryTxm")
  @Primary
  public PlatformTransactionManager primaryTxm(@Qualifier("primaryDs") DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
  }

  @Bean(name = "primeSsf")
  @Primary
  public SqlSessionFactory primeSsf(@Qualifier("primaryDs") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSource);
    return sqlSessionFactoryBean.getObject();
  }
}

第二个数据源我们使用 druid 作为连接池,除了基本参数外其他都采用默认配置:

package com.avaloninc.springmultidatasourceexample.conf;

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

@Configuration
@EnableTransactionManagement
@MapperScan(basePackages = "com.avaloninc.springmultidatasourceexample.mapper.secondary", sqlSessionFactoryRef = "secondarySsf")
public class DataSourceConfigTwo {

  @Bean(name = "secondaryDs")
  @ConfigurationProperties(prefix = "spring.datasource.secondary")
  public DataSource secondaryDs() {
    return DruidDataSourceBuilder.create().build();
  }

  @Bean(name = "secondaryTxm")
  public PlatformTransactionManager secondaryTxm(@Qualifier("secondaryDs") DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
  }


  @Bean(name = "secondarySsf")
  public SqlSessionFactory secondarySsf(@Qualifier("secondaryDs") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSource);
    return sqlSessionFactoryBean.getObject();
  }
}

注意,在配置两个不同的数据源时都各自加了 @MapperSacn 注解,并各自给定了 basePackagessqlSessionFactoryRef。不同数据源的 Mapper 在不同的 package 里面定义,并使用不同的 SqlSessionFactory 来创建。

下面是 Mapper 接口的定义:

package com.avaloninc.springmultidatasourceexample.mapper.prime;

import com.avaloninc.springmultidatasourceexample.domain.Person;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;

@Mapper
public interface PersonMapper {
  /**
   * Insert int.
   *
   * @param person the person
   * @return the int
   */
  @Insert("insert into person (name, age, gender) values (#{p.name}, #{p.age}, #{p.gender, typeHandler=org.apache.ibatis.type.EnumOrdinalTypeHandler, javaType=com.avaloninc.springmultidatasourceexample.domain.Person$Gender})")
  @Options(useGeneratedKeys = true, keyProperty = "p.id")
  int insert(@Param("p") Person person);

  /**
   * Gets person by id.
   *
   * @param id the id
   * @return the person by id
   */
  @Select("select id, name, age, gender from person where id = #{id}")
  @Results(id = "person", value = {
      @Result(column = "gender", property = "gender", typeHandler = EnumOrdinalTypeHandler.class)
  })
  Person getPersonById(@Param("id") int id);
}

以及:

package com.avaloninc.springmultidatasourceexample.mapper.secondary;

import com.avaloninc.springmultidatasourceexample.domain.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;

@Mapper
public interface UserMapper {
  /**
   * Insert int.
   *
   * @param user the user
   * @return the int
   */
  @Insert("insert into person (name, age, gender) values (#{p.name}, #{p.age}, #{p.gender, typeHandler=org.apache.ibatis.type.EnumOrdinalTypeHandler, javaType=com.avaloninc.springmultidatasourceexample.domain.User$Gender})")
  @Options(useGeneratedKeys = true, keyProperty = "p.id")
  int insert(@Param("p") User user);

  /**
   * Gets person by id.
   *
   * @param id the id
   * @return the person by id
   */
  @Select("select id, name, age, gender from person where id = #{id}")
  @Results(id = "person", value = {
      @Result(column = "gender", property = "gender", typeHandler = EnumOrdinalTypeHandler.class)
  })
  User getUserById(@Param("id") int id);
}

4. 简单的 Service 调用

下面是针对这两个 Mapper 的写的简单的读写接口:

package com.avaloninc.springmultidatasourceexample.service;


import com.avaloninc.springmultidatasourceexample.domain.Person;

public interface PersonService {
  /**
   * Insert int.
   *
   * @param person the person
   * @return the int
   */
  int insert(Person person);

  /**
   * Gets person by id.
   *
   * @param id the id
   * @return the person by id
   */
  Person getPersonById(int id);
}
package com.avaloninc.springmultidatasourceexample.service;

import com.avaloninc.springmultidatasourceexample.domain.Person;
import com.avaloninc.springmultidatasourceexample.mapper.prime.PersonMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PersonServiceImpl implements PersonService {

  @Autowired
  private PersonMapper personMapper;

  @Override
  public int insert(Person person) {
    return this.personMapper.insert(person);
  }

  @Override
  public Person getPersonById(int id) {
    return this.personMapper.getPersonById(id);
  }
}

以及:

package com.avaloninc.springmultidatasourceexample.service;

import com.avaloninc.springmultidatasourceexample.domain.User;

public interface UserService {
  /**
   * Insert int.
   *
   * @param user the user
   * @return the int
   */
  int insert(User user);

  /**
   * Gets user by id.
   *
   * @param id the id
   * @return the user by id
   */
  User getUserById(int id);
}
package com.avaloninc.springmultidatasourceexample.service;

import com.avaloninc.springmultidatasourceexample.domain.User;
import com.avaloninc.springmultidatasourceexample.mapper.secondary.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class UserServiceImpl implements UserService {

  @Autowired
  private UserMapper userMapper;

  @Override
  public int insert(User user) {
    return this.userMapper.insert(user);
  }

  @Override
  public User getUserById(int id) {
    return this.userMapper.getUserById(id);
  }
}

5. 单元测试

单元测试一发入魂:

package com.avaloninc.springmultidatasourceexample;

import static org.junit.Assert.*;

import com.avaloninc.springmultidatasourceexample.domain.Person;
import com.avaloninc.springmultidatasourceexample.domain.User;
import com.avaloninc.springmultidatasourceexample.domain.User.Gender;
import com.avaloninc.springmultidatasourceexample.service.PersonService;
import com.avaloninc.springmultidatasourceexample.service.UserService;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MainTest extends TestCase {

  @Autowired
  private UserService userService;
  @Autowired
  private PersonService personService;

  @Test
  public void test() {
    User user = new User();
    user.setAge(28);
    user.setGender(Gender.MALE);
    user.setName("John");

    int count = this.userService.insert(user);
    assertEquals(1, count);

    User userById = this.userService.getUserById(user.getId());
    assertEquals(user, userById);

    Person person = new Person();
    person.setAge(27);
    person.setName("Doe");
    person.setGender(Person.Gender.MALE);

    int insertCount = this.personService.insert(person);
    assertEquals(1, insertCount);

    Person personById = this.personService.getPersonById(person.getId());
    assertEquals(person, personById);
  }
}

查看数据库确认一下:

mysql root@localhost:(none)> select * from prime.person;
+----+------+-----+--------+
| id | name | age | gender |
+----+------+-----+--------+
| 1  | Doe  | 27  | 0      |
+----+------+-----+--------+
1 row in set
Time: 0.006s
mysql root@localhost:(none)> select * from secondary.person;
+----+------+-----+--------+
| id | name | age | gender |
+----+------+-----+--------+
| 1  | John | 28  | 0      |
+----+------+-----+--------+
1 row in set
Time: 0.006s
mysql root@localhost:(none)>

以上!

Spring 与 JdbcTemplate

1. 前言

Spring 除了 Mybatis 外同样支持 JPA 和 JdbcTemplate 等的数据映射框架。这里简单给一个关于 JdbcTemplate 的示例。

JdbcTemplate 的功能当然不如 MyBatis 来的强大,但是如果偏爱 Java 代码手动做对象映射的可以试一试,下面不多废话,直接上代码。

2. 实例配置

首先给出需要引入的依赖:

  <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>

然后自定义数据源 dataSourcejdbcTemplate

  package com.avaloninc.springjdbctemplateexample.conf;
  
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  import org.springframework.context.annotation.PropertySource;
  import org.springframework.core.env.Environment;
  import org.springframework.jdbc.core.JdbcTemplate;
  import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  import org.springframework.jdbc.datasource.DriverManagerDataSource;
  import org.springframework.transaction.PlatformTransactionManager;
  import org.springframework.transaction.annotation.EnableTransactionManagement;
  
  import javax.sql.DataSource;
  
  @Configuration
  @EnableTransactionManagement
  @PropertySource(value = "classpath:application.properties")
  public class AppConfig {
  
    @Autowired
    private Environment env;
  
    @Bean(name = "dataSource")
    public DataSource dataSource() {
      DriverManagerDataSource dataSource = new DriverManagerDataSource();
      String                  url        = env.getProperty("jdbc.url");
      String                  userName   = env.getProperty("jdbc.username");
      String                  password   = env.getProperty("jdbc.password");
      dataSource.setUrl(url);
      dataSource.setUsername(userName);
      dataSource.setPassword(password);
      return dataSource;
    }
  
    @Bean
    public PlatformTransactionManager dataSourceTransactionManager() {
      return new DataSourceTransactionManager(dataSource());
    }
  
    @Bean
    public JdbcTemplate jdbcTemplate() {
      JdbcTemplate jdbcTemplate = new JdbcTemplate();
      jdbcTemplate.setDataSource(dataSource());
      return jdbcTemplate;
    }
  }

这段代码中,我们通过注入 Environment 对象来实现从配置文件中获取数据库的连接信息。

3. 实例代码

先给出模型类:

  package com.avaloninc.springjdbctemplateexample.domain;
  
  import lombok.Data;
  
  @Data
  public class Person {
    private int    id;
    private String name;
    private int    age;
    private Gender gender;
  
    public enum Gender {
      MALE,
      FEMALE;
    }
  }
  

然后给定两个简单的读写接口的实现:

  package com.avaloninc.springjdbctemplateexample.service;
  
  import com.avaloninc.springjdbctemplateexample.domain.Person;
  
  public interface PersonService {
    /**
     * Insert int.
     *
     * @param person the person
     * @return the int
     */
    int insert(Person person);
  
    /**
     * Gets person by id.
     *
     * @param id the id
     * @return the person by id
     */
    Person getPersonById(int id);
  }
  
  package com.avaloninc.springjdbctemplateexample.service;
  
  import com.avaloninc.springjdbctemplateexample.domain.Person;
  import com.avaloninc.springjdbctemplateexample.domain.Person.Gender;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.jdbc.core.JdbcTemplate;
  import org.springframework.jdbc.core.RowMapper;
  import org.springframework.jdbc.support.GeneratedKeyHolder;
  import org.springframework.stereotype.Service;
  
  import java.sql.PreparedStatement;
  import java.sql.ResultSet;
  import java.sql.SQLException;
  import java.sql.Statement;
  
  @Service
  public class PersonServiceImpl implements PersonService {
  
    @Autowired
    private JdbcTemplate jdbcTemplate;
  
    @Override
    public int insert(Person person) {
      GeneratedKeyHolder holder = new GeneratedKeyHolder();
      int rowCount = this.jdbcTemplate.update(
          connection -> {
            PreparedStatement statement = connection.prepareStatement(
                "insert into person (name, age, gender) values (?, ?,?)",
                Statement.RETURN_GENERATED_KEYS);
            statement.setString(1, person.getName());
            statement.setInt(2, person.getAge());
            statement.setInt(3, person.getGender().ordinal());
            return statement;
          },
          holder);
      person.setId(holder.getKey().intValue());
      return rowCount;
    }
  
    @Override
    public Person getPersonById(int id) {
      return this.jdbcTemplate.queryForObject("select id, name, age, gender from person where id = ?",
                                              new Object[] {id}, new RowMapper<Person>() {
            @Override
            public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
              Person person = new Person();
              person.setId(rs.getInt("id"));
              person.setName(rs.getString("name"));
              person.setAge(rs.getInt("age"));
              person.setGender(Gender.values()[rs.getInt("gender")]);
              return person;
            }
          });
    }
  }
  

完全手动实现了对象的映射,对于复杂类型的数据反序列化,不必借助 MyBatis 的 TypeHandler,直接代码实现即可。

4. 单元测试

单元测试走一发,确认有效:

  package com.avaloninc.springjdbctemplateexample.service;
  
  import com.avaloninc.springjdbctemplateexample.domain.Person;
  import com.avaloninc.springjdbctemplateexample.domain.Person.Gender;
  import junit.framework.TestCase;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;
  
  @SpringBootTest
  @RunWith(SpringRunner.class)
  public class PersonServiceImplTest extends TestCase {
  
    @Autowired
    private PersonService personService;
  
    @Test
    public void test() {
      Person person = new Person();
      person.setName("John");
      person.setAge(28);
      person.setGender(Gender.MALE);
  
      int count = personService.insert(person);
      assertTrue(count > 0);
      assertTrue(person.getId() > 0);
      Person personById = personService.getPersonById(person.getId());
      assertNotNull(personById);
      assertEquals(person, personById);
    }
  }

Insert Or Update 后续

1. 问题描述

使用 insert … on duplicate key update 语法实现 insertOrUpdate 之后出现了几个新问题,首先给出测试代码:

DROP database if EXISTS test;
CREATE database test;

DROP TABLE if EXISTS test.person;
CREATE table test.person (
    id int not NULL PRIMARY KEY auto_increment,
    name VARCHAR(100) not NULL DEFAULT '' UNIQUE COMMENT '名字',
    age int not NULL DEFAULT 0 COMMENT '年龄',
    gender VARCHAR(20) NOT NULL DEFAULT '' COMMENT '性别',
    addresses text NOT NULL COMMENT '地址'
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='person';

MyBatis 语句:

<insert id="insertOrUpdate" useGeneratedKeys="true" keyProperty="id">
    insert into test.person
    (id, name, age, gender, addresses)
    VALUES
    <foreach collection="list" item="person" separator=",">
        (id, #{person.name}, #{person.age}, #{person.gender},
        #{person.addresses, typeHandler=com.note4code.test.persistence.typeHandler.GenericMapHandler})
    </foreach>
    on duplicate key update
    age = VALUES(age),
    gender = VALUES(gender),
    addresses = VALUES(addresses)
</insert>

Java 代码:

package com.note4code.test.service;

import com.note4code.test.domain.Address;
import com.note4code.test.domain.Gender;
import com.note4code.test.domain.Person;
import com.note4code.test.domain.Province;
import junit.framework.TestCase;
import org.assertj.core.util.Lists;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PersonServiceTest extends TestCase{

  @Autowired
  private PersonService personService;

  private Person me;
  private Person you;
  private Person him;

  @Before
  public void initData() {
    Address address = new Address(Province.BEIJING, "北京", "学院路");
    Map<Province, Address> map = Maps.newHashMap(Province.BEIJING, address);

    this.me = new Person();
    this.me.setName("me");
    this.me.setAge(27);
    this.me.setGender(Gender.MALE);
    this.me.setAddresses(map);

    this.you = new Person();
    this.you.setName("you");
    this.you.setAge(25);
    this.you.setGender(Gender.FEMALE);
    this.you.setAddresses(map);

    this.him = new Person();
    this.him.setName("him");
    this.him.setAge(25);
    this.him.setGender(Gender.MALE);
    this.him.setAddresses(map);
  }

  @Test
  public void testForOnDuplicateKey() {
    personService.addPerson(me);
    int id = me.getId();

    me.setAge(28);
    List<Person> people = Lists.newArrayList(me, you, him);
    personService.addOrUpdate(people);
    assertTrue(id != me.getId());
  }
}

运行测试用例,得到的输出结果是:

people = [Person{id=2, name=’me’, age=28, gender=MALE, addresses={BEIJING=Address{province=BEIJING, city=’北京’, street=’学院路’}}}
, Person{id=0, name=’you’, age=25, gender=FEMALE, addresses={BEIJING=Address{province=BEIJING, city=’北京’, street=’学院路’}}}
, Person{id=0, name=’him’, age=25, gender=MALE, addresses={BEIJING=Address{province=BEIJING, city=’北京’, street=’学院路’}}}
]

另外,查询数据库可以得到:

mysql root@localhost:test> SELECT * from person;
+------+--------+-------+----------+--------------------------------------------------------------------+
|   id | name   |   age | gender   | addresses                                                          |
|------+--------+-------+----------+--------------------------------------------------------------------|
|    1 | me     |    28 | MALE     | {"BEIJING":{"province":"BEIJING","city":"北京","street":"学院路"}} |
|    2 | you    |    25 | FEMALE   | {"BEIJING":{"province":"BEIJING","city":"北京","street":"学院路"}} |
|    3 | him    |    25 | MALE     | {"BEIJING":{"province":"BEIJING","city":"北京","street":"学院路"}} |
+------+--------+-------+----------+--------------------------------------------------------------------+
3 rows in set
Time: 0.002s
mysql root@localhost:test> SELECT LAST_INSERT_ID();
+--------------------+
|   LAST_INSERT_ID() |
|--------------------|
|                 17 |
+--------------------+
1 row in set
Time: 0.001s

从上面的示例可以看出 3 个问题:

  1. 即使使用了 userGeneratedKeys = true 并指定了 keyProperty,只回写了第一行的主键。
  2. 回写的主键与数据库不一致。
  3. LAST_INSERT_ID() 的值发生了跳跃,按理来说应该是 3,但是变成了 17。

2. 疑问

看到这里其实很让人费解:

  1. 为什么只返回了一个主键?
  2. useGeneratedKeys 返回的主键不对那么到底是什么?
  3. 为什么 LAST_INSERT_ID() 发生了跳变?

首先从 userGeneratedKeys 说起:

useGeneratedKeys(仅对 insert 和 update 有用)这会令 MyBatis 使用 JDBC 的 getGeneratedKeys 方法来取出由数据库内部生成的主键(比如:像 MySQL 和 SQL Server 这样的关系数据库管理系统的自动递增字段),默认值:false。

引自 insert, update 和 delete

With older JDBC drivers for MySQL, you could always use a MySQL-specific method on theStatement interface, or issue the query SELECT LAST_INSERT_ID() after issuing an INSERT to a table that had an AUTO_INCREMENT key.

First, we demonstrate the use of the new JDBC 3.0 method getGeneratedKeys() which is now the preferred method to use if you need to retrieve AUTO_INCREMENT keys and have access to JDBC 3.0. The second example shows how you can retrieve the same value using a standard SELECT LAST_INSERT_ID() query. 

引自 Retrieving AUTO_INCREMENT Column Values through JDBC

也就是说 Mybatis 通过 useGeneratedKeys 返回的是 LAST_INSERT_ID()

接着说,那么为什么只回写了一个主键,并且还是错的呢?

If you insert multiple rows using a single INSERT statement, LAST_INSERT_ID() returns the value generated for the first inserted row only.

引自 LAST_INSERT_ID()LAST_INSERT_ID(expr)

按照上文的说法,批量插入只会返回插入的第一条数据的主键。第一次插入 me 这个对象之后 LAST_INSERT_ID() 返回 1。接着在插入 people 时首先是更新了 me 这行记录,而 LAST_INSERT_ID() 没有变。直到插入第二行 you 这个对象,此时 LAST_INSERT_ID() 返回 2,也就是批量插入后回写的主键值。这同时解释了为什么只回写了一个主键并且回写的主键与数据库没有对应上。

最后,关于 LAST_INSERT_ID() 的跳变,我也找到了一些参考资料:

  1. 官方文档:InnoDB AUTO_INCREMENT Lock Modes
  2. 其实这个说的更加简洁清楚:AUTO_INCREMENT 字段的GAP

Insert Or Update

1. MySQL 批量插入

有的时候会有这样的需求:

  1. 批量写入数据,但是数据可能存在主键或者唯一性索引的冲突写到一半就失败了。
  2. 每天批量更新表,新数据插入,旧数据更新。

于是我们不禁想到是否能实现一个 insertOrUpdate 方法呢?事实上,MySQL 给出了标准 SQL 的两种扩展来帮我们实现这种需求。

2. INSERT ... ON DUPLICATE KEY UPDATE

标准 SQL 的扩展,如果插入时遇到主键冲突或者唯一性约束不满足则会执行 update 操作,例如:

INSERT INTO table (a,b,c) VALUES (1,2,3)
  ON DUPLICATE KEY UPDATE c=c+1;

UPDATE table SET c=c+1 WHERE a=1;

支持多行插入:

INSERT INTO table (a,b,c) VALUES (1,2,3),(4,5,6)
  ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);

ON DUPLICATE KEY UPDATE 子句中的 VALUES(a) 表示的是如果没有冲突将会插入 a 列的值,即 1 或者 4。

ON DUPLICATE KEY UPDATE 子句不适宜在有多个唯一性索引的表上使用。

参考:14.2.5.3 INSERT … ON DUPLICATE KEY UPDATE Syntax

3. Replace

Replace 是标准 SQL 的扩展,等价于 inserts/deletes and inserts

REPLACE [LOW_PRIORITY | DELAYED]
    [INTO] tbl_name
    [PARTITION (partition_name,...)]
    [(col_name,...)]
    {VALUES | VALUE} ({expr | DEFAULT},...),(...),...

REPLACE [LOW_PRIORITY | DELAYED]
    [INTO] tbl_name
    [PARTITION (partition_name,...)]
    SET col_name={expr | DEFAULT}, ...

REPLACE [LOW_PRIORITY | DELAYED]
    [INTO] tbl_name
    [PARTITION (partition_name,...)]
    [(col_name,...)]
    SELECT ...

所有列的值都从 Replace 语句中获取,无法从现有的行中获取值,如果有的列没有指定值那么会给定默认值。如果在指定值的时候使用了 SET col_name = col_name + 1 语句,等价于 SET col_name = DEFAULT(col_name) + 1

Replace 语句的返回值表示影响的行数,影响行数 = 删除的行数 + 插入的行数。对于单行的 Replace 操作,如果返回 1 表示仅仅插入了新的一行,如果大于 1 则表示删除了至少一行并插入了一行。

参考:14.2.8 REPLACE Syntax

3. 异同分析

从文档的说明来看,至少有两点差异:

  1. INSERT … ON DUPLICATE KEY UPDATE 在插入时可以使用现有行的值, Replace 不支持。
    mysql root@localhost:test> create table tb_table (a int not NULL PRIMARY key auto_increment, b int NOT NULL DEFAULT 0, c int NOT NULL DEFAULT 0)
    Query OK, 0 rows affected
    Time: 0.042s
    mysql root@localhost:test> insert into tb_table (a, b, c) VALUES (1,2,3);
    Query OK, 1 row affected
    Time: 0.024s
    mysql root@localhost:test> INSERT INTO tb_table (a, b, c) values (1,5,6) on  DUPLICATE KEY UPDATE b = values(b) + b;
    Query OK, 2 rows affected
    Time: 0.025s
    mysql root@localhost:test> SELECT * from tb_table;
    +-----+-----+-----+
    |   a |   b |   c |
    |-----+-----+-----|
    |   1 |   7 |   3 |
    +-----+-----+-----+
    1 row in set
    Time: 0.001s
    
  2. INSERT … ON DUPLICATE KEY UPDATE 在不满足主键约束或者唯一性约束的时候是更新操作,所以主键不会变。而 Replacedeletes inserts 也就是说老的记录会被删除,如果不指定主键值那么可能会破坏已有的外键关联(比如主键是自增产生,插入的时候不指定主键值)。
    mysql root@localhost:test> create table tb_table (a int not NULL PRIMARY key auto_increment, b int NOT NULL DEFAULT 0 unique, c int NOT NULL DEFAULT 0)
    Query OK, 0 rows affected
    Time: 0.027s
    mysql root@localhost:test> show create table tb_table;
    +----------+----------------+
    | Table    | Create Table   |
    |----------+----------------|
    | tb_table | CREATE TABLE `tb_table` (
     `a` int(11) NOT NULL AUTO_INCREMENT,
     `b` int(11) NOT NULL DEFAULT '0',
     `c` int(11) NOT NULL DEFAULT '0',
     PRIMARY KEY (`a`),
     UNIQUE KEY `b` (`b`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8                |
    +----------+----------------+
    1 row in set
    Time: 0.020s
    mysql root@localhost:test> insert into tb_table (b,c) VALUES (2,3);
    Query OK, 1 row affected
    Time: 0.013s
    mysql root@localhost:test> SELECT * from tb_table;
    +-----+-----+-----+
    |   a |   b |   c |
    |-----+-----+-----|
    |   1 |   2 |   3 |
    +-----+-----+-----+
    1 row in set
    Time: 0.001s
    mysql root@localhost:test> replace into tb_table (b,c) values (2,4);
    Query OK, 2 rows affected
    Time: 0.023s
    mysql root@localhost:test> SELECT * from tb_table;
    +-----+-----+-----+
    |   a |   b |   c |
    |-----+-----+-----|
    |   2 |   2 |   4 |
    +-----+-----+-----+
    1 row in set
    Time: 0.001s
    mysql root@localhost:test> replace into tb_table (b,c) values (2,4),(3,5);
    Query OK, 3 rows affected
    Time: 0.007s
    mysql root@localhost:test> SELECT * from tb_table;
    +-----+-----+-----+
    |   a |   b |   c |
    |-----+-----+-----|
    |   3 |   2 |   4 |
    |   4 |   3 |   5 |
    +-----+-----+-----+
    2 rows in set
    Time: 0.001s
    mysql root@localhost:test> insert into tb_table (b,c) values (2,8) on  DUPLICATE KEY UPDATE c = VALUES(c);
    Query OK, 2 rows affected
    Time: 0.013s
    mysql root@localhost:test> SELECT * from tb_table;
    +-----+-----+-----+
    |   a |   b |   c |
    |-----+-----+-----|
    |   3 |   2 |   8 |
    |   4 |   3 |   5 |
    +-----+-----+-----+
    2 rows in set
    Time: 0.001s
    

MySQL 与 Hive 中正则表达式字符转义的一个坑

提到正则表达式的转义,第一个念头就是加一个反斜杠 \

比如 . 在正则表达式中的含义就是匹配除 \n 之外的任何单个字符。

如果让你匹配在 Hive 或者 MySQL 的 SQL 语句中匹配像 4.2.5 这样的字符串,你会怎么做?

我的第一反应是:

[1-9]+\.[0-9]+\.[1-9]+

结果是错的!==!

事实上 . 的转义是 \\. 而不是 \.

有一个说法是 SQL 语句本身也是个字符串,交给程序处理的时候首先使用 \\\ 进行转义,然后用转义过的 \. 进行转义。

真是够绕的!

为 MySQL 表添加 ctime 和 mtime

1. 序

以前在使用 Ruby on Rails 的时候,框架自动会为每张表增加 mtimectime 两个字段,最近在手动建 MySQL 表的时候对这两个字段做了一点研究。

2. 字段的初始化和更新方式

  1. TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP: 在创建新记录和修改现有记录的时候都对这个数据列刷新。
  2. TIMESTAMP DEFAULT CURRENT_TIMESTAMP 在创建新记录的时候把这个字段设置为当前时间,但以后修改时,不再刷新它。
  3. TIMESTAMP ON UPDATE CURRENT_TIMESTAMP 在创建新记录的时候把这个字段设置为 0,修改字段时更新。

当你给一个 timestamp 设置为 on update current_timestamp 的时候,其他的 timestamp 字段需要显式设定 default 值

3. 实例

CREATE TABLE if not exists mybatis.person(
  id BIGINT not null auto_increment PRIMARY KEY,
  name VARCHAR(20) NOT NULL DEFAULT '',
  phone VARCHAR(15) NOT NULL DEFAULT '',
  email VARCHAR(50) NOT NULL DEFAULT '',
  ctime TIMESTAMP DEFAULT '0000-00-00 00:00:00',
  mtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)ENGINE=innodb DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Python 使用 MySQL 数据库

#!/bin/env python
# -*- coding: utf-8 -*-

import MySQLdb

def printRow(res):
    for prop in res:
        print prop,
    print

conn = MySQLdb.connect(host="localhost",user="root",passwd="root",db="employees",port=3306)
cur = conn.cursor()

# fetchone()
# 获取一条结果
print '='*5 + 'fetchone' + '='*5
rows = cur.execute("select count(*) from employees")
result = cur.fetchone()
print "rows:%d, result:%d" % (rows,result[0])

# fetchmany()
# 获取多条结果
print '='*5 + 'fetchmany' + '='*5
rows = cur.execute("select * from employees limit 10")
print "rows:%d" % rows
result = cur.fetchmany(5)
print "first 5:"
for res in result:
    printRow(res)
print "last 5:"
result = cur.fetchmany(5)
for res in result:
    for prop in res:
        print prop,
    print

# fetchall()
# 获取所有结果
print '='*5 + 'fetchall' + '='*5
rows = cur.execute("select * from employees limit 10")
print "rows:%d" % rows
result = cur.fetchall()
print "all 10:"
for res in result:
    printRow(res)

# scroll()
# 控制游标的移动
print '='*5 + 'scroll' + '='*5
rows = cur.execute("select * from employees limit 10")
print "rows:%d" % rows
print "first row:"
first_row = cur.fetchone()
printRow(first_row)
# absolute 从第 0 条位置向下移动 2 条
print "third row:"
cur.scroll(2,mode='absolute')
third_row = cur.fetchone()
printRow(third_row)
# relative 从当前移动 3 条
print "seventh row:"
cur.scroll(3,mode='relative')
seventh_row = cur.fetchone()
printRow(seventh_row)

# executemany()
# 用多个参数执行同一条语句,参数是所有支持迭代的对象
print '='*5 + 'executemany' + '='*5
genders = ['F','M']
rows = cur.executemany('select * from employees where gender=%s limit 1', genders)
result = cur.fetchall()
print rows
print result
# 执行结果表明每一条语句执行过后游标 cursor 都会改变。
# 因此不适合多参数的查询,而适合多参数的插入

# conn.commit()
# 用来执行事务
print '='*5 + 'commit' + '='*5
try:
    cur.execute("insert into employees values(0,date(now()),'Jack','Sparrow','M',date(now()))")
    cur.execute("insert into employees values(1,date(now()),'Jack','Sparrow','M',date(now()))")
    raise MySQLdb.Error
    conn.commit()
except MySQLdb.Error,e:
    # cur.execute("delete from employees where emp_no in (0,1)")
    print "Error Message: %s" % str(e.args)

cur.close()
conn.close()

将WordPress安装或者迁移到万网虚拟机上

万网虚拟机

前一阵子在远景论坛上看到万网的虚拟主机免费,果断申请了一个来使用。使用期间也第一次领教了备案带来的麻烦,好在万网的备案服务还是很好很迅速的,幕布的快递也很给力,就是工信部的审核花了十几天时间。从后台的显示来看,万网的虚拟机可以免费使用两年。这个域名note4code.com也是在万网上购买的,49元/年的价格还算公道。

WordPress的安装

万网的虚拟机使用的操作系统是 CentOS,支持 MySQL和 PHP5.3。WordPress 在该虚拟机上的安装比在 Ubuntu 上安装简单多了(参见上一篇博客在 Ubuntu 上安装 WordPress),不需要安装和设置 Apache2、MySQL 和 PHP,不需要为了修改 URL 而进行的大量操作。安装步骤简单概括就是:

  • 下载 WordPress 源代码。
  • 解压缩文件,创建uploads(wp-content/uploads)文件夹。
  • 复制 wp-config-sample.php,重命名为 wp-config.php 并修改其内容。
/** The name of the database for WordPress */
define('DB_NAME', 'database_name_here');

/** MySQL database username */
define('DB_USER', 'username_here');

/** MySQL database password */
define('DB_PASSWORD', 'password_here');

/** MySQL hostname */
define('DB_HOST', 'localhost');

修改为:

/** The name of the database for WordPress */
define('DB_NAME', 'qdmxxxxxxxxx_db');

/** MySQL database username */
define('DB_USER', 'qdmxxxxxxxxx');

/** MySQL database password */
define('DB_PASSWORD', 'xxxxxxxx');

/** MySQL hostname */
define('DB_HOST', 'qdmxxxxxxxxx.my3w.com');
  • 将所有文件复制到 FTP 服务器的 htdocs 目录下。
  • 登录你的域名(审核已通过的情况下),跟随界面操作。

关于WordPress的迁移

可能不少 WordPress 的用户和我一样,在审核期间就现在本地的 WordPress 上写起了博客,打算在审核通过后再将数据迁移到网上,由于网上相关的经验比较少,我也是花了好几天才摸索出了一套方法。

1.导出上传的图片

在写博客的时候不可避免会用到一些图片,这些图片应该都存在我们建立的 uploads 文件夹里,在向 FTP 服务器复制文件的时候记得将原来的 uploads 文件夹合并到 FTP 上去。

2.导出数据库

我们的文章都存在数据库中,所以 WordPress 的迁移本质上是数据库的迁移,我们首先导出数据库:

mysqldump -u wpuser -p wordpress > wordpress.sql

输入该用户的密码后我们得到一个名为 wordpress.sql 的文件,打开文件删除所有类似 /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; 的语句和注释,仅保留创建表和插入数据的语句。然后使用 Vim(没错,我支持 Vim)的全局替换字符串功能,将 localhost/wordpress 替换为你的域名。

3.导入数据库

进入虚拟主机数据库的管理界面,导入我们修改好的脚本即可。至此大功告成!

注意:在导入数据前最好将本地WordPress使用的插件在虚拟主机上提前装好,避免数据出现错误或者不一致的情况!