为了更快的定位您的问题,请提供以下信息,谢谢
【详述】我们在使用routine load 导入kafka数据过程中,发现异常的数据导致routine load不消费kafka数据了。routine load状态还是正常的。
【背景】
【业务影响】
【是否存算分离】否
【StarRocks版本】3.3.3,3.1.4,3.2.6
【集群规模】例如:3fe(1 follower+2observer)+5be(fe与be混部)
【机器信息】
【联系方式】社区1群
【附件】
CREATE TABLE starrocks_audit_log
(
cdate
date NOT NULL COMMENT “分区日期”,
slow_time
datetime NULL COMMENT “slow time”,
igid
varchar(40) NULL COMMENT “”,
db_name
varchar(300) NULL COMMENT “db name”,
fe_ip
varchar(30) NULL COMMENT “fe ip”,
query_id
varchar(200) NULL COMMENT “QueryId”,
time
bigint(20) NULL COMMENT “SQL run time”,
client
varchar(30) NULL COMMENT “client ip”,
user
varchar(200) NULL COMMENT “user name”,
state
varchar(200) NULL COMMENT “State”,
scan_bytes
bigint(20) NULL COMMENT “ScanBytes”,
scan_rows
bigint(20) NULL COMMENT “ScanRows”,
return_rows
bigint(20) NULL COMMENT “ReturnRows”,
stmt_id
bigint(20) NULL COMMENT “StmtId”,
is_query
varchar(50) NULL COMMENT “IsQuery”,
stmt
varchar(65533) NULL COMMENT “Stmt,Query Detail”,
cpu_cost_ns
bigint(20) NULL COMMENT “”,
mem_cost_bytes
bigint(20) NULL COMMENT “”
) ENGINE=OLAP
DUPLICATE KEY(cdate
, slow_time
, igid
, db_name
, fe_ip
, query_id
)
COMMENT “StarRocks audit log”
PARTITION BY RANGE(cdate
)
(PARTITION p20240915 VALUES [(“2024-09-15”), (“2024-09-16”)),
PARTITION p20240916 VALUES [(“2024-09-16”), (“2024-09-17”)),
PARTITION p20240917 VALUES [(“2024-09-17”), (“2024-09-18”)),
PARTITION p20240918 VALUES [(“2024-09-18”), (“2024-09-19”)),
PARTITION p20240919 VALUES [(“2024-09-19”), (“2024-09-20”)),
PARTITION p20240920 VALUES [(“2024-09-20”), (“2024-09-21”)),
PARTITION p20240921 VALUES [(“2024-09-21”), (“2024-09-22”)),
PARTITION p20240922 VALUES [(“2024-09-22”), (“2024-09-23”)),
PARTITION p20240923 VALUES [(“2024-09-23”), (“2024-09-24”)),
PARTITION p20240924 VALUES [(“2024-09-24”), (“2024-09-25”)),
PARTITION p20240925 VALUES [(“2024-09-25”), (“2024-09-26”)),
PARTITION p20240926 VALUES [(“2024-09-26”), (“2024-09-27”)),
PARTITION p20240927 VALUES [(“2024-09-27”), (“2024-09-28”)),
PARTITION p20240928 VALUES [(“2024-09-28”), (“2024-09-29”)),
PARTITION p20240929 VALUES [(“2024-09-29”), (“2024-09-30”)),
PARTITION p20240930 VALUES [(“2024-09-30”), (“2024-10-01”)),
PARTITION p20241001 VALUES [(“2024-10-01”), (“2024-10-02”)),
PARTITION p20241002 VALUES [(“2024-10-02”), (“2024-10-03”)),
PARTITION p20241003 VALUES [(“2024-10-03”), (“2024-10-04”)),
PARTITION p20241008 VALUES [(“2024-10-08”), (“2024-10-09”)),
PARTITION p20241009 VALUES [(“2024-10-09”), (“2024-10-10”)),
PARTITION p20241010 VALUES [(“2024-10-10”), (“2024-10-11”)),
PARTITION p20241011 VALUES [(“2024-10-11”), (“2024-10-12”)),
PARTITION p20241012 VALUES [(“2024-10-12”), (“2024-10-13”)),
PARTITION p20241015 VALUES [(“2024-10-15”), (“2024-10-16”)),
PARTITION p20241016 VALUES [(“2024-10-16”), (“2024-10-17”)),
PARTITION p20241017 VALUES [(“2024-10-17”), (“2024-10-18”)),
PARTITION p20241018 VALUES [(“2024-10-18”), (“2024-10-19”)))
DISTRIBUTED BY HASH(igid
) BUCKETS 4
PROPERTIES (
“compression” = “LZ4”,
“dynamic_partition.buckets” = “4”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.end” = “3”,
“dynamic_partition.history_partition_num” = “0”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.start” = “-30”,
“dynamic_partition.time_unit” = “DAY”,
“dynamic_partition.time_zone” = “Etc/UTC”,
“fast_schema_evolution” = “true”,
“replicated_storage” = “false”,
“replication_num” = “1”
);
--------routine load —
CREATE ROUTINE LOAD webull_rt_db.routine_load_starrocks_audit_log_c1 ON starrocks_audit_log
COLUMNS (ts,igid,db_name,fe_ip,query_id,time,client,user,state,scan_bytes,scan_rows,return_rows,stmt_id,is_query,stmt,cpu_cost_ns,mem_cost_bytes,slow_time=from_unixtime(ts / 1000),cdate=from_unixtime(ts / 1000))
PROPERTIES
(
“jsonpaths” = “[”$.Timestamp","$.igid","$.Db","$.feIp","$.QueryId","$.Time","$.Client","$.User","$.State","$.ScanBytes","$.ScanRows","$.ReturnRows","$.StmtId","$.IsQuery","$.Stmt","$.CpuCostNs","$.MemCostBytes"]",
“json_root” = “$.entity”,
“max_batch_interval” = “30”,
“max_batch_rows” = “200000”,
“max_error_number” = “0”,
“strict_mode” = “false”,
“timezone” = “Etc/UTC”,
“format” = “json”,
“strip_outer_array” = “false”,
“partial_update” = “false”,
“desired_concurrent_number” = “1”
)
FROM KAFKA
(
“kafka_broker_list” = “”,
“kafka_topic” = “starrocks_log_test”,
“property.kafka_default_offsets” = “OFFSET_BEGINNING”,
“property.group.id” = “starrocks_audit_log_c1”
)
模拟异常数据
发送:111
真实效果
任务有报错信息,但是任务卡在,不再消费了
这里有2种场景
设置了最大失败条数是0 :“max_error_number” = “0”
理论上,任务应该pause,但是没有
设置了最大失败条数是1000 :“max_error_number” = “100”
理论上,任务应该正常过滤这条异常数据,然后继续消费,实际上,任务已经卡在,不在进行数据消费了。
两种场景状态都是running,但是不进行消费数据。
业务诉求:
当设置"max_error_number" = “0”,任务可以pause
当设置"max_error_number" = “100”,任务过滤异常数据继续消费