flink SQL从starrocks一张表写入到另一张表并发执行偶发性报错

【详述】使用flink将StarRocks的一张表数据写入到StarRocks另一张表数据时,如果两个或多个任务并发执行时出现偶发性异常,执行的都是insert into select语句
【背景】使用flink抽数
【是否存算分离】不是存算分离
【StarRocks版本】2.5.20
【connector版本】flink-connector 1.2.9
【集群规模】3fe(1 follower+2observer)+3be(fe与be分开部署)
【联系方式】社区群20-Sanborn
【附件】
如下图,有两个任务:一是从ods库的EAST_0202_NBKMDZB表抽数到out库的EAST_0202_NBKMDZB表,二是从ods库的EAST_0204_YHZHXXB表抽数到out库的EAST_0204_YHZHXXB表,这俩flink任务几乎是同一时间点(并发)加工,出现以下异常(偶发):

提供下完整的详细日志信息

2024-05-29 09:00:17,369 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成

CREATE CATALOG ODS_CW WITH ( ‘type’=‘generic_in_memory’ );

CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;

CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ( LSH STRING, BXJGDM STRING, BXJGMC STRING, CJRQ STRING, BXZTBH STRING, BXZTMC STRING, FZJGDM STRING, GSQJ STRING, JZRQ1 STRING, JZPZH STRING, ZZKJKMBH STRING, ZZKJKMMC STRING, PZLY STRING, ZY1 STRING, BZ STRING, JYBJF DECIMAL(18,2), JYBDF DECIMAL(18,2), BWBJF DECIMAL(18,2), BWBDF DECIMAL(18,2), OP_TYPE STRING, OP_TIME TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);

set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_财务凭证信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_CWPZGJB]-[e76041b3b57f44c3baa8422515387d19]-[Flink SQL 数据加工引擎];

set table.dynamic-table-options.enabled=true;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG ( MD5 STRING, BWBDF DECIMAL(18,2), BWBJF DECIMAL(18,2), BXJGDM STRING, BXJGMC STRING, BXZTBH STRING, BXZTMC STRING, BZ STRING, CJRQ STRING, FZJGDM STRING, GSQJ STRING, JYBDF DECIMAL(18,2), JYBJF DECIMAL(18,2), JZPZH STRING, JZRQ1 STRING, LSH STRING, PZLY STRING, ZY1 STRING, ZZKJKMBH STRING, ZZKJKMMC STRING, OP_TYPE STRING, OP_TIME TIMESTAMP, ETLDATE TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);

INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG
SELECT
MD5(ConcatObjUDF(BWBDF
,BWBJF
,BXJGDM
,BXJGMC
,BXZTBH
,BXZTMC
,BZ
,CJRQ
,FZJGDM
,GSQJ
,JYBDF
,JYBJF
,JZPZH
,JZRQ1
,LSH
,PZLY
,ZY1
,ZZKJKMBH
,ZZKJKMMC
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BWBDF , BWBJF , BXJGDM , BXJGMC , BXZTBH , BXZTMC , BZ , CJRQ , FZJGDM , GSQJ , JYBDF , JYBJF , JZPZH , JZRQ1 , LSH , PZLY , ZY1 , ZZKJKMBH , ZZKJKMMC , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ;

2024-05-29 09:00:17,372 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成

CREATE CATALOG ODS_CW WITH ( ‘type’=‘generic_in_memory’ );

CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;

CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ( LSH STRING, BXJGDM STRING, BXJGMC STRING, CJRQ STRING, FZJGDM STRING, YHMC STRING, YHZH STRING, YHZHMC STRING, YHZHLX STRING, YHZHZT STRING, BZ STRING, KHRQ STRING, XHRQ STRING, JWBZ STRING, OP_TYPE STRING, OP_TIME TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);

set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_银行账户信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_YHZHGJ]-[c2e3979608a84e3ea888b4d0d331f612]-[Flink SQL 数据加工引擎];

set table.dynamic-table-options.enabled=true;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ ( MD5 STRING, BXJGDM STRING, BXJGMC STRING, BZ STRING, CJRQ STRING, FZJGDM STRING, JWBZ STRING, KHRQ STRING, LSH STRING, XHRQ STRING, YHMC STRING, YHZH STRING, YHZHLX STRING, YHZHMC STRING, YHZHZT STRING, OP_TYPE STRING, OP_TIME TIMESTAMP, ETLDATE TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);

INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ
SELECT
MD5(ConcatObjUDF(BXJGDM
,BXJGMC
,BZ
,CJRQ
,FZJGDM
,JWBZ
,KHRQ
,LSH
,XHRQ
,YHMC
,YHZH
,YHZHLX
,YHZHMC
,YHZHZT
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BXJGDM , BXJGMC , BZ , CJRQ , FZJGDM , JWBZ , KHRQ , LSH , XHRQ , YHMC , YHZH , YHZHLX , YHZHMC , YHZHZT , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ;

2024-05-29 09:00:17,408 INFO com.my.flink.MainLancher [] - 执行异常:java.lang.RuntimeException: Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryPlan(StarRocksQueryPlanVisitor.java:134)
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryInfo(StarRocksQueryPlanVisitor.java:64)
at com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc.getQueryInfo(StarRocksSourceCommonFunc.java:206)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.(StarRocksDynamicSourceFunction.java:89)
at com.starrocks.connector.flink.table.source.StarRocksDynamicTableSource.getScanRuntimeProvider(StarRocksDynamicTableSource.java:66)
at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalTableSourceScanRule.matches(BatchPhysicalTableSourceScanRule.scala:44)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:278)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:262)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1090)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1386)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:498)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:314)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
at com.my.flink.lancher.AbstractLancher.runSqlList(AbstractLancher.java:134)
at com.my.flink.lancher.AbstractLancher.exec(AbstractLancher.java:81)
at com.my.flink.MainLancher.main(MainLancher.java:46)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.lang.Thread.run(Thread.java:829)

