【详述】flink 1.14.2使用 exactly-once 数据无法写入 SR
【背景】使用至少一次正常写入,精确一次flink无报错,ckeckpoint 正常,SR没日志,数据也没写入;
checkpoint配置 10s 两次之间间隔 10,状态后端 RocksDB
【业务影响】
【StarRocks版本】2.15
【集群规模】3fe(1 follower+2observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:8C/32G/万兆
【附件】
val tabEnv = FlinkTool.tabEnv
val conf = tabEnv.getConfig.getConfiguration
conf.setString(“pipeline.name”, prefix)
conf.setString(“table.exec.resource.default-parallelism”, “5”)
tabEnv.executeSql(
“”"
|create table dwd_event(
| dt as CURRENT_DATE,
| id int,
| city_code string,
| user_name string,
| pv bigint
|) with (
| ‘connector’ = ‘datagen’,
| ‘rows-per-second’ = ‘3000’,
| ‘fields.id.min’=‘1’,
| ‘fields.id.max’=‘10000000’,
| ‘fields.id.kind’ = ‘random’,
| ‘fields.pv.kind’ = ‘random’,
| ‘fields.user_name.length’ = ‘50’,
| ‘fields.city_code.length’ = ‘10’
|)
|""".stripMargin)
tabEnv.executeSql(
“”"
|CREATE TABLE test_doplicate_tab(
| dt date,
| id int,
| city_code string,
| user_name string,
| pv bigint
|) WITH (
| ‘connector’ = ‘starrocks’,
| ‘jdbc-url’=‘jdbc:mysql://test-hadoop1:9030,test-hadoop2:9030,test-hadoop3:9030’,
| ‘load-url’=‘test-hadoop1:8030;test-hadoop2:8030;test-hadoop3:8030’,
| ‘database-name’ = ‘olap_test’,
| ‘table-name’ = ‘test_doplicate_tab’,
| ‘username’ = ‘olap_test01’,
| ‘password’ = ‘xxxxx$.123’,
| ‘sink.semantic’ = ‘at-least-once’,
| ‘sink.buffer-flush.max-rows’ = ‘1000000’,
| ‘sink.buffer-flush.max-bytes’ = ‘300000000’,
| ‘sink.buffer-flush.interval-ms’ = ‘5000’,
| ‘sink.max-retries’ = ‘3’
|)
|""".stripMargin)
tabEnv.executeSql(
“”"
|insert into test_doplicate_tab
|select * from dwd_event
|""".stripMargin)
明细表:

