Routine load where过滤条件失效

kafka数据格式:内有4个数据库下的8个表的binlog数据
{
“data”:{
“FID”:“xxxxx”
“FName”:“xxxxx”
“FNationalityID”:“xxxxx”
“FHeaderUrl”:“xxxxx”
“FStatus”:“xxxxx”
“FLanguage”:“xxxxx”
“FAge”:“xxxxx”
“FSex”:“xxxxx”
“FBirthday”:“xxxxx”
“FExperience”:“xxxxx”
“FCurrency”:“xxxxx”
“FDiamond”:“xxxxx”
“FLevelID”:“xxxxx”
“FSkinID”:“xxxxx”
“FVipID”:“xxxxx”
“FRegisterSource”:“xxxxx”
“FRegisterIP”:“xxxxx”
“FRegisterTime”:“xxxxx”
“FDevice”:“xxxxx”
“FDeviceType”:“xxxxx”
“FDownLoadChannelID”:“xxxxx”
“FIsDeleted”:“xxxxx”
“FModifyTime”:“xxxxx”
“FIsUsed”:“xxxxx”
“FAccountType”:“xxxxx”
“FVipValue”:“xxxxx”
“FIsRecharged”:“xxxxx”
“FPushToken”:“xxxxx”
“FPrettyId”:“xxxxx”
“FDeleteStatus”:“xxxxx”
},
“database”:“test_db1”,
“table”:“test_tb1”,
“type”:“INSERT”
}

我的starrocks端建表语句:
CREATE TABLE IF NOT EXISTS ods.test_tb1 (
FID STRING NOT NULL COMMENT “”,
FName STRING,
FNationalityID STRING,
FHeaderUrl STRING,
FStatus STRING,
FLanguage STRING,
FAge STRING,
FSex STRING,
FBirthday STRING,
FExperience STRING,
FCurrency STRING,
FDiamond STRING,
FLevelID STRING,
FSkinID STRING,
FVipID STRING,
FRegisterSource STRING,
FRegisterIP STRING,
FRegisterTime STRING,
FDevice STRING,
FDeviceType STRING,
FDownLoadChannelID STRING,
FIsDeleted STRING,
FModifyTime STRING,
FIsUsed STRING,
FAccountType STRING,
FVipValue STRING,
FIsRecharged STRING,
FPushToken STRING,
FPrettyId STRING,
FDeleteStatus STRING,
database STRING,
table STRING
) ENGINE=olap
PRIMARY KEY(FID)
COMMENT “”
DISTRIBUTED BY HASH(FID) BUCKETS 12
PROPERTIES (
“replication_num” = “3”
);

