【详述】单台docker虚拟机 部署 fe be Broker
使用flink cdc + flink + flink-connector-starrocks 进行数据迁移
【导入/导出方式】flink cdc source flink-connector-starrocks sink
【背景】相关代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);
String sourceDDL =
“CREATE TABLE pg_user (\n” +
" pk string,\n" +
" id string,\n" +
" tx_count int,\n" +
" t DECIMAL(20,10)," +
" l string," +
" timestamp
int," +
" c string," +
"version
string" +
“) WITH (\n” +
" ‘connector’ = ‘postgres-cdc’,\n" +
" ‘hostname’ = ‘127.0.0.1’,\n" +
" ‘port’ = ‘5432’,\n" +
" ‘username’ = ‘postgres’,\n" +
" ‘password’ = ‘123456’,\n" +
" ‘database-name’ = ‘data-store’,\n" +
" ‘schema-name’ = ‘public’,\n" +
" ‘table-name’ = ‘user’,\n" +
" ‘slot.name’ = ‘flink_cdc_to_starrocks’,\n" +
" ‘decoding.plugin.name’ = ‘pgoutput’\n" +
“)”;
// Table resultTable = streamTableEnv.sqlQuery(“SELECT * FROM user”);
streamTableEnv.executeSql(sourceDDL);
String sinkDDL =
"CREATE TABLE sr_user (\n" +
" pk string,\n" +
" id string,\n" +
" tx_count int,\n" +
" t DECIMAL(20,10)," +
" l string," +
" `timestamp` int," +
" c string," +
"`version` string" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='jdbc:mysql://127.0.0.1:9030'," +
"'load-url'='127.0.0.1:8030'," +
"'database-name' = 'data-store'," +
"'table-name' = 'user'," +
"'username' = 'root'," +
"'password' = '123456'," +
"'sink.buffer-flush.max-rows' = '1000000'," +
"'sink.buffer-flush.max-bytes' = '300000000'," +
"'sink.buffer-flush.interval-ms' = '5000'," +
"'sink.properties.column_separator' = '\\x01'," +
"'sink.properties.row_delimiter' = '\\x02'," +
"'sink.max-retries' = '3'" +
// “‘sink.properties.*’ = ‘xxx’” + // stream load properties like 'sink.properties.columns' = 'k1, v1'
“)”;
streamTableEnv.executeSql(sinkDDL);
streamTableEnv.executeSql("insert into sr_user select * from pg_user");
【业务影响】
【StarRocks版本】2.1
【集群规模】例如:1fe+1be
【机器信息】CPU虚拟核/内存/网卡,例如:12C/16G/万兆
【附件】错误信息
Exception in thread “main” java.lang.IllegalArgumentException: Couldn’t get the sink table’s column info.
at com.starrocks.connector.flink.manager.StarRocksSinkManager.validateTableStructure(StarRocksSinkManager.java:411)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.(StarRocksSinkManager.java:129)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.(StarRocksDynamicSinkFunction.java:67)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSink.getSinkRuntimeProvider(StarRocksDynamicTableSink.java:45)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:121)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at flink.FlinkStarRocksCDC.main(FlinkStarRocksCDC.java:65)
与目标 VM 断开连接, 地址为: ‘‘127.0.0.1:59729’,传输: ‘套接字’’
进程已结束,退出代码1
- fe.warn.log/be.warn.log/相应截图