[主键模型] flink 导入数据 事务超时

为了更快的定位您的问题,请提供以下信息,谢谢
【详述】使用flink connector(1.2.9版本)导入数据到sr中,出现事务超时
【背景】多张表用同一个sink manager
【业务影响】
【是否存算分离】
【StarRocks版本】3.3.2
【集群规模】例如:3fe+6be
【机器信息】CPU虚拟核/内存/网卡/磁盘,例如:48C/64G/万兆/NVME
【表模型】主键模型
【导入或者导出方式】flink connector 1.2.9 版本
【联系方式】社区群15 redscarf
【附件】

  • fe.log/be.INFO/相应截图
  • 完整的报错异常栈

2024-09-26 02:46:37

java.lang.Exception: Could not perform checkpoint 38528 for operator Source: operator (1/3)#1.

    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1166)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1113)

    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)

    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)

    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)

    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)

    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)

    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:936)

    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:915)

    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:729)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)

    at java.base/java.lang.Thread.run(Thread.java:829)

    Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840,

responseBody: {

    "Status": "TXN_IN_PROCESSING",

    "Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",

    "Message": "Transaction in processing, please retry later"

}

errorLog: null

        at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)

        at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)

        at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:251)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)

        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)

        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025)

        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)

        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)

        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)

        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:918)

        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:936)

        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:918)

        ... 3 more

    Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840,

responseBody: {

    "Status": "TXN_IN_PROCESSING",

    "Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",

    "Message": "Transaction in processing, please retry later"

}

errorLog: null

        at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:232)

        at com.starrocks.data.load.stream.v2.TransactionTableRegion.doCommit(TransactionTableRegion.java:303)

        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)

        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

        ... 1 more

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 38528 for operator  (1/3)#1. Failure reason: Checkpoint was declined.

    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)

    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)

    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)

    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)

    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)

    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)

    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)

    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)

    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1154)

    ... 14 more

Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: 6cce80f5-c259-4b2b-a22b-ecdc43e05840,

responseBody: {

    "Status": "TXN_IN_PROCESSING",

    "Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",

    "Message": "Transaction in processing, please retry later"

}

errorLog: null

    at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)

    at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)

    at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.snapshotState(StarRocksDynamicSinkFunctionV2.java:264)

    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)

    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)

    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)

    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)

    ... 25 more

Caused by: [CIRCULAR REFERENCE: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: database-name, table: table-name, label: 6cce80f5-c259-4b2b-a22b-ecdc43e05840,

responseBody: {

    "Status": "TXN_IN_PROCESSING",

    "Label": "flink-6cce80f5-c259-4b2b-a22b-ecdc43e05840",

    "Message": "Transaction in processing, please retry later"

}

errorLog: null]

查看BE节点日志,发现FILE_SCAN_NODE耗时10分钟,导致事务超时,BE节点日志如下所示:

I20240926 02:57:07.476316 140236708935232 plan_fragment_executor.cpp:496] Fragment 324aba29-eeb5-95b5-08bc-c0e011086095:(Active: 10m30s, non-child: 0.00%)
   - InstanceAllocatedMemoryUsage: 10.05 MB
   - InstanceDeallocatedMemoryUsage: 1.79 MB
   - InstancePeakMemoryUsage: 12.26 MB
   - MemoryLimit: -1.00 B
   - RowsProduced: 408
  OlapTableSink:(Active: 29.116ms, non-child: 0.00%)
     - TxnID: 26910271
     - IndexNum: 1
     - ReplicatedStorage: true
     - AutomaticPartition: true
     - AutomaticBucketSize: 0
     - AllocAutoIncrementTime: 248.000ns
     - CloseWaitTime: 0.000ns
     - OpenTime: 19.338ms
     - PrepareDataTime: 556.603us
       - ConvertChunkTime: 39.915us
       - ValidateDataTime: 362.306us
     - RowsFiltered: 0
     - RowsRead: 408
     - RowsReturned: 408
     - RpcClientSideTime: 0.000ns
     - RpcServerSideTime: 0.000ns
     - RpcServerWaitFlushTime: 0.000ns
     - SendDataTime: 5.494ms
       - PackChunkTime: 2.120ms
       - SendRpcTime: 0.000ns
         - CompressTime: 0.000ns
         - SerializeChunkTime: 0.000ns
       - WaitResponseTime: 0.000ns
  FILE_SCAN_NODE (id=0):(Active: 10m30s, non-child: 100.00%)
     - BytesRead: 0
     - IOTaskExecTime: 0.000ns
     - IOTaskWaitTime: 0.000ns
     - NumDiskAccess: 0
     - PeakMemoryUsage: 0
     - RowsRead: 0
     - RowsReturned: 408
     - RowsReturnedRate: 0
     - ScanTime: 10m30s
     - ScannerQueueCounter: 1
     - ScannerQueueTime: 4.420us
     - ScannerThreadsInvoluntaryContextSwitches: 0
     - ScannerThreadsTotalWallClockTime: 0.000ns
       - MaterializeTupleTime(*): 0.000ns
       - ScannerThreadsSysTime: 0.000ns
       - ScannerThreadsUserTime: 0.000ns
     - ScannerThreadsVoluntaryContextSwitches: 0
     - TotalRawReadTime(*): 0.000ns
     - TotalReadThroughput: 0.00 /sec
    DataSource:
       - DataSourceType: FileDataSource
       - FileScanner: 0
         - CastChunkTime: 0.000ns
         - CreateChunkTime: 0.000ns
         - FileReadCount: 0
         - FileReadTime: 0.000ns
         - FillTime: 0.000ns
         - MaterializeTime: 0.000ns
         - ReadTime: 0.000ns
       - ScannerTotalTime: 0.000ns

主键模型的写入原理是 DELETE + INSERT,DELETE的过程会去查找是否有数据,所以有FILE_SCAN_NODE,但是当时磁盘IO没有跑满,为什么查询会耗时10分钟呢?

IO监控