java编写streamLoad导入速度慢

【详述】java编写streamLoad导入速度慢
【背景】使用官方案例,java读取mongodb数据,拼接字符串后,每1200条调用一次sendData方法执行写入,发现速度较慢,有时候1200条写入超过2s,请问怎么能提速
【业务影响】
【StarRocks版本】例如:2.5.6
【集群规模】例如:1fe(1 follower+2observer)+8be(fe与be混部)
【机器信息】8C 16G
【表模型】主键模型
【导入或者导出方式】java 编写streamload
【联系方式】ljw1209113096
【附件】

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.;
import java.util.
;

public class mongoStreamLoad {
private final static String STARROCKS_HOST = “”;
private final static String STARROCKS_DB = “”;
private final static String STARROCKS_USER = “”;
private final static String STARROCKS_PASSWORD = “”;
private final static int STARROCKS_HTTP_PORT = 8030;
//
private static String url = “”;
private static Connection conn = null;
static final String JDBC_DRIVER = “com.starrocks.jdbc.Driver”;
public static Connection getConnection() throws SQLException {
if (null == conn || conn.isClosed()) {
try {
Class.forName(“com.mysql.jdbc.Driver”);
conn = DriverManager.getConnection(url, STARROCKS_USER, STARROCKS_PASSWORD);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    return conn;
}
private boolean sendData(String content,String STARROCKS_TABLE) throws Exception {
    long timeMillis = System.currentTimeMillis();
    String createTime = DateFormatUtil.toDate(timeMillis);
    System.out.println("3.5----接收到数据"+createTime);
    boolean flag=true;
    final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
            STARROCKS_HOST,
            STARROCKS_HTTP_PORT,
            STARROCKS_DB,
            STARROCKS_TABLE);

    final HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    return true;
                }
            });

    try (CloseableHttpClient client = httpClientBuilder.build()) {
        HttpPut put = new HttpPut(loadUrl);
        StringEntity entity = new StringEntity(content, "UTF-8");
        put.setHeader(HttpHeaders.EXPECT, "100-continue");
        put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
        // the label header is optional, not necessary
        // use label header can ensure at most once semantics
        //put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de");
        put.setEntity(entity);
        client.execute(put);
        
    }
    return flag;
}
private String basicAuthHeader(String username, String password) {
    final String tobeEncode = username + ":" + password;
    byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
    return "Basic " + new String(encoded);
}

//拼接写入的字符串
public static String splitStrBud(List<String> fieldNames, JSONObject jsonObject) {
    StringBuilder builder = new StringBuilder();
    //先遍历fieldNames,去json里取值,拼接,如果取不到,则用空填补
    for (String fieldName : fieldNames) {
        String values = jsonObject.getString(fieldName);
        if (values==null){
            values="";
        }
        builder.append(values+"\t");
    }
    if (builder.length()>1){
        builder.deleteCharAt(builder.length() - 1);
    }
    builder.append("\n");
    return builder.toString();
}

//写入记录表
public static void insertDataLog(String shopName, String jdDate,String tableName,String batch,String jdCount,String updateTime,String syncCount,String sr_batch) {
    PreparedStatement pst = null;
    Statement st = null;
    StringBuilder builder = new StringBuilder();
    long timeMillis = System.currentTimeMillis();
    String createTime = DateFormatUtil.toDate(timeMillis);
    builder.append(shopName+"\t"+jdDate+"\t"+tableName+"\t"+batch+"\t"+jdCount+"\t"+"1\t"+updateTime+"\t"+syncCount+"\t"+createTime+"\t"+"1"+"\t"+sr_batch);
    mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
    try {
        System.out.println(builder.toString());
        starrocksStreamLoad.sendData(builder.toString(),"mysql_template_data_log");
    }catch (Exception e){
        e.printStackTrace();
    }

}

