flink kafka source 修改分隔符无效果

– 这是Flink ddl 语句
CREATE TEMPORARY TABLE ta_table (
appid STRING,
client_ip STRING,
data_object ROW<data ARRAY<ROW<#account_id STRING,#time TIMESTAMP,#uuid STRING,#event_name STRING,#type STRING,#ip STRING, properties MAP<STRING,STRING>>>>,
data_source STRING,
receive_time TIMESTAMP
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘’,
‘properties.bootstrap.servers’ = ‘’,
‘properties.group.id’ = ‘’,
‘properties.max.poll.records’=‘5000’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘format’ = ‘json’,
‘json.ignore-parse-errors’ = ‘true’
);

– starrocks结果表
CREATE TEMPORARY TABLE ta_sink(
event_time TIMESTAMP,
gameid VARCHAR,
event_name VARCHAR,
part_date DATE,
record_time TIMESTAMP,
#uuid VARCHAR,
#account_id VARCHAR,
#type VARCHAR,
#ip varchar,
user_id varchar,
account_id varchar,
device_id varchar,
channel varchar,
server_id varchar,
idfa varchar,
idfv varchar,
imei varchar,
oa_id varchar,
imsi varchar,
an_id varchar,
country_code varchar,
package_name varchar,
extend_id varchar,
game_version varchar,
test_version varchar,
power_value INT,
vip_level INT,
level INT,
recharge_channel varchar,
money FLOAT,
money_type varchar,
charging_money FLOAT,
charging_money_type varchar,
game_order_id varchar,
sdk_order_id varchar,
pay_order_id varchar,
payway INT,
product_id varchar,
pay_times INT,
totap_pay INT,
receive_time TIMESTAMP
)WITH(
‘connector’ = ‘starrocks’,
‘jdbc-url’=’’,
‘load-url’=’’,
‘database-name’ = ‘’,
‘table-name’ = ‘’,
‘username’ = ‘’,
‘password’ = ‘’,
‘sink.buffer-flush.max-rows’ = ‘1000000’,
‘sink.buffer-flush.max-bytes’ = ‘300000000’,
‘sink.buffer-flush.interval-ms’ = ‘60000’,
‘sink.semantic’ = ‘exactly-once’
);

CREATE TEMPORARY TABLE app_gameid_table(
id INT,
app_id VARCHAR,
gameid VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) with (
‘connector’=‘jdbc’,
‘url’=’’,
‘table-name’=’’,
‘username’=’’,
‘password’=’’,
‘lookup.cache.max-rows’ = ‘3000’,
‘lookup.cache.ttl’ = ‘2s’,
‘lookup.max-retries’ = ‘3’
);

