版本 sr 2.5 非存算分离
mysql建表
create table zzztest (ID
bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘主键’,CREATED_BY
varchar(128) NOT NULL DEFAULT ‘SYS’ COMMENT ‘创建人’,PRIMARY KEY (ID
));
sr建表:
CREATE TABLE zzztest_oms
(
id
bigint(20) NOT NULL COMMENT "主键 ",
CREATED_BY
varchar(65533) NOT NULL COMMENT “创建人”
)
ENGINE=OLAP
PRIMARY KEY(id
)
DISTRIBUTED BY HASH(id
);
kafka内数据
{
“database”: “market_limit_db”,
“sqlType”: {
“CREATED_BY”: 12,
“ID”: -5
},
“data”: [{
“CREATED_BY”: “zzz1457”,
“ID”: 1
}],
“pkNames”: [“ID”],
“old”: null,
“mysqlType”: {
“CREATED_BY”: “varchar”,
“ID”: “bigint”
},
“type”: “INSERT”,
“table”: “zzztest”,
“es”: 1709535445000,
“isDdl”: false,
“ts”: 1709535445711,
“sql”: “”
}
创建routine load 语句
create routine load dbatest.zzztest_oms on zzztest_oms
columns (ID,CREATED_BY,__op=(CASE type WHEN “DELETE” THEN 1 ELSE 0 END))
PROPERTIES (
“format”=“json”,
“jsonpaths”="["$.data[0].ID","$.data[0].CREATED_BY","$.type"]",
“desired_concurrent_number”=“3”,
“max_error_number”=“0”
)
FROM KAFKA (
“kafka_broker_list”= “xxxxxxx”,
“kafka_topic”=“oms_kafka_flink”,
“kafka_partitions” =“0,1,2”,
“kafka_offsets”=“OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING”
);
报错:
mysql> show routine load\G
*************************** 1. row ***************************
Id: 61216
Name: zzztest_oms
CreateTime: 2024-03-04 15:44:10
PauseTime: 2024-03-04 15:44:21
EndTime: NULL
DbName: dbatest
TableName: zzztest_oms
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {“partitions”:"",“partial_update”:“false”,“columnToColumnExpr”:“ID,CREATED_BY,__op=CASE type WHEN ‘DELETE’ THEN 1 ELSE 0 END”,“maxBatchIntervalS”:“10”,“whereExpr”:"",“dataFormat”:“json”,“timezone”:“Asia/Shanghai”,“format”:“json”,“json_root”:"",“maxFilterRatio”:“1.0”,“strict_mode”:“false”,“jsonpaths”:"["$.data[0].ID","$.data[0].CREATED_BY","$.type"]",“desireTaskConcurrentNum”:“3”,“maxErrorNum”:“0”,“strip_outer_array”:“false”,“currentTaskConcurrentNum”:“3”,“maxBatchRows”:“200000”}
DataSourceProperties: {“topic”:“oms_kafka_flink”,“currentKafkaPartitions”:“0,1,2”,“brokerList”:“xxxxx”}
CustomProperties: {“group.id”:“zzztest_oms_d01a967d-9556-464a-83df-aeb6ec55f464”}
Statistic: {“receivedBytes”:0,“errorRows”:0,“committedTaskNum”:0,“loadedRows”:0,“loadRowsRate”:0,“abortedTaskNum”:0,“totalRows”:0,“unselectedRows”:0,“receivedBytesRate”:0,“taskExecuteTimeMs”:1}
Progress: {“0”:“OFFSET_ZERO”,“1”:“OFFSET_ZERO”,“2”:“OFFSET_ZERO”}
ReasonOfStateChanged: ErrorReason{errCode = 2, msg=‘failed to create task: unknown reference column, column=__op, reference=type’}
ErrorLogUrls:
参考链接:
mysql同步到starRocks时增加自定义字段问题咨询 - StarRocks 用户问答 - StarRocks中文社区论坛 (mirrorship.cn)