请问flink计算后的数据, 写入到SR, 数值异常是什么原因, 如何解决
详细描述下?什么版本?具体做了什么操作?相关配置等一些基础信息
版本是2.4.0
使用flink-connector-starrocks实现flink写入SR, 代码如下, 执行过程没有报错, 最后的SR的结果就和图片中一样
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
class RowData {
public int score;
public String name;
public RowData(int score, String name) {
}
}
env.fromElements(
new RowData[]{
new RowData(99, "张三"),
new RowData(100, "李四")
}
).addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("score", DataTypes.INT())
.field("name", DataTypes.VARCHAR(20))
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://192.168.190.102:9030")
.withProperty("load-url", "192.168.190.102:8030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "test2")
.withProperty("database-name", "flink_to_SR")
// .withProperty("sink.properties.partial_update", "true")
// .withProperty("sink.properties.columns", "k1,k2,k3")
// .withProperty(“sink.properties.column_separator”, “\x01”)
// .withProperty(“sink.properties.row_delimiter”, “\x02”)
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.score;
slots[1] = streamRowData.name;
}
)
);
env.execute();
}
可以看下starrocks内部的建表吗?
CREATE TABLE test2
(
score
INT(11) NOT NULL COMMENT “”,
name
STRING NULL COMMENT “”
) ENGINE=OLAP
COMMENT “OLAP”
DISTRIBUTED BY HASH(name
) BUCKETS 1
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);
这个方法没有定义吧?具体的可参考https://github.com/StarRocks/demo/blob/master/FlinkDemo/src/main/java/com/starrocks/funcs/BeanDataJava.java