Json2StarRocks 实现增删改

背景

需要利用StarRocks1.19.0以及之后版本的Primary Key模型

本文阐述json数据导入StarRocks,实现insert、update、delete

操作

StarRocks DDL

CREATE TABLE cdc_db.`cdc_table` (
  `pk` int(20) NOT NULL COMMENT "",
  `col0` varchar(64) REPLACE NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`pk`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`pk`) BUCKETS 1 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

Stream load

json数据

[
        {"pk":1,"col0":"aaca"},
        {"pk":2,"col0":"aaba"},
        {"pk":3,"col0":"aa4a"},
        {"pk":4,"col0":"aa33"},
        {"pk":5,"col0":"aaa3"}
]

Upsert

curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.pk\", \"$.col0\"]" -H "strip_outer_array: true" -T json.text http://172.26.194.184:9011/api/cdc_db/cdc_table/_stream_load

Delete

curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.pk\", \"$.col0\"]" -H "strip_outer_array: true" -H "columns:__op='delete'" -T json.text http://172.26.194.184:9011/api/cdc_db/cdc_table/_stream_load

Routine load

Kakfa数据中有标识:upsert 0 或delete 1

{"pk":1,"col0":"aaca","op_type":0}
{"pk":2,"col0":"aaba","op_type":0}
{"pk":3,"col0":"aa4a","op_type":0}
{"pk":1,"col0":"aaca","op_type":1}
{"pk":3,"col0":"aa4a","op_type":1}
CREATE ROUTINE LOAD cdc_db.cdc_task1 ON cdc_table
COLUMNS(pk,col0,__op)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.data.pk\",\"$.data.col0\",\"$.op_type\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "cdc-data",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Mysql binlog

{"data":[{"col1":12,"col2":"adag"}],"database":"mysql_source","es":1634222249000,"id":644,"isDdl":false,"mysqlType":{"col1":"int(20)","col0":"varchar(64)"},"old":[{"col0":"dagdg"}],"pkNames":["host"],"sql":"","sqlType":{"host":12,"collect_time":-5},"table":"query_collect_time","ts":1634222249860,"type":"UPDATE"}
CREATE ROUTINE LOAD cdc_db.task2 ON cdc_table
COLUMNS(pk,col0,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.data[0].col1\",\"$.data[0].col2\",\"$.type\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "mysql-data",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

如果想到实现json数组的INSERT,可以参考如下方式

{"data":[{"col1":12,"col2":"adag"},{"col1":13,"col2":"adag3"],"database":"mysql_source","es":1634222249000,"id":644,"isDdl":false,"mysqlType":{"col1":"int(20)","col0":"varchar(64)"},"old":[{"col0":"dagdg"}],"pkNames":["host"],"sql":"","sqlType":{"host":12,"collect_time":-5},"table":"query_collect_time","ts":1634222249860,"type":"INSERT"}
CREATE ROUTINE LOAD cdc_db.task3 ON cdc_table
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "json_root" = "$.data"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "mysql-data-2",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

如果想到实现json数组的upsert和delete,需要json数组中有{"__op":0}或者{"__op":1}分别表示upsert和delete

{"data":[{"col1":12,"col2":"adag","__op":0},{"col1":13,"col2":"adag3","__op":1}],"database":"mysql_source","es":1634222249000,"id":644,"isDdl":false,"mysqlType":{"col1":"int(20)","col0":"varchar(64)"},"old":[{"col0":"dagdg"}],"pkNames":["host"],"sql":"","sqlType":{"host":12,"collect_time":-5},"table":"query_collect_time","ts":1634222249860,"type":"UPDATE"}
CREATE ROUTINE LOAD cdc_db.task3 ON cdc_table
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "json_root" = "$.data"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "mysql-data-3",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Oracle log

注意:当前这种模式没办法支持delete

标准格式

CREATE ROUTINE LOAD cdc_db.task3 ON cdc_table
COLUMNS(pk,col0,temp,__op=(CASE temp WHEN "D" THEN 1 ELSE 0 END))
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.after.ID\",\"$.after.NAME\",\"$.op_type\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "test_ogg",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
1赞

mysql binlog 数据routine load 到入 data是个数组,里面有多条数据是怎么解决吗?


这个是直接取的$.data.col1会不会有问题?

你好,如果只是insert,当前可以通过如下方式支持

"strip_outer_array" = "true",
"json_root" = "$.data"

如果要实现upsert和delete,需要json数组中有{"__op":0}或者{"__op":1}分别表示upsert和delete

{"data":[{"col1":12,"col2":"adag","__op":0},{"col1":13,"col2":"adag3","__op":1}]}

我在官方文档中发现个demo,并且按照此demo测试了,并未成功,是否也是这个原因
demo地址

请发下是哪个demo呢

文档地址

请问下您目前使用的是哪个版本呢?