【存算分离】【3.3.2】使用BrokerLoad导入出现重复的分区

现象:使用brokerLoad导入数据到starrocks的列表达式分区表出现重复分区,分区的动态创建没有进行去重唯一锁?

表使用列表达式分区。DDL如下

CREATE TABLE `test_dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733` (
  `userid` varchar(65533) NULL COMMENT "",
  `serviceid` varchar(65533) NULL COMMENT "",
  `total_pv` varchar(65533) NULL COMMENT "",
  `item_cnt` varchar(65533) NULL COMMENT "",
  `pv_ls` varchar(65533) NULL COMMENT "",
  `item_cnt_ls` varchar(65533) NULL COMMENT "",
  `spm_pv_info` array<varchar(65533)> NULL COMMENT "",
  `spm_item_info` array<varchar(65533)> NULL COMMENT "",
  `sessionid` varchar(65533) NULL COMMENT "",
  `referer_pv_info` array<varchar(65533)> NULL COMMENT "",
  `referer_item_info` array<varchar(65533)> NULL COMMENT "",
  `x_page_url_info` array<varchar(65533)> NULL COMMENT "",
  `x_page_url_item_info` array<varchar(65533)> NULL COMMENT "",
  `ds` varchar(65533) NULL COMMENT "",
  `src` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`userid`)
COMMENT "OLAP"
PARTITION BY (`ds`,`src`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"bucket_size" = "4294967296"
);

执行如下语法:

LOAD LABEL dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733_0QkTIVBxR82fEdazmWhd1w
(
    DATA INFILE("oss://export/dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733_0QkTIVBxR82fEdazmWhd1w/*")
    INTO TABLE dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733
    FORMAT AS "csv"
    (
        enclose = "`"
    )
    (userid,serviceid,total_pv,item_cnt,pv_ls,item_cnt_ls,spm_pv_info,spm_item_info,sessionid,referer_pv_info,referer_item_info,x_page_url_info,x_page_url_item_info,ds,src)
)
WITH BROKER
(
"fs.oss.accessKeyId"="6DDI**************T5fG",
"fs.oss.accessKeySecret"="LTAI********************wm7f",
"fs.oss.endpoint"="xxx"
)
PROPERTIES
(
"timeout"="7200"
);

结果出现多个重名分区,数据分布到多个分区。 有什么方式可以优化这个问题? 必须要预先创建分区嘛?

@lvlouisaslia @trueeyu 两位大佬有空帮忙看看 :pray: :pray:

这是distributed by random的行为. 所有同名分区(分区ID不重复)都是属于同一个逻辑分区的数据. 查询时优化器也会当作同一个分区处理.

只要确认没有数据写错分区就可以

