【详述】flink任务通过lookup join starrocks的表,填充维度。查看log,发现没有进行谓词下推,导致内存不足。运行中报内存不足错误。
2023-06-24 22:11:58,392 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Finished building RocksDB keyed state-backend at /hadoop/yarn/local/usercache/root/appcache/application_1674975318075_66463/flink-io-e2bb8076-c33e-4ee7-853a-27a9ae7e93a8/job_e8562f41d44a49d82332d09cd554c01e_op_StreamingJoinOperator_8d15ade8b1e4809c335f24f6b7dba4a0__1_1__uuid_dde04e0e-2a6f-4483-b0d5-f8672761335a.
2023-06-24 22:11:58,436 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
2023-06-24 22:11:58,436 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
2023-06-24 22:11:58,437 INFO com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction [] - Populating lookup join cache
2023-06-24 22:11:58,437 INFO com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction [] - LookUpFunction SQL [select * from customer_field_value
]
2023-06-24 22:11:58,437 INFO com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor [] - query sql [select * from customer_field_value
]
2023-06-24 22:11:58,480 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - query data from be by using be-ip
2023-06-24 22:11:58,480 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - query data from be by using be-ip
2023-06-24 22:11:58,480 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - query data from be by using be-ip
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.mem_limit 1073741824 B
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.keep-alive-min 10 min
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.mem_limit 1073741824 B
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.keep-alive-min 10 min
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.mem_limit 1073741824 B
2023-06-24 22:11:58,492 INFO com.starrocks.connector.flink.table.source.StarRocksSourceBeReader [] - open Scan params.keep-alive-min 10 min
2023-06-24 22:11:59,140 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed RocksDB State Backend. Cleaning up RocksDB working directory /hadoop/yarn/local/usercache/root/appcache/application_1674975318075_66463/flink-io-e2bb8076-c33e-4ee7-853a-27a9ae7e93a8/job_e8562f41d44a49d82332d09cd554c01e_op_StreamingJoinOperator_8d15ade8b1e4809c335f24f6b7dba4a0__1_1__uuid_dde04e0e-2a6f-4483-b0d5-f8672761335a.
2023-06-24 22:11:59,150 WARN org.apache.flink.runtime.taskmanager.Task [] - Join(joinType=[InnerJoin], where=[((coll_account = pay_account) AND (trans_amount0 = trans_amount) AND (trans_time0 = trans_time) AND (update_time0 >= (update_time - 3000:INTERVAL SECOND)) AND (update_time0 <= update_time))], select=[id_, trans_time, trans_amount, pay_account, update_time, trans_time0, trans_amount0, pay_object, coll_account, update_time0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id_, CAST(pay_object) AS pay_object00]) -> LookupJoin(table=[default_catalog.default_database.customer_field_value], joinType=[InnerJoin], async=[false], lookup=[field_value=pay_object00, field_name_id=17], where=[((field_name_id = 17:BIGINT) AND (CAST(del_flag) = 0) AND (customer_id > 0))], select=[id_, pay_object00, field_value, customer_id]) -> Calc(select=[id_, CAST(field_value) AS client_company_name, CAST(customer_id) AS client_company_id, UTF-16LE’prod_dwd_flow_record_cmbc_client_update’:VARCHAR(255) CHARACTER SET “UTF-16LE” AS etl_channel_name, CAST(()) AS etl_time]) -> Sink: Sink(table=[default_catalog.default_database.dwd_client_update], fields=[id, client_company_name, client_company_id, etl_channel_name, etl_time]) (1/1)#109118 (5c887a6fbf3a22a1d4a8854fe52142a4) switched from RUNNING to FAILED.
com.starrocks.shade.org.apache.arrow.memory.OutOfMemoryException: com.starrocks.shade.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
at sun.reflect.GeneratedConstructorAccessor76.newInstance(Unknown Source) ~[?:?]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_144]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_144]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) ~[?:1.8.0_144]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_144]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_144]
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714) ~[?:1.8.0_144]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_144]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_144]
at com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction.reloadData(StarRocksDynamicLookupFunction.java:109) ~[flink-connector-starrocks-1.2.1_flink-1.12_2.11.jar:?]
at com.starrocks.connector.flink.table.source.StarRocksDynamicLookupFunction.eval(StarRocksDynamicLookupFunction.java:68) ~[flink-connector-starrocks-1.2.1_flink-1.12_2.11.jar:?]
at LookupFunction$153.flatMap(Unknown Source) ~[?:?]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at StreamExecCalc$149.processElement(Unknown Source) ~[?:?]
【背景】基于flink-connector-starrocks-1.2.1_flink-1.12_2.11.jar ,实现flink任务的维表关联
【业务影响】flink任务运行失败,业务逻辑跑不起来
【StarRocks版本】2.5.4
【集群规模、机器信息】fe:8C16G 3 , be:8C32G 3 ,BE/FE单独部署
【联系方式】StarRocks社区群8
【附件】
- Profile信息,如何获取profile
- 并行度:show variables like ‘%parallel_fragment_exec_instance_num%’;show variables like ‘%pipeline_dop%’;
|Variable_name|Value|
|—|—|
|parallel_fragment_exec_instance_num|1|
Variable_name | Value |
---|---|
pipeline_dop | 0 |
- pipeline是否开启:show variables like ‘%pipeline%’;
|Variable_name|Value|
|—|—|
|enable_pipeline_engine|true|
|enable_pipeline_query_statistic|true|
|pipeline_dop|0|
|pipeline_profile_level|1|