为了更快的定位您的问题,请提供以下信息,谢谢
【详述】Flink SQL 写入StarRocks 首列出现多余不可见字符
【背景】通过 Flink SQL 将 Kafka 中的 canal-json 格式的业务数据导入到 StarRocks
【业务影响】
【StarRocks版本】例如:2.4.4
【集群规模】3fe(1 follower+2observer)+5be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,:16C/64G/万兆
【表模型】 Unique Key模型
CREATE TABLE cr_order
(
order_no
varchar(60) NOT NULL COMMENT “订单号”, – 业务主键
create_time
datetime NOT NULL COMMENT “创建时间”,
id
bigint(20) NOT NULL COMMENT “主键”,
trade_id
varchar(255) NULL COMMENT “交易id”,
…
)
UNIQUE KEY(order_no
, create_time
)
PARTITION BY RANGE(create_time
)
(
START (“2021-01-01”) END (“2023-12-01”) EVERY (INTERVAL 1 month)
)
DISTRIBUTED BY HASH(order_no
) BUCKETS 4
PROPERTIES (
“replication_num” = “3”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “MONTH”,
“dynamic_partition.time_zone” = “Asia/Shanghai”,
“dynamic_partition.end” = “12”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.buckets” = “4”,
“dynamic_partition.start_day_of_month” = “1”,
“enable_persistent_index” = “false”,
“colocate_with” = “group_order_no”
);
【导入或者导出方式】Flink SQL
package com.test.app;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class FlinkSQLTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint,时间间隔为毫秒
senv.enableCheckpointing(5000L);
senv.setRestartStrategy(RestartStrategies.failureRateRestart(
5, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
));
process_flink_sql_stats(senv);
}
private static void process_flink_sql_stats(StreamExecutionEnvironment senv) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useAnyPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(senv, settings);
process_fsql_cr_order(tableEnv);
}
private static void process_fsql_cr_order(TableEnvironment tableEnv) {
String createDb = String.join("", "create database test");
tableEnv.executeSql(createDb);
// scan.startup.mode指定了读取kafka的位置,有几个选项:
// group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
// earliest-offset: start from the earliest offset possible.
// latest-offset: start from the latest offset.
// timestamp: start from user-supplied timestamp for each partition.
// specific-offsets: start from user-supplied specific offsets for each partition.
String sourceSql = String.join("\n",
"CREATE TABLE IF NOT EXISTS test.canal_source_cr_orders ( ",
" `order_no` varchar(60) NOT NULL , ",
" `create_time` string NOT NULL , ",
.........................
" `dr` int ",
") with ( ",
"'connector' = 'kafka', ",
"'topic' = 'order_cr_order', ",
"'properties.bootstrap.servers' = '<ip>:9092', ",
"'properties.group.id' = 'order_canal_group25', ",
"'scan.startup.mode' = 'group-offsets', ",
"'format' = 'canal-json' ", // -- 使用 canal-json 格式
") "
);
String sinkSql = String.join("\n",
"CREATE TABLE IF NOT EXISTS test.canal_sink_cr_orders ( ",
" `order_no` varchar(60) NOT NULL , ",
" `create_time` string NOT NULL , ",
.........................
" `dr` int ,",
" PRIMARY KEY (order_no,create_time) NOT ENFORCED ",
") with ( ",
"'jdbc-url' = 'jdbc:mysql://<ip>:9030', ",
"'load-url' = '<ip>:8030', ",
"'username' = '<user>', ",
"'password' = '<password>', ",
"'database-name' = 'test', ",
"'sink.properties.column_separator' = '\\x01', ",
"'sink.properties.row_delimiter' = '\\x02', ",
"'sink.buffer-flush.interval-ms' = '15000', ",
"'connector' = 'starrocks', ",
"'table-name' = 'cr_order' ",
") "
);
System.out.println(sourceSql);
System.out.println("----------------");
System.out.println(sinkSql);
tableEnv.executeSql(sourceSql);
tableEnv.executeSql(sinkSql);
tableEnv.executeSql("INSERT INTO `test`.`canal_sink_cr_orders` SELECT * FROM `test`.`canal_source_cr_orders` ");
Table table1 = tableEnv.sqlQuery("select * from test.canal_source_cr_orders");
table1.execute().print();
}
}
Flink 中的输出
【附件】Flink sink 到 starrocks问题.docx (133.5 KB)