flink-connector

为了更快的定位您的问题,请提供以下信息,谢谢
【详述】问题详细描述
用flink-connector写入sr,三台集群,三个fe,三个be。
source是自定义内存造数据,固定条数和大小。
为何加大flink任务的并发不能提升整体的写入速率???
【背景】做过哪些操作?
【业务影响】
【是否存算分离】 存算一体
【StarRocks版本】最新
【connector版本】flink-connector 1.2 或者spark-connector 1.2
【集群规模】例如:3fe(1 follower+2observer)+5be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【联系方式】例如:社区群24-张义传
【附件】
图片

flink 是standlone集群,根据具体并发数调节tm中slot个数
代码如下:

`
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> source = env.addSource(new MySource3(Integer.parseInt(args[0])));


    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://10.0.81.195:9030,10.0.81.50:9030,10.0.81.63:9030")
            .withProperty("load-url", "10.0.81.195:8030;10.0.81.50:8030;10.0.81.63:8030")
            .withProperty("database-name", "ssb")
            .withProperty("table-name", args[1])
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "csv")
            .withProperty("sink.properties.column_separator", "|")
            .withProperty("sink.buffer-flush.interval-ms", "30000")
            .build();

    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
    env.execute("StarRocks flink source");

}`