flink-cdc 同步到StarRocks

package com.example.flinkstudy;

import com.alibaba.fastjson.JSONObject;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;

/**

  • @author zhengjc

  • @date 30/12/21
    */
    public class MySqlSourceStarRocks {
    public static void main(String[] args) throws Exception {
    MySqlSource mySqlSource = MySqlSource.builder()
    .hostname(“10.255.8.25”)
    .port(3306)
    .databaseList(“test”) // set captured database
    .tableList(“test.sql_counter”) // set captured table
    .username(“root”)
    .password(“123456”).startupOptions(StartupOptions.latest())
    .deserializer(new MyDeserializationSchema())
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(3000);
    env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), “MySQL Source”)
    // set 1 parallel source tasks
    .setParallelism(1).addSink(StarRocksSink.sink(
    // the sink options
    StarRocksSinkOptions.builder()
    .withProperty(“jdbc-url”, “jdbc:mysql://10.255.10.163:9030”)
    .withProperty(“load-url”, “10.255.10.163:8030”)
    .withProperty(“username”, “root”)
    .withProperty(“password”, “123456”)
    .withProperty(“table-name”, “sql_counter”)
    .withProperty(“database-name”, “test”)
    .withProperty(“sink.properties.format”, “json”)
    .withProperty(“sink.properties.strip_outer_array”, “true”)
    .build()
    )).setParallelism(1); // use parallelism 1 for sink to keep message ordering

     env.execute("Print MySQL Snapshot + Binlog");
    

    }

    public static class MyDeserializationSchema implements DebeziumDeserializationSchema {

     @Override
     public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
         Struct value = (Struct) sourceRecord.value();
         Struct after = value.getStruct("after");
         Struct source = value.getStruct("source");
    
         String db = source.getString("db");//库名
         String table = source.getString("table");//表名
    
         Envelope.Operation operation = Envelope.operationFor(sourceRecord);
         String opstr = operation.toString().toLowerCase();
         if (opstr.equals("create")) {
             opstr = "insert";
         }
         JSONObject json2 = new JSONObject();
         //加个判空
         if (after != null) {
             List<Field> data = after.schema().fields(); //获取结构体
             for (Field field : data) {
                 String name = field.name(); //结构体的名字
                 Object value2 = after.get(field);//结构体的字段值
                 json2.put(name, value2);
             }
         }
         collector.collect(json2.toJSONString());
     }
    
     @Override
     public TypeInformation<String> getProducedType() {
         return TypeInformation.of(String.class);
     }
    

    }

}

需要怎么写才能把删除的记录,同步后starrocks 记录也同样删除

可以参照下图: