flinkcdc同步任务突然报错,报错信息如下
org.apache.flink.table.api.TableException: Failed to deserialize the input record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1750756448, file=mysql-bin.057776, pos=85321588, gtids=02e551c4-53a8-11ef-b299-0c42a1e842c8:1-7439123037, row=1, server_id=31502042, event=56}} ConnectRecord{topic=‘mysql_binlog_source.senguopf.goods_storage_move’, kafkaPartition=null, key=Struct{id=1275299423}, keySchema=Schema{mysql_binlog_source.senguopf.goods_storage_move.Key:STRUCT}, value=Struct{after=Struct{id=1275299423,shop_id=21754,goods_id=17939883,type=2,fk_id=1556137432,operater_id=470846,delta_storage=-800,delta_storage_num=-43700,create_time=1750785248000,record_time=1750785247000,account_period_date=20263,status=1,delta_storage_new=0.0000,delta_cost_price=0,update_time=2025-06-24T09:14:08Z,subwarehouse_id=0,delta_bottom_storage=0},source=Struct{version=1.9.7.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1750756448000,db=senguopf,table=goods_storage_move,server_id=31502042,gtid=02e551c4-53a8-11ef-b299-0c42a1e842c8:7439123038,file=mysql-bin.057776,pos=85332368,row=0,thread=690949961},op=c,ts_ms=1750756448460}, valueSchema=Schema{mysql_binlog_source.senguopf.goods_storage_move.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}.
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:150)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:129)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:109)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:82)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
at org.apache.flink.streaming.api.operators.SourceOperator.pollNext(SourceOperator.java:708)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:425)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:70)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:615)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1070)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1019)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:879)
Suppressed: java.lang.RuntimeException: Writing records to StarRocks failed.
at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:431)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.close(StarRocksSinkManager.java:302)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.close(StarRocksDynamicSinkFunction.java:244)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1274)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1182)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:941)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
… 3 more
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“STREAM_LOAD_TASK:99495159, label{8e63d838-4649-46e2-97cb-893096ceb07b}, error_msg{cancel stream task for exception: Label [8e63d838-4649-46e2-97cb-893096ceb07b] has already been used.}”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“8e63d838-4649-46e2-97cb-893096ceb07b”,“LoadBytes”:0,“StreamLoadPlanTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:0,“TxnId”:-1,“LoadTimeMs”:0,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}
{}
at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:103)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:349)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:168)
... 1 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:149)
at com.ververica.cdc.debezium.table.AppendMetadataCollector.collect(AppendMetadataCollector.java:51)
at com.ververica.cdc.debezium.table.AppendSystemColumnAndMetadataCollector.collect(AppendSystemColumnAndMetadataCollector.java:43)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:169)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:130)
… 18 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at StreamExecCalc$1587.processElement_split924(Unknown Source)
at StreamExecCalc$1587.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
… 27 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:96)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:259)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
… 32 more
Caused by: java.lang.RuntimeException: Writing records to StarRocks failed.
at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:431)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.writeRecords(StarRocksSinkManager.java:225)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.invoke(StarRocksDynamicSinkFunction.java:199)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:79)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
… 37 more
Caused by: [CIRCULAR REFERENCE: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“STREAM_LOAD_TASK:99495159, label{8e63d838-4649-46e2-97cb-893096ceb07b}, error_msg{cancel stream task for exception: Label [8e63d838-4649-46e2-97cb-893096ceb07b] has already been used.}”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“8e63d838-4649-46e2-97cb-893096ceb07b”,“LoadBytes”:0,“StreamLoadPlanTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:0,“TxnId”:-1,“LoadTimeMs”:0,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}
{}
]
辛苦哪位大佬帮看下