Flink DataStream 写入自定义JAVA对象数据,每秒数据就一条,fe崩掉

public static void sinkIntoTrackLogAdgroupInfo( DataStream parsedKeywordDataStream) {
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, StarRocksConfig.jdbcUrl)
.withProperty(“load-url”, StarRocksConfig.loadUrl)
.withProperty(“username”, StarRocksConfig.userName)
.withProperty(“password”, StarRocksConfig.password)
.withProperty(“database-name”, “xxx”)
.withProperty(“table-name”, “xxx”)
// .withProperty(“sink.properties.format”, “json”)
// .withProperty(“sink.properties.strip_outer_array”, “true”)
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
.withProperty(“sink.parallelism”, “30”)
// .withProperty(“sink.version”, “v1”)
.withProperty(“sink.buffer-flush.max-rows”, “100000”)
.withProperty(“sink.buffer-flush.interval-ms”, “5000”)
// .withProperty(“sink.properties.column_separator”, “\x01”)
// .withProperty(“sink.properties.row_delimiter”, “\x02”)
.build();

    // 定义数据转换器,将TrackLogEyeInfo转换为JSONObject
    DataStream<TrackLogSummaryData> jsonDataStream = parsedKeywordDataStream.map(trackLogEyeInfo -> {
        TrackLogSummaryData summaryData = TrackLogSummaryData.buildTrackLogSummaryData(trackLogEyeInfo);
        return summaryData;
    }).filter(Objects::nonNull);


     TableSchema schema = TableSchema.builder()
      .primaryKey("pv_id","pos_id","process_date_time")
      .field("pv_id", DataTypes.STRING().notNull()) // VARCHAR(255) NOT NULL。   注释是sr表的字段
      .field("pos_id", DataTypes.BIGINT().notNull()) // BIGINT NOT NULL
      .field("process_date_time", DataTypes.TIMESTAMP().notNull()) // DATETIME NOT NULL
      .field("process_time", DataTypes.BIGINT().notNull()) // BIGINT NOT NULL
      .field("wuid", DataTypes.STRING()) // VARCHAR(255)
      .field("connection_type", DataTypes.INT()) // INT
      .field("env", DataTypes.STRING()) // VARCHAR(50)
      .field("site_set", DataTypes.INT()) // INT
      .field("site_id", DataTypes.BIGINT()) // BIGINT
      .field("position_id", DataTypes.BIGINT()) // BIGINT
      .field("gender", DataTypes.INT()) // INT
      .field("age", DataTypes.INT()) // INT
      .field("os_type", DataTypes.INT()) // INT
      .field("carrier", DataTypes.INT()) // INT
      .field("area_targeting_ids", DataTypes.ARRAY(DataTypes.BIGINT())) // ARRAY<BIGINT>
      .field("retrieval_count", DataTypes.INT()) // INT
      .field("retrieval_error_codes", DataTypes.ARRAY(DataTypes.INT())) // ARRAY<INT>
      .field("om_get_ad_type", DataTypes.INT()) // INT
      .field("stage_ad_info", DataTypes.STRING()) // JSON
      .field("article_business_intention", DataTypes.INT()) // INT
      .field("relevance_factor", DataTypes.BIGINT()) // BIGINT
      .field("recall_strategy", DataTypes.ARRAY(DataTypes.STRING())) // ARRAY<STRING>
      .field("recall_strategy_new", DataTypes.ARRAY(DataTypes.STRING())) // ARRAY<STRING>
      .field("recall_ltr_queue_len", DataTypes.INT()) // INT
      .field("scoring_ltr_queue_len", DataTypes.INT()) // INT
      .field("in_wuid_white_list", DataTypes.BOOLEAN()) // BOOLEAN
      .field("recall_unified_strategy", DataTypes.ARRAY(DataTypes.STRING())) // ARRAY<STRING>
      .field("is_mix_rerank", DataTypes.BOOLEAN()) // BOOLEAN
      .field("doc_wash_ltr_queue_len", DataTypes.INT()) // INT
      .field("prediction_track_info", DataTypes.STRING()) // JSON
      .field("education", DataTypes.INT()) // INT
      .field("marriage_status", DataTypes.ARRAY(DataTypes.INT())) // ARRAY<INT>
      .field("consumption_level", DataTypes.INT()) // INT
      .build();

    TrackLogSummaryDataTransformer transformer = new TrackLogSummaryDataTransformer();
    SinkFunction<TrackLogSummaryData> starRockSink = StarRocksSink.sink(schema, options, transformer);
    jsonDataStream.addSink(starRockSink);
}

flink TPS=1. 每秒就导入一条数据,其他使用JSON导入的都没问题。 自定义java对象导入,fe容易挂掉。 请协助解决下谢谢。
3个fe 4个be,单机cpu 60核.
怀疑是java自定义对象导入的性能问题,或者String转JSON字段有过大的性能消耗? String字段length平均几百至几千

版本是3.1,flink版本1.5,使用的flink-connector-starrocks
1.2.10_flink-1.15

自顶一下,处理不了。又不想无谓加资源,因为数据量很小

解决了吗