【详述】
使用flink-connector-starrocks,版本:1.2.6_flink-1.13_2.11,
写表报截图如下的错:
我在下面的源码里指定了DB,不知道为啥还报这个错,请大佬指导下,不胜感激!
【建表语句】
CREATE TABLE table1
(
id int(11) NOT NULL COMMENT “用户 ID”,
name varchar(65533) NULL COMMENT “用户姓名”,
score int(11) NOT NULL COMMENT “用户得分”
)
ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id);
【Java源码】
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class ss_p {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(60 * 1000L);
env.fromElements(
new RowData[]{
new RowData(1, 99, “stephen”),
new RowData(2, 100, “lebron1”),
new RowData(3, 101, “lebron2”),
new RowData(4, 102, “lebron3”),
new RowData(5, 1043, “lebron4”),
}
).addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.primaryKey(“id”, new String[]{“id”})
.field(“id”, DataTypes.INT().notNull())
.field(“score”, DataTypes.INT())
.field(“name”, DataTypes.VARCHAR(20))
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, “jdbc:mysql://10.178.151.161:31720”)
.withProperty(“load-url”, “10.178.151.161:31773”)
.withProperty(“username”, “root”)
.withProperty(“password”, “”)
.withProperty(“table-name”, “table1”)
.withProperty(“database-name”, “test”)
.withProperty(“sink.properties.format”, “csv”)
.withProperty(“sink.properties.database-name”, “test”)
.withProperty(“sink.properties.column_separator”, “\x01”)
.withProperty(“sink.properties.row_delimiter”, “\x02”)
.withProperty(“sink.parallelism”, “1”)
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.id;
slots[1] = streamRowData.score;
slots[2] = streamRowData.name;
}
)
);
env.execute();
}
static class RowData {
public int id;
public int score;
public String name;
public RowData(int id, int score, String name) {
this.id = id;
this.score = score;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}




