主键模型导入数据报错

【详述】使用官方提供的 flink-connector-starrocks 案例导入数据报错
【背景】数据导入
【业务影响】无
【StarRocks版本】例如:2.3.4
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:6C/32G/万兆
【附件】

最近在测试starrocks的时候,遇到问题,在使用 flink-connector-starrocks 按照官方的示例

从 Apache Flink® 持续导入 @ Flink-connector-starrocks @ StarRocks Docs

导入数据的时候,明细模型没问题,在使用主键模型的时候,遇到如下问题,我的BEAN对象是四个字段,数据也是四个字段,但是出来的slots结果是5,导致一直多出来一个列,数据插入不进去。
其他两类模型还未测试,相关信息如下提供。
请问如果要使用主键模型的时候,要如何使用呢

我的建表语句如下

CREATE TABLE `t_erp_raw_repository_day_totalprice` (
  `id` varchar(50) NOT NULL COMMENT 'id',
  `tenant_id` varchar(50) DEFAULT NULL COMMENT '租户id',
  `total_price` varchar(50) DEFAULT NULL COMMENT '总金额',
  `create_date` varchar(50) NOT NULL COMMENT '统计日期'
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 8
PROPERTIES (
    "replication_num" = “1” 
);

我的代码如下

   DataStreamSource<RowData> rowDataDataStreamSource = env.fromElements(
            new RowData[]{
                    new RowData("99", "stephen", "323.3", "stephen"),
                    new RowData("98", "stephen", "323.3", "stephen")
            }
    );

    rowDataDataStreamSource.addSink(
            StarRocksSink.sink(
                    // the table structure
                    TableSchema.builder()
                            .field("id", DataTypes.VARCHAR(20).notNull())
                            .field("tenant_id", DataTypes.VARCHAR(50))
                            .field("total_price", DataTypes.VARCHAR(20))
                            .field("create_date", DataTypes.VARCHAR(20))
                            .primaryKey("id")
                            .build(),
                    // the sink options
                    StarRocksSinkOptions.builder()
                            .withProperty("jdbc-url", "jdbc:mysql://XXXXXXXXXXXXX")
                            .withProperty("load-url", "XXXXXXXXXXXXXXXX")
                            .withProperty("username", "XXXXX")
                            .withProperty("password", "XXXXX")
                            .withProperty("table-name", "XXXXX")
                            .withProperty("database-name", "XXXXX")
                            // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
                            // .withProperty("sink.properties.partial_update", "true")
                            //.withProperty("sink.properties.columns", "id,tenant_id,total_price,create_date")
                            .withProperty("sink.properties.column_separator", "\\x01")
                            .withProperty("sink.properties.row_delimiter", "\\x02")
                            .build(),
                    // set the slots with streamRowData
                    (slots, streamRowData) -> {
                        slots[0] = streamRowData.getId();
                        slots[1] = streamRowData.getTenant_id();
                        slots[2] = streamRowData.getTotal_price();
                        slots[3] = streamRowData.getCreate_date();
                        System.out.println(streamRowData);
                        System.out.println(slots.length);
                    }
            )
    );
    env.execute();

报错内容如下:

2022-11-22 22:42:52 WARN  [starrocks-flush] c.s.connector.flink.manager.StarRocksSinkManager Failed to flush batch data to StarRocks, retry times = 0
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: 
{"Status":"Fail","BeginTxnTimeMs":1,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"4ab4791f-ebab-4a72-b98b-1687821eab1b","LoadBytes":56,"StreamLoadPutTimeMs":3,"NumberTotalRows":2,"WriteDataTimeMs":64,"TxnId":125,"LoadTimeMs":69,"ErrorURL":"http://192.168.50.156:8043/api/_load_error_log?file=error_log_474b33951714411d_f665d99a21e442b3","ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":2}
{"streamLoadErrorLog":"Error: NULL value in non-nullable column '__op'. Row: ['99', 'stephen', '323.3', 'stephen', NULL]\nError: NULL value in non-nullable column '__op'. Row: ['98', 'stephen', '323.3', 'stephen', NULL]\n"}
	at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:104) ~[flink-connector-starrocks-1.2.3_flink-1.12_2.12.jar:na]
	at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:324) [flink-connector-starrocks-1.2.3_flink-1.12_2.12.jar:na]
	at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:159) [flink-connector-starrocks-1.2.3_flink-1.12_2.12.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

这个问题看起来是不能为null的列为null了

sr默认会在导入后面加一列标记列,这是正常现象

那如可以如何处理呢,我在使用明细模型没这个问题,就是主键模型报错

我的主键模型,是有加一列,但是明细模型,没有加,如果要使用主键模型,该如何呢

嗯嗯,主键模型会加一列__op,目的是标记数据是否删除的,可以忽略

可以手动添加下__op列。来处理下。

另外

curl http://192.168.50.156:8043/api/_load_error_log?file=error_log_474b33951714411d_f665d99a21e442b3
看下具体的报错信息呢?

1赞

好的,谢谢,后面我再试一下

意思是主键模型会自动在业务列后加一列"__op",然后在sink对各列赋值时,除了业务列,还需要对"__op"列赋值 StarRocksSinkOP.UPSERT.ordinal()对吧?
为什么不能有个默认值,一定要手工添加才能处理呢…

业务json数据没有__op的,需要在flink中加 StarRocksSinkOP.UPSERT.ordinal() 或者 json数据中增加{’__op’:‘0’} 字段
业务json数据中自带 __op 的(比如 debezium-json),这种就不需要加,flink connector 会自动补齐

了解。 非常感谢~