【详述】spark读取HDFS文件,然后写入Starrocks,核心代码参考的官网的代码:
resultDf.write.format(“starrocks”)
.option(“starrocks.fe.http.url”, “192.168.168.23:18030”)
.option(“starrocks.fe.jdbc.url”, “jdbc:mysql://192.168.168.23:9030”)
.option(“starrocks.table.identifier”, “app.event_raw_log”) // .option(“starrocks.write.num.partitions”, 300)
.option(“user”, “root”)
.option(“password”, “=X}-&?c>5H-V”)
.mode(“append”)
.save()
}
当数据量大一些的时候报错: 捕获到异常: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=0, partition=2) failed; but task commit success, data duplication may happen. reason=ExceptionFailure(java.io.IOException,Failed to commit, partitionId: 2, taskId: 0, epochId: -1,[Ljava.lang.StackTraceElement;@690cfa52,java.io.IOException: Failed to commit, partitionId: 2, taskId: 0, epochId: -1
at com.starrocks.connector.spark.sql.write.StarRocksDataWriter.commit(StarRocksDataWriter.java:69)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:482)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Failed to prepare transaction, please check taskmanager log for details, Transaction{database=‘app’, table=‘wlw_xdr_log’, label=‘5eef52d5-4c62-4906-8325-3eba67c86244’, finish=false}
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:395)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:327)
at com.starrocks.connector.spark.sql.write.StarRocksDataWriter.commit(StarRocksDataWriter.java:63)
… 15 more
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Failed to prepare transaction, please check taskmanager log for details, Transaction{database=‘app’, table=‘wlw_xdr_log’, label=‘5eef52d5-4c62-4906-8325-3eba67c86244’, finish=false}
at com.starrocks.data.load.stream.v2.TransactionTableRegion.commit(TransactionTableRegion.java:254)
at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:172)
… 1 more
,Some(org.apache.spark.ThrowableSerializationWrapper@362cb668),Vector(AccumulableInfo(7,None,Some(24093),None,false,true,None), AccumulableInfo(9,None,Some(0),None,false,true,None), AccumulableInfo(10,None,Some(712),None,false,true,None), AccumulableInfo(36,None,Some(17213830),None,false,true,None), AccumulableInfo(37,None,Some(251016),None,false,true,None)),Vector(LongAccumulator(id: 7, name: Some(internal.metrics.executorRunTime), value: 24093), LongAccumulator(id: 9, name: Some(internal.metrics.resultSize), value: 0), LongAccumulator(id: 10, name: Some(internal.metrics.jvmGCTime), value: 712), LongAccumulator(id: 36, name: Some(internal.metrics.input.bytesRead), value: 17213830), LongAccumulator(id: 37, name: Some(internal.metrics.input.recordsRead), value: 251016)),WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))