source 读取数据返回乱序,未按照StarRocksSourceOptions设置的scan.columns顺序返回,TableSchema设置了也没用

我在使用connector source读取starrocks数据时,返回的数据乱序

版本信息
starrocks-v2.3 flink-1.14.2 connector-1.2.5_flink-1.14_2.11

传入的StarRocksSourceOptions属性

scan.columns=c_sbm_a457bc598b214148a9e74d212a8b3847,uuid_318bec51528a459483bdbb19b593126b,c_zzyj_f6fa7c9a546140189e0c866cfc6363b9,c_psjssj_f880d4d094014fc9b4319416dab1a38c,c_qdzt_c936ceee39234cd292d763452aa82dd5,c_dwbm_d59ba3be586d40259524320c2aecee87,c_psjd_df485556c82e4298bc6713270e46e933,c_psrwlx_f5217e02a20d4a00a0381b60f9481ec0,c_psrwmc_e64b8bde2af4473da4c85c1601f988c7,c_pskssj_fddc0c49eb8246f49a588e1dd2452e0a,c_pszt_d89e639fb64f488fa1393c1210a57268,c_bhsz_b702504e391d4e93b8373ee35e0ee867,c_gh_f24c13864ce04d7fa2567e85f00cd9a1,c_ljscbz_b6162004a7714eb2a1162dadc4bade8e,c_wjghrwid_fbe9b92fd2db4730b1f95f5e2e3bdb37,c_psfs_c8c49283e11c48ef8616ef1ae8c866d1,c_zdmj_e046f49aa5da404981cc0b3206fd9257,c_dlxlzd_b0f17329f82548e5b06039588132f17e,c_cjrmc_c1ca861d89f949c3a8f0e73618bdf8e2,c_scbs_cd3533fd72b7435a9b13fd1da0848a67,c_sfbc_aa9ce0c4933348bfa0e2cd622b59c7db,c_xldjh_be8a75e5ac794df7b9f894ab8e2bad78,c_jkxlzd_edd46a8a6ab0402ca7d0430806730176,c_xmid_af419689630f44cd8571452321bdbe7b,c_famc_c609e2f7d6d0417d8ad1308fc32a6ea8,c_bdrl_ef7b6694534449f2be3f1ff53ca4e335,c_xgsj_d40fd2fa45ce4049b36ce96d1d98cb9c,c_xgrmc_eeb2d63900a94a8d91cfdd06da724827,c_dydj_ea55ae82d2e14c46b1ba6a77502fa36a

TableSchema为

root
 |-- c_sbm_a457bc598b214148a9e74d212a8b3847: STRING
 |-- uuid_318bec51528a459483bdbb19b593126b: STRING
 |-- c_zzyj_f6fa7c9a546140189e0c866cfc6363b9: STRING
 |-- c_psjssj_f880d4d094014fc9b4319416dab1a38c: STRING
 |-- c_qdzt_c936ceee39234cd292d763452aa82dd5: STRING
 |-- c_dwbm_d59ba3be586d40259524320c2aecee87: STRING
 |-- c_psjd_df485556c82e4298bc6713270e46e933: STRING
 |-- c_psrwlx_f5217e02a20d4a00a0381b60f9481ec0: STRING
 |-- c_psrwmc_e64b8bde2af4473da4c85c1601f988c7: STRING
 |-- c_pskssj_fddc0c49eb8246f49a588e1dd2452e0a: STRING
 |-- c_pszt_d89e639fb64f488fa1393c1210a57268: STRING
 |-- c_bhsz_b702504e391d4e93b8373ee35e0ee867: STRING
 |-- c_gh_f24c13864ce04d7fa2567e85f00cd9a1: STRING
 |-- c_ljscbz_b6162004a7714eb2a1162dadc4bade8e: STRING
 |-- c_wjghrwid_fbe9b92fd2db4730b1f95f5e2e3bdb37: STRING
 |-- c_psfs_c8c49283e11c48ef8616ef1ae8c866d1: STRING
 |-- c_zdmj_e046f49aa5da404981cc0b3206fd9257: STRING
 |-- c_dlxlzd_b0f17329f82548e5b06039588132f17e: STRING
 |-- c_cjrmc_c1ca861d89f949c3a8f0e73618bdf8e2: STRING
 |-- c_scbs_cd3533fd72b7435a9b13fd1da0848a67: STRING
 |-- c_sfbc_aa9ce0c4933348bfa0e2cd622b59c7db: STRING
 |-- c_xldjh_be8a75e5ac794df7b9f894ab8e2bad78: STRING
 |-- c_jkxlzd_edd46a8a6ab0402ca7d0430806730176: STRING
 |-- c_xmid_af419689630f44cd8571452321bdbe7b: STRING
 |-- c_famc_c609e2f7d6d0417d8ad1308fc32a6ea8: STRING
 |-- c_bdrl_ef7b6694534449f2be3f1ff53ca4e335: STRING
 |-- c_xgsj_d40fd2fa45ce4049b36ce96d1d98cb9c: STRING
 |-- c_xgrmc_eeb2d63900a94a8d91cfdd06da724827: STRING
 |-- c_dydj_ea55ae82d2e14c46b1ba6a77502fa36a: STRING

