为了更快的定位您的问题,请提供以下信息,谢谢
【详述】Flink 流式写入SR,在HDFS集群繁忙时候会有错误。
【背景】做过哪些操作?
【业务影响】
【是否存算分离】存算分离
【StarRocks版本】例如:3.2.2
【集群规模】例如:3fe+5cn(fe与cn混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
Flink 流式写入SR,正常时间点平稳运行。但是在HDFS集群繁忙时候会有错误
SR端报错:
2024-03-26 03:06:22,819 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaSource-default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_src -> NotNullEnforcer(fields=[ORDER_ID]) -> Sink: Sink(table=[default_catalog.olpycdg.ONLINE_PYC__NETPAY_ORDER_sink], fields=[ORDER_ID, MERCH_ORDER_ID, MERCH_ID, ORDER_AMT, GOODS_INFO, CREATE_TM, PAY_TM, ORDER_STATUS, CAN_REFUND, REFUNDED_AMT, RESERVER1, RESERVER2, RESERVER3, RESERVER4, TRANS_TM, MBR_ID, BUSS_CODE, MERCH_FEE, CARD_NO_SM4, CARD_NAME_SM4, CARD_INFO_SM3, ACC_SPLIT_TAG, ORGAN_ID, ORGAN_NAME, CALLBACK, TRAN_CODE, REQ_DATE, PAYWAY, CHK_FLAG, BILL_ID, ROUTE_MERCH_CODE, CHNL_PAYWAY, ORI_ORDER_ID, SCENE, REQ_CNT, RET_TYPE, RET_CODE, RET_MSG, EXTEND1, EXTEND2, EXTEND3, SETTLE_TYPE, REMOTE_SN, CHANNEL_SN, RMT_CHANNEL_CODE, LIMIT_PAY, SETTLE_DATE, PAY_TYPE, EXPIRE_TIME, CARD_TYPE, ID_CARD_NO, IS_DISCOUNT, CARD_NO, REFER_NO, CHANNEL_MERCH_NO, CHANNEL_TRADE_NO, BUYER_ID, BUYER_ACCOUNT, SUB_APP_ID, USER_ID, TRADE_NO, RMT_EXT1, RMT_EXT2, RMT_EXT3, RMT_EXT4, POS_SN, PRE_ORDER_ID, PREAUTH_FLAG, SETTLE_RATE, SETTLE_MAX_FEE, SETTLE_MIN_FEE, SETTLE_SINGLE_FEE, SETTLE_ALGORITHM, BIZ_CODE, NITI_COUNT, STORE_ID, NOTI_STATUS, LIMIT_COUNT, ORDER_SETTLE_AMT, ORI_CREATE_TM, ORI_SETTLE_DATE, ACCOUNT_NO, ACCOUNT_NOTI_CODE]) (4/5) (7cde6077b64f992343003849fac688b8) switched from RUNNING to FAILED on container_e14_1643018116527_1858_01_000016 @ bigdata05 (dataPort=1318).
java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: srods, table: onlpaydb_payment_netpay_order2, label: flink-dd54854f-d0e0-45fd-a346-178c10f25fa1,
responseBody: {
“Status”: “INTERNAL_ERROR”,
“Message”: “10.3.40.84: starlet err Close hdfs file /user/starrocks/7dc34e9a-b5b1-4237-9451-85bb21e79db3/11995/11994/log/0000000000002EDE_000000000001B2C4.log error: Unknown error 255: Unknown error 255”
}
errorLog: null
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:427) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.write(StreamLoadManagerV2.java:252) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:198) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) ~[flink-connector-kafka_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) ~[flink-connector-kafka_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-table_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: srods, table: onlpaydb_payment_netpay_order2, label: flink-dd54854f-d0e0-45fd-a346-178c10f25fa1,
responseBody: {
“Status”: “INTERNAL_ERROR”,
“Message”: “10.3.40.84: starlet err Close hdfs file /user/starrocks/7dc34e9a-b5b1-4237-9451-85bb21e79db3/11995/11994/log/0000000000002EDE_000000000001B2C4.log error: Unknown error 255: Unknown error 255”
}
errorLog: null
at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:221) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.TransactionTableRegion.commit(TransactionTableRegion.java:247) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:210) ~[flink-connector-starrocks-1.2.9_flink-1.14_2.11.jar:?]
… 1 more
进一步定位,发现问题出在HDFS上
HDFS:
2024-03-26 03:06:41,320 INFO org.apache.hadoop.ipc.Server: IPC Server handler 48 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 10.3.40.81:14828 Call#34485017 Retry#0
org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException: Not replicated yet: /user/starrocks/7dc34e9a-b5b1-4237-9451-85bb21e79db3/11995/11994/data/000000000001b2c7_a4ca485e-9274-497c-b7d6-aa5ce5cf40e7.dat.fd737921-7f53-47a4-a2bb-85623ea28db9.TEMP.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3623)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3412)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:688)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:217)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:506)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)
2024-03-26 03:06:41,727 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: BLOCK* checkFileProgress: blk_1780026042_706535163{blockUCState=COMMITTED, primaryNodeIndex=-1, replicas=[ReplicaUnderConstruction[[DISK]DS-a2b1a4a6-63b3-4d94-9bb5-17bd4ffc9b06:NORMAL:10.3.40.23:50010|RBW], ReplicaUnderConstruction[[DISK]DS-e0c70bcd-fd99-4302-8939-24ebe2a7ca3f:NORMAL:10.3.40.31:50010|RBW]]} has not reached minimal replication 2
这个时间点也是HDFS最繁忙时间点
看起来是 hdfs性能 跟不上写入,可以考虑 sr这边降并发,攒批时间长一点。