streamload导入数据总是一次成功一次失败

【详述】在使用streamload导入数据的时候,我使用的模拟csv字符串使用httpClient请求api:http://ip:port/api/db/table/_stream_load
当ip和port(8030)是FE的时候总是出现一次成功一次失败(必现),失败的信息Bad Request.
text is empty (possibly HTTP/0.9);如果我把ip和port使用指定BE的时候port(8040)是可以正常执行的;
【背景】
【业务影响】
【是否存算分离】
【StarRocks版本】2.5.14
【集群规模】1fe+3be(fe与be均在不同的机器)
【机器信息】8C/16G/千兆
【表模型】主键模型
【导入或者导出方式】使用httpClient stream load
【附件】
// 数据格式
col1\tcol2\tcol3\tcol4\n
// 对应表中四个字段

// 请求
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content, StandardCharsets.UTF_8);

put.setHeader(HttpHeaders.EXPECT, “100-continue”);
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(starrocksUser, starrocksPassword));
put.setHeader(“label”, label);
put.setEntity(entity);

label的名字不能重复的,第一次导入成功了第二次就不能使用这个label了

label是每次生成的uuid

方便提供下当时fe.log里的报错信息吗

2024-01-05 16:43:55,709 INFO (PUBLISH_VERSION|20) [PublishVersionDaemon.publishVersionForOlapTable():125] send publish tasks for txn_id: 1228325
2024-01-05 16:43:55,719 INFO (PUBLISH_VERSION|20) [DatabaseTransactionMgr.finishTransaction():1061] finish transaction TransactionState. txn_id: 1228325, label: b365244d-9b9f-4a58-9dbd-bd7180968d8f, db id: 14028, table id list: 1847668, callback id: -1, coordinator: BE: 192.168.10.193, transaction status: VISIBLE, error replicas num: 0, replica ids: , prepare time: 1704444235687, commit time: 1704444235701, finish time: 1704444235719, write cost: 14ms, wait for publish cost: 7ms, publish rpc cost: 2ms, finish txn cost: 9ms, publish total cost: 18ms, total cost: 32ms, reason: attachment: com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment@1e5a525b successfully
2024-01-05 16:43:56,217 WARN (nioEventLoopGroup-7-10|9206) [HttpServerHandler.channelRead():70] accept bad request: /bad-request, error: text is empty (possibly HTTP/0.9)
com.starrocks.http.HttpRequestException: text is empty (possibly HTTP/0.9)
at com.starrocks.http.HttpServerHandler.validateRequest(HttpServerHandler.java:90) ~[starrocks-fe.jar:?]
at com.starrocks.http.HttpServerHandler.channelRead(HttpServerHandler.java:68) ~[starrocks-fe.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-all-4.1.61.Final.jar:4.1.61.Final]
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) ~[netty-all-4.1.61.Final.jar:4.1.61.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-all-4.1.61.Final.jar:4.1.61.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-all-4.1.61.Final.jar:4.1.61.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-all-4.1.61.Final.jar:4.1.61.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.87.Final.jar:4.1.87.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.87.Final.jar:4.1.87.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.87.Final.jar:4.1.87.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.87.Final.jar:4.1.87.Final]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
2024-01-05 16:43:59,779 INFO (starrocks-mysql-nio-pool-28695|155119) [ConnectScheduler.unregisterConnection():144] Connection closed. remote=192.168.10.243:58470, connectionId=24529

有完整的demo吗,我们测试下

package com.efarm.api.util;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.;
import java.util.concurrent.
;

