【详述】使用1.2.9的flink connector长时间往starrocks里写数据时,会时不时出现Transaction in processing, please retry later。
从be的代码来看,出现这个错误只会是
或者
但从日志看,这两步都没有运行到。
并且这个时候,会自动进行rollback,但是rollback也是报一样的错。直到到达某个时间点才能正确rollback。
【业务影响】Flink会重启
【是否存算分离】否
【StarRocks版本】例如:3.3.4
【集群规模】3fe +6be
【机器信息】be cpu 8core 32G
【附件】
Flink的报错 2024-12-24 06:02:05
java.lang.Exception: Could not perform checkpoint 55776 for operator Source: common:hybrid -> Aviator script process -> (Split Business Model -> Tag Privilege Flag -> Aviator script filter -> (ToStarRocksRowData -> Sink: TEST:StarRocksSink, Process), TABLE:TEST:FORWARD) (2/3)#14.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1166)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1113)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:936)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:915)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:729)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: test, table: TEST, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "storage-d06d48f1-2643-45a4-9e06-579b34f08dfc",
"Message": "Transaction in processing, please retry later"
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:251)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:918)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:936)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:918)
... 3 more
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: test, table: Test, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "storage-d06d48f1-2643-45a4-9e06-579b34f08dfc",
"Message": "Transaction in processing, please retry later"
}
errorLog: null
at com.starrocks.data.load.stream.DefaultStreamLoader.sendToSR(DefaultStreamLoader.java:349)
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$3(DefaultStreamLoader.java:176)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 55776 for operator Source: common:hybrid -> Aviator script process -> (Split Business Model -> Tag Privilege Flag -> Aviator script filter -> (ToStarRocksRowData -> Sink: TEST:StarRocksSink, Process), TABLE:TEST:FORWARD) (2/3)#14. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1154)
... 14 more
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: test, table: Test, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "storage-d06d48f1-2643-45a4-9e06-579b34f08dfc",
"Message": "Transaction in processing, please retry later"
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.snapshotState(StarRocksDynamicSinkFunctionV2.java:264)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
... 25 more
Caused by: [CIRCULAR REFERENCE: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: test, table: Test, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc,
responseBody: {
"Status": "TXN_IN_PROCESSING",
"Label": "storage-d06d48f1-2643-45a4-9e06-579b34f08dfc",
"Message": "Transaction in processing, please retry later"
}
errorLog: null]
fe相关日志
fe.log:2024-12-24 06:02:05.442+08:00 INFO (nioEventLoopGroup-6-1|125) [TransactionWithoutChannelHandler.handle():65] Handle transaction without channel info, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc
fe.log:2024-12-24 06:02:05.442+08:00 INFO (nioEventLoopGroup-6-1|125) [TransactionLoadAction.executeTransaction():244] Redirect transaction action to destination=TNetworkAddress(hostname:starrocks-be-2.starrocks-be-search.incubator.svc.cluster.local, port:8040), db: test, table: Test, op: TXN_BEGIN, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc
fe.log:2024-12-24 06:02:05.443+08:00 INFO (thrift-server-pool-3228705|6416260) [FrontendServiceImpl.loadTxnBegin():1147] receive txn begin request, db: test, tbl: Test, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, backend: 10.244.116.130
fe.log:2024-12-24 06:02:05.443+08:00 INFO (thrift-server-pool-3228705|6416260) [DatabaseTransactionMgr.beginTransaction():186] begin transaction: txn_id: 39482937 with label storage-d06d48f1-2643-45a4-9e06-579b34f08dfc from coordinator BE: 10.244.116.130, listner id: -1
fe.log:2024-12-24 06:02:05.470+08:00 INFO (nioEventLoopGroup-6-5|179) [TransactionWithoutChannelHandler.handle():65] Handle transaction without channel info, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc
fe.log:2024-12-24 06:02:05.470+08:00 INFO (nioEventLoopGroup-6-5|179) [TransactionLoadAction.executeTransaction():244] Redirect transaction action to destination=TNetworkAddress(hostname:starrocks-be-2.starrocks-be-search.incubator.svc.cluster.local, port:8040), db: test, table: Test, op: TXN_LOAD, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc
fe.log:2024-12-24 06:02:05.837+08:00 INFO (nioEventLoopGroup-6-1|125) [TransactionWithoutChannelHandler.handle():65] Handle transaction without channel info, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc
fe.log:2024-12-24 06:02:05.837+08:00 INFO (nioEventLoopGroup-6-1|125) [TransactionLoadAction.executeTransaction():244] Redirect transaction action to destination=TNetworkAddress(hostname:starrocks-be-2.starrocks-be-search.incubator.svc.cluster.local, port:8040), db: test, table: Test, op: TXN_ROLLBACK, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc
fe.log:2024-12-24 06:02:05.854+08:00 INFO (thrift-server-pool-3228788|6416361) [DatabaseTransactionMgr.abortTransaction():552] transaction:[TransactionState. txn_id: 39482937, label: storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, db id: 49098906, table id list: 54200359, callback id: -1, coordinator: BE: 10.244.116.130, transaction status: ABORTED, error replicas num: 0, replica ids: , prepare time: 1734991325443, write end time: -1, allow commit time: -1, commit time: -1, finish time: 1734991325853, total cost: 410ms, reason: transaction is aborted by user. attachment: com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment@4730172d] successfully rollback
be相关日志
I20241224 06:02:05.443726 140713024546368 transaction_mgr.cpp:190] new transaction manage request. id=704144e686e27e6e-6339a9b3a2bb88b2, job_id=-1, txn_id: 39482937, label=storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, db=test, tbl=Test op=begin
I20241224 06:02:05.471338 140713007760960 transaction_stream_load.cpp:237] new transaction load request.id=704144e686e27e6e-6339a9b3a2bb88b2, job_id=-1, txn_id: 39482937, label=storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, db=test, tbl=Test
I20241224 06:02:05.484226 140713007760960 stream_load_executor.cpp:77] begin to execute job. label=storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, txn_id: 39482937, query_id=704144e6-86e2-7e6e-6339-a9b3a2bb88b2
I20241224 06:02:05.495729 140713511310912 local_tablets_channel.cpp:715] LocalTabletsChannel txn_id: 39482937 load_id: 704144e6-86e2-7e6e-6339-a9b3a2bb88b2 open 49 delta writers, 0 failed_tablets: _num_remaining_senders: 1
I20241224 06:02:05.838148 140713007760960 transaction_mgr.cpp:213] new transaction manage request. id=704144e686e27e6e-6339a9b3a2bb88b2, job_id=-1, txn_id: 39482937, label=storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, db=test, tbl=Test op=rollback
I20241224 06:02:05.838190 140713007760960 transaction_mgr.cpp:369] Rollback transaction id=704144e686e27e6e-6339a9b3a2bb88b2, job_id=-1, txn_id: 39482937, label=storage-d06d48f1-2643-45a4-9e06-579b34f08dfc, db=test
I20241224 06:02:05.839736 140713528096320 local_tablets_channel.cpp:583] LocalTabletsChannel txn_id: 39482937 load_id: 704144e6-86e2-7e6e-6339-a9b3a2bb88b2 commit 0 tablets:
I20241224 06:02:05.840155 140713662379584 local_tablets_channel.cpp:756] cancel LocalTabletsChannel txn_id: 39482937 load_id: 704144e686e27e6e-6339a9b3a2bb88b2 index_id: 54200360 tablet_ids:54919213
I20241224 06:02:05.841484 140713536489024 local_tablets_channel.cpp:756] cancel LocalTabletsChannel txn_id: 39482937 load_id: 704144e686e27e6e-6339a9b3a2bb88b2 index_id: 54200360 tablet_ids:109993244,108994617,109993220,108360749,106541815,106541791,108994641,106541775,54919209,107118574,108360725,107118598,110593260,107767817,110593284,107767841
I20241224 06:02:05.842079 140714130269760 local_tablets_channel.cpp:756] cancel LocalTabletsChannel txn_id: 39482937 load_id: 704144e686e27e6e-6339a9b3a2bb88b2 index_id: 54200360 tablet_ids:110593244,109993228,108994625,107118558,54919221,108994601,54919197,106541799,107767801,110593268,107767825,107118582,108360709,54919181,109993204,108360733
I20241224 06:02:05.874101 140660587886144 agent_server.cpp:460] Submit task success. type=CLEAR_TRANSACTION_TASK, signature=39482937, task_count_in_queue=3
I20241224 06:02:05.875324 140594168522304 agent_task.cpp:299] get clear transaction task task, signature:39482937, txn_id: 39482937, partition id size: 0
I20241224 06:02:05.875333 140594168522304 storage_engine.cpp:705] Clearing transaction task txn_id: 39482937
I20241224 06:02:05.875340 140594168522304 storage_engine.cpp:726] Cleared transaction task txn_id: 39482937
I20241224 06:02:05.875396 140594168522304 agent_task.cpp:321] finish to clear transaction task. signature:39482937, txn_id: 39482937
I20241224 06:02:05.875608 140594168522304 agent_task.cpp:157] Remove task success. type=CLEAR_TRANSACTION_TASK, signature=39482937, task_count_in_queue=0
I20241224 06:02:08.019524 140595896632896 tablet_sink_sender.cpp:334] Olap table sink statistics. load_id: 704144e6-86e2-7e6e-6339-a9b3a2bb88b2, txn_id: 39482937, add chunk time(ms)/wait lock time(ms)/num: {5574729:(11)(0)(1)} {11150:(11)(0)(1)} {11159:(2173)(0)(1)} {11107:(10)(0)(1)} {11001:(12)(0)(1)} {5201573:(11)(0)(1)}
- TxnID: 39482937