public static void main(String[] args) throws Exception {

int i=0;
while (true){
long timeMillisst = System.currentTimeMillis();
String start = DateFormatUtil.toDate(timeMillisst);
System.out.println(“第”+i+“次开始”+start);
//1.查询starrocks中的导入记录表,并保存为一个map
Connection conn = getConnection(); //(连接需要的三个参数)
PreparedStatement pst = null;
Statement st = null;
String sql = “select * from ods.mysql_template_data_log”;
Map<String, String> mysqlMap = new HashMap<String, String>();
try {
pst =conn.prepareStatement(sql);
ResultSet rs =pst.executeQuery();

    ResultSetMetaData meta = rs.getMetaData();
    int numColumns = meta.getColumnCount();
    while (rs.next()) {
        String jingdongShopname = rs.getString("jingdong_shopname");
        String jingdongDate = rs.getString("jingdong_date");
        String jingdongTablename = rs.getString("jingdong_tablename");
        String jingdongBatch = rs.getString("jingdong_batch");
        String sr_batch = rs.getString("sr_batch");
        if (sr_batch==null||"".equals(sr_batch)){
            sr_batch="0";
        }
        mysqlMap.put(jingdongShopname+"%%"+jingdongDate+"%%"+jingdongTablename,jingdongBatch+","+sr_batch);
    }
    rs.close();
    pst.close();
    conn.close();
} catch (Exception e) {
    e.printStackTrace();
}
// 2.读取mongoDb中配置表信息,遍历成一个map
Map<String, Integer> mongoMap = mongoUtil.getSettingCollection("jingdong_template_data_log");
// 3.遍历两个集合,如果存在同shopname-date-tablename 比一下batch大小,batch更大,放入待处理集合,如果没有,则放入待处理集合
Map<String, String> resultMap = new HashMap<String, String>();
Map<String, String> resultMap2 = new HashMap<String, String>();
for (Map.Entry<String, Integer> entry : mongoMap.entrySet()) {
    String mapKey = entry.getKey();
    String[] split = mapKey.split("&&");
    int mapValue = entry.getValue();
    //判断mysql记录表中是否有该记录,如果有比较batch大小,如果没有默认需要处理
    if(mysqlMap.containsKey(split[0])){
        String mysqlValue = mysqlMap.get(split[0]);
        String[] batchs = mysqlValue.split(",");
        String jingdong_batch = batchs[0];
        String sr_batch = batchs[1];
        if (mapValue>Integer.parseInt(jingdong_batch)){
            resultMap.put(mapKey,String.valueOf(mapValue)+","+sr_batch);
        }
    }else {
        resultMap2.put(mapKey,String.valueOf(mapValue)+",0");
    }
}
//4.遍历待处理的集合,去mongo对应表取数,返回一个jsonArray,先处理还没有入库的,再处理版本更新的
for (Map.Entry<String, String> entry : resultMap2.entrySet()) {
    String entryKey = entry.getKey();
    ArrayList<Boolean> resultarr = new ArrayList<Boolean>();
    String[] split = entryKey.split("&&");
    String querymessage = split[0];
    //System.out.println(querymessage);
    String[] messageSplit = querymessage.split("%%");
    String shopName = messageSplit[0];
    String jddate = messageSplit[1];
    String tableName = messageSplit[2];
    String jdCount = split[1];
    String updateTime = split[2];
    String batch2 = entry.getValue();
    String[] batchSplit = batch2.split(",");
    String jingdong_batch = batchSplit[0];
    String sr_batch = batchSplit[1];

    Integer batch = Integer.parseInt(jingdong_batch);
    long timeMilli1 = System.currentTimeMillis();
    String time1 = DateFormatUtil.toDate(timeMilli1);
    System.out.println("1----处理开始"+time1);
    JSONArray detailCollection = mongoUtil.getDetailCollection(querymessage, batch);
    long timeMilli2 = System.currentTimeMillis();
    String time2 = DateFormatUtil.toDate(timeMilli2);
    System.out.println("2----拿到jsonArray"+time2);
    Iterator<Object> it = detailCollection.iterator();
    StringBuilder loadBuilder = new StringBuilder();
    int flag =0; int syncCount =0;
    while (it.hasNext()) {
        //String tableName="jingdong_ads_dsp_rtb_featured_recommend_dmp_list_v2";
        JSONObject jsonObject = JSONObject.parseObject(it.next().toString());
        JSONObject flattenJSONObject = mongoUtil.flattenJSONObject(jsonObject);
        long timeMillis = System.currentTimeMillis();
        String kc_dt = DateFormatUtil.toDate(timeMillis);
        String date = flattenJSONObject.getString("date");String _id = flattenJSONObject.getString("_id");
        String sellerId = flattenJSONObject.getString("sellerId");String shopNames = flattenJSONObject.getString("shopName");
        String batchs = flattenJSONObject.getString("batch");
        loadBuilder.append(_id+"\t"+tableName+"\t"+kc_dt+"\t"+sellerId+"\t"+shopNames+"\t"+batchs+"\t"+flattenJSONObject.toJSONString()+"\t"+date+"\n");
        flag++; syncCount++;
        if (flag>1200){
            long timeMilli3 = System.currentTimeMillis();
            String time3 = DateFormatUtil.toDate(timeMilli3);
            System.out.println("3----发送写入请求"+time3);
            mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
            boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
            long timeMilli4 = System.currentTimeMillis();
            String time4 = DateFormatUtil.toDate(timeMilli4);
            System.out.println("4----写入完毕"+flag+"条"+time4);
            resultarr.add(result);
            loadBuilder=new StringBuilder();
            flag=0;
        }

    }

    //System.out.println("loadBuilder="+loadBuilder.toString());
    //执行写入
    long timeMilli3 = System.currentTimeMillis();
    String time3 = DateFormatUtil.toDate(timeMilli3);
    System.out.println("3----拼写完毕发送写入请求"+time3);
    mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
    boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
    long timeMilli4 = System.currentTimeMillis();
    String time4 = DateFormatUtil.toDate(timeMilli4);
    System.out.println("4----写入完毕"+flag+"条"+time4);
    resultarr.add(result);
    //本次写入,留下一条记录,写入记录表中
    if (!resultarr.contains(false)){
        insertDataLog(shopName,jddate,tableName,String.valueOf(batch),jdCount,updateTime,String.valueOf(syncCount),sr_batch);
    }
}


for (Map.Entry<String, String> entry : resultMap.entrySet()) {
    String entryKey = entry.getKey();
    ArrayList<Boolean> resultarr = new ArrayList<Boolean>();
    String[] split = entryKey.split("&&");
    String querymessage = split[0];
    //System.out.println(querymessage);
    String[] messageSplit = querymessage.split("%%");
    String shopName = messageSplit[0];
    String jddate = messageSplit[1];
    String tableName = messageSplit[2];
    String jdCount = split[1];
    String updateTime = split[2];
    String batch2 = entry.getValue();
    String[] batchSplit = batch2.split(",");
    String jingdong_batch = batchSplit[0];
    String sr_batch = batchSplit[1];

    Integer batch = Integer.parseInt(jingdong_batch);
    long timeMilli1 = System.currentTimeMillis();
    String time1 = DateFormatUtil.toDate(timeMilli1);
    System.out.println("1----处理开始"+time1);
    JSONArray detailCollection = mongoUtil.getDetailCollection(querymessage, batch);
    long timeMilli2 = System.currentTimeMillis();
    String time2 = DateFormatUtil.toDate(timeMilli2);
    System.out.println("2----拿到jsonArray"+time2);
    Iterator<Object> it = detailCollection.iterator();
    StringBuilder loadBuilder = new StringBuilder();
    int flag =0; int syncCount =0;
    while (it.hasNext()) {
        //String tableName="jingdong_ads_dsp_rtb_featured_recommend_dmp_list_v2";
        JSONObject jsonObject = JSONObject.parseObject(it.next().toString());
        JSONObject flattenJSONObject = mongoUtil.flattenJSONObject(jsonObject);
        long timeMillis = System.currentTimeMillis();
        String kc_dt = DateFormatUtil.toDate(timeMillis);
        String date = flattenJSONObject.getString("date");String _id = flattenJSONObject.getString("_id");
        String sellerId = flattenJSONObject.getString("sellerId");String shopNames = flattenJSONObject.getString("shopName");
        String batchs = flattenJSONObject.getString("batch");
        loadBuilder.append(_id+"\t"+tableName+"\t"+kc_dt+"\t"+sellerId+"\t"+shopNames+"\t"+batchs+"\t"+flattenJSONObject.toJSONString()+"\t"+date+"\n");
        flag++; syncCount++;
        if (flag>=1200){
            mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
            long timeMilli3 = System.currentTimeMillis();
            String time3 = DateFormatUtil.toDate(timeMilli3);
            System.out.println("3----拼写完毕发送写入请求"+time3);
            boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
            long timeMilli4 = System.currentTimeMillis();
            String time4 = DateFormatUtil.toDate(timeMilli4);
            System.out.println("4----写入完毕"+flag+"条"+time4);
            resultarr.add(result);
            loadBuilder=new StringBuilder();
            flag=0;
        }

    }
    //System.out.println("loadBuilder="+loadBuilder.toString());
    //执行写入
    long timeMilli3 = System.currentTimeMillis();
    String time3 = DateFormatUtil.toDate(timeMilli3);
    System.out.println("3----拼写完毕发送写入请求"+time3);
    mongoStreamLoad starrocksStreamLoad = new mongoStreamLoad();
    boolean result = starrocksStreamLoad.sendData(loadBuilder.toString(), "mongo_api_data_all");
    long timeMilli4 = System.currentTimeMillis();
    String time4 = DateFormatUtil.toDate(timeMilli4);
    System.out.println("4----写入完毕"+flag+"条"+time4);
    resultarr.add(result);
    //本次写入,留下一条记录,写入记录表中
    if (!resultarr.contains(false)){
        insertDataLog(shopName,jddate,tableName,String.valueOf(batch),jdCount,updateTime,String.valueOf(syncCount),sr_batch);
    }
}
long timeMillisend = System.currentTimeMillis();
String end = DateFormatUtil.toDate(timeMillisend);
System.out.println("第"+i+"次结束"+end);
i++;

}

}

}

