【详述】
使用DSG或者CDC工具从DB中采集日志,投递到Kafka中,然后使用Flink消费kafka中的数据写入到Starrocks,由于写入表数过多,我们打算一个DB库对应一个sink实现,目前已经实现了,但是更新操作遇到问题
【背景】
假如有一张表,有50个字段,数据库做了如下连续变更以及写入starrocks的现象
第一次更新:变更3个字段,kafka传递消息包括主键在内有4个字段,写入starrocks时,其他46个字段被赋值为null
第二次更新:变更13个字段,kafka传递消息包括主键在内有14个字段,写入starrocks,其他36个字段被赋值为null
第三次更新:变更46个字段,kafka传递消息包括主键在内有47个字段,写入starrocks,其他3个字段被赋值为null
…
【设置sink参数】
“sink.properties.partial_update”, “true”
“sink.properties.columns”, “50个字段”
【求助】
我是从kudu切换到starrocks出现的新问题,按理解针对同一张表有哪些字段变更就更新哪些字段,每次更新的字段可能不一样,但不应该覆盖未变更字段值,实际测试下来starrocks并没有很好的支持。或者有一些特性不清楚,发帖请教一下
部分列更新应该理解为:比如你这个表有50个字段,你这一批次只需要更新20个字段,那么sink.properties.columns这参数配置上你需要更新的这20个字段;对于整个这张表来说就是部分列更新
实时流式处理应用场景中,是不是sink.properties.columns不能根据消息动态变更?
因为我不知道DB端都变更哪些字段,可能有多个业务在使用,每个业务更新的字段不一样,考虑DB性能又不能开启全列附加日志,只获取有变更的字段
目前我也有相同问题,但是目前解决方案是,动态拼SQL实现了,但是效率会很低
“sink.properties.partial_update”, “true” 可实现部分列更新,sink.properties.columns 要带上更新的主键字段,然后只需要写上你更新的那几列就可以实现了