Stream load的部分更新(partial_update)出现延时更新

【详述】在使用Stream load部分更新功能(partial_update)时出现延时更新的情况,偶尔出现。业务场景,每次取25万数据,分给5个线程各5万并行进行部分更新(同一张表),数据总量是2千万,每个线程部分更新(status由0更新为1)后拿到返回结果(Message:OK)并立即以status为0执行查询进行验证(查询条件带ID),此时偶尔出现以status为0查出数据的情况,整个过程中出现了1到3次这样的问题,每次重跑2千万数据都能复现。为了提升批处理效率,使用了循环查询+批量更新的处理方式,是Stream load本身设计上就是异步的吗?
【背景】使用Stream load部分更新功能
【业务影响】延时更新导致数据一致性问题
【StarRocks版本】3.1.3
【集群规模】3fe(1leader + 2follower)+ 3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,8C/32G/万兆
【表模型】主键模型
【导入或者导出方式】Stream load
【联系方式】2410413272@qq.com
【附件】无

没有人遇到这个问题吗?

stream laod是同步的导入方式 麻烦您可以放下具体的问题截图内容吗 我来跟进下

1赞
模拟了下场景,代码如下:
public static void main(String[] args) {
	try (Connection cnn = DriverManager.getConnection(url, user, pwd)) {
		Statement stmt = cnn.createStatement();
		// StarRocks Stream Load封装类
		StarRocksConnector sr = new StarRocksConnector(streamLoadAddr, streamLoadPort, db, user, pwd);
		List list = new ArrayList<>(50000);
		String[] arr = new String[]{"$.id", "$.status"};
		for (long i = 1L; i < 20000000L; i++) {
			// 第一个参数为id,最后一个参数为status设置为1,数据表里status为0
			Person p = new Person(i, String.valueOf(i), 1, new Date(), 1);
			list.add(p);
			if (i % 50000 == 0) {
				StarRocksOperationResult rslt = sr.update("t_person", JsonUtil.toJSONString(list), "id, status", arr);
				System.out.println(JSON.toJSONString(rslt));
				// 更新完数据后,查每批次的第一条数据,确认status是否为0,为0说明数据还没有更新
				String sql = "select id, status from t_person where id = " + list.get(0).getId();
				ResultSet rs = stmt.executeQuery(sql);
				rs.next();
				int status = rs.getInt("status");
				if (status == 0) {
					throw new Exception("记录ID:" + rs.getInt("id"));
				}
				list = new ArrayList<>(50000);
			}
		}
	} catch (Exception e) {
		e.printStackTrace();
	}
}

输出结果:
{"beginTxnTimeMs":0,"commitAndPublishTimeMs":37,"label":"ae5a553d-4f1e-4e19-aa59-ec8ad3790ae1","loadBytes":4544980,"loadTimeMs":159,"message":"OK","numberFilteredRows":0,"numberLoadedRows":50000,"numberTotalRows":50000,"numberUnselectedRows":0,"readDataTimeMs":11,"status":"Success","streamLoadPlanTimeMs":0,"success":true,"txnId":3778781,"writeDataTimeMs":120}
{"beginTxnTimeMs":0,"commitAndPublishTimeMs":38,"label":"10b27425-e4de-430c-ab50-3aa2fdedae8e","loadBytes":4541478,"loadTimeMs":150,"message":"OK","numberFilteredRows":0,"numberLoadedRows":50000,"numberTotalRows":50000,"numberUnselectedRows":0,"readDataTimeMs":5,"status":"Success","streamLoadPlanTimeMs":0,"success":true,"txnId":3778782,"writeDataTimeMs":111}
java.lang.Exception: 记录ID:3350001
at com.xx.common.Test.main(Test.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65)
1赞
@Slf4j
public class StarRocksConnector {
    private String host;
    private int port;
    private String database;
    private String user;
    private String passwd;
    private String urlPrefix;

    public StarRocksConnector(String host, int port, String database, String user, String passwd) {
        this.host = host;
        this.port = port;
        this.database = database;
        this.user = user;
        this.passwd = passwd;
        this.urlPrefix = String.format("http://%s:%s/api/", host, port);
    }

    //Build http client builder
    private HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    // If the connection target is FE, you need to deal with 307 redirect。
                    return true;
                }
            });

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public StarRocksOperationResult update(String table, String jsonData, String columnNames, String[] jsonpaths) throws Exception {
        return update(this.database, table, jsonData, columnNames, jsonpaths);
    }

    public StarRocksOperationResult update(String database, String table, String jsonData, String columnNames, String[] jsonpaths) throws Exception {
        return loadData(UpdateType.PARTIAL_COLUMNS, database, table, jsonData, columnNames, jsonpaths);
    }
	
    public StarRocksOperationResult loadData(UpdateType type, String database, String table, String jsonData, String columnNames, String[] jsonpaths) throws Exception {
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(urlPrefix + database + "/" + table + "/_stream_load");
            put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
            put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, passwd));

            // You can set stream load related properties in the Header, here we set label and column_separator.
            put.setHeader("label", UUID.randomUUID().toString());
            put.setHeader("strip_outer_array", "true");
            put.setHeader("format", "json");
            put.setHeader("ignore_json_size", "true");
            if (null != columnNames && !"".equals(columnNames)) {
                put.setHeader("columns", columnNames);
            }
            if (type.equals(UpdateType.PARTIAL_COLUMNS)) {
                put.setHeader("partial_update", "true");
            }
            if (null != jsonpaths && !"".equals(jsonpaths)) {
                put.setHeader("jsonpaths", JSON.toJSONString(jsonpaths));
            }

            // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
            StringEntity entity = new StringEntity(jsonData, "UTF-8");
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }

                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) {
                    throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                }
                log.debug("Get load result:{} ", loadResult);
                return JSONObject.parseObject(loadResult, StarRocksOperationResult.class);
            }
        }
    }
}

enum UpdateType {
    PARTIAL_COLUMNS,
    ALL_COLUMNS
}

没有其他人遇到这个问题吗?