【详述】java编写streamLoad导入速度慢
【背景】使用官方案例,java读取mongodb数据,拼接字符串后,每1200条调用一次sendData方法执行写入,发现速度较慢,有时候1200条写入超过2s,请问怎么能提速
【业务影响】
【StarRocks版本】例如:2.5.6
【集群规模】例如:1fe(1 follower+2observer)+8be(fe与be混部)
【机器信息】8C 16G
【表模型】主键模型
【导入或者导出方式】java 编写streamload
【联系方式】ljw1209113096
【附件】
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.;
import java.util.;
public class mongoStreamLoad {
private final static String STARROCKS_HOST = “”;
private final static String STARROCKS_DB = “”;
private final static String STARROCKS_USER = “”;
private final static String STARROCKS_PASSWORD = “”;
private final static int STARROCKS_HTTP_PORT = 8030;
//
private static String url = “”;
private static Connection conn = null;
static final String JDBC_DRIVER = “com.starrocks.jdbc.Driver”;
public static Connection getConnection() throws SQLException {
if (null == conn || conn.isClosed()) {
try {
Class.forName(“com.mysql.jdbc.Driver”);
conn = DriverManager.getConnection(url, STARROCKS_USER, STARROCKS_PASSWORD);
} catch (Exception e) {
e.printStackTrace();
}
}
return conn;
}
private boolean sendData(String content,String STARROCKS_TABLE) throws Exception {
long timeMillis = System.currentTimeMillis();
String createTime = DateFormatUtil.toDate(timeMillis);
System.out.println("3.5----接收到数据"+createTime);
boolean flag=true;
final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
STARROCKS_HOST,
STARROCKS_HTTP_PORT,
STARROCKS_DB,
STARROCKS_TABLE);
final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content, "UTF-8");
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
// the label header is optional, not necessary
// use label header can ensure at most once semantics
//put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de");
put.setEntity(entity);
client.execute(put);
}
return flag;
}
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
//拼接写入的字符串
public static String splitStrBud(List<String> fieldNames, JSONObject jsonObject) {
StringBuilder builder = new StringBuilder();
//先遍历fieldNames,去json里取值,拼接,如果取不到,则用空填补
for (String fieldName : fieldNames) {
String values = jsonObject.getString(fieldName);
if (values==null){
values="";
}
builder.append(values+"\t");
}
if (builder.length()>1){
builder.deleteCharAt(builder.length() - 1);
}
builder.append("\n");
return builder.toString();
}
//写入记录表
public static void insertDataLog(String shopName, String jdDate,String tableName,String batch,String jdCount,String updateTime,String syncCount,String sr_batch) {
PreparedStatement pst = null;
Statement st = null;
StringBuilder builder = new StringBuilder();
long timeMillis = System.currentTimeMillis();
String createTime = DateFormatUtil.toDate(timeMillis);
builder.append(shopName+"\t"+jdDate+"\t"+tableName+"\t"+batch+"\t"+jdCount+"\t"+"1\t"+updateTime+"\t"+syncCount+"\t"+createTime+"\t"+"1"+"\t"+sr_batch);
mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
try {
System.out.println(builder.toString());
starrocksStreamLoad.sendData(builder.toString(),"mysql_template_data_log");
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
int i=0;
while (true){
long timeMillisst = System.currentTimeMillis();
String start = DateFormatUtil.toDate(timeMillisst);
System.out.println(“第”+i+“次开始”+start);
//1.查询starrocks中的导入记录表,并保存为一个map
Connection conn = getConnection(); //(连接需要的三个参数)
PreparedStatement pst = null;
Statement st = null;
String sql = “select * from ods.mysql_template_data_log”;
Map<String, String> mysqlMap = new HashMap<String, String>();
try {
pst =conn.prepareStatement(sql);
ResultSet rs =pst.executeQuery();
ResultSetMetaData meta = rs.getMetaData();
int numColumns = meta.getColumnCount();
while (rs.next()) {
String jingdongShopname = rs.getString("jingdong_shopname");
String jingdongDate = rs.getString("jingdong_date");
String jingdongTablename = rs.getString("jingdong_tablename");
String jingdongBatch = rs.getString("jingdong_batch");
String sr_batch = rs.getString("sr_batch");
if (sr_batch==null||"".equals(sr_batch)){
sr_batch="0";
}
mysqlMap.put(jingdongShopname+"%%"+jingdongDate+"%%"+jingdongTablename,jingdongBatch+","+sr_batch);
}
rs.close();
pst.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
// 2.读取mongoDb中配置表信息,遍历成一个map
Map<String, Integer> mongoMap = mongoUtil.getSettingCollection("jingdong_template_data_log");
// 3.遍历两个集合,如果存在同shopname-date-tablename 比一下batch大小,batch更大,放入待处理集合,如果没有,则放入待处理集合
Map<String, String> resultMap = new HashMap<String, String>();
Map<String, String> resultMap2 = new HashMap<String, String>();
for (Map.Entry<String, Integer> entry : mongoMap.entrySet()) {
String mapKey = entry.getKey();
String[] split = mapKey.split("&&");
int mapValue = entry.getValue();
//判断mysql记录表中是否有该记录,如果有比较batch大小,如果没有默认需要处理
if(mysqlMap.containsKey(split[0])){
String mysqlValue = mysqlMap.get(split[0]);
String[] batchs = mysqlValue.split(",");
String jingdong_batch = batchs[0];
String sr_batch = batchs[1];
if (mapValue>Integer.parseInt(jingdong_batch)){
resultMap.put(mapKey,String.valueOf(mapValue)+","+sr_batch);
}
}else {
resultMap2.put(mapKey,String.valueOf(mapValue)+",0");
}
}
//4.遍历待处理的集合,去mongo对应表取数,返回一个jsonArray,先处理还没有入库的,再处理版本更新的
for (Map.Entry<String, String> entry : resultMap2.entrySet()) {
String entryKey = entry.getKey();
ArrayList<Boolean> resultarr = new ArrayList<Boolean>();
String[] split = entryKey.split("&&");
String querymessage = split[0];
//System.out.println(querymessage);
String[] messageSplit = querymessage.split("%%");
String shopName = messageSplit[0];
String jddate = messageSplit[1];
String tableName = messageSplit[2];
String jdCount = split[1];
String updateTime = split[2];
String batch2 = entry.getValue();
String[] batchSplit = batch2.split(",");
String jingdong_batch = batchSplit[0];
String sr_batch = batchSplit[1];
Integer batch = Integer.parseInt(jingdong_batch);
long timeMilli1 = System.currentTimeMillis();
String time1 = DateFormatUtil.toDate(timeMilli1);
System.out.println("1----处理开始"+time1);
JSONArray detailCollection = mongoUtil.getDetailCollection(querymessage, batch);
long timeMilli2 = System.currentTimeMillis();
String time2 = DateFormatUtil.toDate(timeMilli2);
System.out.println("2----拿到jsonArray"+time2);
Iterator<Object> it = detailCollection.iterator();
StringBuilder loadBuilder = new StringBuilder();
int flag =0; int syncCount =0;
while (it.hasNext()) {
//String tableName="jingdong_ads_dsp_rtb_featured_recommend_dmp_list_v2";
JSONObject jsonObject = JSONObject.parseObject(it.next().toString());
JSONObject flattenJSONObject = mongoUtil.flattenJSONObject(jsonObject);
long timeMillis = System.currentTimeMillis();
String kc_dt = DateFormatUtil.toDate(timeMillis);
String date = flattenJSONObject.getString("date");String _id = flattenJSONObject.getString("_id");
String sellerId = flattenJSONObject.getString("sellerId");String shopNames = flattenJSONObject.getString("shopName");
String batchs = flattenJSONObject.getString("batch");
loadBuilder.append(_id+"\t"+tableName+"\t"+kc_dt+"\t"+sellerId+"\t"+shopNames+"\t"+batchs+"\t"+flattenJSONObject.toJSONString()+"\t"+date+"\n");
flag++; syncCount++;
if (flag>1200){
long timeMilli3 = System.currentTimeMillis();
String time3 = DateFormatUtil.toDate(timeMilli3);
System.out.println("3----发送写入请求"+time3);
mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
long timeMilli4 = System.currentTimeMillis();
String time4 = DateFormatUtil.toDate(timeMilli4);
System.out.println("4----写入完毕"+flag+"条"+time4);
resultarr.add(result);
loadBuilder=new StringBuilder();
flag=0;
}
}
//System.out.println("loadBuilder="+loadBuilder.toString());
//执行写入
long timeMilli3 = System.currentTimeMillis();
String time3 = DateFormatUtil.toDate(timeMilli3);
System.out.println("3----拼写完毕发送写入请求"+time3);
mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
long timeMilli4 = System.currentTimeMillis();
String time4 = DateFormatUtil.toDate(timeMilli4);
System.out.println("4----写入完毕"+flag+"条"+time4);
resultarr.add(result);
//本次写入,留下一条记录,写入记录表中
if (!resultarr.contains(false)){
insertDataLog(shopName,jddate,tableName,String.valueOf(batch),jdCount,updateTime,String.valueOf(syncCount),sr_batch);
}
}
for (Map.Entry<String, String> entry : resultMap.entrySet()) {
String entryKey = entry.getKey();
ArrayList<Boolean> resultarr = new ArrayList<Boolean>();
String[] split = entryKey.split("&&");
String querymessage = split[0];
//System.out.println(querymessage);
String[] messageSplit = querymessage.split("%%");
String shopName = messageSplit[0];
String jddate = messageSplit[1];
String tableName = messageSplit[2];
String jdCount = split[1];
String updateTime = split[2];
String batch2 = entry.getValue();
String[] batchSplit = batch2.split(",");
String jingdong_batch = batchSplit[0];
String sr_batch = batchSplit[1];
Integer batch = Integer.parseInt(jingdong_batch);
long timeMilli1 = System.currentTimeMillis();
String time1 = DateFormatUtil.toDate(timeMilli1);
System.out.println("1----处理开始"+time1);
JSONArray detailCollection = mongoUtil.getDetailCollection(querymessage, batch);
long timeMilli2 = System.currentTimeMillis();
String time2 = DateFormatUtil.toDate(timeMilli2);
System.out.println("2----拿到jsonArray"+time2);
Iterator<Object> it = detailCollection.iterator();
StringBuilder loadBuilder = new StringBuilder();
int flag =0; int syncCount =0;
while (it.hasNext()) {
//String tableName="jingdong_ads_dsp_rtb_featured_recommend_dmp_list_v2";
JSONObject jsonObject = JSONObject.parseObject(it.next().toString());
JSONObject flattenJSONObject = mongoUtil.flattenJSONObject(jsonObject);
long timeMillis = System.currentTimeMillis();
String kc_dt = DateFormatUtil.toDate(timeMillis);
String date = flattenJSONObject.getString("date");String _id = flattenJSONObject.getString("_id");
String sellerId = flattenJSONObject.getString("sellerId");String shopNames = flattenJSONObject.getString("shopName");
String batchs = flattenJSONObject.getString("batch");
loadBuilder.append(_id+"\t"+tableName+"\t"+kc_dt+"\t"+sellerId+"\t"+shopNames+"\t"+batchs+"\t"+flattenJSONObject.toJSONString()+"\t"+date+"\n");
flag++; syncCount++;
if (flag>=1200){
mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
long timeMilli3 = System.currentTimeMillis();
String time3 = DateFormatUtil.toDate(timeMilli3);
System.out.println("3----拼写完毕发送写入请求"+time3);
boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
long timeMilli4 = System.currentTimeMillis();
String time4 = DateFormatUtil.toDate(timeMilli4);
System.out.println("4----写入完毕"+flag+"条"+time4);
resultarr.add(result);
loadBuilder=new StringBuilder();
flag=0;
}
}
//System.out.println("loadBuilder="+loadBuilder.toString());
//执行写入
long timeMilli3 = System.currentTimeMillis();
String time3 = DateFormatUtil.toDate(timeMilli3);
System.out.println("3----拼写完毕发送写入请求"+time3);
mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
long timeMilli4 = System.currentTimeMillis();
String time4 = DateFormatUtil.toDate(timeMilli4);
System.out.println("4----写入完毕"+flag+"条"+time4);
resultarr.add(result);
//本次写入,留下一条记录,写入记录表中
if (!resultarr.contains(false)){
insertDataLog(shopName,jddate,tableName,String.valueOf(batch),jdCount,updateTime,String.valueOf(syncCount),sr_batch);
}
}
long timeMillisend = System.currentTimeMillis();
String end = DateFormatUtil.toDate(timeMillisend);
System.out.println("第"+i+"次结束"+end);
i++;
}
}
}