flink cdc直接通过connector写数据到starrocks,中间不用kafka。
没看见可以删除啊
参考https://docs.starrocks.com/zh-cn/main/loading/Flink_cdc_load
朋友针对flinkcdc删除的问题,你解决了吗?
您好,针对
引用 [U_1656639195598_2136]没看见可以删除啊
这个问题我也有遇见,历史和增量数据可以同步!但是source表数据有删除时,SR(已经通过增量同步过来了)作为sink表并不能通过flink cdc删除;我仔细查看了您推荐的文档,很惭愧没有解决,希望大佬指点下。
请问下写的是主键模型吗?
用smt工具生成的sr表结构,仅有一个唯一id,和mysql表一致;
CREATE TABLE aaa(
…
)ENGINE=olap
PRIMARY KEY( ORDER_DETAIL_ID
)
COMMENT “订单明细”
DISTRIBUTED BY HASH( ORDER_DETAIL_ID
) BUCKETS 110
PROPERTIES (
“replication_num” = “2”
);
对了 补充一下,我使用的<groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId>
作为source进行实时同步的,它返回的json格式是这面这种{ "before": null, "after": { "ORDER_DETAIL_ID": 37410227, ... }, "source": { "version": "1.6.4.Final", "connector": "mysql", "name": "mysql_binlog_source", "db": "xcode", "table": "order_detail", ... }, "op": "r", "ts_ms": 1670570463353, "transaction": null }
我在想是不是这种格式sr sink不兼容导致的?望指教
可以参考这个里面的https://forum.mirrorship.cn/t/topic/1680排查下,另外咱们用的不是flink sql模式?
多谢解答,确实是cdc版本问题,已解决。
您好,如果是用flink datastream api同步我需要处理cdc的格式吗?这种会报错 too many filter rows;
mysql cdc的source数据格式是:
{“before”: null,
“after”: {
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “mysql”,
“name”: “mysql_binlog_source”,。。。
},
“op”: “r”,
“ts_ms”: 1671006027569,
“transaction”: null
}如果以上的格式我只拿出来before或者after中的json数据是能够同步的,但是却无法删除了。
需要自己解析下cdc的source数据,根据op的类型来获取before或者after,然后再添加op是delete还是upsert写入starrocks
好的 已解决json中需要有{"__op":1}表示删除,{"__op":0}表示upsert
你好,请问最终写入starrocks ,你是使用flinksql的方式,还是用flink datastream api + stream load 写入的呢?
你好,我是用datastream操作的,直接在JSON添加字段,没法实现flink cdc执行删除操作,可以借鉴下你的代码么
JSON里面可以加个key,{’__op’:‘1’},1表示删除,默认是0,表示upsert
您好我遇见了新的问题,还望指教;
问题:在使用streamload导入数据的时候,我使用的模拟csv字符串使用httpClient请求api:http://ip:port/api/db/table/_stream_load
当ip和port(8030)是FE的时候总是出现一次成功一次失败(必现),失败的信息Bad Request.
text is empty (possibly HTTP/0.9);如果我把ip和port使用指定BE的时候port(8040)是可以正常执行的;
环境:java1.8,starrocks:2.5.14
关键代码:
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content, StandardCharsets.UTF_8);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(starrocksUser, starrocksPassword));
put.setHeader("column_separator", ",");
put.setHeader("label", label);
put.setEntity(entity);