sr2.5版本,kafka connect sink mongodb 数据(debezium source),无法处理删除事件

sr2.5版本,kafka connect sink mongodb 数据(debezium source),无法处理删除事件,配置如下:
“transforms”: “dropPrefix,dropPrefix2,addfield,unwrap”,
“transforms.addfield.type”: “com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord”,
“transforms.unwrap.type”: “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”,
“transforms.unwrap.drop.tombstones”: “true”,
“transforms.unwrap.delete.handling.mode”: “rewrite”

sr是通过什么方式sink数据的?mongodb里的delete事件写到kafka里是什么样子的?

sr 通过kafka connect 的 sink [starrocks-kafka-connector]进行导入,value数据格式为json,可以正常插入和修改,但是无法删除,删除事件如图:


后面尝试添加op字段,“transforms.unwrap.add.headers”: “op”,没有生效


可以参考下文档这块试试

image
试过了,对于mongodb 的source 来说,transforms.unwrap.type设置为ExtractNewDocumentState 才能成功sink,否则会报错,“transforms.unwrap.type”: “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”, 其他设置和文档一样,依然不能删除,但是能够插入和更新

com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord可能是这个类对于mongo 的删除事件无法添加__op,经测试,对删除事件依然进行了更新

AddOpFieldForDebeziumRecord这个应该是已经内置加了__op所以没法再加,所以kafka connect sink mongodb是能处理删除事件的?

com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord 和“transforms.unwrap.add.headers”: “op”这两个配置都处理不了删除事件,删除事件被处理成了更新事件,现在不太清楚应该怎么做 :joy:下面是debezium 对于官网添加op 的说明

删除事件被处理成了更新事件
请问这个在数据处理层面,比如上游mongodb删了一条数据,下游sr sink之后这条数据是怎样的?

上游mongodb正常删除数据,op字段为”d“, after 字段为null,下游数据sink 后无变化
通过kafka consloe 发送op字段为”d“, after 字段不为null,下游数据sink 后进行了数据更新

再mongodb source 端进行了实验,com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord无法添加__op,请问这个转换是实现什么操作呢,想看下能否配置其他transform 来间接实现这个功能
经试验,“transforms.unwrap.add.fields”: “op"可以添加”__op": “d”,但是在消费时会被处理为NULL,“transforms.unwrap.add.headers”: “op"无法添加”__op"

__op的作用可以参考这篇文档https://docs.starrocks.io/zh/docs/2.5/loading/Load_to_Primary_Key_tables
另外这个问题我问下相关同学

我也遇到相同的问题

请问能否有个针对mongodb的配置示例,我看 导入 Debezium CDC 格式数据 的配置并不能用于mongodb