建表语句

CREATE TABLE `collective_20144` (
  `c_sbm_a457bc598b214148a9e74d212a8b3847` varchar(65533) NULL COMMENT "",
  `uuid_318bec51528a459483bdbb19b593126b` varchar(65533) NULL COMMENT "",
  `c_zzyj_f6fa7c9a546140189e0c866cfc6363b9` varchar(65533) NULL COMMENT "",
  `c_psjssj_f880d4d094014fc9b4319416dab1a38c` varchar(65533) NULL COMMENT "",
  `c_qdzt_c936ceee39234cd292d763452aa82dd5` varchar(65533) NULL COMMENT "",
  `c_dwbm_d59ba3be586d40259524320c2aecee87` varchar(65533) NULL COMMENT "",
  `c_sjly_dcb220e987b54c01a43956629a67fe23` varchar(65533) NULL COMMENT "",
  `c_psjd_df485556c82e4298bc6713270e46e933` varchar(65533) NULL COMMENT "",
  `c_psrwlx_f5217e02a20d4a00a0381b60f9481ec0` varchar(65533) NULL COMMENT "",
  `c_psrwmc_e64b8bde2af4473da4c85c1601f988c7` varchar(65533) NULL COMMENT "",
  `c_cjsj_e6527c6fc2f4498389d25192bea190d8` varchar(65533) NULL COMMENT "",
  `c_pskssj_fddc0c49eb8246f49a588e1dd2452e0a` varchar(65533) NULL COMMENT "",
  `c_cjrid_f421b54a6fe543c4a3e53061c78717a5` varchar(65533) NULL COMMENT "",
  `c_lgsbb_d73cc28bc78f4eb591b24c2087dc1dd1` varchar(65533) NULL COMMENT "",
  `c_pszt_d89e639fb64f488fa1393c1210a57268` varchar(65533) NULL COMMENT "",
  `c_xgsj_d201e5e614c2468fa6d5c3cf7a0ddd30` varchar(65533) NULL COMMENT "",
  `c_bhsz_b702504e391d4e93b8373ee35e0ee867` varchar(65533) NULL COMMENT "",
  `c_gh_f24c13864ce04d7fa2567e85f00cd9a1` varchar(65533) NULL COMMENT "",
  `c_jbm_e1394ec9dc044c2f86ac912f8ef7c979` varchar(65533) NULL COMMENT "",
  `c_ljscbz_b6162004a7714eb2a1162dadc4bade8e` varchar(65533) NULL COMMENT "",
  `c_zj_e9dfa249e00642eb9f37d7bdba3806ef` varchar(65533) NULL COMMENT "",
  `c_wjghrwid_fbe9b92fd2db4730b1f95f5e2e3bdb37` varchar(65533) NULL COMMENT "",
  `c_xgrid_d304c4f9323e436a8fd69a1aa6fcaad6` varchar(65533) NULL COMMENT "",
  `c_psfs_c8c49283e11c48ef8616ef1ae8c866d1` varchar(65533) NULL COMMENT "",
  `c_cjsj_f326995b57c14afab0d82a5ab90e612e` varchar(65533) NULL COMMENT "",
  `c_zdmj_e046f49aa5da404981cc0b3206fd9257` varchar(65533) NULL COMMENT "",
  `c_jbm_db4b4b2caf0e4d95b2481e4b7fd74106` varchar(65533) NULL COMMENT "",
  `c_dlxlzd_b0f17329f82548e5b06039588132f17e` varchar(65533) NULL COMMENT "",
  `c_cjrid_e0e7877a88654e9ebe7a2fb232f8eb29` varchar(65533) NULL COMMENT "",
  `c_cjrmc_c1ca861d89f949c3a8f0e73618bdf8e2` varchar(65533) NULL COMMENT "",
  `c_sjly_f68669e0c685486cb9f9b921d3b6314d` varchar(65533) NULL COMMENT "",
  `c_scbs_cd3533fd72b7435a9b13fd1da0848a67` varchar(65533) NULL COMMENT "",
  `c_zj_e0a9f0da68584411a5242b9c4765a1d7` varchar(65533) NULL COMMENT "",
  `c_sfbc_aa9ce0c4933348bfa0e2cd622b59c7db` varchar(65533) NULL COMMENT "",
  `c_xldjh_be8a75e5ac794df7b9f894ab8e2bad78` varchar(65533) NULL COMMENT "",
  `c_lgsbb_f82463c04b4246c1b3eda36e8c2d278d` varchar(65533) NULL COMMENT "",
  `c_jkxlzd_edd46a8a6ab0402ca7d0430806730176` varchar(65533) NULL COMMENT "",
  `c_rwid_be95655dcae4456fbfce48eaf33ddcf6` varchar(65533) NULL COMMENT "",
  `c_xmid_af419689630f44cd8571452321bdbe7b` varchar(65533) NULL COMMENT "",
  `c_sbm_a005d17cfbc54b5e84e4b7df55496ba8` varchar(65533) NULL COMMENT "",
  `c_famc_c609e2f7d6d0417d8ad1308fc32a6ea8` varchar(65533) NULL COMMENT "",
  `c_bdrl_ef7b6694534449f2be3f1ff53ca4e335` varchar(65533) NULL COMMENT "",
  `c_xgsj_d40fd2fa45ce4049b36ce96d1d98cb9c` varchar(65533) NULL COMMENT "",
  `c_xgrid_fc36bb876de54b85a264ea01d1bc8a50` varchar(65533) NULL COMMENT "",
  `c_xgrmc_eeb2d63900a94a8d91cfdd06da724827` varchar(65533) NULL COMMENT "",
  `c_dydj_ea55ae82d2e14c46b1ba6a77502fa36a` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`c_sbm_a457bc598b214148a9e74d212a8b3847`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c_sbm_a457bc598b214148a9e74d212a8b3847`, `uuid_318bec51528a459483bdbb19b593126b`) BUCKETS 10 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

最后返回的数据未按照传入的scan.columns 和table scheam顺序返回,这个问题我需要怎么调整

“设置TableSchema”是什么意思?这个table是指flink sql里的table?

这个是github给的示例,TableSchema如下

StarRocksSourceOptions options = StarRocksSourceOptions.builder()
    .withProperty("scan-url", "fe_ip1:8030,fe_ip2:8030,fe_ip3:8030")
    .withProperty("jdbc-url", "jdbc:mysql://fe_ip:9030")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("table-name", "flink_test")
    .withProperty("database-name", "test")
    .build();
TableSchema tableSchema = TableSchema.builder()
    .field("date_1", DataTypes.DATE())
    .field("datetime_1", DataTypes.TIMESTAMP(6))
    .field("char_1", DataTypes.CHAR(20))
    .field("varchar_1", DataTypes.STRING())
    .field("boolean_1", DataTypes.BOOLEAN())
    .field("tinyint_1", DataTypes.TINYINT())
    .field("smallint_1", DataTypes.SMALLINT())
    .field("int_1", DataTypes.INT())
    .field("bigint_1", DataTypes.BIGINT())
    .field("largeint_1", DataTypes.STRING())
    .field("float_1", DataTypes.FLOAT())
    .field("double_1", DataTypes.DOUBLE())
    .field("decimal_1", DataTypes.DECIMAL(27, 9))
    .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print();
env.execute("StarRocks flink source");

我尝试通过debug查看select-column的顺序变化
当StarRocksDynamicSourceFunction 类调用 open()方法时,实例化类 StarRocksSourceBeReader, 调用了openScanner 这个方法,当debug到这一行时

result = client.open_scanner(params);

这个时候result返回的数据里面,有个select-column, 这个列的顺序与我传入的 scan.columns 顺序不一致,从而导致了数据返回乱序。当我尝试debug thrift里面的TStarrocksExternalService这个方法时,我失去了思路,只能在这发帖求助下,希望能得到进一步的思路

在我把 scan.columns 这个配置注释时,只设置TableSchema,扫描返回的数据是所有的数据,但是当组装RowData返回时,数据越界,应该是TableSchema传入的列是30个,原表总的列大于30,而发生了异常,我通过把所有的列放入TableSchema定义,返回的数据没有乱序,然后通过30个列 与ddl建表语句的列做映射,拿到了我想要的数据,这样解决了乱序的问题,但是我想知道乱序问题的原因是什么,如果你有空,请指教

我正在复现,能帮确定下是2.3具体哪个版本吗?之前有个类似的问题,我确认下是不是一样的。另外方便加下微信吗,方便沟通

MTUzOTE0MzMxOTE= base64

通过命令查询了下,版本信息是-2.2.9-9e559c3

如线下沟通,实际出问题的StarRocks版本是2.2.0,相关问题已经修复可以升级到2.2最新版本