Flink SQL 写入StarRocks 首列出现多余不可见字符的问题

为了更快的定位您的问题,请提供以下信息,谢谢
【详述】Flink SQL 写入StarRocks 首列出现多余不可见字符
【背景】通过 Flink SQL 将 Kafka 中的 canal-json 格式的业务数据导入到 StarRocks
【业务影响】
【StarRocks版本】例如:2.4.4
【集群规模】3fe(1 follower+2observer)+5be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,:16C/64G/万兆
【表模型】 Unique Key模型

CREATE TABLE cr_order (
order_no varchar(60) NOT NULL COMMENT “订单号”, – 业务主键
create_time datetime NOT NULL COMMENT “创建时间”,
id bigint(20) NOT NULL COMMENT “主键”,
trade_id varchar(255) NULL COMMENT “交易id”,

)
UNIQUE KEY(order_no, create_time)
PARTITION BY RANGE(create_time)
(
START (“2021-01-01”) END (“2023-12-01”) EVERY (INTERVAL 1 month)
)
DISTRIBUTED BY HASH(order_no) BUCKETS 4
PROPERTIES (
“replication_num” = “3”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “MONTH”,
“dynamic_partition.time_zone” = “Asia/Shanghai”,
“dynamic_partition.end” = “12”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.buckets” = “4”,
“dynamic_partition.start_day_of_month” = “1”,
“enable_persistent_index” = “false”,
“colocate_with” = “group_order_no”
);

【导入或者导出方式】Flink SQL

package com.test.app;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

import org.apache.flink.api.common.time.Time;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.TableResult;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

import java.util.concurrent.TimeUnit;

public class FlinkSQLTest {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
    // 开启checkpoint,时间间隔为毫秒
    senv.enableCheckpointing(5000L);
    senv.setRestartStrategy(RestartStrategies.failureRateRestart(
            5, // max failures per unit
            Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
            Time.of(10, TimeUnit.SECONDS) // delay
    ));

    process_flink_sql_stats(senv);
}

private static void process_flink_sql_stats(StreamExecutionEnvironment senv) throws Exception {

    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .useAnyPlanner()
            .build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(senv, settings);
    process_fsql_cr_order(tableEnv);

}

private static void process_fsql_cr_order(TableEnvironment tableEnv) {
    String createDb = String.join("", "create database test");
    tableEnv.executeSql(createDb);
    //        scan.startup.mode指定了读取kafka的位置,有几个选项:
    //            group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
    //            earliest-offset: start from the earliest offset possible.
    //            latest-offset: start from the latest offset.
    //            timestamp: start from user-supplied timestamp for each partition.
    //            specific-offsets: start from user-supplied specific offsets for each partition.

    String sourceSql = String.join("\n",
            "CREATE TABLE IF NOT EXISTS test.canal_source_cr_orders (       ",

            "  `order_no` varchar(60) NOT NULL ,       ",

            "  `create_time` string NOT NULL ,       ",

            .........................

           

            "  `dr`  int        ",

            ") with (       ",

            "'connector' = 'kafka',       ",

            "'topic' = 'order_cr_order',        ",

            "'properties.bootstrap.servers' = '<ip>:9092',       ",

            "'properties.group.id' = 'order_canal_group25',        ",

            "'scan.startup.mode' = 'group-offsets',    ",

            "'format' = 'canal-json'      ",  //  -- 使用 canal-json 格式

            ")      "

            );

    String sinkSql = String.join("\n",

            "CREATE TABLE IF NOT EXISTS test.canal_sink_cr_orders (       ",

            "  `order_no` varchar(60) NOT NULL ,       ",

            "  `create_time` string NOT NULL ,       ",

            .........................

            "  `dr`  int   ,",

            " PRIMARY KEY (order_no,create_time) NOT ENFORCED ",

            ") with (       ",

            "'jdbc-url' = 'jdbc:mysql://<ip>:9030',       ",

            "'load-url' = '<ip>:8030',       ",

            "'username' = '<user>',       ",

            "'password' = '<password>',       ",

            "'database-name' = 'test',       ",

            "'sink.properties.column_separator' = '\\x01',       ",

            "'sink.properties.row_delimiter' = '\\x02',       ",

            "'sink.buffer-flush.interval-ms' = '15000',       ",

            "'connector' = 'starrocks',       ",

            "'table-name' = 'cr_order'        ",

            ")       "

    );

    System.out.println(sourceSql);

    System.out.println("----------------");

    System.out.println(sinkSql);

    tableEnv.executeSql(sourceSql);

    tableEnv.executeSql(sinkSql);

    tableEnv.executeSql("INSERT INTO `test`.`canal_sink_cr_orders` SELECT * FROM `test`.`canal_source_cr_orders` ");

    Table table1 = tableEnv.sqlQuery("select * from test.canal_source_cr_orders");

    table1.execute().print();
}

}

Flink 中的输出

【附件】Flink sink 到 starrocks问题.docx (133.5 KB)

sr 版本2.4.4,您的flink-connector的版本是什么?

flink.version = 1.13.6
scala.binary.version = 2.12

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-connector-kafka_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>

    <!-- 引入 StarRocks 依赖 -->
    <dependency>
        <groupId>com.starrocks</groupId>
        <artifactId>flink-connector-starrocks</artifactId>
        <!-- for flink-1.13 -->
        <version>1.2.4_flink-1.13_${scala.binary.version}</version>
    </dependency>

数据sink到 starrocks,相同 order_no 有部分有 不可见字符,导致数据在表上就出现了 重复。

可以判断下不可见字符是否是空格,如果是空格,您使用TRIM对该字段进行加工下

或者您可以将source端读取的数据print一下,然后看下结果有不可见字符不

connector升级到1.2.7再看一下,已经修复