package com.efarm.api.util;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.;
import java.util.concurrent.;
@Configuration
public class StartRocksStreamLogUtils {
private final static Logger log = LoggerFactory.getLogger(StartRocksStreamLogUtils.class);
public String starrocksHost;
public String starrocksDb;
public String starrocksTable;
public String starrocksUser;
public String starrocksPassword;
public int starrocksHttpPort;
@Value("${db.sr.host}")
public void setStarrocksHost(String starrocksHost) {
this.starrocksHost = starrocksHost;
}
@Value("${db.sr.db}")
public void setStarrocksDb(String starrocksDb) {
this.starrocksDb = starrocksDb;
}
@Value("${db.sr.log.table}")
public void setStarrocksTable(String starrocksTable) {
this.starrocksTable = starrocksTable;
}
@Value("${db.sr.username}")
public void setStarrocksUser(String starrocksUser) {
this.starrocksUser = starrocksUser;
}
@Value("${db.sr.password}")
public void setStarrocksPassword(String starrocksPassword) {
this.starrocksPassword = starrocksPassword;
}
@Value("${db.sr.port}")
public void setStarrocksHttpPort(int starrocksHttpPort) {
this.starrocksHttpPort = starrocksHttpPort;
}
private final int MAX_CONTAINER_SIZE = 100;
/**
* uuid,entry_id,customer_id,belong_type,object_id,customer_login_name,token,request_url,request_param,create_date,IP,SOURCE,explorer_info
*/
private final int TABLE_FIELD_TOTAL = 13;
private static final String LOAD_URL_FORMAT = "http://%s:%s/api/%s/%s/_stream_load";
private final BlockingQueue<String> logQueue = new ArrayBlockingQueue<>(500);
private static final PoolingHttpClientConnectionManager CONNECTION_MANAGER = new PoolingHttpClientConnectionManager();
// 使用连接池或者单例模式管理 CloseableHttpClient
private static final CloseableHttpClient HTTP_CLIENT = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
})
.setConnectionManager(CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(5000)
.setConnectionRequestTimeout(10000)
.setSocketTimeout(10000)
.build())
.build();
private final ExecutorService executor = new ThreadPoolExecutor(1, 5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000));
public StartRocksStreamLogUtils() {
CONNECTION_MANAGER.setMaxTotal(5);
CONNECTION_MANAGER.setDefaultMaxPerRoute(5);
// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// scheduler.scheduleAtFixedRate(() -> {
// log.info(“StarRocks stream load task running,queue size:{}”, logQueue.size());
// sendHttpRequestInBackground();
// }, 1, 10, TimeUnit.MINUTES);
}
/**
* uuid,entry_id,customer_id,belong_type,object_id,customer_login_name,token,request_url,request_param,create_date,IP,SOURCE,explorer_info
*
* @param data 顺序与表字段对应,格式:field + "\t" + field2 + "\t" + field3 + "\n"
*/
public void addLog(String data) throws InterruptedException {
if (StringUtils.isBlank(data)){
return;
}
int columnCount = data.split("\t").length;
// 检查参数的列数是否符合预期
if (columnCount < TABLE_FIELD_TOTAL) {
// 不足的列数
int missingColumns = TABLE_FIELD_TOTAL - columnCount;
// 补充缺失的列
StringBuilder adjustedParams = new StringBuilder(data);
for (int i = 0; i < missingColumns; i++) {
adjustedParams.insert(adjustedParams.lastIndexOf("\n"), "\tnull");
}
data = adjustedParams.toString();
}
// logQueue.put(data);
// if (logQueue.size() >= MAX_CONTAINER_SIZE) {
// sendHttpRequestInBackground();
// }
sendData(data,UUID.randomUUID().toString());
}
private void sendHttpRequestInBackground() {
executor.submit(() -> {
String label = UUID.randomUUID().toString();
// List logEntries = Collections.synchronizedList(new ArrayList<>(MAX_CONTAINER_SIZE));
List logEntries = new ArrayList<>();
logQueue.drainTo(logEntries, 500);
StringBuilder sb = new StringBuilder();
for (String logEntry : logEntries) {
sb.append(logEntry);
}
try {
sendData(sb.toString(), label);
} catch (Exception e) {
log.error(“ratelimit记录StarRocks失败!label:{},ERROR MSG:{}”, label, e.toString());
}
});
}
/**
* @param content 顺序与表字段对应,格式:field + "\t" + field2 + "\t" + field3 + "\n"
* @param label stream load提交事务id,可追溯日志
* @throws Exception
*/
public void sendData(String content, String label) {
if (StringUtils.isBlank(content)) {
return;
}
String loadUrl = String.format(LOAD_URL_FORMAT,
starrocksHost,
starrocksHttpPort,
starrocksDb,
starrocksTable);
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content, StandardCharsets.UTF_8);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(starrocksUser, starrocksPassword));
put.setHeader("column_separator", ",");
put.setHeader("label", label);
put.setEntity(entity);
try (CloseableHttpResponse response = HTTP_CLIENT.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
log.error("Stream load failed, labelId:{}, statusCode={}, load result={}", label,statusCode, loadResult);
}
log.info(loadResult);
} catch (IOException e) {
e.printStackTrace();
// log.error(“StarRocks Exception occurred while sending data: {}”, e.getMessage());
}
}
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
}
// 调用下方接口模拟插入
@RequestMapping("/testssss")
@RestController
public class TestCtrl {
final
StartRocksStreamLogUtils startRocksStreamLogUtils;
public TestCtrl(StartRocksStreamLogUtils startRocksStreamLogUtils) {
this.startRocksStreamLogUtils = startRocksStreamLogUtils;
}
@PostMapping("test")
public Object test11(){
String s = UUID.randomUUID().toString()+
",18,800,21,102,Y052,5b2c7e89-f573-43fc-88a4-751cb6103afe,/efarm_war_exploded/notdinggoodsinfo,utp:4;nt:1701135002751;start:1;length:3;startDate:2023-11-27;endDate:2023-11-28;,1701163844221,127.0.0.1,test_stream_load,Apache-HttpClient%2F4.5.12+%28Java%2F1.8.0_172%29\n";
try {
startRocksStreamLogUtils.addLog(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ok";
}
}