1、概述
使用 StarRocks 提供的 flink-connector-starrocks,导入数据至 StarRocks。
2、版本说明
jdk:1.8
scala:2.11
flink:1.14.6
flink-connector-starrocks:1.1.16_flink-1.14_2.11
flink-connector-kafka_2.11:1.14.6
3、Kafka分区及数据量级
kafka分区:12
峰值:35000条/秒/分区
4、Flink配置
flink分区:12
checkpoint-interval:5000
checkpoint-mode:exactly-once
启动参数:
-ytm 3072mb
-ys 3
-yD yarn.containers.vcores=3
-yD state.checkpoints.num-retained=8
-yD taskmanager.memory.managed.fraction=0.05
-yD taskmanager.memory.network.fraction=0.05
yD env.java.opts="-XX:+UseG1GC -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"
5、SR 导入配置
sink.properties.format:json
sink.properties.strip_outer_array:true
sink.buffer-flush.max-rows:100000
sink.buffer-flush.interval-ms:10000
sink.connect.timeout-ms:5000
sink.max-retries:5
6、BE 配置
cumulative_compaction_num_threads_per_disk = 4
base_compaction_num_threads_per_disk = 2
cumulative_compaction_check_interval_seconds = 2
7、节点说明
8、部分建表语句
DUPLICATE KEY(productid,_td_sdk_source,eventtype,eventid)
PARTITION BY RANGE(partitionday)(
PARTITION p20230904 VALUES LESS THAN (“2023-09-05”),
PARTITION p20230905 VALUES LESS THAN (“2023-09-06”),
PARTITION p20230906 VALUES LESS THAN (“2023-09-07”),
PARTITION p20230907 VALUES LESS THAN (“2023-09-08”),
PARTITION p20230908 VALUES LESS THAN (“2023-09-09”),
PARTITION p20230909 VALUES LESS THAN (“2023-09-10”),
PARTITION p20230910 VALUES LESS THAN (“2023-09-11”)
)
DISTRIBUTED BY HASH(distinctid)
PROPERTIES (
“replication_num” = “3”,
“compression” = “LZ4”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “DAY”,
“dynamic_partition.start” = “-7”,
“dynamic_partition.end” = “7”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.history_partition_num” = “7”
);
9、问题截图及说明
1)connect refused