2025-03-06 14:38:31.380+08:00 INFO (AutoStatistic|27) [DatabaseTransactionMgr.abortTransaction():550] transaction:[TransactionState. txn_id: 5908706, label: insert_9eb4fc07-fa55-11ef-b028-6e7f9fad8393, db id: 10007, table id list: 10009, callback id: -1, coordinator: FE: starrocks-share-data-sdata-fe-1.starrocks-share-data-sdata-fe-search.starrocks.svc.cluster.local, transaction status: ABORTED, error replicas num: 0, replica ids: , prepare time: 1741243111360, write end time: -1, allow commit time: -1, commit time: -1, finish time: 1741243111380, total cost: 20ms, reason: ] successfully rollback
2025-03-06 14:38:31.381+08:00 WARN (AutoStatistic|27) [StmtExecutor.execute():737] execute Exception, sql INSERT INTO _statistics_.table_statistic_v1 WITH base_cte_table as (SELECT * FROM (SELECT `userid` as col_1, `serviceid` as col_2, `total_pv` as col_3, `item_cnt` as col_4, `pv_ls` as col_5, `item_cnt_ls` as col_6, `sessionid` as col_7, `ds` as col_8, `src` as col_9 FROM `sdata`.`dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733` TABLET(3958022, 3958023, 3958019, 3958024, 3958021, 3958020) WHERE rand() <= 0.01 LIMIT 200000) t_medium_high) SELECT 3678711, 'userid', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_1 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'serviceid', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_2 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'total_pv', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_3 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'item_cnt', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_4 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'pv_ls', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_5 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'item_cnt_ls', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_6 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'sessionid', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_7 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'ds', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_8 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1 UNION ALL SELECT 3678711, 'src', 10071, 'sdata.dx_behavior_s_sec_eco_pre_page_to_detail_fdt_186733', 'sdata', IFNULL(SUM(t1.count), 0), IFNULL(SUM(t1.count), 0) * IFNULL(SUM(CHAR_LENGTH(column_key)) / COUNT(1), 0), IFNULL(SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * 0.002067061425789518), COUNT(1)), IFNULL(SUM(IF(t1.column_key IS NULL, t1.count, 0)), 0), IFNULL(MAX(LEFT(column_key, 200)), ''), IFNULL(MIN(LEFT(column_key, 200)), ''), NOW() FROM (SELECT t0.`column_key`, COUNT(1) as count FROM (SELECT col_9 AS column_key FROM `base_cte_table`) as t0 GROUP BY t0.column_key) AS t1
java.lang.IllegalStateException: null
        at com.google.common.base.Preconditions.checkState(Preconditions.java:496) ~[spark-dpp-1.0.0.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalOlapScan(PlanFragmentBuilder.java:822) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalOlapScan(PlanFragmentBuilder.java:408) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator.accept(PhysicalOlapScanOperator.java:201) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visit(PlanFragmentBuilder.java:456) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalDistribution(PlanFragmentBuilder.java:2286) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalDistribution(PlanFragmentBuilder.java:408) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.optimizer.operator.physical.PhysicalDistributionOperator.accept(PhysicalDistributionOperator.java:75) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visit(PlanFragmentBuilder.java:456) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalLimit(PlanFragmentBuilder.java:3185) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalLimit(PlanFragmentBuilder.java:408) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.optimizer.operator.physical.PhysicalLimitOperator.accept(PhysicalLimitOperator.java:58) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visit(PlanFragmentBuilder.java:456) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalCTEProduce(PlanFragmentBuilder.java:3279) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalCTEProduce(PlanFragmentBuilder.java:408) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.optimizer.operator.physical.PhysicalCTEProduceOperator.accept(PhysicalCTEProduceOperator.java:46) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visit(PlanFragmentBuilder.java:456) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalCTEAnchor(PlanFragmentBuilder.java:3296) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visitPhysicalCTEAnchor(PlanFragmentBuilder.java:408) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.optimizer.operator.physical.PhysicalCTEAnchorOperator.accept(PhysicalCTEAnchorOperator.java:54) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.visit(PlanFragmentBuilder.java:456) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder$PhysicalPlanTranslator.translate(PlanFragmentBuilder.java:421) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.plan.PlanFragmentBuilder.createPhysicalPlan(PlanFragmentBuilder.java:240) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.InsertPlanner.buildExecPlan(InsertPlanner.java:527) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.InsertPlanner.plan(InsertPlanner.java:315) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.StatementPlanner.planInsertStmt(StatementPlanner.java:199) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.StatementPlanner.plan(StatementPlanner.java:140) ~[starrocks-fe.jar:?]
        at com.starrocks.sql.StatementPlanner.plan(StatementPlanner.java:93) ~[starrocks-fe.jar:?]
        at com.starrocks.qe.StmtExecutor.execute(StmtExecutor.java:534) ~[starrocks-fe.jar:?]
        at com.starrocks.statistic.StatisticsCollectJob.collectStatisticSync(StatisticsCollectJob.java:151) ~[starrocks-fe.jar:?]
        at com.starrocks.statistic.SampleStatisticsCollectJob.collect(SampleStatisticsCollectJob.java:79) ~[starrocks-fe.jar:?]
        at com.starrocks.statistic.StatisticExecutor.collectStatistics(StatisticExecutor.java:285) ~[starrocks-fe.jar:?]
        at com.starrocks.statistic.StatisticAutoCollector.runAfterCatalogReady(StatisticAutoCollector.java:86) ~[starrocks-fe.jar:?]
        at com.starrocks.common.util.FrontendDaemon.runOneCycle(FrontendDaemon.java:72) ~[starrocks-fe.jar:?]
        at com.starrocks.common.util.Daemon.run(Daemon.java:107) ~[starrocks-fe.jar:?]

因为我这边看到有一堆txn的failed 跟这个表的采集任务有关 这个异常和上面这个distributed BY RANDOM多分区有关联吗?

NPE这个问题在github issue单独开一个, 给研发去看, 只影响统计信息收集, 不影响导数查询