flink-cdc导入StarRocks,更新过程中数据丢失

【详述】1、首先将业务库多个表进行关联得到结果表 共计307126条
2、启动flink-cdc,采用flink-connector-starrocks-1.1.10_flink-1.11.jar 将数据导入到starRock
结果完美:共计307126条
3、what happen? 某一天查看,重新统计starrock
发现只有250000条数据,少了5万多条??

定位过程:
1、我此时去统计我sql逻辑中的第一张主表,发现还是307126条,说明业务数据没有删除。
2、我的逻辑是 flink中 建立一个视图,把多个表展开成宽表,此时,我去flink sql客户端count
我的视图,发现共计307126条; what? flink数据本身也是对的。

3、flink-connector-starrocks 导入 反而少了5万?

【导入/导出方式】flink-connector-starrock
【背景】fink-cdc 建立逻辑view : insert into starRock select flinkview.
【业务影响】
【StarRocks版本】例如:1.19.1
【集群规模】3台i
【机器信息】CPU 8g 32核
【附件】

  • fe.warn.log/be.warn.log/相应截图
    都是正常的导入过程,没有产生异常日志。

捞下fe.audit中关于这个表的操作呢

2021-11-01 18:06:04,586 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=|State=OK|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=936|QueryId=530791db-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SET NAMES utf8
2021-11-01 18:06:04,591 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=937|QueryId=5308552c-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:04,594 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=938|QueryId=5308ca5d-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:04,617 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=15|ScanBytes=512050|ScanRows=306730|ReturnRows=1|StmtId=939|QueryId=530a29ee-3afb-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:06:04,626 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=940|QueryId=530dac60-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:49,278 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=941|QueryId=6dab0724-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:49,279 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=942|QueryId=6dab5545-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:49,296 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=10|ScanBytes=493204|ScanRows=306733|ReturnRows=1|StmtId=943|QueryId=6dac66b6-3afb-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:06:49,303 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=944|QueryId=6daefec8-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:55,722 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=945|QueryId=718274f9-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:55,724 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=946|QueryId=7182c31a-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:55,739 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=8|ScanBytes=493204|ScanRows=306733|ReturnRows=1|StmtId=947|QueryId=7183d48b-3afb-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:06:55,747 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=948|QueryId=71861e7d-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:58,083 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=949|QueryId=72eab78e-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:58,085 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=950|QueryId=72eade9f-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:58,099 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=7|ScanBytes=493204|ScanRows=306733|ReturnRows=1|StmtId=951|QueryId=72ec1720-3afb-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:06:58,107 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=952|QueryId=72ee6112-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:59,706 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=953|QueryId=73e25e03-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:59,707 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=954|QueryId=73e28514-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:06:59,721 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=6|ScanBytes=493204|ScanRows=306733|ReturnRows=1|StmtId=955|QueryId=73e3bd95-3afb-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:06:59,729 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=956|QueryId=73e5e077-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:07:01,180 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=957|QueryId=74c32118-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:07:01,181 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=958|QueryId=74c36f39-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:07:01,195 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=7|ScanBytes=493204|ScanRows=306733|ReturnRows=1|StmtId=959|QueryId=74c480aa-3afb-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:07:01,202 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=960|QueryId=74c67c7c-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:07:23,570 [query] |Client=10.51.40.16:53902|User=default_cluster:test|Db=|State=OK|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=961|QueryId=821bb98d-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SET NAMES utf8
2021-11-01 18:07:23,575 [query] |Client=10.51.40.16:53902|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=1|StmtId=962|QueryId=821c55ce-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW CREATE TABLE dwb_trade_truckbroker_order
2021-11-01 18:07:23,579 [query] |Client=10.51.40.16:53902|User=default_cluster:test|Db=default_cluster:test|State=ERR|Time=2|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=962|QueryId=821ccaff-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema=‘test’ AND BINARY event_object_table=‘dwb_trade_truckbroker_order’
2021-11-01 18:07:23,718 [query] |Client=10.51.40.16:53902|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=1|StmtId=963|QueryId=823227c0-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW CREATE TABLE dwb_trade_truckbroker_order
2021-11-01 18:07:23,720 [query] |Client=10.51.40.16:53902|User=default_cluster:test|Db=default_cluster:test|State=ERR|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=963|QueryId=823275e1-3afb-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema=‘test’ AND BINARY event_object_table=‘dwb_trade_truckbroker_order’
2021-11-01 18:28:01,422 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=1|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=964|QueryId=63eccc5c-3afe-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:28:01,424 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=965|QueryId=63ed418d-3afe-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS
2021-11-01 18:28:01,447 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=15|ScanBytes=429499|ScanRows=257850|ReturnRows=1|StmtId=966|QueryId=63ee7a0e-3afe-11ec-8fd2-1418774d18ee|IsQuery=true|feIp=xx.xx.xx.xx|Stmt=select count(1) from dwb_trade_truckbroker_order
2021-11-01 18:28:01,454 [query] |Client=10.51.40.16:64212|User=default_cluster:test|Db=default_cluster:test|State=EOF|Time=0|ScanBytes=0|ScanRows=0|ReturnRows=0|StmtId=967|QueryId=63f1d570-3afe-11ec-8fd2-1418774d18ee|IsQuery=false|feIp=xx.xx.xx.xx|Stmt=SHOW STATUS