@Configuration
public class StartRocksStreamLogUtils {

private final static Logger log = LoggerFactory.getLogger(StartRocksStreamLogUtils.class);

public String starrocksHost;

public String starrocksDb;

public String starrocksTable;

public String starrocksUser;

public String starrocksPassword;

public int starrocksHttpPort;

@Value("${db.sr.host}")
public void setStarrocksHost(String starrocksHost) {
    this.starrocksHost = starrocksHost;
}

@Value("${db.sr.db}")
public void setStarrocksDb(String starrocksDb) {
    this.starrocksDb = starrocksDb;
}

@Value("${db.sr.log.table}")
public void setStarrocksTable(String starrocksTable) {
    this.starrocksTable = starrocksTable;
}

@Value("${db.sr.username}")
public void setStarrocksUser(String starrocksUser) {
    this.starrocksUser = starrocksUser;
}

@Value("${db.sr.password}")
public void setStarrocksPassword(String starrocksPassword) {
    this.starrocksPassword = starrocksPassword;
}

@Value("${db.sr.port}")
public void setStarrocksHttpPort(int starrocksHttpPort) {
    this.starrocksHttpPort = starrocksHttpPort;
}

private final int MAX_CONTAINER_SIZE = 100;

/**
 * uuid,entry_id,customer_id,belong_type,object_id,customer_login_name,token,request_url,request_param,create_date,IP,SOURCE,explorer_info
 */
private final int TABLE_FIELD_TOTAL = 13;

private static final String LOAD_URL_FORMAT = "http://%s:%s/api/%s/%s/_stream_load";

private final BlockingQueue<String> logQueue = new ArrayBlockingQueue<>(500);

private static final PoolingHttpClientConnectionManager CONNECTION_MANAGER = new PoolingHttpClientConnectionManager();


// 使用连接池或者单例模式管理 CloseableHttpClient
private static final CloseableHttpClient HTTP_CLIENT = HttpClients.custom()
        .setRedirectStrategy(new DefaultRedirectStrategy() {
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        })
        .setConnectionManager(CONNECTION_MANAGER)
        .setDefaultRequestConfig(RequestConfig.custom()
                .setConnectTimeout(5000)
                .setConnectionRequestTimeout(10000)
                .setSocketTimeout(10000)
                .build())
        .build();

private final ExecutorService executor = new ThreadPoolExecutor(1, 5,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(1000));


public StartRocksStreamLogUtils() {

    CONNECTION_MANAGER.setMaxTotal(5);
    CONNECTION_MANAGER.setDefaultMaxPerRoute(5);

// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// scheduler.scheduleAtFixedRate(() -> {
// log.info(“StarRocks stream load task running,queue size:{}”, logQueue.size());
// sendHttpRequestInBackground();
// }, 1, 10, TimeUnit.MINUTES);
}

/**
 * uuid,entry_id,customer_id,belong_type,object_id,customer_login_name,token,request_url,request_param,create_date,IP,SOURCE,explorer_info
 *
 * @param data 顺序与表字段对应,格式:field + "\t" + field2 + "\t" + field3 + "\n"
 */
public void addLog(String data) throws InterruptedException {
    if (StringUtils.isBlank(data)){
        return;
    }
    int columnCount = data.split("\t").length;
    // 检查参数的列数是否符合预期
    if (columnCount < TABLE_FIELD_TOTAL) {
        // 不足的列数
        int missingColumns = TABLE_FIELD_TOTAL - columnCount;

        // 补充缺失的列
        StringBuilder adjustedParams = new StringBuilder(data);
        for (int i = 0; i < missingColumns; i++) {
            adjustedParams.insert(adjustedParams.lastIndexOf("\n"), "\tnull");
        }
        data = adjustedParams.toString();
    }

// logQueue.put(data);
// if (logQueue.size() >= MAX_CONTAINER_SIZE) {
// sendHttpRequestInBackground();
// }
sendData(data,UUID.randomUUID().toString());
}

private void sendHttpRequestInBackground() {
    executor.submit(() -> {
        String label = UUID.randomUUID().toString();

// List logEntries = Collections.synchronizedList(new ArrayList<>(MAX_CONTAINER_SIZE));
List logEntries = new ArrayList<>();
logQueue.drainTo(logEntries, 500);
StringBuilder sb = new StringBuilder();
for (String logEntry : logEntries) {
sb.append(logEntry);
}
try {
sendData(sb.toString(), label);
} catch (Exception e) {
log.error(“ratelimit记录StarRocks失败!label:{},ERROR MSG:{}”, label, e.toString());
}
});

}

/**
 * @param content 顺序与表字段对应,格式:field + "\t" + field2 + "\t" + field3 + "\n"
 * @param label   stream load提交事务id,可追溯日志
 * @throws Exception
 */
public void sendData(String content, String label) {
    if (StringUtils.isBlank(content)) {
        return;
    }

    String loadUrl = String.format(LOAD_URL_FORMAT,
            starrocksHost,
            starrocksHttpPort,
            starrocksDb,
            starrocksTable);

    HttpPut put = new HttpPut(loadUrl);
    StringEntity entity = new StringEntity(content, StandardCharsets.UTF_8);

    put.setHeader(HttpHeaders.EXPECT, "100-continue");
    put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(starrocksUser, starrocksPassword));
    put.setHeader("column_separator", ",");
    put.setHeader("label", label);
    put.setEntity(entity);

    try (CloseableHttpResponse response = HTTP_CLIENT.execute(put)) {
        String loadResult = "";
        if (response.getEntity() != null) {
            loadResult = EntityUtils.toString(response.getEntity());
        }
        int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode != HttpStatus.SC_OK) {

            log.error("Stream load failed, labelId:{}, statusCode={}, load result={}", label,statusCode, loadResult);
        }
        log.info(loadResult);
    } catch (IOException e) {
        e.printStackTrace();

// log.error(“StarRocks Exception occurred while sending data: {}”, e.getMessage());
}

}

private String basicAuthHeader(String username, String password) {
    final String tobeEncode = username + ":" + password;
    byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
    return "Basic " + new String(encoded);
}

}
// 调用下方接口模拟插入
@RequestMapping("/testssss")
@RestController
public class TestCtrl {

final
StartRocksStreamLogUtils startRocksStreamLogUtils;

public TestCtrl(StartRocksStreamLogUtils startRocksStreamLogUtils) {
    this.startRocksStreamLogUtils = startRocksStreamLogUtils;
}

@PostMapping("test")
public Object test11(){
    String s = UUID.randomUUID().toString()+
    ",18,800,21,102,Y052,5b2c7e89-f573-43fc-88a4-751cb6103afe,/efarm_war_exploded/notdinggoodsinfo,utp:4;nt:1701135002751;start:1;length:3;startDate:2023-11-27;endDate:2023-11-28;,1701163844221,127.0.0.1,test_stream_load,Apache-HttpClient%2F4.5.12+%28Java%2F1.8.0_172%29\n";
    try {
        startRocksStreamLogUtils.addLog(s);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "ok";
}

}

请问是代码有bug吗

可以把debug日志打开运行下,发下日志,看看http client的输出

你也遇见类似问题了吗

没有,我是想分析下原因

您好 这个问题解决了么?我们也遇到了,用的是v3.1.5版本

@trueeyu 大佬 这个问题有遇到过么

啥问题,重新开个贴子描述下