为了更快的定位您的问题,请提供以下信息,谢谢
【详述】使用flink处理MySQL数据写入StarRocks
【业务影响】任务运行中偶现一两次,Flink任务重启后恢复正常
【是否存算分离】否
【StarRocks版本】3.1.0
【connector版本】flink-connector 1.2.8
【集群规模】3fe(1 follower+2observer)+4be(fe与be混部)
【联系方式】zhangweiooy@foxmail.com
【附件】
- 导入报错
java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: flink_db, table: order_product_instance_new, label: flink-25f84b44-7575-4697-819a-3ebc4ba741c3,
responseBody: {
“Status”: “TXN_IN_PROCESSING”,
“Label”: “flink-25f84b44-7575-4697-819a-3ebc4ba741c3”,
“Message”: “Transaction in processing, please retry later”
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:427)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.write(StreamLoadManagerV2.java:252)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:199)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.retractRow(SinkUpsertMaterializer.java:189)
at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.processElement(SinkUpsertMaterializer.java:152)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: flink_db, table: order_product_instance_new, label: flink-25f84b44-7575-4697-819a-3ebc4ba741c3,
responseBody: {
“Status”: “TXN_IN_PROCESSING”,
“Label”: “flink-25f84b44-7575-4697-819a-3ebc4ba741c3”,
“Message”: “Transaction in processing, please retry later”
}
errorLog: null
at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:221)
at com.starrocks.data.load.stream.v2.TransactionTableRegion.commit(TransactionTableRegion.java:247)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:210)
… 1 more