【StarRocks版本】3.3.15-0f56e12
【flinkCDC依赖的jar包】flink-sql-connector-postgres-cdc-3.3.0.jar flink-connector-starrocks-1.2.11_flink-1.18.jar
【联系方式】邮箱363332953@qq.com,谢谢
想通过flinksql实现PG表数据物理删除,SR库表对应逻辑删除,id_del=1 SR建的是主键表,现在怎么实现结果表都是跟着物理删除无法逻辑删除,请大神指点,谢谢!加了这个感觉也没生效,‘sink.properties.ignore.changelog’ = ‘true’ 就是怎么转换都不行,starrocks库接到删除指令就物理删除了,下面附带写的flinksql:CREATE TABLE postgres_source (
id BIGINT,
service_area_id INT,
plate_no STRING,
-- 元数据字段
database_name STRING METADATA FROM 'database_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘postgres-cdc’,
‘hostname’ = ‘...’,
‘port’ = ‘5432’,
‘username’ = ‘postgres’,
‘password’ = ‘root’,
‘database-name’ = ‘ds_test’,
‘schema-name’ = ‘public’,
‘table-name’ = ‘flink_test’,
‘decoding.plugin.name’ = ‘pgoutput’,
‘slot.name’ = ‘flink_slot_lin’,
‘scan.incremental.snapshot.enabled’ = ‘true’,
-- 关键修改:强制保留删除事件并转换为特殊记录
'debezium.transforms' = 'unwrap,dropDelete',
'debezium.transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'debezium.transforms.unwrap.drop.tombstones' = 'false',
'debezium.transforms.unwrap.delete.handling.mode' = 'rewrite',
-- 新增:将删除事件转换为带标记的记录
'debezium.transforms.dropDelete.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'debezium.transforms.dropDelete.renames' = '__deleted:__deleted',
'debezium.transforms.dropDelete.predicate' = 'isDelete',
'debezium.transforms.dropDelete.negate' = 'true'
);
CREATE TABLE flink_test_result (
id BIGINT,
service_area_id INT,
plate_no STRING,
operation_type STRING,
delete_time TIMESTAMP_LTZ(3),
source_table STRING,
is_del INT,
update_time TIMESTAMP_LTZ(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’ = ‘jdbc:mysql://...:9030’,
‘load-url’ = ‘...:8030’,
‘database-name’ = ‘ds_lin’,
‘table-name’ = ‘flink_test_result’,
‘username’ = ‘root’,
‘password’ = ‘root’,
– 关键配置
‘sink.properties.format’ = ‘json’,
‘sink.properties.strip_outer_array’ = ‘true’,
‘sink.buffer-flush.interval-ms’ = ‘5000’,
– 强制所有操作转为 UPSERT + 标记删除
‘sink.properties.ignore.changelog’ = ‘true’,
‘sink.properties.merge-on-write’ = ‘true’,
‘sink.properties.format’ = ‘json’,
–NEW ADD
‘sink.properties.partial_update’ =‘true’,
‘sink.ignore.update-before’ = ‘false’,
'sink.properties.strip_outer_array' = 'true',
'sink.properties.columns' = 'id,service_area_id,plate_no,operation_type,delete_time,source_table,is_del,update_time'
);
– 1. 创建完全过滤 DELETE 的视图
CREATE TEMPORARY VIEW no_delete_data AS
SELECT
id,
service_area_id,
plate_no,
– 所有操作强制转为 INSERT/UPDATE
CASE
WHEN row_kind = ‘+I’ THEN ‘INSERT’
WHEN row_kind = ‘+U’ THEN ‘UPDATE’
WHEN row_kind = ‘-D’ THEN ‘UPDATE’ – 将删除转为更新
END AS operation_type,
CASE WHEN row_kind = ‘-D’ THEN operation_ts ELSE NULL END AS delete_time,
CONCAT(database_name, ‘.’, schema_name, ‘.’, table_name) AS source_table,
CASE WHEN row_kind = ‘-D’ THEN 1 ELSE 0 END AS is_del,
operation_ts AS update_time
FROM postgres_source;
—WHERE row_kind<>’-D’; – 关键:在源头彻底过滤 DELETE 事件
– 2. 写入 StarRocks
INSERT INTO flink_test_result
SELECT * FROM no_delete_data;