为了更快的定位您的问题,请提供以下信息,谢谢
【详述】使用flink sql 从kafka 流式到StarRocks,运行了大概9个小时报错
【背景】做过哪些操作?
【业务影响】
【是否存算分离】存算分离
【StarRocks版本】例如:3.2.2
【集群规模】例如:3fe+5cn(fe与cn混部)
flink 版本:1.14
报错日志:
2024-03-23 03:05:10,907 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaSource-default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_src -> NotNullEnforcer(fields=[ORDER_ID]) -> Sink: Sink(table=[default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_sink], fields=[ORDER_ID, MERCH_ORDER_ID, MERCH_ID, ORDER_AMT, GOODS_INFO, CREATE_TM, PAY_TM, ORDER_STATUS, CAN_REFUND, REFUNDED_AMT, RESERVER1, RESERVER2, RESERVER3, RESERVER4, TRANS_TM, MBR_ID, BUSS_CODE, MERCH_FEE, CARD_NO_SM4, CARD_NAME_SM4, CARD_INFO_SM3, ACC_SPLIT_TAG, ORGAN_ID, ORGAN_NAME, CALLBACK, TRAN_CODE, REQ_DATE, PAYWAY, CHK_FLAG, BILL_ID, ROUTE_MERCH_CODE, CHNL_PAYWAY, ORI_ORDER_ID, SCENE, REQ_CNT, RET_TYPE, RET_CODE, RET_MSG, EXTEND1, EXTEND2, EXTEND3, SETTLE_TYPE, REMOTE_SN, CHANNEL_SN, RMT_CHANNEL_CODE, LIMIT_PAY, SETTLE_DATE, PAY_TYPE, EXPIRE_TIME, CARD_TYPE, ID_CARD_NO, IS_DISCOUNT, CARD_NO, REFER_NO, CHANNEL_MERCH_NO, CHANNEL_TRADE_NO, BUYER_ID, BUYER_ACCOUNT, SUB_APP_ID, USER_ID, TRADE_NO, RMT_EXT1, RMT_EXT2, RMT_EXT3, RMT_EXT4, POS_SN, PRE_ORDER_ID, PREAUTH_FLAG, SETTLE_RATE, SETTLE_MAX_FEE, SETTLE_MIN_FEE, SETTLE_SINGLE_FEE, SETTLE_ALGORITHM, BIZ_CODE, NITI_COUNT, STORE_ID, NOTI_STATUS, LIMIT_COUNT, ORDER_SETTLE_AMT, ORI_CREATE_TM, ORI_SETTLE_DATE, ACCOUNT_NO, ACCOUNT_NOTI_CODE]) (2/5) (c74a92659e23ac53444fd56955b3b0bf) switched from RUNNING to FAILED on container_e14_1643018116527_1858_01_000010 @ bigdata02 (dataPort=7483).
java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: srods, table: onlpaydb_payment_netpay_order2, label: flink-c00096e4-dfcf-4e7a-8e3e-ddab686d8ff5,
responseBody: {
“Status”: “INTERNAL_ERROR”,
“Message”: “10.3.40.83: starlet err Close hdfs file /user/starrocks/7dc34e9a-b5b1-4237-9451-85bb21e79db3/11995/11994/log/0000000000002EDD_0000000000014DAE.log error: Unknown error 255: Unknown error 255”
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:427) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.write(StreamLoadManagerV2.java:252) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:198) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) ~[flink-connector-kafka_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) ~[flink-connector-kafka_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-table_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: srods, table: onlpaydb_payment_netpay_order2, label: flink-c00096e4-dfcf-4e7a-8e3e-ddab686d8ff5,
responseBody: {
“Status”: “INTERNAL_ERROR”,
“Message”: “10.3.40.83: starlet err Close hdfs file /user/starrocks/7dc34e9a-b5b1-4237-9451-85bb21e79db3/11995/11994/log/0000000000002EDD_0000000000014DAE.log error: Unknown error 255: Unknown error 255”
}
errorLog: null
at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:221) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.TransactionTableRegion.commit(TransactionTableRegion.java:247) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:210) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
… 1 more
2024-03-23 03:05:10,911 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job d8f6239310e23f91eb911012a8dc0a8e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=4}]
2024-03-23 03:05:10,911 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_1.
2024-03-23 03:05:10,911 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 1 of source Source: KafkaSource-default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_src -> NotNullEnforcer(fields=[ORDER_ID]) -> Sink: Sink(table=[default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_sink], fields=[ORDER_ID, MERCH_ORDER_ID, MERCH_ID, ORDER_AMT, GOODS_INFO, CREATE_TM, PAY_TM, ORDER_STATUS, CAN_REFUND, REFUNDED_AMT, RESERVER1, RESERVER2, RESERVER3, RESERVER4, TRANS_TM, MBR_ID, BUSS_CODE, MERCH_FEE, CARD_NO_SM4, CARD_NAME_SM4, CARD_INFO_SM3, ACC_SPLIT_TAG, ORGAN_ID, ORGAN_NAME, CALLBACK, TRAN_CODE, REQ_DATE, PAYWAY, CHK_FLAG, BILL_ID, ROUTE_MERCH_CODE, CHNL_PAYWAY, ORI_ORDER_ID, SCENE, REQ_CNT, RET_TYPE, RET_CODE, RET_MSG, EXTEND1, EXTEND2, EXTEND3, SETTLE_TYPE, REMOTE_SN, CHANNEL_SN, RMT_CHANNEL_CODE, LIMIT_PAY, SETTLE_DATE, PAY_TYPE, EXPIRE_TIME, CARD_TYPE, ID_CARD_NO, IS_DISCOUNT, CARD_NO, REFER_NO, CHANNEL_MERCH_NO, CHANNEL_TRADE_NO, BUYER_ID, BUYER_ACCOUNT, SUB_APP_ID, USER_ID, TRADE_NO, RMT_EXT1, RMT_EXT2, RMT_EXT3, RMT_EXT4, POS_SN, PRE_ORDER_ID, PREAUTH_FLAG, SETTLE_RATE, SETTLE_MAX_FEE, SETTLE_MIN_FEE, SETTLE_SINGLE_FEE, SETTLE_ALGORITHM, BIZ_CODE, NITI_COUNT, STORE_ID, NOTI_STATUS, LIMIT_COUNT, ORDER_SETTLE_AMT, ORI_CREATE_TM, ORI_SETTLE_DATE, ACCOUNT_NO, ACCOUNT_NOTI_CODE]).
2024-03-23 03:05:10,911 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_1.
2024-03-23 03:05:10,911 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_sink (d8f6239310e23f91eb911012a8dc0a8e) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_7b5ce868-c889-452e-9a02-2fec84f6908f.jar:1.14.2]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_301]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) [?:1.8.0_301]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) [?:1.8.0_301]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) [?:1.8.0_301]
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: srods, table: onlpaydb_payment_netpay_order2, label: flink-c00096e4-dfcf-4e7a-8e3e-ddab686d8ff5,
responseBody: {
“Status”: “INTERNAL_ERROR”,
“Message”: “10.3.40.83: starlet err Close hdfs file /user/starrocks/7dc34e9a-b5b1-4237-9451-85bb21e79db3/11995/11994/log/0000000000002EDD_0000000000014DAE.log error: Unknown error 255: Unknown error 255”
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:427) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.write(StreamLoadManagerV2.java:252) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:198) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
在fe.warn 和 cn.out 能看到和hdfs文件相关。
fe.warn.log.20240323-1 (1.1 MB) jni.INFO.log.20240323 (29.1 KB)