参考文档,在run时报错,如下图,求解
帮忙发下完整的sink代码
package spendreport;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp");
env.fromElements("{\"score\": \"99\", \"name\": \"stephen\"}",
"{\"score\": \"100\", \"name\": \"lebron\"}").addSink(
StarRocksSink.sink(
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://localhost:16893,localhost:16993,localhost:17193?swifts")
.withProperty("load-url", "localhost:16830;localhost:16930;localhost:17130")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "ogg_table")
.withProperty("database-name", "swifts")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build()
)
);
env.execute("Print MySQL Snapshot + Binlog");
}
}
确认下集群是起来的吗?端口都是存活的吗?
这个是warn的日志,可以忽略,数据应该是写入进去了,可以看下表里面的数据,这块没有打印日志信息。
我看过这个文档,有些细节问题没搞明白:
1.每个表需要建独立的作业,数仓可能有几千上万张表需要同步,那就需要很多作业,会不会很占用flink的资源?还是说资源其实消耗很小
2.对于同一个库,如果一张表对应一个作业,那在读binlog时是所有表读一次还是一张表读一次?如果是一张表读一次binlog,看起来不合理了,无端增加磁盘io
3.文档没写清楚作业如果停止后怎么从上次同步的点恢复,flinkapi好像是有个checkpoint,用starrocks里的smt工具就不知道怎么解决这个问题?
1.目前smt是一个表会起一个job,多个表同步需要咱们利用flink-connector的stream api写入starrocks,cdc多表写入到starrocks的思路是source按cdc配置读取多表,mapfunc转化成StarRocksSinkRowDataWithMeta,然后写入到sr sink
2.读取binlog也是看job的数量的来决定读取多少次
3.smt只是帮助创建flink sql job,停止恢复还是利用的flink ckp的机制,开启ckp就可以
非常感谢!用了stream api,快成功了,还有个疑问,starrocks的sink是怎么处理删除操作的昵?业务表删除后怎么处理才能把目标表的数据删一下昵?
@jingdan 你好,目前我也在使用flink-connector实现多表sink到starrocks,看到前面你说的先把数据转化成 StarRocksSinkRowDataWithMeta,但是我看官网并没有给出这种方式的案例,想问一下这里有什么案例没有?如果没有,StarRocksSinkRowDataWithMeta要求的数据格式是什么?源码里的dataRows字段要求什么样的数据格式?json是否可以?还是csv格式?
请问你最终使用stream api成功了吗?
惭愧,并没有,在看hudi,这里解决个问题太难了,好多坑
我跟你正好相反,放弃了hudi,hudi坑也挺多的。
那你搞定后在这分享下?我是用分流的方式一个表调一个sink,但数据删除和ddl操作不知道怎么搞。另外分流的话我猜测是一个流占用一个slot,这个成本也够高的,感觉不是好方案
参考了文档https://docs.starrocks.com/zh-cn/main/loading/PrimaryKeyLoad,
数据里头加一列__op,值为1时就能够删除数据
{“name”:“111”,“id”:1,"__op":“1”}