通过spark connector 读取 spark数据
版本为 1.1.2
不限定分区查询成功
限定分区查询
Duplicate key Field{name=’’, type=‘BIGINT’, comment=’’, precision=0, scale=0}
请问是什么原因
sr的版本以及spark的版本也提供下。
查询sql发一下,报错时的fe.log 和be.INFO 日志也提供下
spark 3.2
starRocks 2.12
create temporary view sr_temp_table USING starrocks OPTIONS
(
“starrocks.filter.query” = “dt=‘2024-02-01’”
)
sql:
select user_id,user_name
from sr_temp_table
error:
Caused by: java.lang.IllegalStateException: Duplicate key Field{name=’’, type=‘BIGINT’, comment=’’, precision=0, scale=0}
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1245)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.starrocks.connector.spark.serialization.RowBatch.(RowBatch.java:108)
at com.starrocks.connector.spark.rdd.ScalaValueReader.hasNext(ScalaValueReader.scala:201)
at com.starrocks.connector.spark.rdd.AbstractStarrocksRDDIterator.hasNext(AbstractStarrocksRDDIterator.scala:58)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:234)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1463)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
版本是不是写错了,sr目前还没有 2.12的版本,select current_versions(); 看下完整的版本信息
2.5.12
发一下starrocks.table.identifier的表 的建表语句
CREATE TABLE test_user
(
user_id
bigint(20) NOT NULL DEFAULT “0” COMMENT “id 。”,
user_name
varchar(65533) NOT NULL DEFAULT “0” COMMENT “name 。”,
state_time
date NOT NULL DEFAULT “1971-01-01” COMMENT “time。”,
) ENGINE=OLAP
PRIMARY KEY(user_id
, state_time
)
COMMENT “test”
PARTITION BY RANGE(state_time
)
(PARTITION p20231218 VALUES [(“2023-12-18”), (“2023-12-19”)),
)
DISTRIBUTED BY HASH(user_id
) BUCKETS 10
PROPERTIES (
“replication_num” = “3”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “DAY”,
“dynamic_partition.time_zone” = “Asia/Shanghai”,
“dynamic_partition.start” = “-66”,
“dynamic_partition.end” = “1”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.buckets” = “10”,
“dynamic_partition.history_partition_num” = “0”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”,
“compression” = “LZ4”
);
分区字段是 state_time的话,starrocks.filter.query 这里应该用 state_time 来做过滤
“starrocks.filter.query” = “state_time=‘2024-02-01’”
是用state_time的
“starrocks.filter.query” = “state_time=‘2024-02-01’”
然后就有上面的报错信息
如果是这样的条件
“starrocks.filter.query” = “user_id=‘123” 就没问题
确认了下,这个问题已经修复了,需要升级到 2.5 最新版 2.5.19
好的 是需要从2.5.12升级到2.5.19吗
是的,sr集群从 2.5.12升级到2.5.19,2.5.19安装包可以在官网下载
好的 谢谢
我也存在这个异常:
Caused by: java.lang.IllegalStateException: Duplicate key Field{name=’’, type=‘BIGINT’, comment=’’, precision=0, scale=0}
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1255)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at com.starrocks.connector.spark.serialization.RowBatch.(RowBatch.java:108)
at com.starrocks.connector.spark.rdd.ScalaValueReader.hasNext(ScalaValueReader.scala:201)
at com.starrocks.connector.spark.rdd.AbstractStarrocksRDDIterator.hasNext(AbstractStarrocksRDDIterator.scala:58)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:892)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:892)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1556)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Spark版本:3.3,starrocks集群使用版本是3.1.2.
请问这个版本有修复吗?
3.1.2版本上存在该问题,可以升级到3.1最新版。
该问题修复的pr合入 2.5.19+, 3.1.8+,3.2.3+ 版本,对应pr: https://github.com/StarRocks/starrocks/pull/39078
多谢大佬。
这个问题跟什么有关系,是表的schema有关系嘛?目前升级集群比较难,有没有能其他避免的办法?
大佬,不升版本的情况下解决了这个问题嘛?