【详述】routine load 从 kafka 加载数据入 SR 时, 使用 now() 函数, 提示 schedule routine load task failed
【背景】
【业务影响】
【是否存算分离】 是
【StarRocks版本】3.3.5 / 3.3.6 都存在, 3.2.12 正常
【集群规模】3fe(1 follower+2observer)+ 3cn 测试环境集群规模小
【机器信息】
【表模型】明细表
【联系方式】StarRocks 3.0-存算分离用户群: 可以自然点嘛
表结构
CREATE TABLE dwd.aaa (
`__dt` datetime NULL COMMENT "分区字段,入湖时间",
`ts` bigint(20) NULL COMMENT "Client time in milliseconds",
`tz` varchar(65533) NULL COMMENT "Time zone name"
) ENGINE=OLAP
DUPLICATE KEY(`__dt`)
COMMENT "原 ssp_spin_event 表"
PARTITION BY date_trunc('hour', __dt)
DISTRIBUTED BY HASH(`tz`)
PROPERTIES (
"replication_num" = "1",
"datacache.enable" = "true",
"datacache.partition_duration" = "1 DAY",
"storage_volume" = "builtin_storage_volume",
"enable_async_write_back" = "false",
"enable_persistent_index" = "false",
"partition_live_number" = "2400",
"compression" = "LZ4"
);
routine load
CREATE ROUTINE LOAD dwd.aaa ON aaa
COLUMNS(ts,tz,__dt=now())
PROPERTIES
(
"format" ="json",
"strict_mode" = "false",
"max_batch_interval" = "5",
"task_consume_second" = "20",
"desired_concurrent_number"="1",
"jsonpaths" = "[\"$.ts\",\"$.tz\"]"
)
FROM KAFKA
(
"kafka_topic" = "xxx",
"property.group.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_END",
"kafka_broker_list" = "xxx"
);
现象.
- 数据无法写入表,
- routine load 状态显示未 runing
- fe日志中提示
2024-11-28 08:50:53.881Z WARN (pool-19-thread-3|688) [RoutineLoadTaskScheduler.lambda$submitToSchedule$1():170] schedule routine load task failed
java.lang.NullPointerException: null
at com.starrocks.analysis.FunctionParams.<init>(FunctionParams.java:64) ~[starrocks-fe.jar:?]
at com.starrocks.analysis.FunctionParams.<init>(FunctionParams.java:86) ~[starrocks-fe.jar:?]
at com.starrocks.load.Load.transformHadoopFunctionExpr(Load.java:1078) ~[starrocks-fe.jar:?]
at com.starrocks.load.Load.initColumns(Load.java:547) ~[starrocks-fe.jar:?]
at com.starrocks.planner.StreamLoadScanNode.initColumns(StreamLoadScanNode.java:242) ~[starrocks-fe.jar:?]
at com.starrocks.planner.StreamLoadScanNode.initParams(StreamLoadScanNode.java:236) ~[starrocks-fe.jar:?]
at com.starrocks.planner.StreamLoadScanNode.init(StreamLoadScanNode.java:191) ~[starrocks-fe.jar:?]
at com.starrocks.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:194) ~[starrocks-fe.jar:?]
at com.starrocks.load.routineload.RoutineLoadJob.plan(RoutineLoadJob.java:876) ~[starrocks-fe.jar:?]
at com.starrocks.load.routineload.KafkaTaskInfo.plan(KafkaTaskInfo.java:242) ~[starrocks-fe.jar:?]
at com.starrocks.load.routineload.KafkaTaskInfo.createRoutineLoadTask(KafkaTaskInfo.java:196) ~[starrocks-fe.jar:?]
at com.starrocks.load.routineload.RoutineLoadTaskScheduler.scheduleOneTask(RoutineLoadTaskScheduler.java:245) ~[starrocks-fe.jar:?]
at com.starrocks.load.routineload.RoutineLoadTaskScheduler.lambda$submitToSchedule$1(RoutineLoadTaskScheduler.java:168) ~[starrocks-fe.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
- 将
now()
改为now(1)
无效, 依然报错
解决方案:
将 now()
函数换成 current_timestamp()
替代即可
单独使用 select now(), now(1)
都是正常
个人觉得这个可能是 routine load 在处理 now() 函数时的一个小 bug