【详述】问题详细描述
【导入/导出方式】starrocks-connector-for-apache-flink table api方式导入主键模型报错
【背景】
【业务影响】
【StarRocks版本】例如:2.24
【集群规模】例如:2fe(1 follower+1observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【表模型】例如:主键模型
【导入或者导出方式】例如:Flink
【附件】
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Primary keys not defined in the sink TableSchema
.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.IllegalArgumentException: Primary keys not defined in the sink TableSchema
.
at com.starrocks.connector.flink.manager.StarRocksSinkManager.validateTableStructure(StarRocksSinkManager.java:424)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.init(StarRocksSinkManager.java:127)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.(StarRocksSinkManager.java:122)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.(StarRocksDynamicSinkFunction.java:73)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSink.getSinkRuntimeProvider(StarRocksDynamicTableSink.java:45)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:118)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:130)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.jav
表模型:
Table: ods_haha_instance
Create Table: CREATE TABLE ods_haha_instance
(
ID
bigint(20) NOT NULL COMMENT “”,
PROD_INST_ID
bigint(20) NOT NULL COMMENT “”,
ACC_NBR
varchar(50) NOT NULL COMMENT “”,
CC_OBJECT_ID
bigint(20) NULL COMMENT “”,
POLICY_COUNTER_ID
varchar(50) NULL COMMENT “”,
POLICY_COUNTER_STATUS
varchar(50) NULL COMMENT “”,
LATN_ID
bigint(20) NULL COMMENT “”,
FLAG
varchar(50) NULL COMMENT “”,
STATUS_DATE
datetime NULL COMMENT “”,
BATCH_ID
varchar(50) NULL COMMENT “”,
DEAL_STATUS
varchar(50) NULL COMMENT “”,
DEAL_TIME
datetime NULL COMMENT “”,
BILLING_CYCLE_ID
varchar(50) NULL COMMENT “”,
EVENT_ID
varchar(50) NULL COMMENT “”,
action_data_time
datetime NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY(ID
, PROD_INST_ID
, ACC_NBR
)
COMMENT “OLAP”
DISTRIBUTED BY HASH(PROD_INST_ID
) BUCKETS 10
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
);
sink table 配置
CREATE TABLE ods_haha_instance (
ID bigint ,
CC_OBJECT_ID bigint ,
ACC_NBR string,
PROD_INST_ID bigint ,
POLICY_COUNTER_ID string,
POLICY_COUNTER_STATUS string ,
LATN_ID bigint ,
FLAG string ,
STATUS_DATE string,
BATCH_ID string ,
DEAL_STATUS string ,
DEAL_TIME string,
BILLING_CYCLE_ID string,
EVENT_ID string,
action_data_time timestamp
) WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘jdbc:mysql://xxx’,
‘load-url’=’ ',
‘database-name’ = ‘xxx’,
‘table-name’ = ‘ods_haha_instance’,
‘username’ = ‘xxx’,
‘password’ = ‘xxx’,
‘sink.buffer-flush.max-rows’ = ‘1000000’,
‘sink.buffer-flush.max-bytes’ = ‘300000000’,
‘sink.buffer-flush.interval-ms’ = ‘10000’,
‘sink.properties.format’=‘JSON’,
‘sink.properties.strip_outer_array’=‘true’,
‘sink.properties.partial_update’ = ‘true’,
‘sink.properties.columns’ = ‘ID,PROD_INST_ID,ACC_NBR’,
–‘sink.properties.column_separator’ = ‘\^’,
–‘sink.properties.row_delimiter’ = ‘\x02’,
‘sink.max-retries’ = ‘3’
);