【详述】通过OGG将增量数据下发到kafka,然后通过flink将数据导入到SR,更新/新增的数据都能同步到SR,删除的数据存在问题
举个例子
Oracle 有10条数据, 我先通过 datax全量同步到SR,然后Oracle 上插入的一条数据, flink消费后,SR也会增一条,但是原本的这10条数据,Oracle 端删除,sr不会删除,
只有后来新增/更新的数据,oracle端删除,SR才会删除
【业务影响】
后续想通过flink做实时同步的业务无法开展
【StarRocks版本】2.3.9/2.4.4两个版本都试过
【集群规模】例如:3fe(1 follower+2observer)+6be
【机器信息】48C/192G/万兆
【表模型】主键模型
【导入或者导出方式】Flink
【联系方式】linjuqiu@sanfu.com
【附件】
flink-connector-starrocks 为1.2.6_flink-1.15
flink 代码 如下
package com.linjiuqiu.starrocks;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Instant;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableEnvironment;
/**
* @program: flink-study
* @description:
* @author: Akisan
* @create: 2023-04-26 10:48
*/
public class Ogg2SR_alc_allocs_t {
public static void main(String[] args) {
// 创建执行环境 TableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(30000);
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// env.setStateBackend(new FsStateBackend("hdfs://192.168.0.62:8020/flink/checkpoints"));
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(120000);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// .newInstance()
// .inStreamingMode()
//.inBatchMode()
// .build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Configuration configuration = tableEnv.getConfig().getConfiguration();
// 定义job name
configuration.setString("pipeline.name", "alc_allocs");
tableEnv.executeSql("CREATE TABLE KafkaTable1 (\n" +
" `ALLOC_ID` varchar(45) NOT NULL COMMENT '配货单号',\n" +
" `SHO_ID` varchar(12) NOT NULL COMMENT '店号',\n" +
" `GOODSID` varchar(39) NOT NULL COMMENT '商品',\n" +
" `GOO_ID` varchar(39) NULL COMMENT '货号',\n" +
" `BARCODE` varchar(39) NULL COMMENT '色码',\n" +
" `COMPUTE_ASSIGNAMOUNT` decimal NULL COMMENT '计算调配量',\n" +
" `FINAL_ASSIGNAMOUNT` decimal NULL COMMENT '最终调配量',\n" +
" `TRANSFERAMOUNT` decimal NULL COMMENT '默认值0',\n" +
" `GOTAMOUNT` decimal NULL COMMENT '到货量(结束验货后确认的数量,用此数量与ALLOC_AMOUNT判断是否完成)',\n" +
" `COMEAMOUNT` decimal NULL COMMENT '收货量',\n" +
" `IS_FREEZE` bigint NULL COMMENT '是否冻结',\n" +
" `PLAN_ASSIGNAMOUNT` decimal NULL COMMENT '计划调配量',\n" +
" `SHO_GROUP` varchar(60) NULL COMMENT '店群',\n" +
" `SHOPCLASSIFICATION` varchar(60) NULL COMMENT '店分类',\n" +
" `ORDERBASEAMOUNT` decimal NULL COMMENT '订货基数,与ALC_ALLOCITEM一致',\n" +
" `CANCELAMOUNT` decimal NULL COMMENT '取消量(记录单据关闭时发货量与配货量的差异)',\n" +
" `FUTURE_AMOUNT` decimal NULL COMMENT 'AI将来履行量',\n" +
" `AI_SUPPLY_WEEK` decimal NULL COMMENT 'AI供应周数',\n" +
" `AI_RCOMPUTE_STIME` TIMESTAMP NULL COMMENT 'AI重新计算开始时间',\n" +
" `AI_RCOMPUTE_ETIME` TIMESTAMP NULL COMMENT 'AI重新计算完成时间'\n" +
// " `__op` int NULL \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'ODS_LJQ82_ALC_ALLOCS',\n" +
" 'properties.bootstrap.servers' = '172.28.xx.xx:9091',\n" +
" 'properties.group.id' = 'FlinkGroup-ads-t',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
// " 'scan.startup.mode' = 'timestamp',\n" +
// " 'scan.startup.timestamp-millis' = '"+ halfHourAgo.toEpochMilli() +"',\n" +
" 'format' = 'ogg-json',\n" +
" 'ogg-json.ignore-parse-errors' = 'true'\n" +
")");
// sink 配置
tableEnv.executeSql(
"CREATE TABLE USER_RESULT(" +
" `ALLOC_ID` varchar(45) NOT NULL COMMENT '配货单号',\n" +
" `SHO_ID` varchar(12) NOT NULL COMMENT '店号',\n" +
" `GOODSID` varchar(39) NOT NULL COMMENT '商品',\n" +
" `GOO_ID` varchar(39) NULL COMMENT '货号',\n" +
" `BARCODE` varchar(39) NULL COMMENT '色码',\n" +
" `COMPUTE_ASSIGNAMOUNT` decimal NULL COMMENT '计算调配量',\n" +
" `FINAL_ASSIGNAMOUNT` decimal NULL COMMENT '最终调配量',\n" +
" `TRANSFERAMOUNT` decimal NULL COMMENT '默认值0',\n" +
" `GOTAMOUNT` decimal NULL COMMENT '到货量(结束验货后确认的数量,用此数量与ALLOC_AMOUNT判断是否完成)',\n" +
" `COMEAMOUNT` decimal NULL COMMENT '收货量',\n" +
" `IS_FREEZE` bigint NULL COMMENT '是否冻结',\n" +
" `PLAN_ASSIGNAMOUNT` decimal NULL COMMENT '计划调配量',\n" +
" `SHO_GROUP` varchar(60) NULL COMMENT '店群',\n" +
" `SHOPCLASSIFICATION` varchar(60) NULL COMMENT '店分类',\n" +
" `ORDERBASEAMOUNT` decimal NULL COMMENT '订货基数,与ALC_ALLOCITEM一致',\n" +
" `CANCELAMOUNT` decimal NULL COMMENT '取消量(记录单据关闭时发货量与配货量的差异)',\n" +
" `FUTURE_AMOUNT` decimal NULL COMMENT 'AI将来履行量',\n" +
" `AI_SUPPLY_WEEK` decimal NULL COMMENT 'AI供应周数',\n" +
" `AI_RCOMPUTE_STIME` TIMESTAMP NULL COMMENT 'AI重新计算开始时间',\n" +
" `AI_RCOMPUTE_ETIME` TIMESTAMP NULL COMMENT 'AI重新计算完成时间',\n" +
// " `__op` int NULL, \n" +
" PRIMARY KEY (ALLOC_ID,SHO_ID,GOODSID) NOT ENFORCED \n" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='jdbc:mysql://172.28.40.64:9030?characterEncoding=utf-8&useSSL=false'," +
// "'jdbc-url'='jdbc:mysql://192.168.101.25:9030?characterEncoding=utf-8&useSSL=false'," +
// "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
// "'load-url'='192.168.xx.xx:8030'," +
"'load-url'='172.28.xx.xx:8030'," +
"'database-name' = 'test'," +
"'table-name' = 'alc_allocs'," +
"'username' = 'root'," +
"'password' = ''," +
"'sink.buffer-flush.max-rows' = '1000000'," +
"'sink.buffer-flush.max-bytes' = '300000000'," +
"'sink.buffer-flush.interval-ms' = '10000'," +
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列,需要在'sink.properties.columns'的最后显示添加'__op'列。
// "'sink.properties.partial_update' = 'true'," +
// "'sink.properties.columns' = '__op'," +
"'sink.properties.column_separator' = '\\x01'," +
"'sink.properties.row_delimiter' = '\\x02'," +
// "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
"'sink.max-retries' = '3'" +
")"
);
// TableResult tableResult = tableEnv.executeSql("SELECT * FROM KafkaTable1; ");
tableEnv.executeSql("insert into USER_RESULT select * from KafkaTable1;");
// tableEnv.executeSql("select * from KafkaTable1;").print();
}
}