PG通过flink-cdc sink到Starrocks 无法删除数据

【详述】PG通过flink-cdc sink到Starrocks 无法删除数据
【导入/导出方式】 fink-cdc导入 sink导出

可以详细描述下您的问题么?

通过flinkcdc 转换成json写入starrocks 只能覆写,不知道如何删除数据


26890e5dd874d7999faf5a4d5eb483b

1赞

老师有完整demo在git上共享下吗

这个是SR flink-connector的源码呀 :joy:

在搞schema时添加图一的信息就可以了。

意思就是用上面这个吗

FlinkApp.java (3.8 KB)
可以参考下

最后仅覆写了 没有将主键为 xxx的数据删除呢

:joy:兄弟检查下代码吧,这个代码测试过是没问题的,能够做到删除。

不需要创建__op的字段吗

兄弟 你这个没测试过的呀 你这个是更新字段

:joy: 不好意思发错code了 ,你在schema 最后一行添加 slots[slots.length - 1] = StarRocksSinkOP.xxxx.ordinal() ,应该就可以啦 xxxx 可以是 UPSERT 或者DElETE

1赞

public void accept(Object[] objects, flinkCdcTestDO f) {
objects[0] = f.getId();
objects[1] = f.getName();
objects[2] = f.getAge();
objects[3] = f.getCreate_at();
objects[4] = f.getUpdate_at();
if(f.getOp()!= POSTGRESQL_DELETE)
{
objects[objects.length-1] = StarRocksSinkOP.UPSERT.ordinal();
}
else {
objects[objects.length-1] = StarRocksSinkOP.DELETE.ordinal();

                            }

我通过这样指定 但是操作还是更新操作, 是前面还需要设置什么吗

噢噢噢 已经解决了 建表模型没选对, 太憨憨了 多谢大佬