starrocks的sink报错,求解

参考文档,在run时报错,如下图,求解

帮忙发下完整的sink代码

package spendreport;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MysqlCDC {
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env.getCheckpointConfig().setCheckpointStorage("file:///tmp");
    env.fromElements("{\"score\": \"99\", \"name\": \"stephen\"}",
            "{\"score\": \"100\", \"name\": \"lebron\"}").addSink(
            StarRocksSink.sink(
                    // the sink options
                    StarRocksSinkOptions.builder()
                            .withProperty("jdbc-url", "jdbc:mysql://localhost:16893,localhost:16993,localhost:17193?swifts")
                            .withProperty("load-url", "localhost:16830;localhost:16930;localhost:17130")
                            .withProperty("username", "root")
                            .withProperty("password", "")
                            .withProperty("table-name", "ogg_table")
                            .withProperty("database-name", "swifts")
                            .withProperty("sink.properties.format", "json")
                            .withProperty("sink.properties.strip_outer_array", "true")
                            .build()
            )
    );



    env.execute("Print MySQL Snapshot + Binlog");
}

}

确认下集群是起来的吗?端口都是存活的吗?

集群起来了,改成一台的地址,错误信息不一样

这个是warn的日志,可以忽略,数据应该是写入进去了,可以看下表里面的数据,这块没有打印日志信息。

请问下,用flinkcdc拿到的多表数据怎么放入starrock?感谢!

请问下为啥要代码实现这块呢?是flink-cdcd之后的数据做一些复杂etl吗?如果没有复杂etl,可以使用smt+flink-cdc通过flinkl-sql实现,具体可参考 mysql cdc

我看过这个文档,有些细节问题没搞明白:
1.每个表需要建独立的作业,数仓可能有几千上万张表需要同步,那就需要很多作业,会不会很占用flink的资源?还是说资源其实消耗很小
2.对于同一个库,如果一张表对应一个作业,那在读binlog时是所有表读一次还是一张表读一次?如果是一张表读一次binlog,看起来不合理了,无端增加磁盘io
3.文档没写清楚作业如果停止后怎么从上次同步的点恢复,flinkapi好像是有个checkpoint,用starrocks里的smt工具就不知道怎么解决这个问题?

1.目前smt是一个表会起一个job,多个表同步需要咱们利用flink-connector的stream api写入starrocks,cdc多表写入到starrocks的思路是source按cdc配置读取多表,mapfunc转化成StarRocksSinkRowDataWithMeta,然后写入到sr sink
2.读取binlog也是看job的数量的来决定读取多少次
3.smt只是帮助创建flink sql job,停止恢复还是利用的flink ckp的机制,开启ckp就可以

非常感谢!用了stream api,快成功了,还有个疑问,starrocks的sink是怎么处理删除操作的昵?业务表删除后怎么处理才能把目标表的数据删一下昵?

@jingdan 你好,目前我也在使用flink-connector实现多表sink到starrocks,看到前面你说的先把数据转化成 StarRocksSinkRowDataWithMeta,但是我看官网并没有给出这种方式的案例,想问一下这里有什么案例没有?如果没有,StarRocksSinkRowDataWithMeta要求的数据格式是什么?源码里的dataRows字段要求什么样的数据格式?json是否可以?还是csv格式?

请问你最终使用stream api成功了吗?

csv和json都支持

惭愧,并没有,在看hudi,这里解决个问题太难了,好多坑

我跟你正好相反,放弃了hudi,hudi坑也挺多的。

那你搞定后在这分享下?我是用分流的方式一个表调一个sink,但数据删除和ddl操作不知道怎么搞。另外分流的话我猜测是一个流占用一个slot,这个成本也够高的,感觉不是好方案

参考了文档https://docs.starrocks.com/zh-cn/main/loading/PrimaryKeyLoad,
数据里头加一列__op,值为1时就能够删除数据
{“name”:“111”,“id”:1,"__op":“1”}