版本2.3.7 Flink插入StarRocks时数据不正确

【详述】使用flink-connector-starrocks写入StarRocks时出现数据不正确情况。
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment env = TableEnvironment.create(settings);
env.executeSql(
“CREATE TABLE ass_monitoring_test(” +
“digest STRING not null,” +
“exec_time STRING not null,” +
"user STRING," +
“ts STRING,” +
“tag STRING,” +
"database STRING," +
“sqltext STRING,” +
“primary key(digest, exec_time) not enforced” +
") WITH ( " +
“‘connector’ = ‘starrocks’,” +
“‘jdbc-url’=‘jdbc:mysql://xxxx’,” +
“‘load-url’=‘xxxxxxx’,” +
“‘database-name’ = ‘test’,” +
“‘table-name’ = ‘ass_monitoring_test’,” +
“‘username’ = ‘test’,” +
“‘password’ = ‘123456’,” +
“‘sink.buffer-flush.max-rows’ = ‘1000000’,” +
“‘sink.buffer-flush.max-bytes’ = ‘300000000’,” +
“‘sink.buffer-flush.interval-ms’ = ‘5000’,” +
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列,需要在’sink.properties.columns’的最后显示添加’__op’列。
// “‘sink.properties.partial_update’ = ‘true’,” +
// “‘sink.properties.columns’ = ‘digest,exec_time,database,__op’,” +
“‘sink.properties.column_separator’ = ‘\x01’,” +
“‘sink.properties.row_delimiter’ = ‘\x02’,” +
“‘sink.max-retries’ = ‘3’” +
“)”
);
env.executeSql(
“INSERT INTO ass_monitoring_test SELECT ‘5’, ‘2021-03-01 12:00:00’, ‘5’, ‘NULL’, ‘NULL’, ‘NULL’, ‘NULL’”);

插入之后查询数据库出现


第一个字段好像加了一个换行符

使用的connector是什么版本?

com.starrocks flink-connector-starrocks 1.2.5_flink-1.13_2.12

建表语句
CREATE TABLE ass_monitoring_test (

digest varchar(200) NOT NULL COMMENT “主键”,

exec_time datetime NOT NULL COMMENT “执行时间”,

user varchar(100) NULL COMMENT “用户”,

ts varchar(1048576) NULL COMMENT “”,

tag varchar(100) NULL COMMENT “标识”,

database varchar(100) NULL COMMENT “数据库”,

sqltext varchar(1048576) NULL COMMENT “执行sql”

) ENGINE=OLAP

PRIMARY KEY(digest, exec_time)

COMMENT “OLAP”

PARTITION BY RANGE(exec_time)

(PARTITION p202103 VALUES [(‘2021-03-01 00:00:00’), (‘2021-04-01 00:00:00’)),

PARTITION p202104 VALUES [(‘2021-04-01 00:00:00’), (‘2021-05-01 00:00:00’)),

PARTITION p202105 VALUES [(‘2021-05-01 00:00:00’), (‘2021-06-01 00:00:00’)),

PARTITION p202106 VALUES [(‘2021-06-01 00:00:00’), (‘2021-07-01 00:00:00’)),

PARTITION p202107 VALUES [(‘2021-07-01 00:00:00’), (‘2021-08-01 00:00:00’)),

PARTITION p202108 VALUES [(‘2021-08-01 00:00:00’), (‘2021-09-01 00:00:00’)),

PARTITION p202109 VALUES [(‘2021-09-01 00:00:00’), (‘2021-10-01 00:00:00’)),

PARTITION p202110 VALUES [(‘2021-10-01 00:00:00’), (‘2021-11-01 00:00:00’)),

PARTITION p202111 VALUES [(‘2021-11-01 00:00:00’), (‘2021-12-01 00:00:00’)),

PARTITION p202112 VALUES [(‘2021-12-01 00:00:00’), (‘2022-01-01 00:00:00’)),

PARTITION p202201 VALUES [(‘2022-01-01 00:00:00’), (‘2022-02-01 00:00:00’)),

PARTITION p202202 VALUES [(‘2022-02-01 00:00:00’), (‘2022-03-01 00:00:00’)),

PARTITION p202203 VALUES [(‘2022-03-01 00:00:00’), (‘2022-04-01 00:00:00’)),

PARTITION p202204 VALUES [(‘2022-04-01 00:00:00’), (‘2022-05-01 00:00:00’)),

PARTITION p202205 VALUES [(‘2022-05-01 00:00:00’), (‘2022-06-01 00:00:00’)),

PARTITION p202206 VALUES [(‘2022-06-01 00:00:00’), (‘2022-07-01 00:00:00’)),

PARTITION p202207 VALUES [(‘2022-07-01 00:00:00’), (‘2022-08-01 00:00:00’)),

PARTITION p202208 VALUES [(‘2022-08-01 00:00:00’), (‘2022-09-01 00:00:00’)),

PARTITION p202209 VALUES [(‘2022-09-01 00:00:00’), (‘2022-10-01 00:00:00’)),

PARTITION p202210 VALUES [(‘2022-10-01 00:00:00’), (‘2022-11-01 00:00:00’)),

PARTITION p202211 VALUES [(‘2022-11-01 00:00:00’), (‘2022-12-01 00:00:00’)),

PARTITION p202212 VALUES [(‘2022-12-01 00:00:00’), (‘2023-01-01 00:00:00’)),

PARTITION p202301 VALUES [(‘2023-01-01 00:00:00’), (‘2023-02-01 00:00:00’)),

PARTITION p202302 VALUES [(‘2023-02-01 00:00:00’), (‘2023-03-01 00:00:00’)),

PARTITION p202303 VALUES [(‘2023-03-01 00:00:00’), (‘2023-03-27 00:00:00’)),

PARTITION p202304 VALUES [(‘2023-04-01 00:00:00’), (‘2023-05-01 00:00:00’)),

PARTITION p202305 VALUES [(‘2023-05-01 00:00:00’), (‘2023-06-01 00:00:00’)))

DISTRIBUTED BY HASH(digest) BUCKETS 3

PROPERTIES (

“replication_num” = “3”,

“dynamic_partition.enable” = “true”,

“dynamic_partition.time_unit” = “MONTH”,

“dynamic_partition.time_zone” = “Asia/Shanghai”,

“dynamic_partition.start” = “-24”,

“dynamic_partition.end” = “2”,

“dynamic_partition.prefix” = “p”,

“dynamic_partition.buckets” = “3”,

“dynamic_partition.start_day_of_month” = “1”,

“in_memory” = “false”,

“storage_format” = “DEFAULT”,

“enable_persistent_index” = “false”,

“storage_medium” = “SSD”

);

这是个已知问题,最近会发版本修复,如果不需要特殊的row delimiter,可以先去掉 配置sink.properties.row_delimiter,使用默认的绕过