使用ROUTINE LOAD从kafka消费数据,当加入where条件时任务停止

【详述】使用ROUTINE LOAD从kafka的topic消费数据,当加入where条件时任务停止,且在sr表的数据都没有了
【kafka数据示例】
{
“database”: “vvtest”,
“table”: “fa_collection_payment”,
“type”: “insert”,
“ts”: 1684291750,
“xid”: 82778153,
“xoffset”: 0,
“data”: {
“id”: 4,
“name”: “现金”,
“f_number”: “JSFS01_SYS”,
“status”: “1”,
“is_delete”: “0”,
“orders”: 1,
“create_date_time”: null,
“create_user_id”: null,
“update_date_time”: null,
“update_user_id”: null
}
}
【sr建表语句】
CREATE TABLE fa_collection_payment (
id bigint(20) NOT NULL COMMENT ‘id’,
name varchar(100) DEFAULT NULL COMMENT ‘名称’,
f_number varchar(20) DEFAULT NULL COMMENT ‘金蝶编码’,
status char(1) DEFAULT ‘1’ COMMENT ‘状态(0禁用,1可用,默认1)’,
is_delete char(1) DEFAULT ‘0’ COMMENT ‘是否删除(1删除,0可用,默认0)’,
orders int(11) DEFAULT ‘1’ COMMENT ‘排序(默认1)’,
create_date_time datetime DEFAULT NULL COMMENT ‘创建时间’,
create_user_id bigint(20) DEFAULT NULL COMMENT ‘创建人’,
update_date_time datetime DEFAULT NULL COMMENT ‘更新时间’,
update_user_id bigint(20) DEFAULT NULL COMMENT ‘更新人’
) ENGINE=OLAP
unique KEY(id)
COMMENT “OLAP”
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”,
“compression” = “LZ4”
);
【ROUTINE LOAD语句】
CREATE ROUTINE LOAD test.fa_collection_payment ON fa_collection_payment
COLUMNS(id,name,f_number,status,is_delete,orders,create_date_time,create_user_id,update_date_time,update_user_id),
WHERE type = ‘delete’
PROPERTIES
(
“desired_concurrent_number”=“3”,
“format” =“json”,
“jsonpaths” ="["$.data.id","$.data.name","$.data.f_number","$.data.status","$.data.is_delete","$.data.orders","$.data.create_date_time","$.data.create_user_id","$.data.update_date_time","$.data.update_user_id"]"
)
FROM KAFKA
(
“kafka_broker_list” =“SASL_PLAINTEXT://10.206.67.60:9092,SASL_PLAINTEXT://10.206.65.16:9092,SASL_PLAINTEXT://10.206.64.244:9092”,
“property.security.protocol” = “SASL_PLAINTEXT”,
“property.sasl.mechanism” = “PLAIN”,
“property.sasl.username” = “kafkaadmin”,
“property.sasl.password” = “KafkaAdmin-secret”,
“kafka_topic” = “vvtest.fa_collection_payment”,
“kafka_partitions” =“0,1,2”,
“property.kafka_default_offsets” = “OFFSET_BEGINNING”
);
【业务影响】
当没有WHERE type = 'delete’这个条件时,可以从kafka消费20条数据写入sr表中,但是加上这个条件后,sr表的数据都没有了,而且任务也停止了
【StarRocks版本】2.5.2
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be混部)
【表模型】unique 模型
【导入或者导出方式】kafka -> ROUTINE LOAD

where过滤的字段需要是routine load的columns中有的列,routine load任务中没有接type这个字段的数据,starrocks 中的表没有type 字段,所以where这块不能通过type过滤。

可以试一下在columns和jsonpaths中加上type字段。CREATE ROUTINE LOAD test.fa_collection_payment ON fa_collection_payment
COLUMNS(id,name,f_number,status,is_delete,orders,create_date_time,create_user_id,update_date_time,update_user_id,type),
WHERE type = ‘delete’
PROPERTIES
(
“desired_concurrent_number”=“3”,
“format” =“json”,
“jsonpaths” ="["$.data.id","$.data.name","$.data.f_number","$.data.status","$.data.is_delete","$.data.orders","$.data.create_date_time","$.data.create_user_id","$.data.update_date_time","$.data.update_user_id","$.type"]"
)

昨天看了报错信息,发现是这个原因.使用CREATE ROUTINE LOAD这种方式从kafka获取数据,怎么查看这个任务消耗的资源

您好,这块是写入先前有数据的sr表么,routine load用错了加上where type=xxx 导致原来有数据的表现在没有数据了么,还是说本来就是个空表,加上where type=xxx后没有正常写入数据

routine load是后台按照设置的时间自动起子任务,到kafka中拉取数据,目前暂时没有routine load单个任务资源消耗情况的监控

那有所有的routine load的总任务消耗的资源和监控吗?还有就是能知道starrocks所消耗的总资源吗?

一开始本来就是空表,之前表没有type列的话,使用where type =xxx的时候相当于数据直接据写不进去,后面看了你的回复和去查看报错的原因,知道了要使用where type =xxx这个条件,那表需要type列.

routine load任务资源消耗的监控暂时没有,有消费kafka的lag监控项 https://github.com/StarRocks/starrocks/pull/1210
starrocks集群资源监控可以自己搭个Grafana监控,参考 https://docs.starrocks.io/zh-cn/latest/administration/Monitor_and_Alert