Reason: column count mismatch, expect=6 real=1. src line: [0021-11-11]; Reason: column count mismatch, expect=6 real=1. src line: [x01xiaoding15018323];

flink sql 写入数据报错:Reason: column count mismatch, expect=6 real=1. src line: [0021-11-11];
Reason: column count mismatch, expect=6 real=1. src line: [x01xiaoding15018323];

应该是分隔符的问题,可以参考下https://docs.starrocks.com/zh-cn/main/loading/Flink-connector-starrocks#%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9设置下。

如图打印的数据,我已经按照官网设置了

列按照\x01 行按照\x02

是这样配置的嘛?
image

辛苦发下flink sink的配置

请问下找个分割符是按照字符串写进去的嘛?

正常应该是’\t’

这个跟我传递的类型没关系吧,因为connector里面会以String类型处理的。

你好,确认下咱们原始数据列分隔符是’\t’还是’\x01’呢

如图是Flink sql 消费kafka json格式的数据 然后抽取出来select字段,通过print connector出来的数据,原始数据还需要分割么??flink sql不是会抽取对应的字段转换成RowData么

create table t_kafka_source (
log_timestamp BIGINT,
name STRING,
age STRING,
sex STRING,
hometown STRING,
work STRING
) WITH (
‘properties.bootstrap.servers’=‘xxxx’,
‘properties.group.id’=‘xxxx’,
‘scan.startup.mode’=‘latest-offset’,
‘scan.topic-partition-discovery.interval’ = ‘30s’,
‘topic’=‘xxx’,
‘connector’=‘kafka’,
‘format’=‘json’,
‘json.fail-on-missing-field’ = ‘false’,
‘json.ignore-parse-errors’ = ‘true’
);

CREATE TABLE doris_sink(
log_timestamp STRING,
name STRING,
age INT,
sex INT,
hometown STRING,
work STRING
) WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘xxx’, – connector里面查询系统表information_schema.COLUMNS信息用的
‘load-url’=‘xxxx’, --这里写所有的IP是因为connector里面会对每个IP的连接进行检查,然后获取返回第一个个可以连接上的IP地址
‘database-name’ = ‘wuyang_test_db’,
‘table-name’ = ‘wuyang_dynamic_table’,
‘username’ = ‘root’,
‘password’ = ‘’,
‘sink.semantic’ = ‘exactly-once’,
‘sink.max-retries’ = ‘3’,
‘sink.buffer-flush.max-rows’ = ‘1000000’, --开启仅一次的会失效
‘sink.buffer-flush.max-bytes’ = ‘300000000’, --开启仅一次的会失效
‘sink.buffer-flush.interval-ms’ = ‘5000’, --开启仅一次的会失效
‘sink.properties.column_separator’ = ‘\x01’,
‘sink.properties.row_delimiter’ = ‘\x02’
–‘sink.properties.*’ = ‘xxx’ // stream load properties like 'sink.properties.columns' = 'k1, v1' ???
);

insert into doris_sink 
select 
   DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(log_timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')), 'yy-MM-dd') as log_timestamp,
    name,
    cast(age as INT) as age,
    cast(age as INT) as sex,
    hometown,
    work 
from t_kafka_source;

这是我的flink sql

报错看到的返回链接打开的错误。

是两个反斜杠,

        "'sink.properties.column_separator' = '\\x01'," +
        "'sink.properties.row_delimiter' = '\\x02'," +