FLINK插件写入通过nginx代理的be节点时提示空指针

【详述】flink插件flink-connector-starrocks版本1.2.5_flink-1.13_2.12,写入数据时提示空指针。
【代码】 EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment env = TableEnvironment.create(settings);
env.executeSql(
“CREATE TABLE ass_monitoring_test(” +
“digest STRING not null,” +
“exec_time STRING not null,” +
"user STRING," +
// “ts STRING,” +
// “tag STRING,” +
// "database STRING," +
// “sqltext STRING,” +
“primary key(digest, exec_time) not enforced” +
") WITH ( " +
“‘connector’ = ‘starrocks’,” +
“‘jdbc-url’=‘jdbc:mysql://xxxx:9033’,” +
“‘load-url’=‘xxxxx:8033’,” +
“‘database-name’ = ‘hdop’,” +
“‘table-name’ = ‘ass_monitoring_test’,” +
“‘username’ = ‘hdop’,” +
“‘password’ = ‘123456’,” +
“‘sink.buffer-flush.max-rows’ = ‘1000000’,” +
“‘sink.buffer-flush.max-bytes’ = ‘300000000’,” +
“‘sink.buffer-flush.interval-ms’ = ‘5000’,” +
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列,需要在’sink.properties.columns’的最后显示添加’__op’列。
// “‘sink.properties.partial_update’ = ‘true’,” +
“‘sink.properties.columns’ = ‘digest,exec_time,user’,” +
“‘sink.properties.column_separator’ = ‘\x01’,” +
“‘sink.properties.row_delimiter’ = ‘\x02’,” +
“‘sink.max-retries’ = ‘3’” +
“)”
);
env.executeSql(
“INSERT INTO ass_monitoring_test SELECT ‘6’, ‘2021-03-01 12:00:00’, cast(null as varchar) union all SELECT ‘6’, ‘2021-03-02 13:00:00’, ‘6’”);
【问题】Caused by: java.lang.NullPointerException: null
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257)
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
… 1 common frames omitted
【问题分析】
我直接将load-url改成be的8040端口是可以执行成功的,用以前的flink-connector-doris插件也是没有问题的,好像是httpclient请求nginx的时候返回307重定向,但是为啥以前的插件是正常返回200,请帮忙定位下原因,谢谢啦

能提供一下taskmanager日志吗,或者搜一下这个 Stream load completed, label


返回的body就是空的 什么都没有

你说的 flink-connector-doris 是什么版本,还是指jdbc的方式?现在connector用的是stream load,连接FE时会重定向到BE,如果直接连BE则不会重定向,可以看下stream load的原理


这个是当时starrocks还没有官方的包时候的用的

我直接连接fe的端口程序没有问题,但是连接nginx代理的fe的端口就出现问题了

两个包不同的地方就是在这里


抓包发现两个版本的不同,请问下第一张图片里面的nginx代理应该如何配置


1.2.5里stream load用的是chunked http,如果怀疑是这个问题,可以先用最新的connector 1.2.6再验证下,这个版本改回了非chunked模式

找到原因了,新包里面的httpentity是自己实现的,当第一次发起nginx请求是把inputstream里面的数据读取出来之后,重定向发起的第二次请求就读取不了数据了

请问下你们后面是怎么处理的?在nginx代理上修改了配置?还是说要求connector不能自定义httpentity

这个地方我也不知道怎么处理,我就这么替换了一下

可以用connector 1.2.6版本验证下是否还有这个问题吗

flink-connector-starrocks-1.2.6_flink-1.13_2.11.jar 我就是用的这个版本