为了更快的定位您的问题,请提供以下信息,谢谢
【详述】我需要从Kafka导数到starrocks,然后kafka的topic有两个表A和B的数据,我只需要其中表A的数据,表A数据量一天是几百,表B的数据量是几十万。然后过滤条件加上过滤表A的条件后,通过show routine load task where JobName= 'table_to_sr’查看到导入任务的结果都报错误previous task aborted because of all partitions have no load data。如果去掉source_table_name != 'tableA’则不会。其他过滤条件是一个保险,一般情况下不会出现。
【背景】:添加了where
条件语句,其中source_table_name不是主键
CREATE ROUTINE LOAD table_to_sr ON table_to_sr
COLUMNS(id,name,user_id,source_table_name),
WHERE
id IS NOT NULL
AND name IS NOT NULL
AND user_id IS NOT NULL
AND source_table_name != ‘tableA’
PROPERTIES
(
“strict_mode”=“TRUE”,
“desired_concurrent_number”=“5”,
“format” =“json”,
“jsonpaths” ="["$.data.id","$.data.name","$.data.user_id","$.table"]"
)
FROM KAFKA
(
“kafka_broker_list” ="",
“kafka_topic” = “”,
“property.security.protocol”=“SASL_PLAINTEXT”,
“property.sasl.mechanism”=“SCRAM-SHA-256”,
“property.sasl.username”="",
“property.sasl.password”="",
“property.group.id” = “topic_to_sr”
);
【业务影响】Kafka的消费偏移量与routine load的显示偏移量不一致,导致监控失效
【StarRocks版本】例如:2.5
【表模型】主键模型
CREATE TABLE table_to_sr
(
id
varchar(400) NOT NULL COMMENT “id”,
name
varchar(400) NOT NULL COMMENT “name”,
user_id
varchar(400) NOT NULL COMMENT “user_id”,
source_table_name
varchar(400) NULL COMMENT “source_table_name”
) ENGINE=OLAP
PRIMARY KEY(id
, name
, user_id
)
DISTRIBUTED BY HASH(id
) BUCKETS 8
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”,
“compression” = “LZ4”
);
【导入或者导出方式】Routine Load
已解决;https://docs.starrocks.io/zh/docs/2.5/administration/Configuration/#配置-fe-静态参数
将empty_load_as_error设置为false