starrocks版本: StarRocks version 3.0.5
环境: AWS线上环境
概述: 起初显示列个数不对,通过分隔符搞定,但是依然会周期性报下面的null错误.
DDL指定分隔符:
,'sink.properties.format' = 'csv'
,'sink.properties.column_separator' = '\x01'
,'sink.properties.row_delimiter' = '\x02'
报错内容:
2023-11-06 10:02:07,354 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job testbookng_data_sync (6e0c6a10e8911a4ccbb80e8316dd9cad) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) ~[flink-dist-1.15.1.jar:1.15.1]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) ~[flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) ~[flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_c902654b-9376-4f93-bfdf-dbd2cb1dc6c3.jar:1.15.1]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: java.lang.Exception: Could not perform checkpoint 100 for operator Source: auto_test_task_log[1] -> Calc[2] -> ConstraintEnforcer[3] -> Sink: auto_test_task_log_starrocks[3] (1/1)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of unknown exception, db: tuya_testbook_ng, table: auto_test_task_log, label: ee5d1166-4708-4ea4-87c0-c255e4bf2d88
at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.AssertNotException(StarRocksSinkManagerV2.java:368) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.flush(StarRocksSinkManagerV2.java:300) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:184) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:930) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of unknown exception, db: tuya_testbook_ng, table: auto_test_task_log, label: ee5d1166-4708-4ea4-87c0-c255e4bf2d88
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:292) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:118) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
… 1 more
Caused by: java.lang.NullPointerException
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:262) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:118) ~[flink-connector-starrocks-1.2.6_flink-1.15.jar:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
… 1 more