什么时候支持flinkcdc通过stream load直接同步增删改的数据到StarRocks呢

在flink-sql-client创建flink cdc对oracle的映射表A,再创建对StarRocks的映射表B,执行insert into B select * from A即可实现实时同步oracle的增删改数据到StarRocks。

你好,当前flink-cdc是支持同步增删改数据的,您的问题是?

flink-connector-starrocks支持同步增删改数据吗

是用connector消费kafka写starrocks嘛

flink cdc直接通过connector写数据到starrocks,中间不用kafka。

参考flink-cdc,就可以实现从mysql实时同步到starrocks

没看见可以删除啊

参考https://docs.starrocks.com/zh-cn/main/loading/Flink_cdc_load

朋友针对flinkcdc删除的问题,你解决了吗?

您好,针对

引用 [U_1656639195598_2136]没看见可以删除啊
这个问题我也有遇见,历史和增量数据可以同步!但是source表数据有删除时,SR(已经通过增量同步过来了)作为sink表并不能通过flink cdc删除;我仔细查看了您推荐的文档,很惭愧没有解决,希望大佬指点下。

请问下写的是主键模型吗?

用smt工具生成的sr表结构,仅有一个唯一id,和mysql表一致;
CREATE TABLE aaa(

)ENGINE=olap
PRIMARY KEY( ORDER_DETAIL_ID )
COMMENT “订单明细”
DISTRIBUTED BY HASH( ORDER_DETAIL_ID ) BUCKETS 110
PROPERTIES (
“replication_num” = “2”
);

对了 补充一下,我使用的<groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId>作为source进行实时同步的,它返回的json格式是这面这种{ "before": null, "after": { "ORDER_DETAIL_ID": 37410227, ... }, "source": { "version": "1.6.4.Final", "connector": "mysql", "name": "mysql_binlog_source", "db": "xcode", "table": "order_detail", ... }, "op": "r", "ts_ms": 1670570463353, "transaction": null }我在想是不是这种格式sr sink不兼容导致的?望指教

可以参考这个里面的https://forum.mirrorship.cn/t/topic/1680排查下,另外咱们用的不是flink sql模式?

另外flink需要和cdc版本匹配

多谢解答,确实是cdc版本问题,已解决。

您好,如果是用flink datastream api同步我需要处理cdc的格式吗?这种会报错 too many filter rows;
mysql cdc的source数据格式是:
{“before”: null,
“after”: {
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “mysql”,
“name”: “mysql_binlog_source”,。。。
},
“op”: “r”,
“ts_ms”: 1671006027569,
“transaction”: null
}如果以上的格式我只拿出来before或者after中的json数据是能够同步的,但是却无法删除了。

需要自己解析下cdc的source数据,根据op的类型来获取before或者after,然后再添加op是delete还是upsert写入starrocks

好的 已解决json中需要有{"__op":1}表示删除,{"__op":0}表示upsert

你好,请问最终写入starrocks ,你是使用flinksql的方式,还是用flink datastream api + stream load 写入的呢?