【详述】flink-connector-starrocks向primary key表导入数据报 Source table schema should contain primary keys ,SR表设置了主键,应该是StarRocksSink.sink的时候没有指定主键,我没有找到sink时如何设定主键的参数
【业务影响】
【StarRocks版本】例如:1.18.2
【集群规模】例如:3fe(1 follower+2observer)+5be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【附件】
SR表如下:
CREATE TABLE IF NOT EXISTS test.sign
(
staff_guid
VARCHAR(64) not null,
customer_guid
VARCHAR(64) not null,
sign_date
VARCHAR(20) not null,
tenant_guid
VARCHAR(64) not null,
sign_time
VARCHAR(20),
in_time VARCHAR(20),
out_time VARCHAR(20),
interval_time
int,
is_deleted
char(2),
create_time
datetime,
update_time
datetime
)
PRIMARY KEY(staff_guid,customer_guid
,sign_date,tenant_guid
)
DISTRIBUTED BY HASH(staff_guid
) BUCKETS 5
PROPERTIES(
“replication_num” = “1”
);
输入到SR的flink代码:
process.process(new ProcessFunction<Tuple8<String, String, Integer, String, String, Timestamp, String, String>, SignInfo>() {
@Override
public void processElement(Tuple8<String, String, Integer, String, String, Timestamp, String, String> value, Context ctx, Collector out) throws Exception {
SignInfo signInfo = new SignInfo();
signInfo.setStaffGuid(value.f0);
signInfo.setCustomerGuid(value.f1);
signInfo.setIntervalTime(Math.round(value.f2 / 60f)); //秒转分钟,四舍五入
signInfo.setInTime(value.f3.substring(11, 19));
signInfo.setOutTime(value.f4.substring(11, 19));
signInfo.setCreateTime(value.f5);
signInfo.setUpdateTime(new Timestamp(new Date().getTime()));
signInfo.setSignDate(value.f6.substring(0, 10));
signInfo.setSignTime(value.f6);
signInfo.setTenantGuid(value.f7);
out.collect(signInfo);
}
}).addSink(StarRocksSink.sink(
TableSchema.builder()
.field("staff_guid", DataTypes.VARCHAR(32))
.field("customer_guid", DataTypes.VARCHAR(32))
.field("sign_date", DataTypes.VARCHAR(20))
.field("tenant_guid", DataTypes.VARCHAR(32))
.field("sign_time", DataTypes.VARCHAR(20))
.field("in_time", DataTypes.VARCHAR(20))
.field("out_time", DataTypes.VARCHAR(20))
.field("interval_time", DataTypes.INT())
.field("is_deleted", DataTypes.CHAR(2))
.field("create_time", DataTypes.TIMESTAMP())
.field("update_time", DataTypes.TIMESTAMP())
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("connector", "starrocks")
.withProperty("jdbc-url", "jdbc:mysql://starrocks-fe.doris:9030")
.withProperty("load-url", "starrocks-fe.doris:8030")
.withProperty("username", "root")
.withProperty("password", "root")
.withProperty("table-name", "sign")
.withProperty("database-name", "test")
.withProperty("sink.buffer-flush.max-rows","100000")
.withProperty("sink.buffer-flush.max-bytes","100000000")
.withProperty("sink.buffer-flush.interval-ms", "3000")
.withProperty("sink.max-retries","3")
.withProperty("sink.properties.column_separator", "\\x01")
.withProperty("sink.properties.row_delimiter", "\\x02")
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.getStaffGuid();
slots[1] = streamRowData.getCustomerGuid();
slots[2] = streamRowData.getSignDate();
slots[3] = streamRowData.getTenantGuid();
slots[4] = streamRowData.getSignTime();
slots[5] = streamRowData.getInTime();
slots[6] = streamRowData.getOutTime();
slots[7] = streamRowData.getIntervalTime();
slots[8] = "N";
slots[9] = streamRowData.getCreateTime();
slots[10] = streamRowData.getUpdateTime();
}
));