使用flink将ogg增量数据同步SR,原有数据无法删除

【详述】通过OGG将增量数据下发到kafka,然后通过flink将数据导入到SR,更新/新增的数据都能同步到SR,删除的数据存在问题
举个例子
Oracle 有10条数据, 我先通过 datax全量同步到SR,然后Oracle 上插入的一条数据, flink消费后,SR也会增一条,但是原本的这10条数据,Oracle 端删除,sr不会删除,
只有后来新增/更新的数据,oracle端删除,SR才会删除

【业务影响】
后续想通过flink做实时同步的业务无法开展
【StarRocks版本】2.3.9/2.4.4两个版本都试过
【集群规模】例如:3fe(1 follower+2observer)+6be
【机器信息】48C/192G/万兆
【表模型】主键模型
【导入或者导出方式】Flink
【联系方式】linjuqiu@sanfu.com
【附件】
flink-connector-starrocks 为1.2.6_flink-1.15
flink 代码 如下

package com.linjiuqiu.starrocks;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Instant;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableEnvironment;

/**
 * @program: flink-study
 * @description:
 * @author: Akisan
 * @create: 2023-04-26 10:48
 */
public class Ogg2SR_alc_allocs_t {
    public static void main(String[] args) {
        // 创建执行环境 TableEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每 1000ms 开始一次 checkpoint
        env.enableCheckpointing(30000);
        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//        env.setStateBackend(new FsStateBackend("hdfs://192.168.0.62:8020/flink/checkpoints"));

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(120000);
        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 开启实验性的 unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
//                .newInstance()
//                .inStreamingMode()
        //.inBatchMode()
//                .build();
        TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        // 定义job name
        configuration.setString("pipeline.name", "alc_allocs");


        tableEnv.executeSql("CREATE TABLE KafkaTable1 (\n" +
                "  `ALLOC_ID` varchar(45) NOT NULL COMMENT '配货单号',\n" +
                "  `SHO_ID` varchar(12) NOT NULL COMMENT '店号',\n" +
                "  `GOODSID` varchar(39) NOT NULL COMMENT '商品',\n" +
                "  `GOO_ID` varchar(39) NULL COMMENT '货号',\n" +
                "  `BARCODE` varchar(39) NULL COMMENT '色码',\n" +
                "  `COMPUTE_ASSIGNAMOUNT` decimal NULL COMMENT '计算调配量',\n" +
                "  `FINAL_ASSIGNAMOUNT` decimal NULL COMMENT '最终调配量',\n" +
                "  `TRANSFERAMOUNT` decimal NULL COMMENT '默认值0',\n" +
                "  `GOTAMOUNT` decimal NULL COMMENT '到货量(结束验货后确认的数量,用此数量与ALLOC_AMOUNT判断是否完成)',\n" +
                "  `COMEAMOUNT` decimal NULL COMMENT '收货量',\n" +
                "  `IS_FREEZE` bigint NULL COMMENT '是否冻结',\n" +
                "  `PLAN_ASSIGNAMOUNT` decimal NULL COMMENT '计划调配量',\n" +
                "  `SHO_GROUP` varchar(60) NULL COMMENT '店群',\n" +
                "  `SHOPCLASSIFICATION` varchar(60) NULL COMMENT '店分类',\n" +
                "  `ORDERBASEAMOUNT` decimal NULL COMMENT '订货基数,与ALC_ALLOCITEM一致',\n" +
                "  `CANCELAMOUNT` decimal NULL COMMENT '取消量(记录单据关闭时发货量与配货量的差异)',\n" +
                "  `FUTURE_AMOUNT` decimal NULL COMMENT 'AI将来履行量',\n" +
                "  `AI_SUPPLY_WEEK` decimal NULL COMMENT 'AI供应周数',\n" +
                "  `AI_RCOMPUTE_STIME` TIMESTAMP NULL COMMENT 'AI重新计算开始时间',\n" +
                "  `AI_RCOMPUTE_ETIME` TIMESTAMP NULL COMMENT 'AI重新计算完成时间'\n" +
//                "  `__op` int NULL \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'ODS_LJQ82_ALC_ALLOCS',\n" +
                "  'properties.bootstrap.servers' = '172.28.xx.xx:9091',\n" +
                "  'properties.group.id' = 'FlinkGroup-ads-t',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
//                "  'scan.startup.mode' = 'timestamp',\n" +
//                "  'scan.startup.timestamp-millis' = '"+ halfHourAgo.toEpochMilli() +"',\n" +
                "  'format' = 'ogg-json',\n" +
                "  'ogg-json.ignore-parse-errors' = 'true'\n" +
                ")");
        // sink 配置

        tableEnv.executeSql(
                "CREATE TABLE USER_RESULT(" +
                        "  `ALLOC_ID` varchar(45) NOT NULL COMMENT '配货单号',\n" +
                        "  `SHO_ID` varchar(12) NOT NULL COMMENT '店号',\n" +
                        "  `GOODSID` varchar(39) NOT NULL COMMENT '商品',\n" +
                        "  `GOO_ID` varchar(39) NULL COMMENT '货号',\n" +
                        "  `BARCODE` varchar(39) NULL COMMENT '色码',\n" +
                        "  `COMPUTE_ASSIGNAMOUNT` decimal NULL COMMENT '计算调配量',\n" +
                        "  `FINAL_ASSIGNAMOUNT` decimal NULL COMMENT '最终调配量',\n" +
                        "  `TRANSFERAMOUNT` decimal NULL COMMENT '默认值0',\n" +
                        "  `GOTAMOUNT` decimal NULL COMMENT '到货量(结束验货后确认的数量,用此数量与ALLOC_AMOUNT判断是否完成)',\n" +
                        "  `COMEAMOUNT` decimal NULL COMMENT '收货量',\n" +
                        "  `IS_FREEZE` bigint NULL COMMENT '是否冻结',\n" +
                        "  `PLAN_ASSIGNAMOUNT` decimal NULL COMMENT '计划调配量',\n" +
                        "  `SHO_GROUP` varchar(60) NULL COMMENT '店群',\n" +
                        "  `SHOPCLASSIFICATION` varchar(60) NULL COMMENT '店分类',\n" +
                        "  `ORDERBASEAMOUNT` decimal NULL COMMENT '订货基数,与ALC_ALLOCITEM一致',\n" +
                        "  `CANCELAMOUNT` decimal NULL COMMENT '取消量(记录单据关闭时发货量与配货量的差异)',\n" +
                        "  `FUTURE_AMOUNT` decimal NULL COMMENT 'AI将来履行量',\n" +
                        "  `AI_SUPPLY_WEEK` decimal NULL COMMENT 'AI供应周数',\n" +
                        "  `AI_RCOMPUTE_STIME` TIMESTAMP NULL COMMENT 'AI重新计算开始时间',\n" +
                        "  `AI_RCOMPUTE_ETIME` TIMESTAMP NULL COMMENT 'AI重新计算完成时间',\n" +
//                        "  `__op` int NULL, \n" +
                        " PRIMARY KEY (ALLOC_ID,SHO_ID,GOODSID) NOT ENFORCED \n" +
                        ") WITH ( " +
                        "'connector' = 'starrocks'," +
                        "'jdbc-url'='jdbc:mysql://172.28.40.64:9030?characterEncoding=utf-8&useSSL=false'," +
//                        "'jdbc-url'='jdbc:mysql://192.168.101.25:9030?characterEncoding=utf-8&useSSL=false'," +
//                        "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
//                        "'load-url'='192.168.xx.xx:8030'," +
                        "'load-url'='172.28.xx.xx:8030'," +
                        "'database-name' = 'test'," +
                        "'table-name' = 'alc_allocs'," +
                        "'username' = 'root'," +
                        "'password' = ''," +
                        "'sink.buffer-flush.max-rows' = '1000000'," +
                        "'sink.buffer-flush.max-bytes' = '300000000'," +
                        "'sink.buffer-flush.interval-ms' = '10000'," +
                        // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列,需要在'sink.properties.columns'的最后显示添加'__op'列。
                        // "'sink.properties.partial_update' = 'true'," +
//                         "'sink.properties.columns' = '__op'," +
                        "'sink.properties.column_separator' = '\\x01'," +
                        "'sink.properties.row_delimiter' = '\\x02'," +
//                        "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
                        "'sink.max-retries' = '3'" +
                        ")"
        );



//        TableResult tableResult = tableEnv.executeSql("SELECT * FROM KafkaTable1; ");

        tableEnv.executeSql("insert into USER_RESULT select * from KafkaTable1;");
//        tableEnv.executeSql("select * from KafkaTable1;").print();


    }
}

你用的flink和starrocks分别是什么版本? 我用的flink1.16.1,starrocks3.0, flink-connector-starrocks 为1.2.6_flink-1.15. 发现好多bug, 同步数据基本用不了

你的版本不兼容,flink,flink cdc,sr有很多个版本