flink cdc 同步报错

【详述】
用的是这些版本
flink-1.13.6-bin-scala_2.11.tgz
flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar
flink-sql-connector-mysql-cdc-1.3.0.jar
smt.tar.gz
【导入/导出方式】
【背景】做过哪些操作?
执行这个报错,
./bin/sql-client.sh -f flink-create.1.sql

[INFO] Execute statement succeed.
Flink SQL>
CREATE TABLE IF NOT EXISTS `default_catalog`.`finenter_expense`.`bill_batch_period_sink` (
  `batch` STRING NOT NULL,
  `period` TIMESTAMP NOT NULL,
  PRIMARY KEY(`batch`, `period`)
 NOT ENFORCED
) with (
  'jdbc-url' = 'jdbc:mysql://192.168.235.101:9030',
  'load-url' = '192.168.235.101:8030',
  'password' = 'xxxx',
  'database-name' = 'finenter_expense',
  'table-name' = 'bill_batch_period',
  'username' = 'xxxx',
  'sink.max-retries' = '10',
  'sink.properties.format' = 'json',
  'sink.properties.strip_outer_array' = 'true',
  'sink.buffer-flush.interval-ms' = '15000',
  'connector' = 'starrocks'
);
[INFO] Execute statement succeed.
Flink SQL>
INSERT INTO `default_catalog`.`finenter_expense`.`bill_batch_period_sink` SELECT * FROM `default_catalog`.`finenter_expense`.`bill_batch_period_src`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;

Shutting down the session...
done.

【业务影响】无,还在搭建
【StarRocks版本】2.3