【详述】使用官方提供的 flink-connector-starrocks 案例导入数据报错
【背景】数据导入
【业务影响】无
【StarRocks版本】例如:2.3.4
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:6C/32G/万兆
【附件】
最近在测试starrocks的时候,遇到问题,在使用 flink-connector-starrocks 按照官方的示例
从 Apache Flink® 持续导入 @ Flink-connector-starrocks @ StarRocks Docs
导入数据的时候,明细模型没问题,在使用主键模型的时候,遇到如下问题,我的BEAN对象是四个字段,数据也是四个字段,但是出来的slots结果是5,导致一直多出来一个列,数据插入不进去。
其他两类模型还未测试,相关信息如下提供。
请问如果要使用主键模型的时候,要如何使用呢
我的建表语句如下
CREATE TABLE `t_erp_raw_repository_day_totalprice` (
`id` varchar(50) NOT NULL COMMENT 'id',
`tenant_id` varchar(50) DEFAULT NULL COMMENT '租户id',
`total_price` varchar(50) DEFAULT NULL COMMENT '总金额',
`create_date` varchar(50) NOT NULL COMMENT '统计日期'
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 8
PROPERTIES (
"replication_num" = “1”
);
我的代码如下
DataStreamSource<RowData> rowDataDataStreamSource = env.fromElements(
new RowData[]{
new RowData("99", "stephen", "323.3", "stephen"),
new RowData("98", "stephen", "323.3", "stephen")
}
);
rowDataDataStreamSource.addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("id", DataTypes.VARCHAR(20).notNull())
.field("tenant_id", DataTypes.VARCHAR(50))
.field("total_price", DataTypes.VARCHAR(20))
.field("create_date", DataTypes.VARCHAR(20))
.primaryKey("id")
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://XXXXXXXXXXXXX")
.withProperty("load-url", "XXXXXXXXXXXXXXXX")
.withProperty("username", "XXXXX")
.withProperty("password", "XXXXX")
.withProperty("table-name", "XXXXX")
.withProperty("database-name", "XXXXX")
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
// .withProperty("sink.properties.partial_update", "true")
//.withProperty("sink.properties.columns", "id,tenant_id,total_price,create_date")
.withProperty("sink.properties.column_separator", "\\x01")
.withProperty("sink.properties.row_delimiter", "\\x02")
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.getId();
slots[1] = streamRowData.getTenant_id();
slots[2] = streamRowData.getTotal_price();
slots[3] = streamRowData.getCreate_date();
System.out.println(streamRowData);
System.out.println(slots.length);
}
)
);
env.execute();
报错内容如下:
2022-11-22 22:42:52 WARN [starrocks-flush] c.s.connector.flink.manager.StarRocksSinkManager Failed to flush batch data to StarRocks, retry times = 0
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{"Status":"Fail","BeginTxnTimeMs":1,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"4ab4791f-ebab-4a72-b98b-1687821eab1b","LoadBytes":56,"StreamLoadPutTimeMs":3,"NumberTotalRows":2,"WriteDataTimeMs":64,"TxnId":125,"LoadTimeMs":69,"ErrorURL":"http://192.168.50.156:8043/api/_load_error_log?file=error_log_474b33951714411d_f665d99a21e442b3","ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":2}
{"streamLoadErrorLog":"Error: NULL value in non-nullable column '__op'. Row: ['99', 'stephen', '323.3', 'stephen', NULL]\nError: NULL value in non-nullable column '__op'. Row: ['98', 'stephen', '323.3', 'stephen', NULL]\n"}
at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:104) ~[flink-connector-starrocks-1.2.3_flink-1.12_2.12.jar:na]
at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:324) [flink-connector-starrocks-1.2.3_flink-1.12_2.12.jar:na]
at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:159) [flink-connector-starrocks-1.2.3_flink-1.12_2.12.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
