[导入]Flink-connector-starrocks使用指南

一 背景

本文主要在官方文档从 Apache Flink® 持续导入 @ Flink-connector-starrocks基础上,分享下参数调优和问题排查的思路。

二 Flink-connector-starrocks 导入逻辑

众所周知,Flink-connector-starrocks底层封装的stream load,通过flink-connector-starrocks导入数据,底层实际是通过HTTP协议导入数据。所以大家了解这个之后,就比较容易定位和解决遇到的问题了。

Flink-connector-starrocks 触发写 StarRocks 的策略目前有两种:

1. Flink-connector-starrocks 1.2.3及其之前版本

  • 开启at least-once情况下(sink.semanti=“at-least-once”),当前默认策略

当未开启checkpoint的时候,会按照下面参数,哪个先满足条件会按照哪个触发写 StarRocks

sink.buffer-flush.max-bytes=94371840
sink.buffer-flush.max-rows=500000
sink.buffer-flush.interval-ms=300000
//如果开启了checkpoint,还需要考虑execution.checkpointing.interval时间

怎么查看实际是根据哪个条件触发写StarRocks的,可以通过flink的taskmanager日志查看,例如下面就是以“sink.buffer-flush.interval-ms”触发的写入

2022-12-08 10:34:03,670 INFO  com.starrocks.connector.flink.manager.StarRocksSinkManager   [] - StarRocks interval Sinking triggered.
2022-12-08 10:34:03,671 INFO  com.starrocks.connector.flink.manager.StarRocksSinkManager   [] - Async stream load: db[db] table[table] rows[30967] bytes[28062223] label[627b476b-de30-4aec-bfdf-f46d88782da7].
2022-12-08 10:34:03,671 INFO  com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Start to join batch data: label[627b476b-de30-4aec-bfdf-f46d88782da7].
2022-12-08 10:34:03,682 INFO  com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Executing stream load to: 'http://10.0.0.2:8030/api/db/table/_stream_load', size: '28093190', thread: 14342

目前日志中可以根据如下关键字区分哪种条件满足触发的写入

触发写的条件 taskmanager日志关键字
sink.buffer-flush.max-bytes
sink.buffer-flush.max-rows
StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s]
sink.buffer-flush.interval-ms
execution.checkpointing.interval
StarRocks interval Sinking triggered

可以根据sink的间隔判断是checkpoint 触发导致还是sink.buffer-flush.interval-ms触发导致|

  • 开启exactly once(sink.semanti=“exactly-once”)

开启exactly once之后,会按照checkpoint的周期写入StarRocks

execution.checkpointing.interval: 300000

2. Flink-connector-starrocks 1.2.4及其之后版本

Flink-connector-starrocks 1.2.4版本开始,取消了sink.buffer-flush.max-rows策略

  • 开启at least-once情况下(sink.semanti=“at-least-once”),当前默认策略

当未开启checkpoint的时候,会按照下面参数,哪个先满足条件会按照哪个触发写 StarRocks

sink.buffer-flush.max-bytes=94371840
sink.buffer-flush.interval-ms=300000
//如果开启了checkpoint,还需要考虑execution.checkpointing.interval时间execution.checkpointing.interval: 300000

怎么查看实际是根据哪个条件触发写StarRocks的,可以通过flink的taskmanager日志查看,目前日志中可以根据如下关键字区分哪种条件满足触发的写入

触发写的条件 taskmanager日志关键字
sink.buffer-flush.max-bytes Cache full, wait flush, currentBytes
execution.checkpointing.interval Stream load manager flush
  • 开启exactly once(sink.semanti=“exactly-once”)

    • 对于SR >=2.4.0,exactly-once默认基于StarRocks的transaction stream load实现,两次checkpoint之间的数据会作为一个事务提交,但是会将数据按照buffer配置切割,多次发送

    • 对于SR < 2.4,exactly-once基于普通的stream load实现,两次checkpoint之间的数据也是作为一个事务提交,但是只按照checkpoint周期一次写入

      execution.checkpointing.interval: 300000
      

