Routine Load作业设置了max_error_number设置与错误检测窗口范围一样(即2000000),预期是为了让其忽略所有的错误数据,但是实际使用时发现消费到kafka中的错误数据时,会卡住,无法继续往后消费数据

【StarRocks版本】3.1.2-4f3a2ee
【集群规模】3个FE,3个BE,混合部署,存算一体
【服务器配置】16C,64G内存,500G硬盘,万兆网络,共3台
【联系方式】本站
【问题】Routine Load作业设置了 max_error_number(允许的错误数据行数的上限)为2000000,与错误检测窗口范围一样,预期是为了让其忽略所有的错误数据,不会因任何错误数据而停止消费kafka中的数据。
但实际情况是消费到kafka中的非json数据(错误数据)时,没有自动跳过,而是卡住了,一直无法继续往后消费数据,这个如何处理才能继续往后消费数据呢?

【Starrocks建表语句】
CREATE TABLE test_db.message_table (
event_time DATETIME NOT NULL,
jid INT NOT NULL,
act SMALLINT NOT NULL,
fid TINYINT NOT NULL,
uid INT NOT NULL DEFAULT ‘0’,
cid INT NOT NULL,
view_id INT NOT NULL,
view_type SMALLINT NOT NULL,
keyword VARCHAR(1200) NOT NULL,
INDEX index_act (act) USING BITMAP,
INDEX index_fid (fid) USING BITMAP,
INDEX index_view_type (view_type) USING BITMAP
)
DUPLICATE KEY(event_time,jid)
PARTITION BY date_trunc(“day”,event_time)
DISTRIBUTED BY HASH(jid)
PROPERTIES(
“replication_num” = “2”,
“bloom_filter_columns” = “jid,uid,cid,view_id”
);

【Routine Load作业创建语句】
CREATE ROUTINE LOAD test_db.kafka2sr_ods_message_2023111221 ON message_table
COLUMNS(tmp_event_time,jid,act,fid,uid,cid,view_id,view_type,keyword,event_time=from_unixtime(tmp_event_time,‘yyyy-MM-dd HH:mm:ss’))
PROPERTIES
(
“desired_concurrent_number” = “3”,
“format” = “json”,
“strip_outer_array” = “true”,
“json_root” = “$.records”,
“jsonpaths” = “[”$.event_time","$.jid","$.act","$.fid","$.uid","$.cid","$.view_id","$.view_type","$.keyword"]",
“max_error_number”=“2000000”,
“strict_mode” = “true”,
“log_rejected_record_num”= “-1”
)
FROM KAFKA
(
“kafka_broker_list” = “node-103:9092,node-104:9092,node-105:9092”,
“kafka_topic” = “ods_message”,
“property.kafka_default_offsets” = “OFFSET_BEGINNING”
);

【kafka中的数据示例】
1、正常数据:
{“records”:[{“event_time”:1640966401,“jid”:11110000,“act”:2,“fid”:1,“uid”:3030882,“cid”:3030882,“view_id”:7684468,“view_type”:1,“keyword”:“设计师”}]}

2、错误数据(非json数据),以下这个内容为kafka中的某条数据:
--------------------------32155b7eb38861fe
Content-Disposition: attachment; name=“records”

Array
--------------------------32155b7eb38861fe–

【查看ROUTINE LOAD TASK】
mysql> SHOW ROUTINE LOAD TASK WHERE JobName = “kafka2sr_ods_message_2023111221”\G
*************************** 1. row ***************************
TaskId: a10c3c5f-a8e8-4149-be61-b2abc1754d59
TxnId: -1
TxnStatus: UNKNOWN
JobId: 12456
CreateTime: 2023-11-12 21:29:26
LastScheduledTime: 2023-11-14 16:50:15
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: Progress:{“1”:270},LatestOffset:null
Message: there is no new data in kafka/pulsar, wait for 10 seconds to schedule again
*************************** 2. row ***************************
TaskId: 9d2e6798-2545-4314-8e48-4dc7281e3747
TxnId: -1
TxnStatus: UNKNOWN
JobId: 12456
CreateTime: 2023-11-12 21:29:37
LastScheduledTime: NULL
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: Progress:{“0”:257},LatestOffset:null
Message: previous task aborted because of illegal json started with 45
*************************** 3. row ***************************
TaskId: 38d05739-1369-4fe2-9043-f07aa61e7002
TxnId: -1
TxnStatus: UNKNOWN
JobId: 12456
CreateTime: 2023-11-12 21:29:37
LastScheduledTime: NULL
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: Progress:{“2”:253},LatestOffset:null
Message: previous task aborted because of illegal json started with 45
3 rows in set (0.00 sec)

max_error_number是在json数据正常解析的基础上,如果某些数据不满足列的定义或者其他不匹配的情况下,会记录到error_number里。您这种情况的话,需要在kafka之前清理掉这种不符合规范的json数据

ok,感谢您的回复