flink connector 导入报错 Couldn’t get the sink table’s column info.

【详述】单台docker虚拟机 部署 fe be Broker
使用flink cdc + flink + flink-connector-starrocks 进行数据迁移
【导入/导出方式】flink cdc source flink-connector-starrocks sink
【背景】相关代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);
String sourceDDL =
“CREATE TABLE pg_user (\n” +
" pk string,\n" +
" id string,\n" +
" tx_count int,\n" +
" t DECIMAL(20,10)," +
" l string," +
" timestamp int," +
" c string," +
"version string" +
“) WITH (\n” +
" ‘connector’ = ‘postgres-cdc’,\n" +
" ‘hostname’ = ‘127.0.0.1’,\n" +
" ‘port’ = ‘5432’,\n" +
" ‘username’ = ‘postgres’,\n" +
" ‘password’ = ‘123456’,\n" +
" ‘database-name’ = ‘data-store’,\n" +
" ‘schema-name’ = ‘public’,\n" +
" ‘table-name’ = ‘user’,\n" +
" ‘slot.name’ = ‘flink_cdc_to_starrocks’,\n" +
" ‘decoding.plugin.name’ = ‘pgoutput’\n" +
“)”;
// Table resultTable = streamTableEnv.sqlQuery(“SELECT * FROM user”);
streamTableEnv.executeSql(sourceDDL);

    String sinkDDL =
            "CREATE TABLE sr_user (\n" +
                    " pk string,\n" +
                    " id string,\n" +
                    " tx_count int,\n" +
                    " t DECIMAL(20,10)," +
                    " l string," +
                    " `timestamp` int," +
                    "  c string," +
                    "`version` string" +
                    ") WITH ( " +
                    "'connector' = 'starrocks'," +
                    "'jdbc-url'='jdbc:mysql://127.0.0.1:9030'," +
                    "'load-url'='127.0.0.1:8030'," +
                    "'database-name' = 'data-store'," +
                    "'table-name' = 'user'," +
                    "'username' = 'root'," +
                    "'password' = '123456'," +
                    "'sink.buffer-flush.max-rows' = '1000000'," +
                    "'sink.buffer-flush.max-bytes' = '300000000'," +
                    "'sink.buffer-flush.interval-ms' = '5000'," +
                    "'sink.properties.column_separator' = '\\x01'," +
                    "'sink.properties.row_delimiter' = '\\x02'," +
                    "'sink.max-retries' = '3'" +

// “‘sink.properties.*’ = ‘xxx’” + // stream load properties like 'sink.properties.columns' = 'k1, v1'
“)”;
streamTableEnv.executeSql(sinkDDL);

    streamTableEnv.executeSql("insert into sr_user select * from pg_user");

【业务影响】
【StarRocks版本】2.1
【集群规模】例如:1fe+1be
【机器信息】CPU虚拟核/内存/网卡,例如:12C/16G/万兆
【附件】错误信息
Exception in thread “main” java.lang.IllegalArgumentException: Couldn’t get the sink table’s column info.
at com.starrocks.connector.flink.manager.StarRocksSinkManager.validateTableStructure(StarRocksSinkManager.java:411)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.(StarRocksSinkManager.java:129)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.(StarRocksDynamicSinkFunction.java:67)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSink.getSinkRuntimeProvider(StarRocksDynamicTableSink.java:45)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:121)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at flink.FlinkStarRocksCDC.main(FlinkStarRocksCDC.java:65)
与目标 VM 断开连接, 地址为: ‘‘127.0.0.1:59729’,传输: ‘套接字’’

进程已结束,退出代码1

检查下sink端的连接信息,这个看着没连上数据库

我使用了相同的信息在navicat上连上了,但是同样在navicat上无法查看表DDL,可以看我另一个提问,[https://forum.mirrorship.cn/t/topic/2074,我感觉是相同的问题导致

我docker服务只映射了8030 和9030 是不是有原因,在docker 上使用mysql client DESCRIBE 命令可以查看表结构

连接sr执行select * from information_schema.columns where table_schema=‘data-store’ and table_name=‘user’ 看下结果

ERROR 1064 (HY000): Please check your sql, we meet an error when parsing.出错了
建表语句
CREATE TABLE IF NOT EXISTS user (
pk VARCHAR ( 255 ) NOT NULL,
id VARCHAR ( 255 ) NOT NULL,
tx_count INT NOT NULL,
t DECIMAL ( 20, 10 ) NOT NULL,
l STRING NOT NULL,
TIMESTAMP INT NOT NULL,
c VARCHAR ( 255 ) NOT NULL,
VERSION VARCHAR ( 255 ) NOT NULL
) DUPLICATE KEY ( pk ) DISTRIBUTED BY HASH ( id ) BUCKETS 1 PROPERTIES ( “replication_num” = “1” );

是用smt生成的建表语句么?

不是,自己手写的

smt 教程文档有吗 我的源是 postgresql 14 支持吗

https://www.starrocks.com/zh-CN/download/community 可以在这个页面下载 smt 工具,支持 mysql pgsql oracle sqlserver hive tidb 的数据库表结构转换及flink sql的自动生成。

谢谢,我先尝试一下,麻烦了

执行smt 报错 pgsql 版本是14 我通过修改 这段错误的sql 在pgsql shell 上执行成功 结果是Empty set
我修正后的sql是 : SELECT “table_name”,“table_schema”,“table_catalog”,pg_table_size(quote_ident(table_name)) as data_length,pg_indexes_size(quote_ident(table_name)) as index_length FROM “information_schema”.“tables” WHERE table_type=‘BASE TABLE’ and table_schema not in (‘information_schema’, ‘pg_catalog’) ORDER BY TABLE_SCHEMA asc, TABLE_NAME asc;

2022/04/19 07:11:10 /home/disk4/zhaoyifei/workspace/StarRocksManager/tools/migrate-tool/source/pgsql.go:124 错误: 函数 pg_table_size(information_schema.sql_identifier) 不存在 (SQLSTATE 42883)
[2.306ms] [rows:0] SELECT “table_name”,“table_schema”,“table_catalog”,pg_table_size(table_name) as data_length,pg_indexes_size(table_name) as index_length FROM “information_schema”.“tables” WHERE table_type=‘BASE TABLE’ and table_schema not in (‘information_schema’, ‘pg_catalog’) ORDER BY TABLE_SCHEMA asc, TABLE_NAME asc
panic: Failed to get rows from information_schema.tables.

goroutine 1 [running]:
main.main()
/home/disk4/zhaoyifei/workspace/StarRocksManager/tools/migrate-tool/main.go:36 +0x979

重新下载下smt工具试试,上述问题已经修复了。https://www.starrocks.com/zh-CN/download/community