【详述】使用flink将StarRocks的一张表数据写入到StarRocks另一张表数据时,如果两个或多个任务并发执行时出现偶发性异常,执行的都是insert into select语句
【背景】使用flink抽数
【是否存算分离】不是存算分离
【StarRocks版本】2.5.20
【connector版本】flink-connector 1.2.9
【集群规模】3fe(1 follower+2observer)+3be(fe与be分开部署)
【联系方式】社区群20-Sanborn
【附件】
如下图,有两个任务:一是从ods库的EAST_0202_NBKMDZB表抽数到out库的EAST_0202_NBKMDZB表,二是从ods库的EAST_0204_YHZHXXB表抽数到out库的EAST_0204_YHZHXXB表,这俩flink任务几乎是同一时间点(并发)加工,出现以下异常(偶发):
提供下完整的详细日志信息
2024-05-29 09:00:17,369 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成
CREATE CATALOG ODS_CW
WITH ( ‘type’=‘generic_in_memory’ );
CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;
CREATE DATABASE IF NOT EXISTS ODS_CW
.ODS_CW_ODS_STARROCKS_DB
;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ( LSH
STRING, BXJGDM
STRING, BXJGMC
STRING, CJRQ
STRING, BXZTBH
STRING, BXZTMC
STRING, FZJGDM
STRING, GSQJ
STRING, JZRQ1
STRING, JZPZH
STRING, ZZKJKMBH
STRING, ZZKJKMMC
STRING, PZLY
STRING, ZY1
STRING, BZ
STRING, JYBJF
DECIMAL(18,2), JYBDF
DECIMAL(18,2), BWBJF
DECIMAL(18,2), BWBDF
DECIMAL(18,2), OP_TYPE
STRING, OP_TIME
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);
set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_财务凭证信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_CWPZGJB]-[e76041b3b57f44c3baa8422515387d19]-[Flink SQL 数据加工引擎];
set table.dynamic-table-options.enabled=true;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG ( MD5
STRING, BWBDF
DECIMAL(18,2), BWBJF
DECIMAL(18,2), BXJGDM
STRING, BXJGMC
STRING, BXZTBH
STRING, BXZTMC
STRING, BZ
STRING, CJRQ
STRING, FZJGDM
STRING, GSQJ
STRING, JYBDF
DECIMAL(18,2), JYBJF
DECIMAL(18,2), JZPZH
STRING, JZRQ1
STRING, LSH
STRING, PZLY
STRING, ZY1
STRING, ZZKJKMBH
STRING, ZZKJKMMC
STRING, OP_TYPE
STRING, OP_TIME
TIMESTAMP, ETLDATE
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);
INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG
SELECT
MD5(ConcatObjUDF(BWBDF
,BWBJF
,BXJGDM
,BXJGMC
,BXZTBH
,BXZTMC
,BZ
,CJRQ
,FZJGDM
,GSQJ
,JYBDF
,JYBJF
,JZPZH
,JZRQ1
,LSH
,PZLY
,ZY1
,ZZKJKMBH
,ZZKJKMMC
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BWBDF , BWBJF , BXJGDM , BXJGMC , BXZTBH , BXZTMC , BZ , CJRQ , FZJGDM , GSQJ , JYBDF , JYBJF , JZPZH , JZRQ1 , LSH , PZLY , ZY1 , ZZKJKMBH , ZZKJKMMC , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ;
2024-05-29 09:00:17,372 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成
CREATE CATALOG ODS_CW
WITH ( ‘type’=‘generic_in_memory’ );
CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;
CREATE DATABASE IF NOT EXISTS ODS_CW
.ODS_CW_ODS_STARROCKS_DB
;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ( LSH
STRING, BXJGDM
STRING, BXJGMC
STRING, CJRQ
STRING, FZJGDM
STRING, YHMC
STRING, YHZH
STRING, YHZHMC
STRING, YHZHLX
STRING, YHZHZT
STRING, BZ
STRING, KHRQ
STRING, XHRQ
STRING, JWBZ
STRING, OP_TYPE
STRING, OP_TIME
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);
set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_银行账户信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_YHZHGJ]-[c2e3979608a84e3ea888b4d0d331f612]-[Flink SQL 数据加工引擎];
set table.dynamic-table-options.enabled=true;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ ( MD5
STRING, BXJGDM
STRING, BXJGMC
STRING, BZ
STRING, CJRQ
STRING, FZJGDM
STRING, JWBZ
STRING, KHRQ
STRING, LSH
STRING, XHRQ
STRING, YHMC
STRING, YHZH
STRING, YHZHLX
STRING, YHZHMC
STRING, YHZHZT
STRING, OP_TYPE
STRING, OP_TIME
TIMESTAMP, ETLDATE
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);
INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ
SELECT
MD5(ConcatObjUDF(BXJGDM
,BXJGMC
,BZ
,CJRQ
,FZJGDM
,JWBZ
,KHRQ
,LSH
,XHRQ
,YHMC
,YHZH
,YHZHLX
,YHZHMC
,YHZHZT
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BXJGDM , BXJGMC , BZ , CJRQ , FZJGDM , JWBZ , KHRQ , LSH , XHRQ , YHMC , YHZH , YHZHLX , YHZHMC , YHZHZT , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ;
2024-05-29 09:00:17,408 INFO com.my.flink.MainLancher [] - 执行异常:java.lang.RuntimeException: Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryPlan(StarRocksQueryPlanVisitor.java:134)
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryInfo(StarRocksQueryPlanVisitor.java:64)
at com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc.getQueryInfo(StarRocksSourceCommonFunc.java:206)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.(StarRocksDynamicSourceFunction.java:89)
at com.starrocks.connector.flink.table.source.StarRocksDynamicTableSource.getScanRuntimeProvider(StarRocksDynamicTableSource.java:66)
at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalTableSourceScanRule.matches(BatchPhysicalTableSourceScanRule.scala:44)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:278)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:262)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1090)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1386)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:498)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:314)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
at com.my.flink.lancher.AbstractLancher.runSqlList(AbstractLancher.java:134)
at com.my.flink.lancher.AbstractLancher.exec(AbstractLancher.java:81)
at com.my.flink.MainLancher.main(MainLancher.java:46)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.lang.Thread.run(Thread.java:829)
2024-05-29 09:00:17,408 ERROR com.my.flink.MainLancher [] - Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}
日志已粘贴
麻烦发下flink client的完整日志
日志如下啊:
2024-05-29 09:00:17,369 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成
CREATE CATALOG ODS_CW
WITH ( ‘type’=‘generic_in_memory’ );
CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;
CREATE DATABASE IF NOT EXISTS ODS_CW
. ODS_CW_ODS_STARROCKS_DB
;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ( LSH
STRING, BXJGDM
STRING, BXJGMC
STRING, CJRQ
STRING, BXZTBH
STRING, BXZTMC
STRING, FZJGDM
STRING, GSQJ
STRING, JZRQ1
STRING, JZPZH
STRING, ZZKJKMBH
STRING, ZZKJKMMC
STRING, PZLY
STRING, ZY1
STRING, BZ
STRING, JYBJF
DECIMAL(18,2), JYBDF
DECIMAL(18,2), BWBJF
DECIMAL(18,2), BWBDF
DECIMAL(18,2), OP_TYPE
STRING, OP_TIME
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);
set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_财务凭证信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0203_CWPZXXB_LOG_CWPZGJB]-[e76041b3b57f44c3baa8422515387d19]-[Flink SQL 数据加工引擎];
set table.dynamic-table-options.enabled=true;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG ( MD5
STRING, BWBDF
DECIMAL(18,2), BWBJF
DECIMAL(18,2), BXJGDM
STRING, BXJGMC
STRING, BXZTBH
STRING, BXZTMC
STRING, BZ
STRING, CJRQ
STRING, FZJGDM
STRING, GSQJ
STRING, JYBDF
DECIMAL(18,2), JYBJF
DECIMAL(18,2), JZPZH
STRING, JZRQ1
STRING, LSH
STRING, PZLY
STRING, ZY1
STRING, ZZKJKMBH
STRING, ZZKJKMMC
STRING, OP_TYPE
STRING, OP_TIME
TIMESTAMP, ETLDATE
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0203_CWPZXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);
INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_LOG_0X01JFK3OOG
SELECT
MD5(ConcatObjUDF(BWBDF
,BWBJF
,BXJGDM
,BXJGMC
,BXZTBH
,BXZTMC
,BZ
,CJRQ
,FZJGDM
,GSQJ
,JYBDF
,JYBJF
,JZPZH
,JZRQ1
,LSH
,PZLY
,ZY1
,ZZKJKMBH
,ZZKJKMMC
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BWBDF , BWBJF , BXJGDM , BXJGMC , BXZTBH , BXZTMC , BZ , CJRQ , FZJGDM , GSQJ , JYBDF , JYBJF , JZPZH , JZRQ1 , LSH , PZLY , ZY1 , ZZKJKMBH , ZZKJKMMC , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0203_CWPZXXB_0X01JFK3OOG ;
2024-05-29 09:00:17,372 INFO com.my.flink.lancher.BatchLancher [] - SQL:-- 引擎: 脚本由Flink SQL 数据加工引擎[FLINKSQL-DATAPROCESS]合成
CREATE CATALOG ODS_CW
WITH ( ‘type’=‘generic_in_memory’ );
CREATE DATABASE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB;
CREATE DATABASE IF NOT EXISTS ODS_CW
. ODS_CW_ODS_STARROCKS_DB
;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ( LSH
STRING, BXJGDM
STRING, BXJGMC
STRING, CJRQ
STRING, FZJGDM
STRING, YHMC
STRING, YHZH
STRING, YHZHMC
STRING, YHZHLX
STRING, YHZHZT
STRING, BZ
STRING, KHRQ
STRING, XHRQ
STRING, JWBZ
STRING, OP_TYPE
STRING, OP_TIME
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘scan-url’ = ‘127.0.0.1:8030’,‘connector’ =‘starrocks’);
set pipeline.name=DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_银行账户信息轨迹表-[DATAPROCESS_ODS_CW_ODS_STARROCKS_DB_EAST_0204_YHZHXXB_LOG_YHZHGJ]-[c2e3979608a84e3ea888b4d0d331f612]-[Flink SQL 数据加工引擎];
set table.dynamic-table-options.enabled=true;
CREATE TABLE IF NOT EXISTS ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ ( MD5
STRING, BXJGDM
STRING, BXJGMC
STRING, BZ
STRING, CJRQ
STRING, FZJGDM
STRING, JWBZ
STRING, KHRQ
STRING, LSH
STRING, XHRQ
STRING, YHMC
STRING, YHZH
STRING, YHZHLX
STRING, YHZHMC
STRING, YHZHZT
STRING, OP_TYPE
STRING, OP_TIME
TIMESTAMP, ETLDATE
TIMESTAMP) WITH (‘jdbc-url’=‘jdbc:mysql:loadbalance://127.0.0.1:9030/ods?useTimezone=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull’,‘table-name’ = ‘EAST_0204_YHZHXXB_LOG’,‘username’ = ‘sjptuser’,‘password’ = ‘xxxxxxxxxx’,‘database-name’ = ‘ods’,‘load-url’ = ‘127.0.0.1:8030’,‘sink.properties.column_separator’ = ‘\x01’,‘sink.properties.row_delimiter’ = ‘\x02’,‘sink.buffer-flush.max-rows’ = ‘1000000’,‘sink.buffer-flush.max-bytes’ = ‘300000000’,‘sink.buffer-flush.interval-ms’ = ‘30000’,‘sink.max-retries’ = ‘3’,‘connector’ =‘starrocks’);
INSERT INTO
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_LOG_0X0XRV49CTJ
SELECT
MD5(ConcatObjUDF(BXJGDM
,BXJGMC
,BZ
,CJRQ
,FZJGDM
,JWBZ
,KHRQ
,LSH
,XHRQ
,YHMC
,YHZH
,YHZHLX
,YHZHMC
,YHZHZT
,‘c’
,LOCALTIMESTAMP
)
) AS MD5
, BXJGDM , BXJGMC , BZ , CJRQ , FZJGDM , JWBZ , KHRQ , LSH , XHRQ , YHMC , YHZH , YHZHLX , YHZHMC , YHZHZT , OP_TYPE
, OP_TIME
, LOCALTIMESTAMP AS ETLDATE
FROM
ODS_CW.ODS_CW_ODS_STARROCKS_DB.EAST_0204_YHZHXXB_0X0XRV49CTJ ;
2024-05-29 09:00:17,408 INFO com.my.flink.MainLancher [] - 执行异常:java.lang.RuntimeException: Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryPlan(StarRocksQueryPlanVisitor.java:134)
at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryInfo(StarRocksQueryPlanVisitor.java:64)
at com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc.getQueryInfo(StarRocksSourceCommonFunc.java:206)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.(StarRocksDynamicSourceFunction.java:89)
at com.starrocks.connector.flink.table.source.StarRocksDynamicTableSource.getScanRuntimeProvider(StarRocksDynamicTableSource.java:66)
at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalTableSourceScanRule.matches(BatchPhysicalTableSourceScanRule.scala:44)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:278)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:262)
at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1090)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1386)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:274)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1270)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:498)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:314)
at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1805)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109)
at com.my.flink.lancher.AbstractLancher.runSqlList(AbstractLancher.java:134)
at com.my.flink.lancher.AbstractLancher.exec(AbstractLancher.java:81)
at com.my.flink.MainLancher.main(MainLancher.java:46)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.lang.Thread.run(Thread.java:829)
2024-05-29 09:00:17,408 ERROR com.my.flink.MainLancher [] - Request of get query plan failed with code 400 {“exception”:“requested database and table must consistent with sql: request [ ods.EAST_0203_CWPZXXB]and sql [ods.EAST_0204_YHZHXXB]”,“status”:400}