一 背景
本文主要在官方文档从 Apache Flink® 持续导入 @ Flink-connector-starrocks基础上,分享下参数调优和问题排查的思路。
二 Flink-connector-starrocks 导入逻辑
众所周知,Flink-connector-starrocks底层封装的stream load,通过flink-connector-starrocks导入数据,底层实际是通过HTTP协议导入数据。所以大家了解这个之后,就比较容易定位和解决遇到的问题了。
Flink-connector-starrocks 触发写 StarRocks 的策略目前有两种:
1. Flink-connector-starrocks 1.2.3及其之前版本
- 开启at least-once情况下(sink.semanti=“at-least-once”),当前默认策略
当未开启checkpoint的时候,会按照下面参数,哪个先满足条件会按照哪个触发写 StarRocks
sink.buffer-flush.max-bytes=94371840
sink.buffer-flush.max-rows=500000
sink.buffer-flush.interval-ms=300000
//如果开启了checkpoint,还需要考虑execution.checkpointing.interval时间
怎么查看实际是根据哪个条件触发写StarRocks的,可以通过flink的taskmanager日志查看,例如下面就是以“sink.buffer-flush.interval-ms”触发的写入
2022-12-08 10:34:03,670 INFO com.starrocks.connector.flink.manager.StarRocksSinkManager [] - StarRocks interval Sinking triggered.
2022-12-08 10:34:03,671 INFO com.starrocks.connector.flink.manager.StarRocksSinkManager [] - Async stream load: db[db] table[table] rows[30967] bytes[28062223] label[627b476b-de30-4aec-bfdf-f46d88782da7].
2022-12-08 10:34:03,671 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Start to join batch data: label[627b476b-de30-4aec-bfdf-f46d88782da7].
2022-12-08 10:34:03,682 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Executing stream load to: 'http://10.0.0.2:8030/api/db/table/_stream_load', size: '28093190', thread: 14342
目前日志中可以根据如下关键字区分哪种条件满足触发的写入
触发写的条件 | taskmanager日志关键字 |
---|---|
sink.buffer-flush.max-bytes sink.buffer-flush.max-rows |
StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s] |
sink.buffer-flush.interval-ms execution.checkpointing.interval |
StarRocks interval Sinking triggered |
可以根据sink的间隔判断是checkpoint 触发导致还是sink.buffer-flush.interval-ms触发导致|
- 开启exactly once(sink.semanti=“exactly-once”)
开启exactly once之后,会按照checkpoint的周期写入StarRocks
execution.checkpointing.interval: 300000
2. Flink-connector-starrocks 1.2.4及其之后版本
Flink-connector-starrocks 1.2.4版本开始,取消了sink.buffer-flush.max-rows策略
- 开启at least-once情况下(sink.semanti=“at-least-once”),当前默认策略
当未开启checkpoint的时候,会按照下面参数,哪个先满足条件会按照哪个触发写 StarRocks
sink.buffer-flush.max-bytes=94371840
sink.buffer-flush.interval-ms=300000
//如果开启了checkpoint,还需要考虑execution.checkpointing.interval时间execution.checkpointing.interval: 300000
怎么查看实际是根据哪个条件触发写StarRocks的,可以通过flink的taskmanager日志查看,目前日志中可以根据如下关键字区分哪种条件满足触发的写入
触发写的条件 | taskmanager日志关键字 |
---|---|
sink.buffer-flush.max-bytes | Cache full, wait flush, currentBytes |
execution.checkpointing.interval | Stream load manager flush |
-
开启exactly once(sink.semanti=“exactly-once”)
-
对于SR >=2.4.0,exactly-once默认基于StarRocks的transaction stream load实现,两次checkpoint之间的数据会作为一个事务提交,但是会将数据按照buffer配置切割,多次发送
-
对于SR < 2.4,exactly-once基于普通的stream load实现,两次checkpoint之间的数据也是作为一个事务提交,但是只按照checkpoint周期一次写入
execution.checkpointing.interval: 300000
-
三 支持导入的数据格式
- CSV(默认)
'sink.properties.column_separator' = '\\x01',
'sink.properties.row_delimiter' = '\\x02'
- JSON
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true'
四 参数配置
这里只列举下常用的配置,其他的请参考官方文档
配置项目 | 默认值 | 类型 | 说明 |
---|---|---|---|
sink.semantic | at-least-once | String | at-least-once 或者 exactly-once |
sink.buffer-flush.max-bytes | 94371840(90M) | String | 一次flush的最大数据大小,满足设置的值后会触发写入Starrocks,具体写入见第二部分“Flink-connector-starrocks 导入逻辑” Flink-connector-starrocks 1.2.3及其之前版本 取值范围: [64MB, 1.9GB] Flink-connector 1.2.4及其之后版本 取值范围: [64MB, 10GB]. |
sink.buffer-flush.max-rows | 500000 | String | 一次flush的最大数据行,满足设置的值后会触发写入Starrocks,具体写入见第二部分“Flink-connector-starrocks 导入逻辑” 取值范围: [64,000, 5000,000] |
sink.buffer-flush.interval-ms | 300000 | String | 写数据间隔,满足设置的值后会触发写入Starrocks,具体写入见第二部分“Flink-connector-starrocks 导入逻辑” 取值范围: [1000ms, 3600000ms]. |
sink.properties.* | NONE | String | 指定导入参数,这里指定的是stream load相关的参数 例如stream load中header中指定-H “columns:k1,k2,k3”,在flink-connector-starrocks中需要这样指定 ‘sink.properties.columns’ = ‘k1, k2, k3’,其他的参数请参考 STREAM LOAD StarRocks 2.4版本开始, flink-connector-starrocks 支持 主键模型 部分列更新,可以通过’sink.properties.partial_update’='true’实现 |
sink.properties.ignore_json_size | false | String | StarRocks 2.1版本开始支持忽略json一次导入100MB 数据量限制,但是会给内存带来压力,如果开启后请观察内存负载 |
五 最佳实践
1.如何合理配置导入参数和并行度,达到比较高的导入效率?
目前从我们接触的用户case分析来看,导致flink-connector-starrocks写入效率不高的原因主要是以下几点:
-
StarRocks每次导入都是一个事务,导入频率高,compaction和事务交互压力大
-
Flink sink 并行度高
-
单次导入数据量小(攒批小)
-
以上问题主要基于攒批来优化导入
- 开启at leaset once情况下,尽量提高单次导入数据量,降低并发和频率
Flink-connector-starrocks 1.2.3及其之前版本
通过taskmanager中搜索下面关键字,查看是哪种触发逻辑,每次flush触发的间隔多久(多并行度的话需要查看对应thread号间隔多久触发)
触发写的条件 | taskmanager日志关键字 |
---|---|
sink.buffer-flush.max-bytes sink.buffer-flush.max-rows |
StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s] |
sink.buffer-flush.interval-ms execution.checkpointing.interval |
StarRocks interval Sinking triggered |
可以根据sink的间隔判断是checkpoint 触发导致还是sink.buffer-flush.interval-ms触发导致|
由于以下参数任意一个满足条件就会触发StarRocks写入,所以建议配合调整,一般建议最低10s+一次写入
sink.buffer-flush.max-bytes
sink.buffer-flush.max-rows
sink.buffer-flush.interval-ms
//如果开启了checkpoint,还需要考虑execution.checkpointing.interval时间
Flink-connector-starrocks 1.2.4及其之后版本
通过taskmanager中搜索下面关键字,查看是哪种触发逻辑,每次flush触发的间隔多久(多并行度的话需要查看对应thread号间隔多久触发)
触发写的条件 | taskmanager日志关键字 |
---|---|
sink.buffer-flush.max-bytes | Cache full, wait flush, currentBytes |
execution.checkpointing.interval | Stream load manager flush |
sink.buffer-flush.interval-ms | 当前没有关键字,可以在日志中搜索_stream_load |
由于以下参数任意一个满足条件就会触发StarRocks写入,所以建议配合调整,一般建议最低10s+一次写入
sink.buffer-flush.max-bytes
sink.buffer-flush.interval-ms
//如果开启了checkpoint,还需要考虑上面execution.checkpointing.interval时间
-
开启exactly once的情况下,尽量调大checkpoint的时间
-
Flink sink的并行度:最大控制在8*be个数以内,如果数据量比较小的话用1即可
2.如果导入失败了,如何排查?
-
确认端口是否正确,
-
是否正常发起了 stream load任务
3.如果要实现主键模型的upsert和delete,如何写代码
查看下面黄色标注的部分, 注:这里列举了一个demo,实际生产建议参考最佳实践1合理攒批写入StarRocks
package com.starrocks.flink;
...
public class Bean {
public static void main(String[] args) throws Exception {
class RowData {
public int userId;
public String name;
public String email;
public String address;
public String opType;
public RowData(int userId, String name, String email, String address, String Optype) {
...
}
}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<RowData> rowDataDataStreamSource = env.fromElements(
new RowData[]{
new RowData(99, "stephen", "stephen@example.com", "N.35", "I"),
...
}
);
DataStreamSink<RowData> rowDataDataStreamSink = rowDataDataStreamSource.addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("user_id", DataTypes.INT().notNull())
...
.primaryKey("user_id")
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
...
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.getUserId();
slots[1] = streamRowData.getName();
slots[2] = streamRowData.getEmail();
slots[3] = streamRowData.getAddress();
// slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); //主键模型写入需要指定下是upsert还是delete,这里以upsert举例
//如果说原始数据里面有字段可以表示upsert还是delete,则可以在这里判断一下,例如原始字段中有一个字段"type",取值分别为insert,update,delete
//则在这里可以加一个判断
if (streamRowData.getType() == "D") {
slots[slots.length - 1] = StarRocksSinkOP.DELETE.ordinal();
} else {
slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
}
}
)
);
env.execute();
}
}
具体代码见
Bean.java (4.7 KB)
StarRocks中建表
CREATE TABLE `users` (
`user_id` bigint(20) NOT NULL COMMENT "",
`name` varchar(65533) NOT NULL COMMENT "",
`email` varchar(65533) NULL COMMENT "",
`address` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1;
4.结合CDC实现多种数据源到StarRocks的实时同步
目前已支持MySQL、PostgreSQL、Oracle、Hive、SQLServer和TiDB数据实时同步到StarRocks。
使用方式可参考从 MySQL 实时同步 @ Flink_cdc_load
其他数据库的实时同步可下载对应的cdc的jar包(https://github.com/ververica/flink-cdc-connectors/releases)结合flink-connector-starrocks使用。
六 FAQ
Q:导入json数据的时候报错The size of this batch exceed the max size [104857600] of json type data data [ 118170895 ]. Set ignore_json_size to skip the check, although it may lead huge memory consuming.`
A:这是超过单次导入json 100MB的限制,可以配置’sink.properties.ignore_json_size’='true’解决,但是会给内存带来压力,建议调整完观察下BE节点内存负载。
Q:flink-cdc+flink-connector-starrocks没有实现update
A:确认flink和flink-cdc版本是否一致,参考CDC Connectors for Apache Flink