flink-connector-starrocks 导出数据异常

环境信息
starrocks 版本:StarRocks-2.3.0-rc03
flink 版本:1.14.2 scale 2.11
flink-connector-starrocks 1.2.3_flink-1.14_2.11

我们有一张表,结构如下
CREATE TABLE middle_ti5kA8f (
id bigint(20) NOT NULL COMMENT “主键”,
metric_count bigint(20) NULL COMMENT “销售量”,
data_date_str varchar(1000) NOT NULL COMMENT “指标日期(分区字段)”,
data_time datetime NOT NULL COMMENT “指标时间”,
dim_product_id int(11) NULL COMMENT “商品ID”,
dim_area_province_code int(11) NULL COMMENT “省代码”,
dim_area_city_code int(11) NULL COMMENT “城市代码”,
dim_area_area_code int(11) NULL COMMENT “区域代码”
) ENGINE=OLAP
DUPLICATE KEY(id)
COMMENT “OLAP”
DISTRIBUTED BY HASH(data_date_str) BUCKETS 2
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
);

我们的需求是,通过 flink 导出这张表,代码如下
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamStatementSet statementSet = tableEnv.createStatementSet();
tableEnv.executeSql(
“CREATE TABLE middle_ti5kA8f_0 (\n” +
" id BIGINT,\n" +
" metric_count BIGINT,\n" +
" data_date_str VARCHAR,\n" +
" data_time TIMESTAMP,\n" +
" dim_product_id INTEGER,\n" +
" dim_area_province_code INTEGER,\n" +
" dim_area_city_code INTEGER,\n" +
" dim_area_area_code INTEGER\n" +
“) WITH (\n” +
" ‘connector’ = ‘starrocks’,\n" +
" ‘scan-url’ = ‘10.201.2.218:8030’,\n" +
" ‘jdbc-url’ = ‘jdbc:mysql://10.201.2.218:9030’,\n" +
" ‘username’ = ‘root’,\n" +
" ‘password’ = ‘Deepexi!@#2021’,\n" +
" ‘database-name’ = ‘datasense_232fae86dd9046938b901be15d920807_default’,\n" +
" ‘table-name’ = ‘middle_ti5kA8f’,\n" +
" ‘scan.connect.timeout-ms’ = ‘20000’,\n" +
" ‘scan.params.keep-alive-min’ = ‘60’,\n" +
" ‘scan.params.query-timeout-s’ = ‘3600’\n" +
“)”);
tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0 where data_date_str = ‘2022-10-09’”)).print();
statementSet.attachAsDataStream();
env.execute(“商品销售量-分区域_copy”);

会报以下异常
Caused by: java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryStringData cannot be cast to org.apache.flink.table.data.TimestampData
at org.apache.flink.table.data.GenericRowData.getTimestamp(GenericRowData.java:179)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$39385f9c$1(RowData.java:260)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.lambda$run$1(StarRocksDynamicSourceFunction.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.run(StarRocksDynamicSourceFunction.java:156)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

如果将代码做如下修改
tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0 where data_date_str = ‘2022-10-09’”)).print();
改为
tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0”)).print();
即去掉查询条件

程序执行正常。

经过排查,是因为connector 代码中,该方法出现问题。
com.starrocks.connector.flink.row.source.StarRocksSourceFlinkRows#genFlinkRows

connector 从 BE 获取数据时,返回的字段顺序问题导致。

这个问题该如何解决?

  1. 已知问题,已经提了PR修复 https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/122 , 最近发新版connector会包含
  2. 临时可以先基于main分支打个包用一下 https://github.com/StarRocks/starrocks-connector-for-apache-flink

main 分支是 flink 1.15 的,我基于 main 分支修改 pom 依赖后,有些类找不到

先到starrocks-connector-for-apache-flink/starrocks-stream-load-sdkmvn install一下

用了 main 分支的代码,修改为 flink 1.14.2 。问题依旧存在

补充:

这个问题解决了是吧?

你的意思是说fieldVectorssize应该是7吗?

没解决,你看下截图。 flink 定义临时表是 7 个字段,而且 在下面进行 SQL 查询时,用的是 select *

但是这个 fieldVectors 的个数是 6

好的,我本地复现一下

@Loong 大佬 可以帮忙看看嘛?比较急 多谢啦

@U_1660533790308_5109

tableEnv.toDataStream(tableEnv.sqlQuery(“select * from middle_ti5kA8f_0 where data_date_str = ‘2022-10-09’”)).print();

补充一个,如果使用 where 条件的话,be 没有返回字段名。

即这个 FieldVector 的 e.getField().getName() 返回空

你可以先试下把StarRocksDynamicTableSource 对这几个接口的支持去掉SupportsLimitPushDown SupportsFilterPushDownSupportsProjectionPushDown去掉,看看能不能绕过去,排查问题还需要花点时间

不实现 SupportsProjectionPushDown 接口可行,多谢。
我们先绕过去

好的,我会继续定位一下这个问题

解决了吗?什么时候会发版本

你用最新的connector 1.2.4版本试过吗?我用你的这个例子本地跑是没问题的了

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        StreamStatementSet statementSet = tableEnv.createStatementSet();
        tableEnv.executeSql(
                "CREATE TABLE middle_ti5kA8f_0 (\n" +
                        " id BIGINT,\n" +
                        " metric_count BIGINT,\n" +
                        " data_date_str VARCHAR,\n" +
                        " data_time TIMESTAMP,\n" +
                        " dim_product_id INTEGER,\n" +
                        " dim_area_province_code INTEGER,\n" +
                        " dim_area_city_code INTEGER,\n" +
                        " dim_area_area_code INTEGER\n" +
                        ") WITH (\n" +
                        " 'connector' = 'starrocks',\n" +
                        " 'scan-url' = '10.201.2.218:8030',\n" +
                        " 'jdbc-url' = 'jdbc:mysql://10.201.2.218:9030',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '',\n" +
                        " 'database-name' = 'datasense_232fae86dd9046938b901be15d920807_default',\n" +
                        " 'table-name' = 'middle_ti5kA8f',\n" +
                        " 'scan.connect.timeout-ms' = '20000',\n" +
                        " 'scan.params.keep-alive-min' = '60',\n" +
                        " 'scan.params.query-timeout-s' = '3600'\n" +
                        ")");
        tableEnv.toDataStream(tableEnv.sqlQuery("select * from middle_ti5kA8f_0 where data_date_str = '2022-10-09'")).print();
        statementSet.attachAsDataStream();
        env.execute("商品销售量-分区域_copy");
    }
}

使用

<dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>1.2.4_flink-1.14_2.11</version>
        </dependency>

问题依旧存在