flink sql 写入数据报错:Reason: column count mismatch, expect=6 real=1. src line: [0021-11-11];
Reason: column count mismatch, expect=6 real=1. src line: [x01xiaoding15018323];
应该是分隔符的问题,可以参考下https://docs.starrocks.com/zh-cn/main/loading/Flink-connector-starrocks#%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9设置下。
是这样配置的嘛?
辛苦发下flink sink的配置
请问下找个分割符是按照字符串写进去的嘛?
正常应该是’\t’
你好,确认下咱们原始数据列分隔符是’\t’还是’\x01’呢
create table t_kafka_source (
log_timestamp BIGINT,
name STRING,
age STRING,
sex STRING,
hometown STRING,
work STRING
) WITH (
‘properties.bootstrap.servers’=‘xxxx’,
‘properties.group.id’=‘xxxx’,
‘scan.startup.mode’=‘latest-offset’,
‘scan.topic-partition-discovery.interval’ = ‘30s’,
‘topic’=‘xxx’,
‘connector’=‘kafka’,
‘format’=‘json’,
‘json.fail-on-missing-field’ = ‘false’,
‘json.ignore-parse-errors’ = ‘true’
);
CREATE TABLE doris_sink(
log_timestamp STRING,
name STRING,
age INT,
sex INT,
hometown STRING,
work STRING
) WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘xxx’, – connector里面查询系统表information_schema
.COLUMNS
信息用的
‘load-url’=‘xxxx’, --这里写所有的IP是因为connector里面会对每个IP的连接进行检查,然后获取返回第一个个可以连接上的IP地址
‘database-name’ = ‘wuyang_test_db’,
‘table-name’ = ‘wuyang_dynamic_table’,
‘username’ = ‘root’,
‘password’ = ‘’,
‘sink.semantic’ = ‘exactly-once’,
‘sink.max-retries’ = ‘3’,
‘sink.buffer-flush.max-rows’ = ‘1000000’, --开启仅一次的会失效
‘sink.buffer-flush.max-bytes’ = ‘300000000’, --开启仅一次的会失效
‘sink.buffer-flush.interval-ms’ = ‘5000’, --开启仅一次的会失效
‘sink.properties.column_separator’ = ‘\x01’,
‘sink.properties.row_delimiter’ = ‘\x02’
–‘sink.properties.*’ = ‘xxx’ // stream load properties like 'sink.properties.columns' = 'k1, v1'
???
);
insert into doris_sink
select
DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(log_timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')), 'yy-MM-dd') as log_timestamp,
name,
cast(age as INT) as age,
cast(age as INT) as sex,
hometown,
work
from t_kafka_source;
这是我的flink sql
是两个反斜杠,
"'sink.properties.column_separator' = '\\x01'," +
"'sink.properties.row_delimiter' = '\\x02'," +