public static void sinkIntoTrackLogAdgroupInfo( DataStream parsedKeywordDataStream) {
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, StarRocksConfig.jdbcUrl)
.withProperty(“load-url”, StarRocksConfig.loadUrl)
.withProperty(“username”, StarRocksConfig.userName)
.withProperty(“password”, StarRocksConfig.password)
.withProperty(“database-name”, “xxx”)
.withProperty(“table-name”, “xxx”)
// .withProperty(“sink.properties.format”, “json”)
// .withProperty(“sink.properties.strip_outer_array”, “true”)
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
.withProperty(“sink.parallelism”, “30”)
// .withProperty(“sink.version”, “v1”)
.withProperty(“sink.buffer-flush.max-rows”, “100000”)
.withProperty(“sink.buffer-flush.interval-ms”, “5000”)
// .withProperty(“sink.properties.column_separator”, “\x01”)
// .withProperty(“sink.properties.row_delimiter”, “\x02”)
.build();
// 定义数据转换器,将TrackLogEyeInfo转换为JSONObject
DataStream<TrackLogSummaryData> jsonDataStream = parsedKeywordDataStream.map(trackLogEyeInfo -> {
TrackLogSummaryData summaryData = TrackLogSummaryData.buildTrackLogSummaryData(trackLogEyeInfo);
return summaryData;
}).filter(Objects::nonNull);
TableSchema schema = TableSchema.builder()
.primaryKey("pv_id","pos_id","process_date_time")
.field("pv_id", DataTypes.STRING().notNull()) // VARCHAR(255) NOT NULL。 注释是sr表的字段
.field("pos_id", DataTypes.BIGINT().notNull()) // BIGINT NOT NULL
.field("process_date_time", DataTypes.TIMESTAMP().notNull()) // DATETIME NOT NULL
.field("process_time", DataTypes.BIGINT().notNull()) // BIGINT NOT NULL
.field("wuid", DataTypes.STRING()) // VARCHAR(255)
.field("connection_type", DataTypes.INT()) // INT
.field("env", DataTypes.STRING()) // VARCHAR(50)
.field("site_set", DataTypes.INT()) // INT
.field("site_id", DataTypes.BIGINT()) // BIGINT
.field("position_id", DataTypes.BIGINT()) // BIGINT
.field("gender", DataTypes.INT()) // INT
.field("age", DataTypes.INT()) // INT
.field("os_type", DataTypes.INT()) // INT
.field("carrier", DataTypes.INT()) // INT
.field("area_targeting_ids", DataTypes.ARRAY(DataTypes.BIGINT())) // ARRAY<BIGINT>
.field("retrieval_count", DataTypes.INT()) // INT
.field("retrieval_error_codes", DataTypes.ARRAY(DataTypes.INT())) // ARRAY<INT>
.field("om_get_ad_type", DataTypes.INT()) // INT
.field("stage_ad_info", DataTypes.STRING()) // JSON
.field("article_business_intention", DataTypes.INT()) // INT
.field("relevance_factor", DataTypes.BIGINT()) // BIGINT
.field("recall_strategy", DataTypes.ARRAY(DataTypes.STRING())) // ARRAY<STRING>
.field("recall_strategy_new", DataTypes.ARRAY(DataTypes.STRING())) // ARRAY<STRING>
.field("recall_ltr_queue_len", DataTypes.INT()) // INT
.field("scoring_ltr_queue_len", DataTypes.INT()) // INT
.field("in_wuid_white_list", DataTypes.BOOLEAN()) // BOOLEAN
.field("recall_unified_strategy", DataTypes.ARRAY(DataTypes.STRING())) // ARRAY<STRING>
.field("is_mix_rerank", DataTypes.BOOLEAN()) // BOOLEAN
.field("doc_wash_ltr_queue_len", DataTypes.INT()) // INT
.field("prediction_track_info", DataTypes.STRING()) // JSON
.field("education", DataTypes.INT()) // INT
.field("marriage_status", DataTypes.ARRAY(DataTypes.INT())) // ARRAY<INT>
.field("consumption_level", DataTypes.INT()) // INT
.build();
TrackLogSummaryDataTransformer transformer = new TrackLogSummaryDataTransformer();
SinkFunction<TrackLogSummaryData> starRockSink = StarRocksSink.sink(schema, options, transformer);
jsonDataStream.addSink(starRockSink);
}
flink TPS=1. 每秒就导入一条数据,其他使用JSON导入的都没问题。 自定义java对象导入,fe容易挂掉。 请协助解决下谢谢。
3个fe 4个be,单机cpu 60核.
怀疑是java自定义对象导入的性能问题,或者String转JSON字段有过大的性能消耗? String字段length平均几百至几千