【详述】在使用Stream load部分更新功能(partial_update)时出现延时更新的情况,偶尔出现。业务场景,每次取25万数据,分给5个线程各5万并行进行部分更新(同一张表),数据总量是2千万,每个线程部分更新(status由0更新为1)后拿到返回结果(Message:OK)并立即以status为0执行查询进行验证(查询条件带ID),此时偶尔出现以status为0查出数据的情况,整个过程中出现了1到3次这样的问题,每次重跑2千万数据都能复现。为了提升批处理效率,使用了循环查询+批量更新的处理方式,是Stream load本身设计上就是异步的吗?
【背景】使用Stream load部分更新功能
【业务影响】延时更新导致数据一致性问题
【StarRocks版本】3.1.3
【集群规模】3fe(1leader + 2follower)+ 3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,8C/32G/万兆
【表模型】主键模型
【导入或者导出方式】Stream load
【联系方式】2410413272@qq.com
【附件】无
没有人遇到这个问题吗?
stream laod是同步的导入方式 麻烦您可以放下具体的问题截图内容吗 我来跟进下
1赞
模拟了下场景,代码如下:
public static void main(String[] args) {
try (Connection cnn = DriverManager.getConnection(url, user, pwd)) {
Statement stmt = cnn.createStatement();
// StarRocks Stream Load封装类
StarRocksConnector sr = new StarRocksConnector(streamLoadAddr, streamLoadPort, db, user, pwd);
List list = new ArrayList<>(50000);
String[] arr = new String[]{"$.id", "$.status"};
for (long i = 1L; i < 20000000L; i++) {
// 第一个参数为id,最后一个参数为status设置为1,数据表里status为0
Person p = new Person(i, String.valueOf(i), 1, new Date(), 1);
list.add(p);
if (i % 50000 == 0) {
StarRocksOperationResult rslt = sr.update("t_person", JsonUtil.toJSONString(list), "id, status", arr);
System.out.println(JSON.toJSONString(rslt));
// 更新完数据后,查每批次的第一条数据,确认status是否为0,为0说明数据还没有更新
String sql = "select id, status from t_person where id = " + list.get(0).getId();
ResultSet rs = stmt.executeQuery(sql);
rs.next();
int status = rs.getInt("status");
if (status == 0) {
throw new Exception("记录ID:" + rs.getInt("id"));
}
list = new ArrayList<>(50000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
输出结果:
{"beginTxnTimeMs":0,"commitAndPublishTimeMs":37,"label":"ae5a553d-4f1e-4e19-aa59-ec8ad3790ae1","loadBytes":4544980,"loadTimeMs":159,"message":"OK","numberFilteredRows":0,"numberLoadedRows":50000,"numberTotalRows":50000,"numberUnselectedRows":0,"readDataTimeMs":11,"status":"Success","streamLoadPlanTimeMs":0,"success":true,"txnId":3778781,"writeDataTimeMs":120}
{"beginTxnTimeMs":0,"commitAndPublishTimeMs":38,"label":"10b27425-e4de-430c-ab50-3aa2fdedae8e","loadBytes":4541478,"loadTimeMs":150,"message":"OK","numberFilteredRows":0,"numberLoadedRows":50000,"numberTotalRows":50000,"numberUnselectedRows":0,"readDataTimeMs":5,"status":"Success","streamLoadPlanTimeMs":0,"success":true,"txnId":3778782,"writeDataTimeMs":111}
java.lang.Exception: 记录ID:3350001
at com.xx.common.Test.main(Test.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65)
1赞
@Slf4j
public class StarRocksConnector {
private String host;
private int port;
private String database;
private String user;
private String passwd;
private String urlPrefix;
public StarRocksConnector(String host, int port, String database, String user, String passwd) {
this.host = host;
this.port = port;
this.database = database;
this.user = user;
this.passwd = passwd;
this.urlPrefix = String.format("http://%s:%s/api/", host, port);
}
//Build http client builder
private HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// If the connection target is FE, you need to deal with 307 redirect。
return true;
}
});
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
public StarRocksOperationResult update(String table, String jsonData, String columnNames, String[] jsonpaths) throws Exception {
return update(this.database, table, jsonData, columnNames, jsonpaths);
}
public StarRocksOperationResult update(String database, String table, String jsonData, String columnNames, String[] jsonpaths) throws Exception {
return loadData(UpdateType.PARTIAL_COLUMNS, database, table, jsonData, columnNames, jsonpaths);
}
public StarRocksOperationResult loadData(UpdateType type, String database, String table, String jsonData, String columnNames, String[] jsonpaths) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(urlPrefix + database + "/" + table + "/_stream_load");
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, passwd));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("strip_outer_array", "true");
put.setHeader("format", "json");
put.setHeader("ignore_json_size", "true");
if (null != columnNames && !"".equals(columnNames)) {
put.setHeader("columns", columnNames);
}
if (type.equals(UpdateType.PARTIAL_COLUMNS)) {
put.setHeader("partial_update", "true");
}
if (null != jsonpaths && !"".equals(jsonpaths)) {
put.setHeader("jsonpaths", JSON.toJSONString(jsonpaths));
}
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
StringEntity entity = new StringEntity(jsonData, "UTF-8");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
log.debug("Get load result:{} ", loadResult);
return JSONObject.parseObject(loadResult, StarRocksOperationResult.class);
}
}
}
}
enum UpdateType {
PARTIAL_COLUMNS,
ALL_COLUMNS
}
没有其他人遇到这个问题吗?