在StarRocks2.5版本中使用Flink同步MySQL数据时,同步库与表结构后,在同步表数据时报错

【详述】我根据官方文档中使用Flink实时同步MySQL数据,将MySQL中的库和表的结构成功同步到Starrocks中后,进行数据同步时,执行./bin/sql-client.sh -f flink-create.all.sql 该命令后,报错显示:[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data
【背景】根据网上的资料,我对flink-conf.yaml文件中的classloader.resolve-order参数进行了更改,更改为:parent-first,但是也毫无作用。
【StarRocks版本】2.5
【MySQL版本】5.7.43
【集群规模】3fe+1be
【附件】
修改的flink-conf.yaml配置文件


这是Flink的lib目录下的 flink-connector-starrocks、Flink CDC connector版本

报错显示如下:
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data

详细报错如下:
org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: TableSourceScan(table=[[default_catalog, flink_test, student_src]], fields=[id, name, age]) -> NotNullEnforcer(fields=[id, name]) -> Sink: Sink(table=[default_catalog.flink_test.student_sink], fields=[id, name, age])
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
… 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: TableSourceScan(table=[[default_catalog, flink_test, student_src]], fields=[id, name, age]) -> NotNullEnforcer(fields=[id, name]) -> Sink: Sink(table=[default_catalog.flink_test.student_sink], fields=[id, name, age])
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
… 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: TableSourceScan(table=[[default_catalog, flink_test, student_src]], fields=[id, name, age]) -> NotNullEnforcer(fields=[id, name]) -> Sink: Sink(table=[default_catalog.flink_test.student_sink], fields=[id, name, age])
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:217)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:791)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:196)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:335)
at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:191)
at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:134)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:346)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:323)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
… 4 more
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1587)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1950)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2167)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:431)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:211)

MySQL中表的信息:


StarRocks同步的表的信息:

您这个问题看着怀疑是跟配置有问题 麻烦参考下这个实操手册再走一遍呢 使用Flink-cdc 实时同步mysql数据

好的,我再去试试

您好,这一次我在MySQL中重新创建了名为t_user的表,重新走一遍后,还是不行。报错还是和上面一样的。以下是我的各项配置文件的信息,麻烦您看一下是否有错
这是我的SMT的config_prod.conf文件的配置内容



这是flink-create.1.sql文件的内容

这是starrocks-create.1.sql文件的内容

这是MySQL的my.cnf的配置信息