flink-starrocks-connect 创建starrocks sink Caused by: java.lang.IllegalArgumentException: Flink and StarRocks types are not matched for column xxx, flink type is ARRAY<BIGINT>, starrocks type is unknown

为了更快的定位您的问题,请提供以下信息,谢谢
【详述】使用flink-starrocks-connect 创建的 starrocks sink 中有 ARRAY 类型,在运行时会报flink 类型与starrocks 类型不匹配:Caused by: java.lang.IllegalArgumentException: Flink and StarRocks types are not matched for column xxx, flink type is ARRAY, starrocks type is unknown

【背景】CREATE TABLE dws_stream_report_quality (
timestamp STRING,
app_id BIGINT,
report_src BIGINT,
country STRING,
region STRING,
peer_to_peer_delay_arr ARRAY < BIGINT >,
peer_to_peer_plr_arr ARRAY < BIGINT >,
rtt_arr ARRAY < BIGINT >,
plr_arr ARRAY < BIGINT > – mos_arr ARRAY < INT >,
video_break_arr ARRAY < INT >,
audio_break_arr ARRAY < INT >
) WITH (
‘connector’ = ‘starrocks’,
‘load-url’ = ‘localhost:8030’,
‘jdbc-url’ = ‘jdbc:mysql://localhost:9030’,
‘username’ = ‘root’,
‘password’ = ‘’,
‘database-name’ = ‘ocean’,
‘table-name’ = ‘dws_stream_report_quality’,
‘sink.properties.strip_outer_array’ = ‘true’,
‘sink.buffer-flush.max-rows’ = ‘100000’,
‘sink.buffer-flush.interval-ms’ = ‘10000’,
‘sink.properties.format’ = ‘JSON’
);
【业务影响】
【StarRocks版本】例如:2.5.1
【集群规模】
【机器信息】
【表模型】主键模型
【导入或者导出方式】Flink
【联系方式】jweicai@163.com
【附件】
2023-08-30 14:48:50,126 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_372]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_05f9117d-335b-42ba-abe9-73aa320d74ec.jar:1.17.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_05f9117d-335b-42ba-abe9-73aa320d74ec.jar:1.17.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_05f9117d-335b-42ba-abe9-73aa320d74ec.jar:1.17.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_05f9117d-335b-42ba-abe9-73aa320d74ec.jar:1.17.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_05f9117d-335b-42ba-abe9-73aa320d74ec.jar:1.17.1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_372]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_372]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_372]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_372]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
… 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Flink and StarRocks types are not matched for column peer_to_peer_delay_arr, flink type is ARRAY, starrocks type is unknown
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
… 12 more
Caused by: java.lang.IllegalArgumentException: Flink and StarRocks types are not matched for column peer_to_peer_delay_arr, flink type is ARRAY, starrocks type is unknown
at com.starrocks.connector.flink.manager.StarRocksSinkTable.validateTableStructure(StarRocksSinkTable.java:148) ~[flink-connector-starrocks-1.2.7_flink-1.17.jar:?]
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.(StarRocksDynamicSinkFunctionV2.java:99) ~[flink-connector-starrocks-1.2.7_flink-1.17.jar:?]
at com.starrocks.connector.flink.table.sink.SinkFunctionFactory.createSinkFunction(SinkFunctionFactory.java:99) ~[flink-connector-starrocks-1.2.7_flink-1.17.jar:?]
at com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSink.getSinkRuntimeProvider(StarRocksDynamicTableSink.java:44) ~[flink-connector-starrocks-1.2.7_flink-1.17.jar:?]
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:158) ~[?:?]
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:176) ~[?:?]
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[?:?]
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) ~[?:?]
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-scala_2.12-1.17.1.jar:1.17.1]

plr_arr ARRAY < BIGINT > – mos_arr ARRAY < INT >, 这里的建表有点问题吧 show create table看下导入的这个表 麻烦再发一下这个建表语句 谢谢