INSERT INTO ta_sink
SELECT
#time,
p.gameid,
#event_name,
CAST(SUBSTRING(#time,0,11) AS DATE) AS part_date,
NOW(),
#uuid,
#account_id,
#type,
#ip ,
properties[‘user_id’] AS user_id ,
properties[‘account_id’] AS account_id ,
properties[‘device_id’] AS device_id ,
properties[‘channel’] AS channel ,
properties[‘server_id’] AS server_id ,
properties[‘idfa’] AS idfa ,
properties[‘idfv’] AS idfv ,
properties[‘imei’] AS imei ,
properties[‘oa_id’] AS oa_id ,
properties[‘imsi’] AS imsi ,
properties[‘an_id’] AS an_id ,
properties[‘country_code’] AS country_code ,
properties[‘package_name’] AS package_name ,
properties[‘extend_id’] AS extend_id ,
properties[‘game_version’] AS game_version ,
properties[‘test_version’] AS test_version ,
CAST(properties[‘power_value’] AS INT) AS power_value ,
CAST(properties[‘vip_level’] AS INT) AS vip_level ,
CAST(properties[‘level’] AS INT) AS level ,
properties[‘recharge_channel’] AS recharge_channel ,
CAST(properties[‘money’] AS FLOAT) AS money ,
properties[‘money_type’] AS money_type ,
CAST(properties[‘charging_money’] AS FLOAT) AS charging_money ,
properties[‘charging_money_type’] AS charging_money_type ,
properties[‘game_order_id’] AS game_order_id ,
properties[‘sdk_order_id’] AS sdk_order_id ,
properties[‘pay_order_id’] AS pay_order_id ,
CAST(properties[‘payway’] AS INT) AS payway ,
properties[‘product_id’] AS product_id ,
CAST(properties[‘pay_times’] AS INT) AS pay_times ,
CAST(properties[‘totap_pay’] AS INT) AS totap_pay,
o.receive_time
FROM ta_table AS o
INNER JOIN app_gameid_table FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.appid = p.app_id
left join UNNEST(data) AS t (#account_id,#time,#uuid,#event_name,#type,#ip,properties) on TRUE where #event_name in (‘register’,‘login’,‘recharge’) and #time >‘2022-11-01 00:00:00.000’;

– 样例数据

– 正常数据
{“appid”:“63a5fd83e69e4c9d87ab405a2837319a”,“client_ip”:“131.49.64.001”,“data_object”:{"#app_id":“63a5fd83e69e4c9d87ab405a2837319a”,“data”:[{"#account_id":“1010_00000”,"#time":“2022-11-01 12:03:47.234”,"#distinct_id":“3188124946-1667184108750”,"#uuid":“67f78cge-0096-447d-b4b0-e28acfdacc31”,"#type":“track”,"#event_name":“login”,“properties”:{"#lib_version":“1.8.2”,“adtype”:“RecommendWatchVideo”,"#mp_platform":“web”,"#lib":“MG”,"#zone_offset":8,"#os":“IOS”,"#device_id":“8090010140-1667184108760”,"#screen_height":1080,“state”:“success”,"#screen_width":2196,“package_name”:“com.hermes.fgam.xiaomi”}}]},“data_source”:“Mini_Game”,“receive_time”:“2022-10-31 12:03:47.430”,“source”:“client”}
– packege_name 字段中含有\t 的数据
{“appid”:“63a5fd83e69e4c9d87ab405a2837319a”,“client_ip”:“141.49.04.002”,“data_object”:{"#app_id":“63a5fd83e69e4c9d87ab405a2837319a”,“data”:[{"#account_id":“1009_00000”,"#time":“2022-11-01 12:03:47.234”,"#distinct_id":“3188124936-1667184108750”,"#uuid":“57f78cfe-7576-487d-b4b0-e18acfdaccb1”,"#type":“track”,"#event_name":“login”,“properties”:{"#lib_version":“1.8.2”,“adtype”:“RecommendWatchVideo”,"#mp_platform":“web”,"#lib":“MG”,"#zone_offset":8,"#os":“IOS”,"#device_id":“8090010140-1667184108760”,"#screen_height":1080,“state”:“success”,"#screen_width":2196,“package_name”:“com.hermes.fgam�� :�Q^��”}}]},“data_source”:“Mini_Game”,“receive_time”:“2022-10-31 12:03:47.430”,“source”:“client”}

错误场景是这样的,任务之前消费的数据都是和上面 “正常数据”格式一样的数据。但是后面接收到上述“错误数据”格式的数据,因为数据字段“package_name” 中含有“\t” 转义符。导致starRocks 会写入失败,因为值个数和数据表列字段数不一致。即如下图:

接下来停止该flink sql 任务,修改flink ddl 中得starRocks sink 的with 参数。增加
‘sink.properties.column_separator’=’\x01’,
‘sink.properties.row_delimiter’ = ‘\x02’
从checkpoint 点重启Flink sql 作业。重启后依旧是会报和上面图片中相同的错误。
目前flink-starrocks-connector 为1.2.3-flink-1.15 版本

这个问题有解决方案了吗


数据CSV格式导入才这样配置。

默认以\t为分隔符,您那个数据里面也包含这个了,只能把这个脏数据提前过滤,或者用其他分隔符。

设置过sink 参数修改为其他分隔符。重启后依旧失败

结帖吧,这个就是因为checkpoin state 中的上一次未成功写入的数据是保含了分隔符的,所以修改flink sql 修改分隔符配置重启后获取到state 中数据重新写入也还是会报错。目前不知道贵公司是否有意愿修改下这部分的逻辑

是不是可以把 data_object 字段,按string来接收到sr里,然后在sr里面进行解析和关联维度表。