【详述】PG通过flink-cdc sink到Starrocks 无法删除数据
【导入/导出方式】 fink-cdc导入 sink导出
可以详细描述下您的问题么?
通过flinkcdc 转换成json写入starrocks 只能覆写,不知道如何删除数据
老师有完整demo在git上共享下吗
这个是SR flink-connector的源码呀
在搞schema时添加图一的信息就可以了。
最后仅覆写了 没有将主键为 xxx的数据删除呢
兄弟检查下代码吧,这个代码测试过是没问题的,能够做到删除。
不需要创建__op的字段吗
兄弟 你这个没测试过的呀 你这个是更新字段
不好意思发错code了 ,你在schema 最后一行添加 slots[slots.length - 1] = StarRocksSinkOP.xxxx.ordinal() ,应该就可以啦 xxxx 可以是 UPSERT 或者DElETE
public void accept(Object[] objects, flinkCdcTestDO f) {
objects[0] = f.getId();
objects[1] = f.getName();
objects[2] = f.getAge();
objects[3] = f.getCreate_at();
objects[4] = f.getUpdate_at();
if(f.getOp()!= POSTGRESQL_DELETE)
{
objects[objects.length-1] = StarRocksSinkOP.UPSERT.ordinal();
}
else {
objects[objects.length-1] = StarRocksSinkOP.DELETE.ordinal();
}
我通过这样指定 但是操作还是更新操作, 是前面还需要设置什么吗
噢噢噢 已经解决了 建表模型没选对, 太憨憨了 多谢大佬