【详述】flink的starrockssink,怎么从数据流中获取tableName,根据数据流不同的tableName写入不同starrocks表中
【flink的starrockssink】
StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, Constant.STARROCKS_JDBC_URL)
.withProperty(“load-url”, Constant.STARROCKS_LOAD_URL)
.withProperty(“username”, username)
.withProperty(“password”, password)
.withProperty(“database-name”, databaseName)
.withProperty(“table-name”, tableName)
.withProperty(“sink.buffer-flush.interval-ms”, “10000”)
.withProperty(“sink.buffer-flush.max-bytes”, 1024102464 + “”)
.withProperty(“sink.max-retries”, “5”)
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.build()
);
这里的tableName只能传入常量,不能通过数据流不同的数据获取不同的tableName
【flink的kafkaSink】
Properties props = new Properties();
props.setProperty(“bootstrap.servers”, Constant.KAFKA_BOOTSTRAP_SERVERS);
props.setProperty(“security.protocol”,Constant.KAFKA_SECURITY_PROTOCOL);
props.setProperty(“sasl.mechanism”, Constant.KAFKA_SASL_MECHANISM);
props.setProperty(“sasl.jaas.config”, Constant.KAFKA_SASL_JAAS_CONFIG);
props.setProperty(“transaction.timeout.ms”, Constant.KAFKA_TRANSACTION_TIMEOUT_MS);
return new FlinkKafkaProducer(
“default”,
new KafkaSerializationSchema() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element,
@Nullable Long timestamp) {
JSONObject obj = JSON.parseObject(element);
JSONObject source = obj.getJSONObject(“source”);
String topic = source.getString(“db”) + “.” + source.getString(“table”);
return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8));
}
},
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
而kafka的topic却可以从数据流不同的数据获取不同的topic,写入不同的topic
【想要实现的效果】和kafkasink类似,能从数据流不同的数据获取不同的tableName,写入starrocks不同的表中
【StarRocks版本】2.5.2
【集群规模】3fe(1 follower+2observer)+3be(fe与be混部)