flink 同步 kafka 数据到 sr 中,经常报 事务提交失败,导致任务重启

【详述】 java.lang.RuntimeException: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction commit failed, db: pims, table: data_control, label: 1aa56b44-b511-4fba-b6eb-178becec4463, commit response status: FAILED, label state: PREPARED

at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.AssertNotException(StarRocksSinkManagerV2.java:367)

at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.write(StarRocksSinkManagerV2.java:198)

at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:158)

at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)

at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)

at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)

at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)

at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)

at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.addRow(SinkUpsertMaterializer.java:170)

at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.processElement(SinkUpsertMaterializer.java:147)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)

at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)

at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)

at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

at java.lang.Thread.run(Thread.java:750)

Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction commit failed, db: pims, table: data_control, label: 1aa56b44-b511-4fba-b6eb-178becec4463, commit response status: FAILED, label state: PREPARED

at com.starrocks.data.load.stream.TransactionStreamLoader.commit(TransactionStreamLoader.java:272)

at com.starrocks.connector.flink.manager.TransactionTableRegion.commit(TransactionTableRegion.java:257)

at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.lambda$init$0(StarRocksSinkManagerV2.java:161)

... 1 more

Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction commit failed, db: pims table: data_control, label: 1aa56b44-b511-4fba-b6eb-178becec4463, commit response status: FAILED, label state: PREPARED

at com.starrocks.data.load.stream.TransactionStreamLoader.commit(TransactionStreamLoader.java:270)

... 3 more