使用 now() 函数导致 schedule routine load task failed

【详述】routine load 从 kafka 加载数据入 SR 时, 使用 now() 函数, 提示 schedule routine load task failed
【背景】
【业务影响】
【是否存算分离】 是
【StarRocks版本】3.3.5 / 3.3.6 都存在, 3.2.12 正常
【集群规模】3fe(1 follower+2observer)+ 3cn 测试环境集群规模小
【机器信息】
【表模型】明细表
【联系方式】StarRocks 3.0-存算分离用户群: 可以自然点嘛

表结构

CREATE TABLE dwd.aaa (
  `__dt` datetime NULL COMMENT "分区字段,入湖时间",
  `ts` bigint(20) NULL COMMENT "Client time in milliseconds",
  `tz` varchar(65533) NULL COMMENT "Time zone name"

) ENGINE=OLAP
DUPLICATE KEY(`__dt`)
COMMENT "原 ssp_spin_event 表"
PARTITION BY date_trunc('hour', __dt)
DISTRIBUTED BY HASH(`tz`)
PROPERTIES (
"replication_num" = "1",
"datacache.enable" = "true",
"datacache.partition_duration" = "1 DAY",
"storage_volume" = "builtin_storage_volume",
"enable_async_write_back" = "false",
"enable_persistent_index" = "false",
"partition_live_number" = "2400",
"compression" = "LZ4"
);

routine load

CREATE ROUTINE LOAD dwd.aaa ON aaa
COLUMNS(ts,tz,__dt=now())
PROPERTIES
(
    "format" ="json",
    "strict_mode" = "false",
    "max_batch_interval" = "5",
    "task_consume_second" = "20",
    "desired_concurrent_number"="1",
    "jsonpaths" = "[\"$.ts\",\"$.tz\"]"

 )
FROM KAFKA
(
    "kafka_topic" = "xxx",
    "property.group.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_END",
    "kafka_broker_list" = "xxx"
);

现象.

  1. 数据无法写入表,
  2. routine load 状态显示未 runing
  3. fe日志中提示
2024-11-28 08:50:53.881Z WARN (pool-19-thread-3|688) [RoutineLoadTaskScheduler.lambda$submitToSchedule$1():170] schedule routine load task failed
java.lang.NullPointerException: null
	at com.starrocks.analysis.FunctionParams.<init>(FunctionParams.java:64) ~[starrocks-fe.jar:?]
	at com.starrocks.analysis.FunctionParams.<init>(FunctionParams.java:86) ~[starrocks-fe.jar:?]
	at com.starrocks.load.Load.transformHadoopFunctionExpr(Load.java:1078) ~[starrocks-fe.jar:?]
	at com.starrocks.load.Load.initColumns(Load.java:547) ~[starrocks-fe.jar:?]
	at com.starrocks.planner.StreamLoadScanNode.initColumns(StreamLoadScanNode.java:242) ~[starrocks-fe.jar:?]
	at com.starrocks.planner.StreamLoadScanNode.initParams(StreamLoadScanNode.java:236) ~[starrocks-fe.jar:?]
	at com.starrocks.planner.StreamLoadScanNode.init(StreamLoadScanNode.java:191) ~[starrocks-fe.jar:?]
	at com.starrocks.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:194) ~[starrocks-fe.jar:?]
	at com.starrocks.load.routineload.RoutineLoadJob.plan(RoutineLoadJob.java:876) ~[starrocks-fe.jar:?]
	at com.starrocks.load.routineload.KafkaTaskInfo.plan(KafkaTaskInfo.java:242) ~[starrocks-fe.jar:?]
	at com.starrocks.load.routineload.KafkaTaskInfo.createRoutineLoadTask(KafkaTaskInfo.java:196) ~[starrocks-fe.jar:?]
	at com.starrocks.load.routineload.RoutineLoadTaskScheduler.scheduleOneTask(RoutineLoadTaskScheduler.java:245) ~[starrocks-fe.jar:?]
	at com.starrocks.load.routineload.RoutineLoadTaskScheduler.lambda$submitToSchedule$1(RoutineLoadTaskScheduler.java:168) ~[starrocks-fe.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]
  1. now() 改为 now(1) 无效, 依然报错

解决方案:
now() 函数换成 current_timestamp() 替代即可

单独使用 select now(), now(1) 都是正常
个人觉得这个可能是 routine load 在处理 now() 函数时的一个小 bug