– 这是Flink ddl 语句
CREATE TEMPORARY TABLE ta_table (
appid STRING,
client_ip STRING,
data_object ROW<data
ARRAY<ROW<#account_id
STRING,#time
TIMESTAMP,#uuid
STRING,#event_name
STRING,#type
STRING,#ip
STRING, properties MAP<STRING,STRING>>>>,
data_source STRING,
receive_time TIMESTAMP
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘’,
‘properties.bootstrap.servers’ = ‘’,
‘properties.group.id’ = ‘’,
‘properties.max.poll.records’=‘5000’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘format’ = ‘json’,
‘json.ignore-parse-errors’ = ‘true’
);
– starrocks结果表
CREATE TEMPORARY TABLE ta_sink(
event_time
TIMESTAMP,
gameid
VARCHAR,
event_name
VARCHAR,
part_date
DATE,
record_time
TIMESTAMP,
#uuid
VARCHAR,
#account_id
VARCHAR,
#type
VARCHAR,
#ip
varchar,
user_id
varchar,
account_id
varchar,
device_id
varchar,
channel
varchar,
server_id
varchar,
idfa
varchar,
idfv
varchar,
imei
varchar,
oa_id
varchar,
imsi
varchar,
an_id
varchar,
country_code
varchar,
package_name
varchar,
extend_id
varchar,
game_version
varchar,
test_version
varchar,
power_value
INT,
vip_level
INT,
level
INT,
recharge_channel
varchar,
money
FLOAT,
money_type
varchar,
charging_money
FLOAT,
charging_money_type
varchar,
game_order_id
varchar,
sdk_order_id
varchar,
pay_order_id
varchar,
payway
INT,
product_id
varchar,
pay_times
INT,
totap_pay
INT,
receive_time
TIMESTAMP
)WITH(
‘connector’ = ‘starrocks’,
‘jdbc-url’=’’,
‘load-url’=’’,
‘database-name’ = ‘’,
‘table-name’ = ‘’,
‘username’ = ‘’,
‘password’ = ‘’,
‘sink.buffer-flush.max-rows’ = ‘1000000’,
‘sink.buffer-flush.max-bytes’ = ‘300000000’,
‘sink.buffer-flush.interval-ms’ = ‘60000’,
‘sink.semantic’ = ‘exactly-once’
);
CREATE TEMPORARY TABLE app_gameid_table(
id INT,
app_id VARCHAR,
gameid VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) with (
‘connector’=‘jdbc’,
‘url’=’’,
‘table-name’=’’,
‘username’=’’,
‘password’=’’,
‘lookup.cache.max-rows’ = ‘3000’,
‘lookup.cache.ttl’ = ‘2s’,
‘lookup.max-retries’ = ‘3’
);
INSERT INTO ta_sink
SELECT
#time
,
p.gameid,
#event_name
,
CAST(SUBSTRING(#time
,0,11) AS DATE) AS part_date,
NOW(),
#uuid
,
#account_id
,
#type
,
#ip
,
properties[‘user_id’] AS user_id
,
properties[‘account_id’] AS account_id
,
properties[‘device_id’] AS device_id
,
properties[‘channel’] AS channel
,
properties[‘server_id’] AS server_id
,
properties[‘idfa’] AS idfa
,
properties[‘idfv’] AS idfv
,
properties[‘imei’] AS imei
,
properties[‘oa_id’] AS oa_id
,
properties[‘imsi’] AS imsi
,
properties[‘an_id’] AS an_id
,
properties[‘country_code’] AS country_code
,
properties[‘package_name’] AS package_name
,
properties[‘extend_id’] AS extend_id
,
properties[‘game_version’] AS game_version
,
properties[‘test_version’] AS test_version
,
CAST(properties[‘power_value’] AS INT) AS power_value
,
CAST(properties[‘vip_level’] AS INT) AS vip_level
,
CAST(properties[‘level’] AS INT) AS level
,
properties[‘recharge_channel’] AS recharge_channel
,
CAST(properties[‘money’] AS FLOAT) AS money
,
properties[‘money_type’] AS money_type
,
CAST(properties[‘charging_money’] AS FLOAT) AS charging_money
,
properties[‘charging_money_type’] AS charging_money_type
,
properties[‘game_order_id’] AS game_order_id
,
properties[‘sdk_order_id’] AS sdk_order_id
,
properties[‘pay_order_id’] AS pay_order_id
,
CAST(properties[‘payway’] AS INT) AS payway
,
properties[‘product_id’] AS product_id
,
CAST(properties[‘pay_times’] AS INT) AS pay_times
,
CAST(properties[‘totap_pay’] AS INT) AS totap_pay
,
o.receive_time
FROM ta_table AS o
INNER JOIN app_gameid_table FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.appid = p.app_id
left join UNNEST(data) AS t (#account_id
,#time
,#uuid
,#event_name
,#type
,#ip
,properties) on TRUE where #event_name
in (‘register’,‘login’,‘recharge’) and #time
>‘2022-11-01 00:00:00.000’;
– 样例数据
– 正常数据
{“appid”:“63a5fd83e69e4c9d87ab405a2837319a”,“client_ip”:“131.49.64.001”,“data_object”:{"#app_id":“63a5fd83e69e4c9d87ab405a2837319a”,“data”:[{"#account_id":“1010_00000”,"#time":“2022-11-01 12:03:47.234”,"#distinct_id":“3188124946-1667184108750”,"#uuid":“67f78cge-0096-447d-b4b0-e28acfdacc31”,"#type":“track”,"#event_name":“login”,“properties”:{"#lib_version":“1.8.2”,“adtype”:“RecommendWatchVideo”,"#mp_platform":“web”,"#lib":“MG”,"#zone_offset":8,"#os":“IOS”,"#device_id":“8090010140-1667184108760”,"#screen_height":1080,“state”:“success”,"#screen_width":2196,“package_name”:“com.hermes.fgam.xiaomi”}}]},“data_source”:“Mini_Game”,“receive_time”:“2022-10-31 12:03:47.430”,“source”:“client”}
– packege_name 字段中含有\t 的数据
{“appid”:“63a5fd83e69e4c9d87ab405a2837319a”,“client_ip”:“141.49.04.002”,“data_object”:{"#app_id":“63a5fd83e69e4c9d87ab405a2837319a”,“data”:[{"#account_id":“1009_00000”,"#time":“2022-11-01 12:03:47.234”,"#distinct_id":“3188124936-1667184108750”,"#uuid":“57f78cfe-7576-487d-b4b0-e18acfdaccb1”,"#type":“track”,"#event_name":“login”,“properties”:{"#lib_version":“1.8.2”,“adtype”:“RecommendWatchVideo”,"#mp_platform":“web”,"#lib":“MG”,"#zone_offset":8,"#os":“IOS”,"#device_id":“8090010140-1667184108760”,"#screen_height":1080,“state”:“success”,"#screen_width":2196,“package_name”:“com.hermes.fgam�� :�Q^��”}}]},“data_source”:“Mini_Game”,“receive_time”:“2022-10-31 12:03:47.430”,“source”:“client”}
错误场景是这样的,任务之前消费的数据都是和上面 “正常数据”格式一样的数据。但是后面接收到上述“错误数据”格式的数据,因为数据字段“package_name” 中含有“\t” 转义符。导致starRocks 会写入失败,因为值个数和数据表列字段数不一致。即如下图:
接下来停止该flink sql 任务,修改flink ddl 中得starRocks sink 的with 参数。增加
‘sink.properties.column_separator’=’\x01’,
‘sink.properties.row_delimiter’ = ‘\x02’
从checkpoint 点重启Flink sql 作业。重启后依旧是会报和上面图片中相同的错误。
目前flink-starrocks-connector 为1.2.3-flink-1.15 版本