背景:阿里云sls 日志数据通过 Flink-connector-StarRocks 写入到starrocks 报错。
是否存算分离:是
版本:starrocks 3.3.7
代码:
`public static void main(String[] args) throws Exception {
ParameterTool parameters = null;
ParameterTool parameters_tool = ParameterTool.fromArgs(args);
String local_path = parameters_tool.get("configfile", null);
if (StringUtils.isBlank(local_path)) {
parameters = parameters_tool;
} else {
parameters = ParameterTool.fromPropertiesFile(local_path);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(6000);
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs://mycluster/checkpints");
env.configure(config);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(
RestartStrategies.failureRateRestart(3,
Time.of(2, TimeUnit.MINUTES),
Time.of(5,TimeUnit.SECONDS))
);
Properties configProps = new Properties();
String endpoint = parameters.get("endpoint");
String accessKeyId = parameters.get("accessKeyId");
String accessKey = parameters.get("accessKey");
String project = parameters.get("project");
String logstore = parameters.get("logstore");
configProps.put(ConfigConstants.LOG_ENDPOINT,endpoint);
configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId);
configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKey); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,Consts.LOG_BEGIN_CURSOR);
FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
DataStream<FastLogGroupList> stream = env.addSource(
new FlinkLogConsumer<>(project, logstore, deserializer, configProps));
SingleOutputStreamOperator<String> resultStream = stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
for (FastLogGroup logGroup : value.getLogGroups()) {
int logCount = logGroup.getLogsCount();
for (int i = 0; i < logCount; i++) {
FastLog log = logGroup.getLogs(i);
JSONObject jsonObject = new JSONObject();
for (int j = 0; j < log.getContentsCount(); j++) {
jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
}
out.collect(jsonObject.toJSONString());
}
}
}).returns(String.class);
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://ip:9030")
.withProperty("load-url","ip:8041")
.withProperty("username", "root")
.withProperty("password", "Mdz@123456")
.withProperty("database-name", "cloud_db")
.withProperty("table-name", "ods_sls_back_data_test")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.withProperty("sink.properties.column_separator","\\x01")
.withProperty("sink.properties.row_delimiter","\\x01")
.withProperty("sink.buffer-flush.interval-ms", "60000")
.build();
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
resultStream.addSink(starRockSink);
env.execute();
}`
报错如下:
信息:
[com.starrocks.data.load.stream.v2.StreamLoadManagerV2] INFO - Receive load response, cacheByteBeforeFlush: 0, currentCacheBytes: 0, totalFlushRows : 72359
[com.starrocks.data.load.stream.TransactionStreamLoader] INFO - Transaction commit, label: flink-f800d245-ea6d-437e-8c4e-aa845c345c22, request : POST http://bigdata01:8041/api/transaction/commit HTTP/1.1
[com.starrocks.data.load.stream.TransactionStreamLoader] INFO - Transaction committed, lable: flink-f800d245-ea6d-437e-8c4e-aa845c345c22, body : {
“Status”: “TXN_NOT_EXISTS”,
“Message”: “transaction with label flink-f800d245-ea6d-437e-8c4e-aa845c345c22 not exists”
}
[com.starrocks.data.load.stream.DefaultStreamLoader] INFO - Response for get_load_state, label: flink-f800d245-ea6d-437e-8c4e-aa845c345c22, response status code: 404, response body :
说明:java stream load 方式:ip:8041是能正常导入数据的。