mysql同步到starRocks时增加自定义字段问题咨询

【需求详述】同一业务数据分配到了多个mysql库,库名不一样,表名、结构一样,现计划将这些表的数据汇总到starRocks里面,计划采用Routine Load”方案,但需要同步时增加自定义字段以区分数据来源,请问如何操作

【背景】mysql通过canal已将数据写入kafka,且记录里包含dabasename,现咨询下大家FE到BE如何增加字段

https://docs.starrocks.com/zh-cn/main/sql-reference/sql-statements/data-manipulation/ROUTINE%20LOAD#description
可以参照第五个例子,不需要增加自定义字段,只需要配置
jsonpaths将kafka数据中的dabasename取到即可。

感谢回复,但是根据示例设置了没有成功
我这边是通过canal将binlog写到kafka里面的,示例数据如下


以下为我测试的几个routine load语句
测试1:
create routine load test.lcytest on lcytest
columns (dbname, id, name, age, gender)
PROPERTIES (
“format”=“json”,
“jsonpaths” = “[”$.name","$.id","$.name","$.age","$.gender"]",
“desired_concurrent_number”=“1”,
“max_error_number”=“0”,
“strip_outer_array” = “true”,
“json_root” = “$.data”
)
FROM KAFKA (
“kafka_broker_list”= “192.168.44.4:9092,192.168.44.5:9092,192.168.44.6:9092”,
“kafka_topic” = “default”,
“kafka_partitions” = “0”,
“kafka_offsets” = “OFFSET_BEGINNING”
);
测试2:
create routine load test.lcytest on lcytest
columns (dbname, id, name, age, gender)
PROPERTIES (
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“strict_mode” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.database","$.data.id","$.data.name","$.data.age","$.data.gender"]",
“strip_outer_array” = “true”,
“json_root” = “$.data”
)
FROM KAFKA (
“kafka_broker_list”= “192.168.44.4:9092,192.168.44.5:9092,192.168.44.6:9092”,
“kafka_topic” = “mytopic”,
“kafka_partitions” = “0”,
“kafka_offsets” = “OFFSET_BEGINNING”
);
均不成功,有时间劳烦帮忙看看哪里出了问题

能发一下报错的信息吗?


测试2是没有生成url报错信息的 但是abortedTaskNum在增长

测试1的报错信息如下,但报出的错误所要求设置的参数已按提示设置正确

"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]"

jsonpath应该是这种格式的

因为demo里面的数据格式和canal生成的数据格式还是有一点差异的
以下是demo中数据格式


以下为canal生成格式

demo中为单节点,canal中为多节点,不知道设置 json_root是否还有效果

columns (dbname, id, name, age, gender)
PROPERTIES (
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.database\",\"$.data.id\",\"$.data.name\",\"$.data.age\",\"$.data.gender\"]"
)

试下这样


尝试后有戏,只是下层的数据没匹配上

columns (dbname, id, name, age, gender)
PROPERTIES (
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“strict_mode” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.database","$.data[0].id","$.data[0].name","$.data[0].age","$.data[0].gender"]"
)

是可以读出数据的,但是针对mysql中批量update的情况,这个就不行了,因为data里面是一组对象

columns (dbname="default_databases", id, name, age, gender)
PROPERTIES (
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonroot"= "$.data" ,
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.age\",\"$.gender\"]",
"strip_outer_array" = "true"
)

看下这样可不可以

这样的当时我也设想过,但是不太符合业务需求,这样的话就需要针对一张starrocks表做多个任务,我们目前是14个mysql数据库,需要订阅四五十张表,这个工作量就稍微有点大了

image
这块定义根节点为$.data后,还能从子节点$.id往上找父节点吗

可不可以把所有的数据都打到一个kafka_topic里面,然后只跑一个routine_load?

目前我是将不同库的同一张表打到了一个topic中

您提出的这个建议是有什么好的想法?

你是打算放到sr里面的一张表里吗?

是的
场景是:
目前我们根据会员,数据分配到了不同的mysql实例中(目前14个)的相同表结构的表中;
现需要将这些表存到starrocks中,并且存入sr中时添加一列dbname(该列可以从canal=》kafka中的database获取)

那这面推荐您使用flink-cdc试一试呢
https://docs.starrocks.com/zh-cn/main/loading/Flink-connector-starrocks

遇到类似问题

–源端

CREATE TABLE test1 (
app_id varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
app_name varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
description varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
cookie_domain varchar(64) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
cookie_path varchar(400) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
index_url varchar(400) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
verify_name varchar(64) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (app_id) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=COMPACT

-SR 端

CREATE TABLE test1 (
app_id varchar(65533) NOT NULL COMMENT “”,
app_name varchar(65533) NOT NULL COMMENT “”,
description varchar(65533) NULL COMMENT “”,
cookie_domain varchar(65533) NULL COMMENT “”,
cookie_path varchar(65533) NULL COMMENT “”,
index_url varchar(65533) NULL COMMENT “”,
verify_name varchar(65533) NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY(app_id)
COMMENT “OLAP”
DISTRIBUTED BY HASH(app_id) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);

– routine load

create routine load test.test1 on test1
columns (app_id,app_name,description,cookie_domain,cookie_path,index_url,verify_name)
PROPERTIES (
“format”=“json”,
“jsonpaths” = “[”$.data[0].app_id","$.data[0].app_name","$.data[0].description","$.data[0].cookie_domain","$.data[0].cookie_path","$.data[0].index_url","$.data[0].index_url","$.data[0].verify_name"]",
“desired_concurrent_number”=“1”,
“max_error_number”=“0”
)
FROM KAFKA (
“kafka_broker_list”= “10.0.10.12:9092”,
“kafka_topic”=“test_test1”
);

– 数据1
{
“data”: [
{
“app_id”: “123123”,
“app_name”: “4444666666”,
“description”: “23123123”,
“cookie_domain”: “2312”,
“cookie_path”: “31231”,
“index_url”: “231”,
“verify_name”: “2312312”
}
],
“database”: “test1”,
“es”: 1647347567000,
“id”: 4380,
“isDdl”: false,
“mysqlType”: {
“app_id”: “varchar(10)”,
“app_name”: “varchar(50)”,
“description”: “varchar(100)”,
“app_password”: “varchar(32)”,
“cookie_domain”: “varchar(64)”,
“cookie_path”: “varchar(400)”,
“index_url”: “varchar(400)”,
“verify_name”: “varchar(64)”
},
“old”: null,
“pkNames”: [
“app_id”
],
“sql”: “”,
“sqlType”: {
“app_id”: 12,
“app_name”: 12,
“description”: 12,
“app_password”: 12,
“cookie_domain”: 12,
“cookie_path”: 12,
“index_url”: 12,
“verify_name”: 12
},
“table”: “test1”,
“ts”: 1647347567535,
“type”: “DELETE”
}

– 数据2
{
“data”: [
{
“app_id”: “12312”,
“app_name”: “12312323423”,
“description”: “12312123”,
“cookie_domain”: “1231”,
“cookie_path”: “231”,
“index_url”: “23123”,
“verify_name”: “1231231”
}
],
“database”: “test”,
“es”: 1647346751000,
“id”: 4241,
“isDdl”: false,
“mysqlType”: {
“app_id”: “varchar(10)”,
“app_name”: “varchar(50)”,
“description”: “varchar(100)”,
“cookie_domain”: “varchar(64)”,
“cookie_path”: “varchar(400)”,
“index_url”: “varchar(400)”,
“verify_name”: “varchar(64)”
},
“old”: [
{
“app_name”: “123123”
}
],
“pkNames”: [
“app_id”
],
“sql”: “”,
“sqlType”: {
“app_id”: 12,
“app_name”: 12,
“description”: 12,
“cookie_domain”: 12,
“cookie_path”: 12,
“index_url”: 12,
“verify_name”: 12
},
“table”: “test1”,
“ts”: 1647346751924,
“type”: “UPDATE”
}

sr表中需要定义一个database的字段,然后在jsonpath和columns中映射就可以

CREATE TABLE test1 (
app_id varchar(65533) NOT NULL COMMENT “”,
app_name varchar(65533) NOT NULL COMMENT “”,
description varchar(65533) NULL COMMENT “”,
cookie_domain varchar(65533) NULL COMMENT “”,
cookie_path varchar(65533) NULL COMMENT “”,
index_url varchar(65533) NULL COMMENT “”,
verify_name varchar(65533) NULL COMMENT “”
database varchar(65533) NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY( app_id )
COMMENT “OLAP”
DISTRIBUTED BY HASH( app_id ) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);

create routine load test.test1 on test1
columns (app_id,app_name,description,cookie_domain,cookie_path,index_url,verify_name,database)
PROPERTIES (
“format”=“json”,
“jsonpaths” = “[”$.data[0].app_id","$.data[0].app_name","$.data[0].description","$.data[0].cookie_domain","$.data[0].cookie_path","$.data[0].index_url","$.data[0].index_url","$.data[0].verify_name","$.database"]",
“desired_concurrent_number”=“1”,
“max_error_number”=“0”
)
FROM KAFKA (
“kafka_broker_list”= “10.0.10.12:9092”,
“kafka_topic”=“test_test1”
);