routine load 实时任务创建

【详述】SR 创建 routine load 实时任务后,由于mysql 端存在 DDL(添加索引、添加字段、修改字段) 等操作,从kafka 消费数据到SR 落表时报错(如图),出现丢数据情况,这种该如何解决呢?现在存在大量的实时任务,这个问题变得尤为突出
【导入】mysql -> canal -> kafka -> SR
【背景】实时同步任务需要
【业务影响】严重影响数据服务的数据质量
【StarRocks版本】2.0.5
【集群规模】3fe + 6be

mysql 源表
CREATE TABLE look_up (
id int(11) NOT NULL AUTO_INCREMENT,
db_name_prefix varchar(50) NOT NULL,
table_name_prefix varchar(50) NOT NULL,
ip varchar(200) NOT NULL,
test1 varchar(100) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=127 DEFAULT CHARSET=utf8;
SR 表
CREATE TABLE ods_test_look_up (
id bigint(20) NOT NULL COMMENT “”,
db_name_prefix varchar(65533) NULL COMMENT “”,
table_name_prefix varchar(65533) NULL COMMENT “”
) ENGINE=OLAP
UNIQUE KEY(id)
COMMENT “ods_test_look_up”
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);
1、不指定 json_root
create ROUTINE LOAD ods_test.routine_ods_test_look_up_1 ON ods_test_look_up
COLUMNS (
id,
db_name_prefix,
table_name_prefix
PROPERTIES
(
“strict_mode” = “false”,
“strip_outer_array” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.data.id", “$.data.db_name_prefix”, “$.data.table_name_prefix”]"
)
FROM KAFKA
(
“kafka_broker_list”= “xxx.xxx.xxx.xxx:9092”,
“kafka_topic” = “test_look_up”,
“property.group.id” = “doris_group_ods_tmp_look_up”,
“property.kafka_default_offsets” = “OFFSET_END”
);
mysql 添加数据,
image
kafka 展示


SR 报错

修改 “strip_outer_array” = “true”
mysql 同样添加数据
image
SR 报错

在"strict_mode" = “ture” 时,报错类似

2、指定 json_root
create ROUTINE LOAD ods_test.routine_ods_test_look_up_5 ON ods_test_look_up
COLUMNS (
id,
db_name_prefix,
table_name_prefix
)
PROPERTIES
(
“strict_mode” = “false”,
“strip_outer_array” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.id", “$.db_name_prefix”, “$.table_name_prefix”]",
“json_root”= “$.data”
)
FROM KAFKA
(
“kafka_broker_list”= “xxx.xxx.xxx.xxx:9092”,
“kafka_topic” = “test_look_up”,
“property.group.id” = “doris_group_ods_tmp_look_up”,
“property.kafka_default_offsets” = “OFFSET_END”
);
mysql 添加数据
image
SR 报错


设置 “strip_outer_array”=“true”
mysql 添加数据
image
SR 同步正常
image
mysql 添加列(DDL 操作)

kafka 数据

SR 报错

在 “strict_mode” = "true"时 报错类似

即 存在 DDL 操作时 data 为null 导致同步出错

设置的 jsonpath 和 jsonroot 都是要解析 data 字段。当该字段为 null 时,解析报错是符合预期的。这种情况,是希望想跳过该数据行么?

按照我们这边的逻辑 是这样的 当有ddl 操作的时候 data 字段确实为 null, 若是为null ,则跳过该条数据行

或是过滤的话 可以通过判断 isDDL再 过滤 这样会不会好一点?

衍生出来的一个问题就是 这个错误发生后 这个实时任务会挂掉,等自动重启脚本重启这个任务后,会发生丢失数据的情况,这个应该是更严重的点

能否尝试一下:

  1. columns 改为
COLUMNS (
id,
db_name_prefix,
table_name_prefix,
is_ddl
)
  1. jsonpath 改为
“jsonpaths” = “[”$.data[0].id", “$.data[0].db_name_prefix”, “$.data[0].table_name_prefix”], "$.isDdl"
  1. 增加where语句
WHERE is_ddl = false

最终修改的结果为

create ROUTINE LOAD ods_test.routine_ods_test_look_up_1 ON ods_test_look_up
COLUMNS (
id,
db_name_prefix,
table_name_prefix,
id_ddl),
WHERE is_ddl = false
PROPERTIES
(
“strict_mode” = “false”,
“strip_outer_array” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.data[0].id", “$.data[0].db_name_prefix”, “$.data[0].table_name_prefix”], "$.isDdl""
)
FROM KAFKA
(
“kafka_broker_list”= “xxx.xxx.xxx.xxx:9092”,
“kafka_topic” = “test_look_up”,
“property.group.id” = “doris_group_ods_tmp_look_up”,
“property.kafka_default_offsets” = “OFFSET_END”
);

这个问题会在下个版本修复

按照您说的方法 尝试了一下 发现还是不行

嗯嗯 好的呢 感谢大佬关注,十分期待,然后请问 kafka 消费这块是自动提交吗?这个是实时任务失败后,会导致丢数据情况,为何会这样呢?

哈喽呀 请问一下近期什么时候发版2.0.7呢?

@Jon_qj
抱歉,之前的jsonpath有点问题,应该更正为

"jsonpaths" = "["$.data[0].id", "$.data[0].db_name_prefix", "$.data[0].table_name_prefix", "$.isDdl"]