待完善-使用 flink-connector-starrocks,从 Kafka 导入数据至 StarRocks,出现connect refused+checkpoint expire

1、概述

使用 StarRocks 提供的 flink-connector-starrocks,导入数据至 StarRocks。

2、版本说明

jdk:1.8
scala:2.11
flink:1.14.6
flink-connector-starrocks:1.1.16_flink-1.14_2.11
flink-connector-kafka_2.11:1.14.6

3、Kafka分区及数据量级

kafka分区:12
峰值:35000条/秒/分区

4、Flink配置

flink分区:12
checkpoint-interval:5000
checkpoint-mode:exactly-once

启动参数:

-ytm 3072mb
-ys 3
-yD yarn.containers.vcores=3
-yD state.checkpoints.num-retained=8
-yD taskmanager.memory.managed.fraction=0.05
-yD taskmanager.memory.network.fraction=0.05
yD env.java.opts="-XX:+UseG1GC -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"

5、SR 导入配置

sink.properties.format:json
sink.properties.strip_outer_array:true
sink.buffer-flush.max-rows:100000
sink.buffer-flush.interval-ms:10000
sink.connect.timeout-ms:5000
sink.max-retries:5

6、BE 配置

cumulative_compaction_num_threads_per_disk = 4
base_compaction_num_threads_per_disk = 2
cumulative_compaction_check_interval_seconds = 2

7、节点说明

8、部分建表语句

DUPLICATE KEY(productid,_td_sdk_source,eventtype,eventid)
PARTITION BY RANGE(partitionday)(
PARTITION p20230904 VALUES LESS THAN (“2023-09-05”),
PARTITION p20230905 VALUES LESS THAN (“2023-09-06”),
PARTITION p20230906 VALUES LESS THAN (“2023-09-07”),
PARTITION p20230907 VALUES LESS THAN (“2023-09-08”),
PARTITION p20230908 VALUES LESS THAN (“2023-09-09”),
PARTITION p20230909 VALUES LESS THAN (“2023-09-10”),
PARTITION p20230910 VALUES LESS THAN (“2023-09-11”)
)
DISTRIBUTED BY HASH(distinctid)
PROPERTIES (
“replication_num” = “3”,
“compression” = “LZ4”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “DAY”,
“dynamic_partition.start” = “-7”,
“dynamic_partition.end” = “7”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.history_partition_num” = “7”
);

9、问题截图及说明

1)connect refused