2024-05-29 09:00:17,408 ERROR com.my.flink.MainLancher [] - Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}

日志已粘贴

麻烦发下flink client的完整日志

日志如下啊:
2024-05-29 09:00:17,369 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成

CREATE CATALOG ODS_CW WITH ( ‘type’=‘generic_in_memory’ );

CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;

CREATE DATABASE IF NOT EXISTS ODS_CW . ODS_CW_ODS_STARROCKS_DB ;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ( LSH STRING, BXJGDM STRING, BXJGMC STRING, CJRQ STRING, BXZTBH STRING, BXZTMC STRING, FZJGDM STRING, GSQJ STRING, JZRQ1 STRING, JZPZH STRING, ZZKJKMBH STRING, ZZKJKMMC STRING, PZLY STRING, ZY1 STRING, BZ STRING, JYBJF DECIMAL(18,2), JYBDF DECIMAL(18,2), BWBJF DECIMAL(18,2), BWBDF DECIMAL(18,2), OP_TYPE STRING, OP_TIME TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);

set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_财务凭证信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_CWPZGJB]-[e76041b3b57f44c3baa8422515387d19]-[Flink SQL 数据加工引擎];

set table.dynamic-table-options.enabled=true;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG ( MD5 STRING, BWBDF DECIMAL(18,2), BWBJF DECIMAL(18,2), BXJGDM STRING, BXJGMC STRING, BXZTBH STRING, BXZTMC STRING, BZ STRING, CJRQ STRING, FZJGDM STRING, GSQJ STRING, JYBDF DECIMAL(18,2), JYBJF DECIMAL(18,2), JZPZH STRING, JZRQ1 STRING, LSH STRING, PZLY STRING, ZY1 STRING, ZZKJKMBH STRING, ZZKJKMMC STRING, OP_TYPE STRING, OP_TIME TIMESTAMP, ETLDATE TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);

INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG
SELECT
MD5(ConcatObjUDF(BWBDF
,BWBJF
,BXJGDM
,BXJGMC
,BXZTBH
,BXZTMC
,BZ
,CJRQ
,FZJGDM
,GSQJ
,JYBDF
,JYBJF
,JZPZH
,JZRQ1
,LSH
,PZLY
,ZY1
,ZZKJKMBH
,ZZKJKMMC
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BWBDF , BWBJF , BXJGDM , BXJGMC , BXZTBH , BXZTMC , BZ , CJRQ , FZJGDM , GSQJ , JYBDF , JYBJF , JZPZH , JZRQ1 , LSH , PZLY , ZY1 , ZZKJKMBH , ZZKJKMMC , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ;

2024-05-29 09:00:17,372 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成

CREATE CATALOG ODS_CW WITH ( ‘type’=‘generic_in_memory’ );

CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;

CREATE DATABASE IF NOT EXISTS ODS_CW . ODS_CW_ODS_STARROCKS_DB ;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ( LSH STRING, BXJGDM STRING, BXJGMC STRING, CJRQ STRING, FZJGDM STRING, YHMC STRING, YHZH STRING, YHZHMC STRING, YHZHLX STRING, YHZHZT STRING, BZ STRING, KHRQ STRING, XHRQ STRING, JWBZ STRING, OP_TYPE STRING, OP_TIME TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);

set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_银行账户信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_YHZHGJ]-[c2e3979608a84e3ea888b4d0d331f612]-[Flink SQL 数据加工引擎];

set table.dynamic-table-options.enabled=true;

CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ ( MD5 STRING, BXJGDM STRING, BXJGMC STRING, BZ STRING, CJRQ STRING, FZJGDM STRING, JWBZ STRING, KHRQ STRING, LSH STRING, XHRQ STRING, YHMC STRING, YHZH STRING, YHZHLX STRING, YHZHMC STRING, YHZHZT STRING, OP_TYPE STRING, OP_TIME TIMESTAMP, ETLDATE TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);

INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ
SELECT
MD5(ConcatObjUDF(BXJGDM
,BXJGMC
,BZ
,CJRQ
,FZJGDM
,JWBZ
,KHRQ
,LSH
,XHRQ
,YHMC
,YHZH
,YHZHLX
,YHZHMC
,YHZHZT
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BXJGDM , BXJGMC , BZ , CJRQ , FZJGDM , JWBZ , KHRQ , LSH , XHRQ , YHMC , YHZH , YHZHLX , YHZHMC , YHZHZT , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ;

2024-05-29 09:00:17,408 INFO com.my.flink.MainLancher [] - 执行异常:java.lang.RuntimeException: Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryPlan(StarRocksQueryPlanVisitor.java:134)
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryInfo(StarRocksQueryPlanVisitor.java:64)
at com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc.getQueryInfo(StarRocksSourceCommonFunc.java:206)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.(StarRocksDynamicSourceFunction.java:89)
at com.starrocks.connector.flink.table.source.StarRocksDynamicTableSource.getScanRuntimeProvider(StarRocksDynamicTableSource.java:66)
at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalTableSourceScanRule.matches(BatchPhysicalTableSourceScanRule.scala:44)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:278)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:262)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1090)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1386)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:498)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:314)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
at com.my.flink.lancher.AbstractLancher.runSqlList(AbstractLancher.java:134)
at com.my.flink.lancher.AbstractLancher.exec(AbstractLancher.java:81)
at com.my.flink.MainLancher.main(MainLancher.java:46)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.lang.Thread.run(Thread.java:829)

2024-05-29 09:00:17,408 ERROR com.my.flink.MainLancher [] - Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}