使用flink-connector-starrocks实现flink写入SR, 写入后数值异常

请问flink计算后的数据, 写入到SR, 数值异常是什么原因, 如何解决
2599d37e5130f5497f42581c40106df

详细描述下?什么版本?具体做了什么操作?相关配置等一些基础信息

版本是2.4.0
使用flink-connector-starrocks实现flink写入SR, 代码如下, 执行过程没有报错, 最后的SR的结果就和图片中一样
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

    class RowData {
        public int score;
        public String name;
        public RowData(int score, String name) {
        }
    }
    env.fromElements(
            new RowData[]{
                    new RowData(99, "张三"),
                    new RowData(100, "李四")
            }
    ).addSink(
            StarRocksSink.sink(
                    // the table structure
                    TableSchema.builder()
                            .field("score", DataTypes.INT())
                            .field("name", DataTypes.VARCHAR(20))
                            .build(),
                    // the sink options
                    StarRocksSinkOptions.builder()
                            .withProperty("jdbc-url", "jdbc:mysql://192.168.190.102:9030")
                            .withProperty("load-url", "192.168.190.102:8030")
                            .withProperty("username", "root")
                            .withProperty("password", "")
                            .withProperty("table-name", "test2")
                            .withProperty("database-name", "flink_to_SR")
                            // .withProperty("sink.properties.partial_update", "true")
                            // .withProperty("sink.properties.columns", "k1,k2,k3")

// .withProperty(“sink.properties.column_separator”, “\x01”)
// .withProperty(“sink.properties.row_delimiter”, “\x02”)
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.score;
slots[1] = streamRowData.name;
}
)
);

    env.execute();
}

可以看下starrocks内部的建表吗?

CREATE TABLE test2 (
score INT(11) NOT NULL COMMENT “”,
name STRING NULL COMMENT “”
) ENGINE=OLAP
COMMENT “OLAP”
DISTRIBUTED BY HASH(name) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);

这个方法没有定义吧?具体的可参考https://github.com/StarRocks/demo/blob/master/FlinkDemo/src/main/java/com/starrocks/funcs/BeanDataJava.java

1赞