三 支持导入的数据格式

  • CSV(默认)
'sink.properties.column_separator' = '\\x01',
'sink.properties.row_delimiter' = '\\x02'
  • JSON
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true'

四 参数配置

这里只列举下常用的配置,其他的请参考官方文档

配置项目 默认值 类型 说明
sink.semantic at-least-once String at-least-once 或者 exactly-once
sink.buffer-flush.max-bytes 94371840(90M) String 一次flush的最大数据大小,满足设置的值后会触发写入Starrocks,具体写入见第二部分“Flink-connector-starrocks 导入逻辑”

Flink-connector-starrocks 1.2.3及其之前版本 取值范围: [64MB, 1.9GB]

Flink-connector 1.2.4及其之后版本 取值范围: [64MB, 10GB].
sink.buffer-flush.max-rows 500000 String 一次flush的最大数据行,满足设置的值后会触发写入Starrocks,具体写入见第二部分“Flink-connector-starrocks 导入逻辑”

取值范围: [64,000, 5000,000]
sink.buffer-flush.interval-ms 300000 String 写数据间隔,满足设置的值后会触发写入Starrocks,具体写入见第二部分“Flink-connector-starrocks 导入逻辑”

取值范围: [1000ms, 3600000ms].
sink.properties.* NONE String 指定导入参数,这里指定的是stream load相关的参数

例如stream load中header中指定-H “columns:k1,k2,k3”,在flink-connector-starrocks中需要这样指定 ‘sink.properties.columns’ = ‘k1, k2, k3’,其他的参数请参考 STREAM LOAD

StarRocks 2.4版本开始, flink-connector-starrocks 支持 主键模型 部分列更新,可以通过’sink.properties.partial_update’='true’实现
sink.properties.ignore_json_size false String StarRocks 2.1版本开始支持忽略json一次导入100MB 数据量限制,但是会给内存带来压力,如果开启后请观察内存负载

五 最佳实践

1.如何合理配置导入参数和并行度,达到比较高的导入效率?

目前从我们接触的用户case分析来看,导致flink-connector-starrocks写入效率不高的原因主要是以下几点:

  • StarRocks每次导入都是一个事务,导入频率高,compaction和事务交互压力大

    • Flink sink 并行度高

    • 单次导入数据量小(攒批小)

以上问题主要基于攒批来优化导入

  • 开启at leaset once情况下,尽量提高单次导入数据量,降低并发和频率

Flink-connector-starrocks 1.2.3及其之前版本

通过taskmanager中搜索下面关键字,查看是哪种触发逻辑,每次flush触发的间隔多久(多并行度的话需要查看对应thread号间隔多久触发)

触发写的条件 taskmanager日志关键字
sink.buffer-flush.max-bytes
sink.buffer-flush.max-rows
StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s]
sink.buffer-flush.interval-ms
execution.checkpointing.interval
StarRocks interval Sinking triggered

可以根据sink的间隔判断是checkpoint 触发导致还是sink.buffer-flush.interval-ms触发导致|

由于以下参数任意一个满足条件就会触发StarRocks写入,所以建议配合调整,一般建议最低10s+一次写入

sink.buffer-flush.max-bytes
sink.buffer-flush.max-rows
sink.buffer-flush.interval-ms
//如果开启了checkpoint,还需要考虑execution.checkpointing.interval时间

Flink-connector-starrocks 1.2.4及其之后版本

通过taskmanager中搜索下面关键字,查看是哪种触发逻辑,每次flush触发的间隔多久(多并行度的话需要查看对应thread号间隔多久触发)

触发写的条件 taskmanager日志关键字
sink.buffer-flush.max-bytes Cache full, wait flush, currentBytes
execution.checkpointing.interval Stream load manager flush
sink.buffer-flush.interval-ms 当前没有关键字,可以在日志中搜索_stream_load