我的routine load任务语句:
CREATE ROUTINE LOAD job_test_tb1 ON test_tb1
COLUMNS(FID,FName,FNationalityID,FHeaderUrl,FStatus,FLanguage,FAge,FSex,FBirthday,FExperience,FCurrency,FDiamond,FLevelID,FSkinID,FVipID,FRegisterSource,FRegisterIP,FRegisterTime,FDevice,FDeviceType,FDownLoadChannelID,FIsDeleted,FModifyTime,FIsUsed,FAccountType,FVipValue,FIsRecharged,FPushToken,FPrettyId,FDeleteStatus,database,table,temp,__op=(CASE temp WHEN “DELETE” THEN 1 ELSE 0 END)),where database = ‘test_db1’ and table = ‘test_tb1’
PROPERTIES
(
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“strict_mode” = “false”,
“max_error_number”=“1000”,
“format” = “json”,
“jsonpaths” = “[”$.data.FID","$.data.FName","$.data.FNationalityID","$.data.FHeaderUrl","$.data.FStatus","$.data.FLanguage","$.data.FAge","$.data.FSex","$.data.FBirthday","$.data.FExperience","$.data.FCurrency","$.data.FDiamond","$.data.FLevelID","$.data.FSkinID","$.data.FVipID","$.data.FRegisterSource","$.data.FRegisterIP","$.data.FRegisterTime","$.data.FDevice","$.data.FDeviceType","$.data.FDownLoadChannelID","$.data.FIsDeleted","$.data.FModifyTime","$.data.FIsUsed","$.data.FAccountType","$.data.FVipValue","$.data.FIsRecharged","$.data.FPushToken","$.data.FPrettyId","$.data.FDeleteStatus","$.database","$.table","$.type"]"
)
FROM KAFKA
(
“kafka_broker_list” =“xxxxxx:9092,xxxxxx:9092,xxxxxxxx:9092”,
“kafka_topic” = “test_binlog_explode”,
“property.group.id” = “SR_test_tb1”,
“property.client.id” = “starrocks-client”,
“property.kafka_default_offsets” = “OFFSET_END”
);

任务运行一段时间后报错,报错信息为:

routine load报错信息为:

current error rows is more than max error num,因为我的最大错误条数为1000条,也就是说超过了最大的报错条数。
但是我在routine load任务中设置了where database = ‘test_db1’ and table = ‘test_tb1’,看报错截图的话也就是过滤条件没有生效。把database = yallagame_chatroom, table='t_roomintolog’的数据也导过来了,因为字段不匹配所以所有的字段都变成了null,而FID主键是not null…
所以很疑惑,是过滤条件不应该这样配置吗?

https://docs.starrocks.com/zh-cn/main/sql-reference/sql-statements/data-manipulation/ROUTINE%20LOAD
参考一下这里的例子,jsonpath里的字段引号转义一下尼

你好,jsonpath里的字段引号转义指的是?


我应该应该添加了转义符了

我们的任务是导入了7万多条数据之后才报错的,也就是数据是导入成功了一部分的

这里的过滤没有生效,再load任务里加上data再试下。 COLUMNS( data,FID , FName , FNationalityID,xxx)这样

test.txt (634 字节)
改写了导入的任务,改下这个例子其中的参数,看看这四个字段能否符合预期导入,能导入的话data字段中其他字段用类似的方法取


加上data之后,还是过滤条件没有生效,来了其他库表的数据

按照你提供的方案在导入13123条数据之后,还是报了跟之前一样的错误


也就是说过滤条件where database = ‘test_db1’ and table = 'test_tb1’依旧没有生效

另外导入成功的数据中,database和table 字段都是test_db1,test_tb1。也就是说database和table识别出来了,但是不知道为什么过滤的时候没有生效…

建表的时候把database和table放最前列看下。

因为是主键模型(我们需要主键模型的增删查改功能),所以fid一定是第一列。我把database和table放到第二三列了。过滤条件还是没有生效…

把key设置为database,table,和fid把,这张表写入成功的话前两个字段值都是一致的,不影响主键模型的数据一致性。限制的过滤是在写入的时候才过滤,哪些错误数据还没到database和table字段,就因为第一字段的非空约束已经呗当作错误数据处理了,这块你先这么改下表结构和写入任务,这块的写入逻辑我们内部讨论优化下。

现在建表时指定了database,table,fid作为主键了,好消息是现在过滤条件生效了,坏消息就是但是字段解析不了了


它现在把整个data当做一个字段写入了,占据了fid的位置。因此现在fid报null异常了
我的routine load任务如下:
CREATE ROUTINE LOAD job_yallagame__t_account_test ON ods_yallagame__t_account_full_test
COLUMNS(data,database,table,FID=get_json_string(data, ‘$.FID’),FName=get_json_string(data, ‘$.FName’),FNationalityID=get_json_string(data, ‘$.FNationalityID’),temp,__op=(CASE temp WHEN “DELETE” THEN 1 ELSE 0 END)),where table = ‘t_account’
PROPERTIES
(
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“strict_mode” = “false”,
“max_error_number”=“10”,
“format” = “json”,
“jsonpaths” = “[”$.database","$.table","$.data","$.type"]"
)
FROM KAFKA
(
“kafka_broker_list” =“xxxxx:9092,xxxxx:9092,xxxxx:9092”,
“kafka_topic” = “ludo_binlog_explode”,
“property.group.id” = “SR_yallagame__t_account”,
“property.client.id” = “starrocks-client”,
“property.kafka_default_offsets” = “OFFSET_END”
);

检查下json格式?或者任务重建一下,注意下单引号是否是英文输入单引号。

都试过了,where过滤和字段解析好像互斥一样,只能同时一个成功。我们还是决定加个flink任务,一个表一个topic算了,这样不用过滤。其实我感觉我们这种场景还是挺常见的(多表的binlog在一个topic里面,配置routine load通过wher条件过滤,或者一个routine load任务可以动态解析过滤多张表),不管是一个表一个topic一个routine load任务还是一个表一个flinkcdc任务,终究还是资源占用过大了。多表一个任务这功能应该还是挺重要的吧个人觉得

有整库单任务同步的demo,需要的话可以私信 @Natsume729 获取

flink-sync-job.tar(1).gz (3.1 MB)
这里有整库同步的demo。这个cdc应该能满足你的需求,按照以下配置设置一下

1. Put the `flink-connector-starrocks-dev_flink-1.13_2.11.jar` into `flink/lib/`.

2. ./flink run -c com.starrocks.CDCJob cdc-sync-job-1.0.1.jar \
--source.host 127.0.0.1 \
--source.port 3306 \
--source.db db \
--source.table db.* \
--source.user root \
--source.password 123456 \
--source.jobname mysqlsource \
--source.parallelism 10 \
--sink.jdbcUrl jdbc:mysql://fe:9030 \
--sink.loadUrl fe:8030 \
--sink.user root \
--sink.password 123456 \
--sink.buffer-flush.intervalMS 15000 \
--sink.jobname starrocks-cdc-sync \
--sink.parallelism 10

多谢老哥~ :heart:

你的8个表的binlog是不是在kafka的同一个主题?我之前遇到这个问题的解决方法是,每个表用独立主题。

多谢老哥,我们也打算一个表一个topic一个routine load任务了。对了,另外请问一下你们现在有多少个routine load任务呢,routine load任务多了之后会有什么问题或者注意事项嘛 :grin: