flink任务通过lookup join starrocks的表,填充维度。没有进行谓词下推,报内存不足错误

【详述】flink任务通过lookup join starrocks的表,填充维度。查看log,发现没有进行谓词下推,导致内存不足。运行中报内存不足错误。
2023-06-24 22:11:58,392 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Finished building RocksDB keyed state-backend at /hadoop/yarn/local/usercache/root/appcache/application_1674975318075_66463/flink-io-e2bb8076-c33e-4ee7-853a-27a9ae7e93a8/job_e8562f41d44a49d82332d09cd554c01e_op_StreamingJoinOperator_8d15ade8b1e4809c335f24f6b7dba4a0__1_1__uuid_dde04e0e-2a6f-4483-b0d5-f8672761335a.
2023-06-24 22:11:58,436 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
2023-06-24 22:11:58,436 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
2023-06-24 22:11:58,437 INFO com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction [] - Populating lookup join cache
2023-06-24 22:11:58,437 INFO com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction [] - LookUpFunction SQL [select * from customer_field_value]
2023-06-24 22:11:58,437 INFO com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor [] - query sql [select * from customer_field_value]
2023-06-24 22:11:58,480 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - query data from be by using be-ip
2023-06-24 22:11:58,480 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - query data from be by using be-ip
2023-06-24 22:11:58,480 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - query data from be by using be-ip
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.mem_limit 1073741824 B
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.keep-alive-min 10 min
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.mem_limit 1073741824 B
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.keep-alive-min 10 min
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.mem_limit 1073741824 B
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.keep-alive-min 10 min
2023-06-24 22:11:59,140 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed RocksDB State Backend. Cleaning up RocksDB working directory /hadoop/yarn/local/usercache/root/appcache/application_1674975318075_66463/flink-io-e2bb8076-c33e-4ee7-853a-27a9ae7e93a8/job_e8562f41d44a49d82332d09cd554c01e_op_StreamingJoinOperator_8d15ade8b1e4809c335f24f6b7dba4a0__1_1__uuid_dde04e0e-2a6f-4483-b0d5-f8672761335a.
2023-06-24 22:11:59,150 WARN org.apache.flink.runtime.taskmanager.Task [] - Join(joinType=[InnerJoin], where=[((coll_account = pay_account) AND (trans_amount0 = trans_amount) AND (trans_time0 = trans_time) AND (update_time0 >= (update_time - 3000:INTERVAL SECOND)) AND (update_time0 <= update_time))], select=[id_, trans_time, trans_amount, pay_account, update_time, trans_time0, trans_amount0, pay_object, coll_account, update_time0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id_, CAST(pay_object) AS pay_object00]) -> LookupJoin(table=[default_catalog.default_database.customer_field_value], joinType=[InnerJoin], async=[false], lookup=[field_value=pay_object00, field_name_id=17], where=[((field_name_id = 17:BIGINT) AND (CAST(del_flag) = 0) AND (customer_id > 0))], select=[id_, pay_object00, field_value, customer_id]) -> Calc(select=[id_, CAST(field_value) AS client_company_name, CAST(customer_id) AS client_company_id, UTF-16LE’prod_dwd_flow_record_cmbc_client_update’:VARCHAR(255) CHARACTER SET “UTF-16LE” AS etl_channel_name, CAST(()) AS etl_time]) -> Sink: Sink(table=[default_catalog.default_database.dwd_client_update], fields=[id, client_company_name, client_company_id, etl_channel_name, etl_time]) (1/1)#109118 (5c887a6fbf3a22a1d4a8854fe52142a4) switched from RUNNING to FAILED.
com.starrocks.shade.org.apache.arrow.memory.OutOfMemoryException: com.starrocks.shade.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
at sun.reflect.GeneratedConstructorAccessor76.newInstance(Unknown Source) ~[?:?]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_144]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_144]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) ~[?:1.8.0_144]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_144]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_144]
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714) ~[?:1.8.0_144]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_144]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_144]
at com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction.reloadData(StarRocksDynamicLookupFunction.java:109) ~[flink-connector-starrocks-1.2.1_flink-1.12_2.11.jar:?]
at com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction.eval(StarRocksDynamicLookupFunction.java:68) ~[flink-connector-starrocks-1.2.1_flink-1.12_2.11.jar:?]
at LookupFunction$153.flatMap(Unknown Source) ~[?:?]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at StreamExecCalc$149.processElement(Unknown Source) ~[?:?]