由于以下参数任意一个满足条件就会触发StarRocks写入,所以建议配合调整,一般建议最低10s+一次写入

sink.buffer-flush.max-bytes
sink.buffer-flush.interval-ms
//如果开启了checkpoint,还需要考虑上面execution.checkpointing.interval时间
  • 开启exactly once的情况下,尽量调大checkpoint的时间

  • Flink sink的并行度:最大控制在8*be个数以内,如果数据量比较小的话用1即可

2.如果导入失败了,如何排查?

  • 确认端口是否正确,

  • 是否正常发起了 stream load任务

参考[问题排查]导入失败相关

3.如果要实现主键模型的upsert和delete,如何写代码

查看下面黄色标注的部分, 注:这里列举了一个demo,实际生产建议参考最佳实践1合理攒批写入StarRocks

package com.starrocks.flink;
...

public class Bean {
    public static void main(String[] args) throws Exception {
        class RowData {
            public int userId;
            public String name;
            public String email;
            public String address;
            public String opType;

            public RowData(int userId, String name, String email, String address, String Optype) {
            ...
            }
        }

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<RowData> rowDataDataStreamSource = env.fromElements(
                new RowData[]{
                        new RowData(99, "stephen", "stephen@example.com", "N.35", "I"),
                        ...
                }
        );

        DataStreamSink<RowData> rowDataDataStreamSink = rowDataDataStreamSource.addSink(
                StarRocksSink.sink(
                        // the table structure
                        TableSchema.builder()
                                .field("user_id", DataTypes.INT().notNull())
                                ...
                                .primaryKey("user_id")
                                .build(),
                        // the sink options
                        StarRocksSinkOptions.builder()
                                .withProperty("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
                                ...
                        // set the slots with streamRowData
                        (slots, streamRowData) -> {
                            slots[0] = streamRowData.getUserId();
                            slots[1] = streamRowData.getName();
                            slots[2] = streamRowData.getEmail();
                            slots[3] = streamRowData.getAddress();
//                            slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); //主键模型写入需要指定下是upsert还是delete,这里以upsert举例
                            //如果说原始数据里面有字段可以表示upsert还是delete,则可以在这里判断一下,例如原始字段中有一个字段"type",取值分别为insert,update,delete
                            //则在这里可以加一个判断
                            if (streamRowData.getType() == "D") {
                                slots[slots.length - 1] = StarRocksSinkOP.DELETE.ordinal();
                            } else {
                                slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
                            }
                        }
                )
        );
        env.execute();
    }
}

具体代码见

Bean.java (4.7 KB)

StarRocks中建表

CREATE TABLE `users` (
  `user_id` bigint(20) NOT NULL COMMENT "",
  `name` varchar(65533) NOT NULL COMMENT "",
  `email` varchar(65533) NULL COMMENT "",
  `address` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1;

4.结合CDC实现多种数据源到StarRocks的实时同步

目前已支持MySQL、PostgreSQL、Oracle、Hive、SQLServer和TiDB数据实时同步到StarRocks。

使用方式可参考从 MySQL 实时同步 @ Flink_cdc_load

其他数据库的实时同步可下载对应的cdc的jar包(https://github.com/ververica/flink-cdc-connectors/releases)结合flink-connector-starrocks使用。

六 FAQ

Q:导入json数据的时候报错The size of this batch exceed the max size [104857600] of json type data data [ 118170895 ]. Set ignore_json_size to skip the check, although it may lead huge memory consuming.`

A:这是超过单次导入json 100MB的限制,可以配置’sink.properties.ignore_json_size’='true’解决,但是会给内存带来压力,建议调整完观察下BE节点内存负载。

Q:flink-cdc+flink-connector-starrocks没有实现update

A:确认flink和flink-cdc版本是否一致,参考CDC Connectors for Apache Flink

1赞

flink写入的时候,7be,手动down掉1be,flink写入的时候,全打在这个down掉的be上