flink-connector-starrocks向primary key表导入数据报 Source table schema should contain primary keys

【详述】flink-connector-starrocks向primary key表导入数据报 Source table schema should contain primary keys ,SR表设置了主键,应该是StarRocksSink.sink的时候没有指定主键,我没有找到sink时如何设定主键的参数
【业务影响】
【StarRocks版本】例如:1.18.2
【集群规模】例如:3fe(1 follower+2observer)+5be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【附件】
SR表如下:
CREATE TABLE IF NOT EXISTS test.sign
(
staff_guid VARCHAR(64) not null,
customer_guid VARCHAR(64) not null,
sign_date VARCHAR(20) not null,
tenant_guid VARCHAR(64) not null,
sign_time VARCHAR(20),
in_time VARCHAR(20),
out_time VARCHAR(20),
interval_time int,
is_deleted char(2),
create_time datetime,
update_time datetime
)
PRIMARY KEY(staff_guid,customer_guid,sign_date,tenant_guid)
DISTRIBUTED BY HASH(staff_guid) BUCKETS 5
PROPERTIES(
“replication_num” = “1”
);

输入到SR的flink代码:
process.process(new ProcessFunction<Tuple8<String, String, Integer, String, String, Timestamp, String, String>, SignInfo>() {
@Override
public void processElement(Tuple8<String, String, Integer, String, String, Timestamp, String, String> value, Context ctx, Collector out) throws Exception {

            SignInfo signInfo = new SignInfo();
            signInfo.setStaffGuid(value.f0);
            signInfo.setCustomerGuid(value.f1);
            signInfo.setIntervalTime(Math.round(value.f2 / 60f)); //秒转分钟,四舍五入
            signInfo.setInTime(value.f3.substring(11, 19));
            signInfo.setOutTime(value.f4.substring(11, 19));
            signInfo.setCreateTime(value.f5);
            signInfo.setUpdateTime(new Timestamp(new Date().getTime()));
            signInfo.setSignDate(value.f6.substring(0, 10));
            signInfo.setSignTime(value.f6);
            signInfo.setTenantGuid(value.f7);
            out.collect(signInfo);
        }
    }).addSink(StarRocksSink.sink(
            TableSchema.builder()
                    .field("staff_guid", DataTypes.VARCHAR(32))
                    .field("customer_guid", DataTypes.VARCHAR(32))
                    .field("sign_date", DataTypes.VARCHAR(20))
                    .field("tenant_guid", DataTypes.VARCHAR(32))
                    .field("sign_time", DataTypes.VARCHAR(20))
                    .field("in_time", DataTypes.VARCHAR(20))
                    .field("out_time", DataTypes.VARCHAR(20))
                    .field("interval_time", DataTypes.INT())
                    .field("is_deleted", DataTypes.CHAR(2))
                    .field("create_time", DataTypes.TIMESTAMP())
                    .field("update_time", DataTypes.TIMESTAMP())
                    .build(),
            // the sink options
            StarRocksSinkOptions.builder()
                    .withProperty("connector", "starrocks")
                    .withProperty("jdbc-url", "jdbc:mysql://starrocks-fe.doris:9030")
                    .withProperty("load-url", "starrocks-fe.doris:8030")
                    .withProperty("username", "root")
                    .withProperty("password", "root")
                    .withProperty("table-name", "sign")
                    .withProperty("database-name", "test")
                    .withProperty("sink.buffer-flush.max-rows","100000")
                    .withProperty("sink.buffer-flush.max-bytes","100000000")
                    .withProperty("sink.buffer-flush.interval-ms", "3000")
                    .withProperty("sink.max-retries","3")
                    .withProperty("sink.properties.column_separator", "\\x01")
                    .withProperty("sink.properties.row_delimiter", "\\x02")
                    .build(),
            // set the slots with streamRowData
            (slots, streamRowData) -> {
                slots[0] = streamRowData.getStaffGuid();
                slots[1] = streamRowData.getCustomerGuid();
                slots[2] = streamRowData.getSignDate();
                slots[3] = streamRowData.getTenantGuid();
                slots[4] = streamRowData.getSignTime();
                slots[5] = streamRowData.getInTime();
                slots[6] = streamRowData.getOutTime();
                slots[7] = streamRowData.getIntervalTime();
                slots[8] = "N";
                slots[9] = streamRowData.getCreateTime();
                slots[10] = streamRowData.getUpdateTime();
            }
    ));

详细日志:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) … 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) … 7 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Source table schema should contain primary keys. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) … 10 more Caused by: java.lang.IllegalArgumentException: Source table schema should contain primary keys. at com.starrocks.connector.flink.manager.StarRocksSinkManager.validateTableStructure(StarRocksSinkManager.java:314) at com.starrocks.connector.flink.manager.StarRocksSinkManager.(StarRocksSinkManager.java:103) at com.starrocks.connector.flink.table.StarRocksDynamicSinkFunction.(StarRocksDynamicSinkFunction.java:67) at com.starrocks.connector.flink.StarRocksSink.sink(StarRocksSink.java:42) at com.csbr.cloud.flinkserver.salesMap.MyIntervalJoin.main(MyIntervalJoin.java:342) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) … 13 more

你好,需要在datastream中这样定义一下
middle_img_v2_7af1350d-1550-4146-9295-11b29d4d769g

已解决,感谢,SR表datetime类型,stream这里哪种对应呢,TIMESTAMP时写SR为null