【详述】使用 flink-connector-starrocks导入大表数据报错(1千万数据)
【StarRocks版本】例如:2.5.0
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be独立部署)
【机器信息】CPU虚拟核/内存/网卡,例如:4C/16G/万兆
Flink Task的错误日志如下(3个异常):
java.io.InterruptedIOException: Call interrupted
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1556) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.ipc.Client.call(Client.java:1508) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.ipc.Client.call(Client.java:1405) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at com.sun.proxy.$Proxy34.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:523) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:431) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at com.sun.proxy.$Proxy35.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1113) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1866) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1668) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716) [flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
java.util.concurrent.CancellationException: null
at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_181]
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:60) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-dist_2.11-1.14.5.jar:1.14.5]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
java.lang.InterruptedException: null
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_181]
at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.flush(StarRocksSinkManagerV2.java:315) ~[flink-cdc-starrocks-1.0-SNAPSHOT.jar:?]
at data.zanxiang.cdc.starrocks.sink.StarRocksStreamLoadSink.snapshotState(StarRocksStreamLoadSink.java:101) ~[flink-cdc-starrocks-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1163) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.11-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14.5.jar:1.14.5]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]