为了更快的定位您的问题,请提供以下信息,谢谢
【详述】问题详细描述
【背景】做过哪些操作?
【业务影响】
【是否存算分离】
【StarRocks版本】例如:3.2.11-1.1-35f6d2d
【集群规模】例如:3fe+ 2be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【表模型】例如:明细模型
【导入或者导出方式】例如:routine load
【联系方式】为了在解决问题过程中能及时联系到您获取一些日志信息,请补充下您的联系方式,例如:社区群4-小李或者邮箱,谢谢
【附件】
- fe.log/be.INFO/相应截图
- 完整的报错异常栈
实验步骤:
模型设计:
create table if not exists tmp.ods_kfk__test_di (
uid int comment ‘用户ID’,
name varchar(12) comment ‘姓名’,
age int comment ‘年龄’,
ods_create_time datetime comment ‘ods创建时间’
)
DUPLICATE KEY(uid) – 用户主要根据uid去查,uid放前
COMMENT “ods测试数据”
DISTRIBUTED BY HASH(uid) BUCKETS 1
properties (
“replication_num” = “1”,
“in_memory” = “false”,
“enable_persistent_index” = “false”,
“replicated_storage” = “true”,
“compression” = “SNAPPY”
);
create routine load tmp.rload__ods_kfk__test_di_topic_test2 on ods_kfk__test_di
columns(
uid,
name,
age,
ods_create_time=current_timestamp()
)
PROPERTIES
(
"max_batch_rows" = "200000",
"desired_concurrent_number" = "2",
"jsonpaths" = "[\"$.uid\", \"$.name\", \"$.age\"]",
"format" = "json",
"max_error_number" = "0",
"max_filter_ratio" = "0"
)
FROM KAFKA
(
“kafka_broker_list” = “xxx:9092”,
“kafka_topic” = “test1”,
“property.kafka_default_offsets” = “OFFSET_BEGINNING”,
“property.group.id” = “group_starrock_260317_test”
);
测试数据
{“uid”: 1001, “name”: “张三”, “age”: 25} 正常写入
{“uid”: ‘1002’, “name”: “李四”, “age”: 30} uid 写入 null
{“uid”: ‘1003’, “name”: “王五”, “age”: ‘28’} uid 写入 null a
{“uid”: 1004, “name”: “赵六”, “age”: 32, } 正常写入
{“uid”: 1005, “name”: 钱七, “age”: 27} name 写入 null
{“uid”: 1006, “name”: “孙八”, “age”: 35, “test”} 正常写入
{“uid”: 1007, “name”: “周九”, “age”: 29, test} 正常写入
{“uid”: 1008, “name”: “吴十”, “age”: 31, “test”:} 正常写入
{“uid”: 1009, “name”: “郑十一”,“test”: “test”} age 写入 null
错误数据测试,触发 任务停止
{“uid”: 1010, “name”: “王十二”, “age”} 无写入
{“uid”: 1011, “name”: “李十三”, “age”:} 无写入
{“uid”: 1012, “name”: “张十四”, ”age“} 无写入
{“uid”: 1013, “name”: “刘十五”, “age”:} 无写入
操作步骤:
-
发送一条正确数据,{“uid”: 1001, “name”: “张三”, “age”: 25} 正常写入
-
发送一条错误数据,{“uid”: 1011, “name”: “李十三”, “age”:} 无法写入 状态变为 PAUSED
-
执行
ALTER ROUTINE LOAD FOR tmp.rload__ods_kfk__test_di_topic_test2
PROPERTIES (
“max_error_number” = “5”,
“max_filter_ratio” = “1”
); -
发送一条正确数据,{“uid”: 1016, “name”: “黄十八”, “age”: 27} 正常写入
-
发送一条错误数据,{“uid”: 1011, “name”: “李十三”, “age”:} 无法写入 状态running,
-
发送一条正确数据,{“uid”: 1017, “name”: “赵十九”, “age”: 30} 状态还是running,数据无法写入 hang住
如果源端数据就很脏,在routine load 任务之前 用starrocks 本身的特性,可以提前预处理吗?
也想知道下,正常情况下,设置这种错误数据,导致任务hang住 是正常的吗,starrocsk 3.2