3.1.4物化视图设置自动更新未更新

为了更快的定位您的问题,请提供以下信息,谢谢
【详述】flink-sql实时写入sr中,创建异步物化视图设置为自动刷新,flink-sql写入基表发生变化,但是物化视图没有变化。使用客户端更新基表,物化视图发生变化。
【背景】
【业务影响】 影响
【是否存算分离】 否
【StarRocks版本】3.1.4
【集群规模】3fe+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【联系方式】v:sun7891011
【附件】
物化视图:
CREATE MATERIALIZED VIEW mv_test5
REFRESH IMMEDIATE ASYNC
PROPERTIES (
“replicated_storage” = “true”,
“replication_num” = “3”,
“storage_medium” = “HDD”
)
AS
SELECT a,b,c,loaded_time from testsun;

基表:
create table testsun(
a varchar(200)
,b varchar(200)
,c varchar(200)
,loaded_time datetime
)
ENGINE=OLAP
PRIMARY KEY(a)
DISTRIBUTED BY HASH(a)
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“enable_persistent_index” = “false”,
“replicated_storage” = “true”,
“compression” = “LZ4”
);

flink-sql写入发现testsun表中数据实时更新,但是物化视图没有更新。
在客户端中手动更新testsun表,发生物化视图发生了更新。

通过information_schema.task_runs表看一下该物化视图的刷新历史

看看只是创建的时候刷新了,后面就没有刷新。
select * from information_schema.materialized_views结果:


sr版本3.1.14

flink-sql写入方式:
create table source_trade_order (

timestamp STRING ,

schema STRING ,

table STRING ,

action string,

columns ARRAY<ROW<name STRING,type STRING,value STRING>>,

identity ARRAY<ROW<name STRING,type STRING,value STRING>>

) WITH (

‘connector’ = ‘kafka’, --kafka连接器

‘topic’ = ‘ods_base_trade’,

‘properties.bootstrap.servers’ = ‘xxxx:9092’,

‘properties.group.id’ = ‘ods_trade_order4’,

‘scan.startup.mode’ = ‘latest-offset’,

‘format’ = ‘json’ --指定格式

);

CREATE TABLE trade_order_sink (

a varchar(200) NOT NULL ,

b varchar(200) NOT NULL ,

c varchar(200) NULL ,

loaded_time TIMESTAMP NULL ,

PRIMARY KEY(a, b) NOT ENFORCED

) WITH (

‘jdbc-url’ = ‘jdbc:mysql://xxxx:9030’,

‘connector’ = ‘starrocks’,

‘database-name’ = ‘xxxx’,

‘table-name’ = ‘testsun’,

‘password’ = ‘xxxx’,

‘load-url’ = ‘xxxxx:8036’,

‘username’ = ‘xxxx’,

‘sink.properties.format’ = ‘json’,

‘sink.properties.strip_outer_array’ = ‘true’,

‘sink.max-retries’ = ‘10’,

‘sink.buffer-flush.interval-ms’ = ‘15000’,

‘sink.parallelism’ = ‘1’

);

upsert into trade_order_sink

select

columns[1].value as a,

columns[1].value as b,

columns[1].value as c,

CURRENT_TIMESTAMP

from source_trade_order

where table=‘testsun’