flink 读取sls 数据到starrocks,sink数据失败。

背景:阿里云sls 日志数据通过 Flink-connector-StarRocks 写入到starrocks 报错。
是否存算分离:是
版本:starrocks 3.3.7
代码:

`public static void main(String[] args) throws Exception {
    ParameterTool parameters = null;
    ParameterTool parameters_tool = ParameterTool.fromArgs(args);
    String local_path = parameters_tool.get("configfile", null);
    if (StringUtils.isBlank(local_path)) {
        parameters = parameters_tool;
    } else {
        parameters = ParameterTool.fromPropertiesFile(local_path);
    }
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(6000);
    Configuration config = new Configuration();
    config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
    config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
    config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
    config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs://mycluster/checkpints");
    env.configure(config);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setRestartStrategy(
            RestartStrategies.failureRateRestart(3,
                    Time.of(2, TimeUnit.MINUTES),
                    Time.of(5,TimeUnit.SECONDS))
    );
    Properties configProps = new Properties();
    String endpoint = parameters.get("endpoint");
    String accessKeyId = parameters.get("accessKeyId");
    String accessKey = parameters.get("accessKey");
    String project = parameters.get("project");
    String logstore = parameters.get("logstore");
    configProps.put(ConfigConstants.LOG_ENDPOINT,endpoint);
    configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId);
    configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKey);        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,Consts.LOG_BEGIN_CURSOR);
    FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
    DataStream<FastLogGroupList> stream = env.addSource(
            new FlinkLogConsumer<>(project, logstore, deserializer, configProps));
    SingleOutputStreamOperator<String> resultStream = stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
        for (FastLogGroup logGroup : value.getLogGroups()) {
            int logCount = logGroup.getLogsCount();
            for (int i = 0; i < logCount; i++) {
                FastLog log = logGroup.getLogs(i);
                JSONObject jsonObject = new JSONObject();
                for (int j = 0; j < log.getContentsCount(); j++) {
                    jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
                }
                out.collect(jsonObject.toJSONString());
            }
        }
    }).returns(String.class);
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://ip:9030")
            .withProperty("load-url","ip:8041")
            .withProperty("username", "root")
            .withProperty("password", "Mdz@123456")
            .withProperty("database-name", "cloud_db")
            .withProperty("table-name", "ods_sls_back_data_test")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .withProperty("sink.properties.column_separator","\\x01")
            .withProperty("sink.properties.row_delimiter","\\x01")
            .withProperty("sink.buffer-flush.interval-ms", "60000")
            .build();
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    resultStream.addSink(starRockSink);
    env.execute();

}`

报错如下:


信息:
[com.starrocks.data.load.stream.v2.StreamLoadManagerV2] INFO - Receive load response, cacheByteBeforeFlush: 0, currentCacheBytes: 0, totalFlushRows : 72359
[com.starrocks.data.load.stream.TransactionStreamLoader] INFO - Transaction commit, label: flink-f800d245-ea6d-437e-8c4e-aa845c345c22, request : POST http://bigdata01:8041/api/transaction/commit HTTP/1.1
[com.starrocks.data.load.stream.TransactionStreamLoader] INFO - Transaction committed, lable: flink-f800d245-ea6d-437e-8c4e-aa845c345c22, body : {
“Status”: “TXN_NOT_EXISTS”,
“Message”: “transaction with label flink-f800d245-ea6d-437e-8c4e-aa845c345c22 not exists”
}
[com.starrocks.data.load.stream.DefaultStreamLoader] INFO - Response for get_load_state, label: flink-f800d245-ea6d-437e-8c4e-aa845c345c22, response status code: 404, response body :

404 Not Found

说明:java stream load 方式:ip:8041是能正常导入数据的。

sink的并行度调成1会有这个问题吗,有个sink.parallelism参数设置一下看看