创建routine load通过json方式接入kafka数据,表结构里多加了个时间字段,需要将kafka接入数据里的时间戳字段值转换后存入这个默认值是CURRENT_TIMESTAMP的时间字段,但创建完成后,状态为暂停,问题是出在那里呢,怎么实现这个时间戳转换并存入另一个时间字段
【StarRocks版本】是3.3.0
【集群规模】3fe(1 follower+2observer)+30be
表结构
CREATE TABLE ods.resops_perf_o_mem_usage_delta_o (
bk_ip string COMMENT “bk_ip”,
hostname string COMMENT “hostname”,
datatime DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT “datatime”,
bk_biz_id int COMMENT “bk_biz_id”,
bk_cloud_id int COMMENT “bk_cloud_id”,
c_time bigint COMMENT “c_time”,
psc_used double COMMENT “psc_used”,
psc_total double COMMENT “psc_total”,
psc_usage double COMMENT “psc_usage”,
buffer double COMMENT “buffer”,
cached double COMMENT “cached”,
free double COMMENT “free”
)
PARTITION BY RANGE(datatime)()
DISTRIBUTED BY HASH(bk_ip)
ORDER BY (datatime,c_time)
PROPERTIES (
“replication_num” = “2”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “HOUR”,
“dynamic_partition.time_zone” = “Asia/Shanghai”,
“dynamic_partition.start” = “-12”,
“dynamic_partition.end” = “3”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.buckets” = “16”,
“dynamic_partition.history_partition_num” = “0”,
“in_memory” = “false”,
“enable_persistent_index” = “true”,
“replicated_storage” = “true”,
“compression” = “LZ4”
);
routine load创建语句
CREATE ROUTINE LOAD kafka_perf_mem_record_job ON resops_perf_o_mem_usage_delta_o
COLUMNS (bk_ip,hostname,datatime=from_unixtime(c_time,’%Y-%m-%d %H:%i:%s’),bk_biz_id,bk_cloud_id,c_time,psc_used,psc_total,
psc_usage,buffer,cached,free)
PROPERTIES
(
“desired_concurrent_number”=“5”,
“format” = “json”,
“jsonpaths” = “[”$.bk_ip","$.hostname","$.bk_biz_id","$.bk_cloud_id","$.c_time","$.psc_used","$.psc_total",
“$.psc_usage”,"$.buffer","$.cached","$.free"]",
“max_batch_interval”=“30”,
“max_error_number”=“1000000” )
FROM KAFKA
(
“kafka_broker_list” ="[ipv6地址]:9092,[ipv6地址]:9092,[ipv6地址]:9092",
“kafka_topic” = “perf_o_mem”,
“property.security.protocol”=“PLAINTEXT”,
“property.sasl.mechanism”=“GSSAPI”,
“property.group.id” = “starrocks_kafka_perf_mem_record”,
“property.kafka_default_offsets” = “OFFSET_END”
);
kafka数据
{“localTime”: “2024-09-13 17:25:00”, “psc_usage”: 64.1705683633558, “bk_biz_id”: 205, “c_time”: 1726218831, “bk_ip”: “10.0.0.0”, “pct_used”: 54.196444463239914, “used”: 1.0886889472E11, “psc_used”: 1.2890474496E11, “pct_usable”: 45.803555536760086, “usable”: 9.2009402368E10, “hostname”: “SHPDHL-Q18F27”, “psc_total”: 2.00878297088E11, “cached”: 2.4534106112E10, “bk_cloud_id”: 329, “buffer”: 7.0529024E7, “free”: 7.1973552128E10, “timestamp”: 1726218831, “dtEventTime”: “2024-09-13 17:13:51”, “dtEventTimeStamp”: 1726218831000}