create table tmp.bu_order_doris (
l_id string ,
l_code string ,
r_code string ,
elt_time timestamp
) with (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://172.26.136.211:9030/tmp’,
‘table-name’ = ‘bu_order’,
‘sink.buffer-flush.max-rows’ = ‘100’,
‘sink.buffer-flush.interval’ = ‘1s’,
‘lookup.cache.max-rows’ = ‘10000’,
‘lookup.cache.ttl’ = ‘300s’,
‘username’ = ‘root’,
‘password’ = ‘root123!m’
)
insert into tmp.bu_order_doris
select
l.in_store_id as l_id ,
l.in_store_code as l_code,
r.in_store_d_id as r_code,
LOCALTIMESTAMP
from dg_ve.t_ve_bu_in_store_order_kafka l
left join dg_ve.t_ve_bu_in_store_order_d_kafka r
on l.in_store_id=r.in_store_id
dg_ve.t_ve_bu_in_store_order_kafka和
dg_ve.t_ve_bu_in_store_order_d_kafka是在flinksql的环境中建立kafka的数据源
tmp.bu_order_doris是flinksql环境中对starocks的主键表tmp.bu_order的映射,建表语句如下
CREATE TABLE bu_order
(
l_id
varchar(65533) NOT NULL COMMENT “”,
l_code
varchar(65533) NULL COMMENT “”,
r_code
varchar(65533) NULL COMMENT “”,
elt_time
datetime NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY(l_id
)
COMMENT “OLAP”
DISTRIBUTED BY HASH(l_id
) BUCKETS 10
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
)