周一刚好观察到数据少的一段时间,这段时间,不可能存在有人去删除我starRock的数据。
因为,他们都不知道我的表,审计日志也是说明都是我的count 查询。

请问你的建表模型是什么?

primary key

  1. 使用flink-cdc-connector 1.4版本
  2. 提交多个任务要确认 flink/conf/flink-conf.yaml 中 taskmanager.numberOfTaskSlots >= insert任务数
  3. 执行 grep -ir “ErrorURL” flink/log/* 看是否有相关日志

@U_1635932342230_0969
请问在源端的数据跟在starrocks里面的主键是一样的么?
如果是一样的话,就需要按照 @fariel 的方式查看下,同步过程中是不是有数据过滤
如果没有的话,可能需要我们的研发同学介入下查看下。

完全一样的,主键是一模一样的,所以不可能存在主键重复覆盖的情况,今天早上又出现了

1、flink 是yarn的模式,flink本地log,没有任何错误。
2、yarn flink集群上 的3个taskmanager 日志都是正常导入,没有报错,今天早上问题又复现了。
3、flink sql 关于 slot的配置如下:
taskmanager.numberOfTaskSlots: 2
parallelism.default: 6

  • 疑惑点:

(1)taskmanager.numberOfTaskSlots >= insert任务数 这一条没有理解,insert任务数是什么意思,同一个表吗?
(II)因为我们用的flink版本是1.12.1 ,所以当时采用的cdc版本是 flink-sql-connector-mysql-cdc-1.2.0.jar

flink的日志中搜下关键词:“stream load” 看看相关的导入日志

提供下starrocks中的建表语句, 和所有的be.INFO, 如果太多只 grep tablet_updates

taskmanager日志.txt (335.5 KB)

三台taskManager 日志都差不多,我就提供一个container里面的导入日志

CREATE TABLE dwb_order_sink (
truck_broker_order_id INT
, *
,PRIMARY KEY (truck_broker_order_id) NOT ENFORCED
) WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘jdbc:mysql://xx.xx.xx.xx:9030?useUnicode=true&characterEncoding=utf-8’,
‘load-url’=‘xx.xx.xx.xx:8030’,
‘database-name’ = ‘test’,
‘table-name’ = ‘dwb_order’,
‘username’ = ‘test’,
‘password’ = ‘xxxx’,
‘sink.buffer-flush.max-rows’ = ‘1000000’,
‘sink.buffer-flush.max-bytes’ = ‘134217728’,
‘sink.buffer-flush.interval-ms’ = ‘60000’,
‘sink.max-retries’ = ‘3’,
‘sink.properties.column_separator’ = ‘\x01’,
‘sink.properties.row_delimiter’ = ‘\x02’
);

需要的是starrock的建表语句和starrocks be的日志

将并行度修改为1 观察下

CREATE TABLE dwb_trade_truckbroker_order (
order_id INT
,owner_party_id INT
,driver_party_id INT
,owner_customer_code varchar(128)
,driver_customer_code varchar(128)
,truck_broker_company_code varchar(128)
,truck_broker_order_code varchar(128)
,invoice_online_code varchar(128)
,invoice_apply_code varchar(128)
,apply_status INT
,outer_order_no varchar(128)
,order_type varchar(128)
,goods_type varchar(128)
,cancel_type varchar(128)
,audit_type varchar(128)
,shipping_status varchar(128)
,audit_status varchar(128)
,pay_status varchar(128)
,finance_invoice_status varchar(128)
,pay_driver_status varchar(128)
,order_fee_change_status varchar(128)
,tax_rate double
,total_shipping_fee double
,tax_fee double
,driver_shipping_fee double
,driver_total_fee double
,online_fee double
,adjust_fee double
,adjust_tax_fee double
,transport_fee double
,present_oil_fee double
,agent_fee double
,from_province varchar(128)
,from_city varchar(128)
,from_region varchar(128)
,from_town varchar(128)
,from_address varchar(128)
,to_province varchar(128)
,to_city varchar(128)
,to_region varchar(128)
,to_town varchar(128)
,to_address varchar(128)
,take_order_time datetime
,start_load_date datetime
,end_load_date datetime
,start_unload_date datetime
,end_unload_date datetime
,receipt_time datetime
,audit_time datetime
,finish_time datetime
,upload_time datetime
,cancel_time datetime
,create_time datetime
,update_time datetime
,stamp_time datetime
,first_primary_audit_time datetime
,wait_review_time datetime
,review_time datetime
,final_audit_time datetime
,apply_invoice_time datetime
,open_invoice_time date
,initial_quantity double
,initial_price double
,quantity double
,unit varchar(128)
,price double
,primary_audit_success_time datetime
,review_success_time datetime
,final_audit_success_time datetime
,truck_broker_company_name varchar(128)
,owner_name varchar(128)
,owner_mobile varchar(128)
,owner_address varchar(128)
,pay_dead_line INT
,driver_name varchar(128)
,driver_mobile varchar(128)
,driver_vehicle_plate_number varchar(128)
,trailer_plate_number varchar(128)
,driver_vehicle_type varchar(128)
,driver_vehicle_length double
,vehicle_type varchar(128)
,bank_user_name varchar(128)
,bank_card_no varchar(128)
,bank_card_status INT
,is_receipt_pay INT
,exception_type INT
,exception_reason varchar(128)
,driver_recv_order_time datetime
,driver_already_arrived_time datetime
,transport_date_start datetime
,transport_date_end datetime
,abnormal_cancel_time datetime
,status_code INT
,refund_owner_status INT
,pay_type INT
,close_order_time datetime
,first_deal_time datetime
,distance double
,kilometer_rate double
,goods_value double
,platform_name varchar(128)
)
ENGINE=olap
PRIMARY KEY(order_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 32
PROPERTIES (“storage_type”=“column”,“replication_num” = “1”);

将Flink-Sink的并行度改成1,应该能够解决这个问题。
当前猜测的原因是:
如果sink多个并行度
update_before, update_after被拆到不同的sink算子当中做导致有可能after对应的upsert先执行,before对应的delete后执行。这样update就变成delete了

我们后续会看看如何修复这个问题。

并行度设置为1,还是会出现此问题,continue

和flink-cdc 、flink的人沟通了下,可能是flink 1.12.x 的bug
https://issues.apache.org/jira/browse/FLINK-20374
目前flink 1.13.3 才修复