flink connector 中为什么每次提交都会创建一个httpclient

com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor#doHttpPut
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    return true;
                }
            });
        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            Map<String, String> props = sinkOptions.getSinkStreamLoadProperties();
            for (Map.Entry<String,String> entry : props.entrySet()) {
                httpPut.setHeader(entry.getKey(), entry.getValue());
            }
          ...

如题。为啥不公用一个httpclient,复用连接呢?而且也没有开放任何跟httpclient相关的设置参数。请教下为啥这么考虑?

链接是无法复用的,fe会把每一个请求重定向到be上,所以每次请求fe都是要重新链接的(即使指定了connection: keep-alive)。并且考虑到整个系统应该尽量减少stream load的次数,增加每个批次的数据量,所以每次重新创建httpclient也是可以接受的。
当然也可以全局只用一个httpclient,欢迎提PR(这里可能要考虑将httpclient初始化操作移到实例序列化下发至task并open之后)。

好的。 感谢。 :+1:。 pr可能实力还不够。。。

麻烦再请教下
1.既然每次都是新建连接 为啥不用HttpURLConnection 而是要选择httpclient。
2. 我看文档说可以直接提交给BE。 如果不经过FE是不是性能会有所提升?
3. 直接提交给BE是否就更好实现连接复用

出于负载考虑,一般不直接提交给be,避免造成单点故障,而是提交给fe,由fe统一协调转发任务到不同be实现负载均衡。还有就是考虑高可用性,在be单节点宕机情况下,fe可以让其他节点retry。

  1. httpurlconnection 当然也可以,但是需要自己实现 307动作和相应的expect: 100-continue操作。考虑选择依赖现有的成熟方案比较可靠。
  2. 直接提交个be会有性能提升但会有均衡问题,因为fe内部有一些均衡逻辑会判断当前stream load发到哪个be比较合适。
  3. 如果不考虑be均衡问题,是可以复用链接的。

好的 感谢大佬。 学到了

感谢大佬。 学到了 学到了