【背景】基于flink-connector-starrocks-1.2.1_flink-1.12_2.11.jar ,实现flink任务的维表关联
【业务影响】flink任务运行失败,业务逻辑跑不起来
【StarRocks版本】2.5.4
【集群规模、机器信息】fe:8C16G 3 , be:8C32G 3 ,BE/FE单独部署
【联系方式】StarRocks社区群8
【附件】

  • Profile信息,如何获取profile
  • 并行度:show variables like ‘%parallel_fragment_exec_instance_num%’;show variables like ‘%pipeline_dop%’;
    |Variable_name|Value|
    |—|—|
    |parallel_fragment_exec_instance_num|1|
Variable_name Value
pipeline_dop 0
  • pipeline是否开启:show variables like ‘%pipeline%’;
    |Variable_name|Value|
    |—|—|
    |enable_pipeline_engine|true|
    |enable_pipeline_query_statistic|true|
    |pipeline_dop|0|
    |pipeline_profile_level|1|

flink任务SQL如下:

CREATE TABLE dwd_record_kafka (
id_ varchar(129) comment ‘id’,
trans_time timestamp ,
trans_amount decimal(16,2) ,
acct_batch_no varchar(129) ,
pay_object varchar(128) ,
pay_account varchar(32) ,
pay_bank_name varchar(129) ,
coll_account varchar(32) ,
payee_bank_name varchar(65) ,
loan_ind tinyint ,
orders_type varchar(50) ,
client_company_name varchar(65) ,
client_company_id varchar(100) ,
update_time timestamp ,
status_ tinyint ,
proc_time AS PROCTIME()
) WITH(
‘connector’ = ‘kafka’,
‘topic’ = ‘dwd_flow_record’ …… ……
);

CREATE TABLE customer_field_value (
id bigint comment ‘id’,
field_value varchar(28000) comment ‘自定义模板值’,
customer_id bigint comment ‘客户信息Id’,
field_name_id bigint comment ‘自定义模板id’,
del_flag tinyint comment ‘是否删除 0:未删除 1:已删除’
) WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘jdbc:mysql://starrocks.com:9030’,
‘scan-url’=‘starrocks.com:8030’,
‘database-name’ = ‘ods’ …… ……
);

CREATE TABLE dwd_client_update (
id_ varchar(129) comment ‘id’,
client_company_name varchar(65) comment ‘客户名称’,
client_company_id varchar(100) comment ‘客户id’,
etl_channel_name varchar(255) COMMENT ‘ETL同步任务名称’,
etl_time timestamp ,
primary key(id_) not ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘url’=‘jdbc:mysql://’ …… ……
);

insert into dwd_client_update
select a.id_,
b.field_value as client_company_name,
cast(b.customer_id as string) as client_company_id,
‘dwd_client_update’ etl_channel_name,
LOCALTIMESTAMP as etl_time
from dwd_record_kafka a
left join dwd_record_kafka aaa on (aaa.loan_ind=0 and aaa.coll_account=a.pay_account and aaa.trans_amount=a.trans_amount and aaa.trans_time=a.trans_time)
left join customer_field_value FOR SYSTEM_TIME AS OF a.proc_time AS b on (b.field_name_id = 17 AND b.del_flag = 0 and b.field_value = aaa.pay_object)
where a.status_=1 and a.orders_type=‘OTH’ and a.loan_ind=1
and a.client_company_id is null
and a.client_company_name is null
and aaa.id_ is not null and CHAR_LENGTH(aaa.id_)>1
and aaa.pay_object is not null and CHAR_LENGTH(aaa.pay_object)>1
and aaa.update_time between a.update_time - INTERVAL ‘3’ SECOND and a.update_time
and b.customer_id is not null and b.customer_id >0
;

您好,我在Dinky+FlinkSQL+StarRocks上也遇到了和您一样的问题,请问您解决了吗,可以请教下解决方案吗,谢谢?