flink读取kafka流写入starrocks问题

【详述】flink读取kafka流写入starrocks未报错,但是写入有延迟,大多时候数据未写入进去
【背景】做过哪些操作?
1.kafka消息生产:
]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test
1001, “张三1”, 18, “111@qq.com”, “男”
2.flink程序读取
DataStreamSource dataSource = env.addSource(MyKafkaUtil.getKafkaConsumer(topic_test, group_id));
3.写入starrocks
dataSource.addSink(
StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, “jdbc:mysql://bigdata101:9030”)
.withProperty(“load-url”, “bigdata101:8050”)
.withProperty(“username”, “root”)
.withProperty(“password”, “”)
.withProperty(“table-name”, “flink_student”)
.withProperty(“database-name”, “test”)
// .withProperty(“sink.properties.format”, “json”)
// .withProperty(“sink.properties.column_separator”, “\x01”)
.withProperty(“sink.properties.row_delimiter”, “,”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.build())
);

    env.execute("SinkStarrocksDemo");

【业务影响】
【StarRocks版本】
Starrocks2.1.2
【集群规模】
3fe(1 follower+2observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡
3C/3G

kafka 数据发送的时候数据类型对吗?你对比本地的能成功的source和kafka source的数据 都打印一下,看看有什么差别

sink.buffer-flush.interval-ms = 1000