在使用smt工具(StarRocks Migrate Tools),根据MySQL表结构自动生成StarRocks的建表语句和Flink CDC使用的SQL脚本时,存在一些不匹配的错误:
-
enum类型
MySQL源表中,字段中包含enum类型
#MySQL CREATE TABLE `test1` ( `id` int NOT NULL AUTO_INCREMENT, `os_type` enum('IOS','ANDROID','ALL') DEFAULT NULL, `ad_channel` varchar(255) DEFAULT NULL, `click_source` enum('FEEDBACK','LANDING','ALL') DEFAULT NULL, `source_field` varchar(255) DEFAULT NULL, `method` enum('PRIORITY-KEY','UPPER','CONCAT','ASSIGN') DEFAULT NULL, `dist_field` varchar(255) DEFAULT NULL, `level` int DEFAULT NULL, `last_update` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) USING BTREE )
生成对应StarRocks的SQL语句为
#StarRocks CREATE TABLE IF NOT EXISTS `test1` ( `id` INT NOT NULL COMMENT "", `os_type` NULL COMMENT "", `ad_channel` STRING NULL COMMENT "", `click_source` NULL COMMENT "", `source_field` STRING NULL COMMENT "", `method` NULL COMMENT "", `dist_field` STRING NULL COMMENT "", `level` INT NULL COMMENT "", `last_update` DATETIME NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`id`) COMMENT "" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3" );
字段类型缺失了,需要手工调整为STRING类型
-
bit类型
MySQL源表中,字段中包含bit类型
#MySQL CREATE TABLE `test2` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `enable` bit(1) NOT NULL DEFAULT b'1', PRIMARY KEY (`id`) USING BTREE )
smt工具生成的建表语句为:
#StarRocks CREATE TABLE IF NOT EXISTS `test2` ( `id` INT NOT NULL COMMENT "", `name` STRING NULL COMMENT "", `enable` TINYINT NOT NULL DEFAULT "1" COMMENT "" ) ENGINE=olap PRIMARY KEY(`id`) COMMENT "" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3" );
enable字段,在这里定义为TINYINT,感觉是正确的
对于Flink CDC的SQL脚本为:
#Flink #src表 CREATE TABLE IF NOT EXISTS `test2_src` ( `id` INT NOT NULL, `name` STRING NULL, `enable` BINARY NOT NULL PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'table-name' = 'test2', 'connector' = 'mysql-cdc', 'hostname' = '......', 'port' = '3306', 'username' = '......', 'password' = '......', 'database-name' = '......' ); #sink表 CREATE TABLE IF NOT EXISTS `test2_sink` ( `id` INT NOT NULL, `name` STRING NULL, `enable` BINARY NOT NULL PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'username' = 'xxxx', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'database-name' = 'xxxx', 'table-name' = 'test2', 'password' = '', 'sink.properties.column_separator' = '\x01', 'sink.properties.row_delimiter' = '\x02' );
enable字段,在这里定义为BINARY,感觉是正确的。
但是在执行这个Flink SQL的时候,会出现如下的错误
(1/1)#0 (d62fcda645af7ef16087081c116a01c0) switched from RUNNING to FAILED with failure cause: java.lang.UnsupportedOperationException: Unsupported BYTES value type: Boolean at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$14.convert(RowDataDebeziumDeserializeSchema.java:537) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$17.convert(RowDataDebeziumDeserializeSchema.java:641) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.convertField(RowDataDebeziumDeserializeSchema.java:626) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.access$000(RowDataDebeziumDeserializeSchema.java:63) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$16.convert(RowDataDebeziumDeserializeSchema.java:611) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$17.convert(RowDataDebeziumDeserializeSchema.java:641) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:146) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:121) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748)
需要将enable字段改成String类型,才不报错,查询存入的数据变成这样:
mysql> select enable from at_app_channel limit 2; +--------+ | enable | +--------+ | true | | false | +--------+ 2 rows in set (0.01 sec)
-
comment注释部分:
MySQL源表中,comment注释内容中包含了双引号
#MySQL CREATE TABLE `test3` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT '自增主键', `name` varchar(255) DEFAULT '' COMMENT '名称', `isdel` tinyint(1) DEFAULT '0' COMMENT '是否删除,枚举值:"1":已删除,"0":未删除', `is_local` tinyint(1) DEFAULT '0' COMMENT '是否上传 1-是 0-否', PRIMARY KEY (`id`), UNIQUE KEY `uk_adv_id_custom_audience_id` (`adv_id`,`custom_audience_id`), KEY `idx_ds_id` (`data_source_id`) )
smt工具生成的建表语句为:
#StarRocks CREATE TABLE IF NOT EXISTS `test3` ( `id` LARGEINT NOT NULL COMMENT "自增主键", `name` STRING NULL COMMENT "名称", `isdel` tinyint(1) NULL DEFAULT "0" COMMENT "人群包是否删除,枚举值:"1":已删除,"0":未删除", `is_local` tinyint(1) NULL DEFAULT "0" COMMENT "是否上传 1-是 0-否" ) ENGINE=olap PRIMARY KEY(`id`) COMMENT "" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3" );
这样引号出现了嵌套,执行的时候报错,需要手工将建表语句中多余的引号去掉,这样类似的表很多,不知道是否有什么参数设置,允许是否导出Comment信息?