【需求详述】同一业务数据分配到了多个mysql库,库名不一样,表名、结构一样,现计划将这些表的数据汇总到starRocks里面,计划采用Routine Load”方案,但需要同步时增加自定义字段以区分数据来源,请问如何操作
【背景】mysql通过canal已将数据写入kafka,且记录里包含dabasename,现咨询下大家FE到BE如何增加字段
【需求详述】同一业务数据分配到了多个mysql库,库名不一样,表名、结构一样,现计划将这些表的数据汇总到starRocks里面,计划采用Routine Load”方案,但需要同步时增加自定义字段以区分数据来源,请问如何操作
【背景】mysql通过canal已将数据写入kafka,且记录里包含dabasename,现咨询下大家FE到BE如何增加字段
https://docs.starrocks.com/zh-cn/main/sql-reference/sql-statements/data-manipulation/ROUTINE%20LOAD#description
可以参照第五个例子,不需要增加自定义字段,只需要配置
jsonpaths将kafka数据中的dabasename取到即可。
感谢回复,但是根据示例设置了没有成功
我这边是通过canal将binlog写到kafka里面的,示例数据如下
能发一下报错的信息吗?
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]"
jsonpath应该是这种格式的
因为demo里面的数据格式和canal生成的数据格式还是有一点差异的
以下是demo中数据格式
demo中为单节点,canal中为多节点,不知道设置 json_root是否还有效果
columns (dbname, id, name, age, gender)
PROPERTIES (
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.database\",\"$.data.id\",\"$.data.name\",\"$.data.age\",\"$.data.gender\"]"
)
试下这样
columns (dbname, id, name, age, gender)
PROPERTIES (
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“strict_mode” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.database","$.data[0].id","$.data[0].name","$.data[0].age","$.data[0].gender"]"
)
是可以读出数据的,但是针对mysql中批量update的情况,这个就不行了,因为data里面是一组对象
columns (dbname="default_databases", id, name, age, gender)
PROPERTIES (
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonroot"= "$.data" ,
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.age\",\"$.gender\"]",
"strip_outer_array" = "true"
)
看下这样可不可以
这样的当时我也设想过,但是不太符合业务需求,这样的话就需要针对一张starrocks表做多个任务,我们目前是14个mysql数据库,需要订阅四五十张表,这个工作量就稍微有点大了
这块定义根节点为$.data后,还能从子节点$.id往上找父节点吗
可不可以把所有的数据都打到一个kafka_topic里面,然后只跑一个routine_load?
目前我是将不同库的同一张表打到了一个topic中
您提出的这个建议是有什么好的想法?
你是打算放到sr里面的一张表里吗?
是的
场景是:
目前我们根据会员,数据分配到了不同的mysql实例中(目前14个)的相同表结构的表中;
现需要将这些表存到starrocks中,并且存入sr中时添加一列dbname(该列可以从canal=》kafka中的database获取)
遇到类似问题
–源端
CREATE TABLE test1
(
app_id
varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
app_name
varchar(50) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
description
varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
cookie_domain
varchar(64) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
cookie_path
varchar(400) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
index_url
varchar(400) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
verify_name
varchar(64) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (app_id
) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=COMPACT
-SR 端
CREATE TABLE test1
(
app_id
varchar(65533) NOT NULL COMMENT “”,
app_name
varchar(65533) NOT NULL COMMENT “”,
description
varchar(65533) NULL COMMENT “”,
cookie_domain
varchar(65533) NULL COMMENT “”,
cookie_path
varchar(65533) NULL COMMENT “”,
index_url
varchar(65533) NULL COMMENT “”,
verify_name
varchar(65533) NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY(app_id
)
COMMENT “OLAP”
DISTRIBUTED BY HASH(app_id
) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);
– routine load
create routine load test.test1 on test1
columns (app_id,app_name,description,cookie_domain,cookie_path,index_url,verify_name)
PROPERTIES (
“format”=“json”,
“jsonpaths” = “[”$.data[0].app_id","$.data[0].app_name","$.data[0].description","$.data[0].cookie_domain","$.data[0].cookie_path","$.data[0].index_url","$.data[0].index_url","$.data[0].verify_name"]",
“desired_concurrent_number”=“1”,
“max_error_number”=“0”
)
FROM KAFKA (
“kafka_broker_list”= “10.0.10.12:9092”,
“kafka_topic”=“test_test1”
);
– 数据1
{
“data”: [
{
“app_id”: “123123”,
“app_name”: “4444666666”,
“description”: “23123123”,
“cookie_domain”: “2312”,
“cookie_path”: “31231”,
“index_url”: “231”,
“verify_name”: “2312312”
}
],
“database”: “test1”,
“es”: 1647347567000,
“id”: 4380,
“isDdl”: false,
“mysqlType”: {
“app_id”: “varchar(10)”,
“app_name”: “varchar(50)”,
“description”: “varchar(100)”,
“app_password”: “varchar(32)”,
“cookie_domain”: “varchar(64)”,
“cookie_path”: “varchar(400)”,
“index_url”: “varchar(400)”,
“verify_name”: “varchar(64)”
},
“old”: null,
“pkNames”: [
“app_id”
],
“sql”: “”,
“sqlType”: {
“app_id”: 12,
“app_name”: 12,
“description”: 12,
“app_password”: 12,
“cookie_domain”: 12,
“cookie_path”: 12,
“index_url”: 12,
“verify_name”: 12
},
“table”: “test1”,
“ts”: 1647347567535,
“type”: “DELETE”
}
– 数据2
{
“data”: [
{
“app_id”: “12312”,
“app_name”: “12312323423”,
“description”: “12312123”,
“cookie_domain”: “1231”,
“cookie_path”: “231”,
“index_url”: “23123”,
“verify_name”: “1231231”
}
],
“database”: “test”,
“es”: 1647346751000,
“id”: 4241,
“isDdl”: false,
“mysqlType”: {
“app_id”: “varchar(10)”,
“app_name”: “varchar(50)”,
“description”: “varchar(100)”,
“cookie_domain”: “varchar(64)”,
“cookie_path”: “varchar(400)”,
“index_url”: “varchar(400)”,
“verify_name”: “varchar(64)”
},
“old”: [
{
“app_name”: “123123”
}
],
“pkNames”: [
“app_id”
],
“sql”: “”,
“sqlType”: {
“app_id”: 12,
“app_name”: 12,
“description”: 12,
“cookie_domain”: 12,
“cookie_path”: 12,
“index_url”: 12,
“verify_name”: 12
},
“table”: “test1”,
“ts”: 1647346751924,
“type”: “UPDATE”
}
sr表中需要定义一个database的字段,然后在jsonpath和columns中映射就可以
CREATE TABLE test1
(
app_id
varchar(65533) NOT NULL COMMENT “”,
app_name
varchar(65533) NOT NULL COMMENT “”,
description
varchar(65533) NULL COMMENT “”,
cookie_domain
varchar(65533) NULL COMMENT “”,
cookie_path
varchar(65533) NULL COMMENT “”,
index_url
varchar(65533) NULL COMMENT “”,
verify_name
varchar(65533) NULL COMMENT “”
database
varchar(65533) NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY( app_id
)
COMMENT “OLAP”
DISTRIBUTED BY HASH( app_id
) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);
create routine load test.test1 on test1
columns (app_id,app_name,description,cookie_domain,cookie_path,index_url,verify_name,database)
PROPERTIES (
“format”=“json”,
“jsonpaths” = “[”$.data[0].app_id","$.data[0].app_name","$.data[0].description","$.data[0].cookie_domain","$.data[0].cookie_path","$.data[0].index_url","$.data[0].index_url","$.data[0].verify_name","$.database"]",
“desired_concurrent_number”=“1”,
“max_error_number”=“0”
)
FROM KAFKA (
“kafka_broker_list”= “10.0.10.12:9092”,
“kafka_topic”=“test_test1”
);