最好每个批次攒的多一些,另外写入慢的问题需要case by case看日志分析。另外我看flink-cdc已经支持了mongodb,咱们为啥没用cdc同步mongodb的数据到sr

因为String长度限制原因,我把每批次改成1200一次,怕太多了超过String长度,我日志这边输出了一下,就是前面读mongo不慢,就是到调用streamload 一次写入要很久,然后因为技术原因,这部分以后要移交给后端同学,所以请问下 在现有基础上这块还有更好的解决办法吗

be日志里面搜下PUBLISH看下task_count_in_queue这个队列大吗

老师你好,我这边是早上跑的时候特别慢,但是我白天测的时候 速度又比较正常,请问我明天早上如果再慢的话,我应该去看哪些日志去找到问题呢,据我了解 那个时间应该是没有其他任务在写入starrocks的,看be的这个task_count_in_queue是吧,现在看了下今天早上这个参数大部分是1,其中偶尔有的时候到了10

主要看下be日志里面是不是很多的PUBLISH_VERSION的,另外看下task_count_in_queue有成本上千的吗?如果没有的话,需要看下机器cpu、内存和io负载是不是打满了

609 00:33:02.971318 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635473, task_count_in_queue=1
I0609 00:34:03.498868 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635551, task_count_in_queue=1
I0609 00:34:06.815630 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635552, task_count_in_queue=1
I0609 00:34:09.600961 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635554, task_count_in_queue=1
I0609 00:34:12.559091 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635555, task_count_in_queue=1
I0609 00:34:15.242552 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635556, task_count_in_queue=1
I0609 00:34:19.300001 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635557, task_count_in_queue=1
I0609 00:34:40.252934 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635558, task_count_in_queue=1
I0609 00:35:01.775885 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635586, task_count_in_queue=1
I0609 00:35:08.090173 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635592, task_count_in_queue=1
I0609 00:35:27.358098 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635636, task_count_in_queue=1
I0609 00:35:27.358117 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635633, task_count_in_queue=2
I0609 00:35:27.380654 5959 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635632, task_count_in_queue=1
I0609 00:35:27.388803 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635591, task_count_in_queue=1
I0609 00:35:29.799994 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635634, task_count_in_queue=1
I0609 00:35:33.184767 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635635, task_count_in_queue=1
I0609 00:35:39.361122 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635588, task_count_in_queue=1
I0609 00:35:51.338990 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635644, task_count_in_queue=1
I0609 00:36:02.901707 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635653, task_count_in_queue=1
I0609 00:36:06.905151 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635652, task_count_in_queue=1
I0609 00:36:10.199834 20933 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635654, task_count_in_queue=1
I0609 00:36:13.553503 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635655, task_count_in_queue=1
I0609 00:36:18.424670 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635721, task_count_in_queue=1
I0609 00:36:20.904420 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635722, task_count_in_queue=1
I0609 00:36:23.811228 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635723, task_count_in_queue=1
I0609 00:36:28.396907 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635726, task_count_in_queue=1
I0609 00:36:28.571539 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635728, task_count_in_queue=1
I0609 00:36:30.807363 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635725, task_count_in_queue=1
I0609 00:36:39.102576 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635739, task_count_in_queue=1
I0609 00:36:40.408375 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635740, task_count_in_queue=1
I0609 00:36:46.071121 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635741, task_count_in_queue=1
I0609 00:36:47.268703 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635724, task_count_in_queue=1
I0609 00:36:56.271167 2850 task_worker_pool.cpp:158] Submit task success. type=PUBLISH_VERSION, signature=34635727, task_count_in_queue=1

老师 好像是有很多PUBLISH_VERSION,我那边是每1000多条就调一次streamLoad执行写入