环境信息
starrocks 版本:StarRocks-2.3.0-rc03
flink 版本:1.14.2 scale 2.11
flink-connector-starrocks 1.2.3_flink-1.14_2.11
我们有一张表,结构如下
CREATE TABLE middle_ti5kA8f
(
id
bigint(20) NOT NULL COMMENT “主键”,
metric_count
bigint(20) NULL COMMENT “销售量”,
data_date_str
varchar(1000) NOT NULL COMMENT “指标日期(分区字段)”,
data_time
datetime NOT NULL COMMENT “指标时间”,
dim_product_id
int(11) NULL COMMENT “商品ID”,
dim_area_province_code
int(11) NULL COMMENT “省代码”,
dim_area_city_code
int(11) NULL COMMENT “城市代码”,
dim_area_area_code
int(11) NULL COMMENT “区域代码”
) ENGINE=OLAP
DUPLICATE KEY(id
)
COMMENT “OLAP”
DISTRIBUTED BY HASH(data_date_str
) BUCKETS 2
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
);
我们的需求是,通过 flink 导出这张表,代码如下
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamStatementSet statementSet = tableEnv.createStatementSet();
tableEnv.executeSql(
“CREATE TABLE middle_ti5kA8f_0 (\n” +
" id
BIGINT,\n" +
" metric_count
BIGINT,\n" +
" data_date_str
VARCHAR,\n" +
" data_time
TIMESTAMP,\n" +
" dim_product_id
INTEGER,\n" +
" dim_area_province_code
INTEGER,\n" +
" dim_area_city_code
INTEGER,\n" +
" dim_area_area_code
INTEGER\n" +
“) WITH (\n” +
" ‘connector’ = ‘starrocks’,\n" +
" ‘scan-url’ = ‘10.201.2.218:8030’,\n" +
" ‘jdbc-url’ = ‘jdbc:mysql://10.201.2.218:9030’,\n" +
" ‘username’ = ‘root’,\n" +
" ‘password’ = ‘Deepexi!@#2021’,\n" +
" ‘database-name’ = ‘datasense_232fae86dd9046938b901be15d920807_default’,\n" +
" ‘table-name’ = ‘middle_ti5kA8f’,\n" +
" ‘scan.connect.timeout-ms’ = ‘20000’,\n" +
" ‘scan.params.keep-alive-min’ = ‘60’,\n" +
" ‘scan.params.query-timeout-s’ = ‘3600’\n" +
“)”);
tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0 where data_date_str = ‘2022-10-09’”)).print();
statementSet.attachAsDataStream();
env.execute(“商品销售量-分区域_copy”);
会报以下异常
Caused by: java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryStringData cannot be cast to org.apache.flink.table.data.TimestampData
at org.apache.flink.table.data.GenericRowData.getTimestamp(GenericRowData.java:179)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$39385f9c$1(RowData.java:260)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.lambda$run$1(StarRocksDynamicSourceFunction.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.run(StarRocksDynamicSourceFunction.java:156)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
如果将代码做如下修改
tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0 where data_date_str = ‘2022-10-09’”)).print();
改为
tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0”)).print();
即去掉查询条件
程序执行正常。
经过排查,是因为connector 代码中,该方法出现问题。
com.starrocks.connector.flink.row.source.StarRocksSourceFlinkRows#genFlinkRows
connector 从 BE 获取数据时,返回的字段顺序问题导致。
这个问题该如何解决?