package com.example.flinkstudy;
import com.alibaba.fastjson.JSONObject;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
-
@author zhengjc
-
@date 30/12/21
*/
public class MySqlSourceStarRocks {
public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname(“10.255.8.25”)
.port(3306)
.databaseList(“test”) // set captured database
.tableList(“test.sql_counter”) // set captured table
.username(“root”)
.password(“123456”).startupOptions(StartupOptions.latest())
.deserializer(new MyDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), “MySQL Source”)
// set 1 parallel source tasks
.setParallelism(1).addSink(StarRocksSink.sink(
// the sink options
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, “jdbc:mysql://10.255.10.163:9030”)
.withProperty(“load-url”, “10.255.10.163:8030”)
.withProperty(“username”, “root”)
.withProperty(“password”, “123456”)
.withProperty(“table-name”, “sql_counter”)
.withProperty(“database-name”, “test”)
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.build()
)).setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");
}
public static class MyDeserializationSchema implements DebeziumDeserializationSchema {
@Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { Struct value = (Struct) sourceRecord.value(); Struct after = value.getStruct("after"); Struct source = value.getStruct("source"); String db = source.getString("db");//库名 String table = source.getString("table");//表名 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String opstr = operation.toString().toLowerCase(); if (opstr.equals("create")) { opstr = "insert"; } JSONObject json2 = new JSONObject(); //加个判空 if (after != null) { List<Field> data = after.schema().fields(); //获取结构体 for (Field field : data) { String name = field.name(); //结构体的名字 Object value2 = after.get(field);//结构体的字段值 json2.put(name, value2); } } collector.collect(json2.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); }
}
}
需要怎么写才能把删除的记录,同步后starrocks 记录也同样删除