为了更快的定位您的问题,请提供以下信息,谢谢
【详述】使用flink connector(1.2.9版本)导入数据到sr中,出现事务超时
【背景】多张表用同一个sink manager
【业务影响】
【是否存算分离】
【StarRocks版本】3.3.2
【集群规模】例如:3fe+6be
【机器信息】CPU虚拟核/内存/网卡/磁盘,例如:48C/64G/万兆/NVME
【表模型】主键模型
【导入或者导出方式】flink connector 1.2.9 版本
【联系方式】社区群15 redscarf
【附件】
- fe.log/be.INFO/相应截图
- 完整的报错异常栈
2024-09-26 02:46:37
java.lang.Exception: Could not perform checkpoint 38528 for operator Source: operator (1/3)#1.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1166)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1113)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:936)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:915)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:729)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",
"Message": "Transaction in processing, please retry later"
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:251)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:918)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:936)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:918)
... 3 more
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",
"Message": "Transaction in processing, please retry later"
}
errorLog: null
at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:232)
at com.starrocks.data.load.stream.v2.TransactionTableRegion.doCommit(TransactionTableRegion.java:303)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 38528 for operator (1/3)#1. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1154)
... 14 more
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: 6cce80f5-c259-4b2b-a22b-ecdc43e05840,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",
"Message": "Transaction in processing, please retry later"
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.snapshotState(StarRocksDynamicSinkFunctionV2.java:264)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
... 25 more
Caused by: [CIRCULAR REFERENCE: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: 6cce80f5-c259-4b2b-a22b-ecdc43e05840,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",
"Message": "Transaction in processing, please retry later"
}
errorLog: null]
查看BE节点日志,发现FILE_SCAN_NODE耗时10分钟,导致事务超时,BE节点日志如下所示:
I20240926 02:57:07.476316 140236708935232 plan_fragment_executor.cpp:496] Fragment 324aba29-eeb5-95b5-08bc-c0e011086095:(Active: 10m30s, non-child: 0.00%)
- InstanceAllocatedMemoryUsage: 10.05 MB
- InstanceDeallocatedMemoryUsage: 1.79 MB
- InstancePeakMemoryUsage: 12.26 MB
- MemoryLimit: -1.00 B
- RowsProduced: 408
OlapTableSink:(Active: 29.116ms, non-child: 0.00%)
- TxnID: 26910271
- IndexNum: 1
- ReplicatedStorage: true
- AutomaticPartition: true
- AutomaticBucketSize: 0
- AllocAutoIncrementTime: 248.000ns
- CloseWaitTime: 0.000ns
- OpenTime: 19.338ms
- PrepareDataTime: 556.603us
- ConvertChunkTime: 39.915us
- ValidateDataTime: 362.306us
- RowsFiltered: 0
- RowsRead: 408
- RowsReturned: 408
- RpcClientSideTime: 0.000ns
- RpcServerSideTime: 0.000ns
- RpcServerWaitFlushTime: 0.000ns
- SendDataTime: 5.494ms
- PackChunkTime: 2.120ms
- SendRpcTime: 0.000ns
- CompressTime: 0.000ns
- SerializeChunkTime: 0.000ns
- WaitResponseTime: 0.000ns
FILE_SCAN_NODE (id=0):(Active: 10m30s, non-child: 100.00%)
- BytesRead: 0
- IOTaskExecTime: 0.000ns
- IOTaskWaitTime: 0.000ns
- NumDiskAccess: 0
- PeakMemoryUsage: 0
- RowsRead: 0
- RowsReturned: 408
- RowsReturnedRate: 0
- ScanTime: 10m30s
- ScannerQueueCounter: 1
- ScannerQueueTime: 4.420us
- ScannerThreadsInvoluntaryContextSwitches: 0
- ScannerThreadsTotalWallClockTime: 0.000ns
- MaterializeTupleTime(*): 0.000ns
- ScannerThreadsSysTime: 0.000ns
- ScannerThreadsUserTime: 0.000ns
- ScannerThreadsVoluntaryContextSwitches: 0
- TotalRawReadTime(*): 0.000ns
- TotalReadThroughput: 0.00 /sec
DataSource:
- DataSourceType: FileDataSource
- FileScanner: 0
- CastChunkTime: 0.000ns
- CreateChunkTime: 0.000ns
- FileReadCount: 0
- FileReadTime: 0.000ns
- FillTime: 0.000ns
- MaterializeTime: 0.000ns
- ReadTime: 0.000ns
- ScannerTotalTime: 0.000ns
主键模型的写入原理是 DELETE + INSERT,DELETE的过程会去查找是否有数据,所以有FILE_SCAN_NODE,但是当时磁盘IO没有跑满,为什么查询会耗时10分钟呢?
IO监控