smt迁移工具的问题

在使用smt工具(StarRocks Migrate Tools),根据MySQL表结构自动生成StarRocks的建表语句和Flink CDC使用的SQL脚本时,存在一些不匹配的错误:

  1. enum类型

    MySQL源表中,字段中包含enum类型

    #MySQL
    CREATE TABLE `test1` (
      `id` int NOT NULL AUTO_INCREMENT,
      `os_type` enum('IOS','ANDROID','ALL')  DEFAULT NULL,
      `ad_channel` varchar(255)  DEFAULT NULL,
      `click_source` enum('FEEDBACK','LANDING','ALL')  DEFAULT NULL,
      `source_field` varchar(255)  DEFAULT NULL,
      `method` enum('PRIORITY-KEY','UPPER','CONCAT','ASSIGN')  DEFAULT NULL,
      `dist_field` varchar(255)  DEFAULT NULL,
      `level` int DEFAULT NULL,
      `last_update` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`) USING BTREE
    )
    

    生成对应StarRocks的SQL语句为

    #StarRocks
    CREATE  TABLE IF NOT EXISTS `test1` (
      `id` INT NOT NULL  COMMENT "",
      `os_type`  NULL  COMMENT "",
      `ad_channel` STRING NULL  COMMENT "",
      `click_source`  NULL  COMMENT "",
      `source_field` STRING NULL  COMMENT "",
      `method`  NULL  COMMENT "",
      `dist_field` STRING NULL  COMMENT "",
      `level` INT NULL  COMMENT "",
      `last_update` DATETIME NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "3"
    );
    

    字段类型缺失了,需要手工调整为STRING类型

  2. bit类型

    MySQL源表中,字段中包含bit类型

    #MySQL
    CREATE TABLE `test2` (
      `id` int NOT NULL AUTO_INCREMENT,
      `name` varchar(255)  DEFAULT NULL,
      `enable` bit(1) NOT NULL DEFAULT b'1',
      PRIMARY KEY (`id`) USING BTREE
    )
    

    smt工具生成的建表语句为:

    #StarRocks
    CREATE  TABLE IF NOT EXISTS `test2` (
      `id` INT NOT NULL  COMMENT "",
      `name` STRING NULL  COMMENT "",
      `enable` TINYINT NOT NULL DEFAULT "1" COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "3"
    );
    

    enable字段,在这里定义为TINYINT,感觉是正确的

    对于Flink CDC的SQL脚本为:

    #Flink
    #src表
    CREATE TABLE IF NOT EXISTS `test2_src` (
      `id` INT NOT NULL,
      `name` STRING NULL,
      `enable` BINARY NOT NULL
      PRIMARY KEY(`id`)
     NOT ENFORCED
    ) with (
      'table-name' = 'test2',
      'connector' = 'mysql-cdc',
      'hostname' = '......',
      'port' = '3306',
      'username' = '......',
      'password' = '......',
      'database-name' = '......'
    );
    
    #sink表
    CREATE TABLE IF NOT EXISTS `test2_sink` (
      `id` INT NOT NULL,
      `name` STRING NULL,
      `enable` BINARY NOT NULL
      PRIMARY KEY(`id`)
     NOT ENFORCED
    ) with (
      'username' = 'xxxx',
      'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
      'load-url' = '127.0.0.1:8030',
      'sink.buffer-flush.interval-ms' = '15000',
      'connector' = 'starrocks',
      'database-name' = 'xxxx',
      'table-name' = 'test2',
      'password' = '',
      'sink.properties.column_separator' = '\x01',
      'sink.properties.row_delimiter' = '\x02'
    );
    

    enable字段,在这里定义为BINARY,感觉是正确的。

    但是在执行这个Flink SQL的时候,会出现如下的错误

    (1/1)#0 (d62fcda645af7ef16087081c116a01c0) switched from RUNNING to FAILED with failure cause: java.lang.UnsupportedOperationException:
     Unsupported BYTES value type: Boolean
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$14.convert(RowDataDebeziumDeserializeSchema.java:537)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$17.convert(RowDataDebeziumDeserializeSchema.java:641)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.convertField(RowDataDebeziumDeserializeSchema.java:626)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.access$000(RowDataDebeziumDeserializeSchema.java:63)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$16.convert(RowDataDebeziumDeserializeSchema.java:611)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema$17.convert(RowDataDebeziumDeserializeSchema.java:641)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:146)
            at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:121)
            at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109)
            at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
            at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53)
            at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
            at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
            at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
            at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
            at java.lang.Thread.run(Thread.java:748)
    

    需要将enable字段改成String类型,才不报错,查询存入的数据变成这样:

    mysql> select enable from at_app_channel limit 2;
    +--------+
    | enable |
    +--------+
    | true   |
    | false  |
    +--------+
    2 rows in set (0.01 sec)
    
  3. comment注释部分:

    MySQL源表中,comment注释内容中包含了双引号

    #MySQL
    CREATE TABLE `test3` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '自增主键',
      `name` varchar(255)  DEFAULT '' COMMENT '名称',
      `isdel` tinyint(1) DEFAULT '0' COMMENT '是否删除,枚举值:"1":已删除,"0":未删除',
      `is_local` tinyint(1) DEFAULT '0' COMMENT '是否上传 1-是 0-否',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uk_adv_id_custom_audience_id` (`adv_id`,`custom_audience_id`),
      KEY `idx_ds_id` (`data_source_id`)
    )
    

    smt工具生成的建表语句为:

    #StarRocks
    CREATE  TABLE IF NOT EXISTS `test3` (
      `id` LARGEINT NOT NULL  COMMENT "自增主键",
      `name` STRING NULL  COMMENT "名称",
      `isdel` tinyint(1) NULL DEFAULT "0" COMMENT "人群包是否删除,枚举值:"1":已删除,"0":未删除",
      `is_local` tinyint(1) NULL DEFAULT "0" COMMENT "是否上传 1-是 0-否"
    ) ENGINE=olap
    PRIMARY KEY(`id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "3"
    );
    

    这样引号出现了嵌套,执行的时候报错,需要手工将建表语句中多余的引号去掉,这样类似的表很多,不知道是否有什么参数设置,允许是否导出Comment信息?

1赞

多谢反馈,这边修复下。

https://cdn-thirdparty.starrocks.com/smt.tar.gz?r=1642133403

已修复,可以下载上面的链接再试下。

执行报错,没有生成脚本

image
看下 database, table 的配置

忽略,我知道问题了

上面出现的问题,现在都修复了。不过又出现新的问题了。

  1. 字段越界
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: 
{"Status":"Fail","BeginTxnTimeMs":3,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"571e2e18-23b8-49fd-b0a9-1de6337376e6","LoadBytes":94475367,"StreamLoadPutTimeMs":4,"NumberTotalRows":101795,"WriteDataTimeMs":5140,"TxnId":12644,"LoadTimeMs":5147,"ErrorURL":"http://10.21.2.38:8040/api/_load_error_log?file=__shard_1/error_log_insert_stmt_9b4371ed-a911-6549-23b3-1501cbd35c87_9b4371eda9116549_23b31501cbd35c87","ReadDataTimeMs":193,"NumberLoadedRows":101791,"NumberFilteredRows":4}
{"streamLoadErrorLog":"Reason: the length of input is too long than schema. column_name: content_log; input_str: [[\"修改词:小红书下载安装 的审核状态: 新建审核中 -> 审核通过\",\"修改词:微信扫一扫健康码 的审核状态: 新建审核中 -> 审核通过\",\"修改词:三十岁的女人感慨说说 的审核状态: 新建审核中 -> 审核通过\",\"修改词:生病了一个人扛的高情商说说 的审核状态: 新建审核中 -> 审核通过\",\"修改词:领钱软件提现微信 的审核状态: 新建审核中 -> 审核通过\",\"

查看MySQL源表结构:

CREATE TABLE `opt_record` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `log_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '日志唯一id',
  `operator_id` bigint DEFAULT NULL COMMENT '操作人用户ID',
  `operator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作人名称',
  `adv_id` bigint DEFAULT NULL COMMENT 'log_source为1时为巨量账号ID',
  `object_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作模块: 员工管理、团队管理',
  `content_title` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作动作 - 朝霞后台: 新增、编辑、删除',
  `object_id` bigint NOT NULL COMMENT '操作对象ID',
  `object_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作对象名称',
  `create_time` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '操作时间: yyyy-MM-dd HH:mm:ss',
  `content_log` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '变化内容',
  ......
  PRIMARY KEY (`id`) USING BTREE,
  ......
)

查看字段长度

mysql> select max(length(content_log)) from opt_record;
+--------------------------+
| max(length(content_log)) |
+--------------------------+
|                    65535 |
+--------------------------+
1 row in set (2.14 sec)

而starrocks的表结构定义是

CREATE TABLE `opt_record` (
  `id` largeint(40) NOT NULL COMMENT "主键ID",
  `log_id` varchar(65533) NOT NULL COMMENT "日志唯一id",
  `operator_id` largeint(40) NULL COMMENT "操作人用户ID",
  `operator` varchar(65533) NOT NULL COMMENT "操作人名称",
  `adv_id` largeint(40) NULL COMMENT "log_source为1时为巨量账号ID",
  `object_type` varchar(65533) NOT NULL COMMENT "操作模块: 员工管理、团队管理",
  `content_title` varchar(65533) NOT NULL COMMENT "操作动作 - 朝霞后台: 新增、编辑、删除",
  `object_id` largeint(40) NOT NULL COMMENT "操作对象ID",
  `object_name` varchar(65533) NOT NULL COMMENT "操作对象名称",
  `create_time` varchar(65533) NOT NULL COMMENT "操作时间: yyyy-MM-dd HH:mm:ss",
  `content_log` varchar(65533) NULL COMMENT "变化内容",
  ......
) ENGINE=OLAP 
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 8 
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

看起来是超了。

  1. 有部分任务执行的时候,出现fail,其报错如下
Caused by: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.sendOperatorEventToTask(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,org.apache.flink.runtime.jobgraph.OperatorID,org.apache.flink.util.SerializedValue) timed out.
	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.sendOperatorEventToTask(RpcTaskManagerGateway.java:114)
	at org.apache.flink.runtime.executiongraph.Execution.sendOperatorEvent(Execution.java:888)
	at org.apache.flink.runtime.operators.coordination.ExecutionSubtaskAccess.lambda$createEventSendAction$1(ExecutionSubtaskAccess.java:67)
	at org.apache.flink.runtime.operators.coordination.OperatorEventValve.callSendAction(OperatorEventValve.java:180)
	at org.apache.flink.runtime.operators.coordination.OperatorEventValve.sendEvent(OperatorEventValve.java:94)
	at org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$2(SubtaskGatewayImpl.java:98)
	... 24 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.21.2.37:46714/user/rpc/taskmanager_0#954516654]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
	at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
	at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
	at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
	at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
	at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
	at java.lang.Thread.run(Thread.java:748)

查看taskmanager, 并没发现挂掉,不知道是什么原因

这个暂时无法解决,starrocks目前支持的字符串长度最大就是 65533 了。
不过后面马上就会支持超长字符串了。

请问一下,我也出现了同样得错误,检查了很久不知道问题在哪,你是如何解决得呢?

目前没办法的,等待官方支持吧