使用Flink-cdc 实时同步mysql数据

实时同步mysql数据–Flink-cdc

一.概述:

本文主要描述flink-cdc同步mysql数据到sr中的使用实践以及一些问题的解决,原理部分不详细描述

二.使用flink-cdc+primarykey模型实现数据同步

  1. 下载 Flink, 推荐使用1.13,最低支持版本1.11。
  2. 下载 Flink CDC connector,请注意下载对应Flink版本的Flink-MySQL-CDC。
  3. 下载 Flink StarRocks connector,请注意1.13版本和1.11/1.12版本使用不同的connector.(注意使用的版本) cdc与flink对应版本关系详见:Flink CDC 2.0.0 documentation
  4. 解压flink-sql-connector-mysql-cdc-xxx.jar ,flink-connector-starrocks-xxx.jarflink-xxx/lib/
  5. 下载 smt.tar.gz
  6. 解压并修改配置文件
  • Db 需要修改成MySQL的连接信息。
  • be_num 需要配置成StarRocks集群的节点数(这个能帮助更合理的设置bucket数量)。
  • [table-rule.1] 是匹配规则,可以根据正则表达式匹配数据库和表名生成建表的SQL,也可以配置多个规则。仅支持正则匹配,不支持多表使用逗号分开的形式。
  • flink.starrocks.* 是StarRocks的集群配置信息,参考Flink.

注意: 此处留意ip,端口,库名,表名,正则表达式是否书写正确。另外如果flink设置的是 多并行度 ,由于flink-cdc的机制,需要 开启checkpoint 才能进行数据同步, 不开启checkpoint 只能使用 单并行度 进行同步。开启checkpoint的方式请参考下文第三小节: 失败重试及任务重启

[db]
host = 192.1.1.1
port = 3306
user = root
password =  

[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# file to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^console_19321.*$
# pattern to match tables for setting properties
table = ^.*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.1.1.1:9030
flink.starrocks.load-url= 192.1.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
  1. 执行starrocks-migrate-tool,所有建表语句都生成在result目录下,此处可将flink-create.1.sql复制一份到flink目录下方便第9步执行
$./starrocks-migrate-tool
$ls result
flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
flink-create.all.sql  starrocks-create.1.sql
  1. 生成StarRocks的表结构,留意命令中端口和脚本是否指定正确
Mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.1.sql
  1. 启动flink-client, 并生成Flink table并开始同步
bin/sql-client.sh -f flink-create.1.sql

这个执行以后同步任务会持续执行

如果是Flink 1.13之前的版本可能无法直接执行脚本,需要逐行提交 注意 记得打开MySQL binlog

  1. 观察任务状况
bin/flink list 

如果有任务请查看log日志,或者调整conf中的系统配置中内存和slot。

三.失败重试及任务重启

checkpoints savepoints 简单配置如下

# unit: ms
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory
  1. 如果是任务运行中的exception导致的任务失败,那么flink系统会根据 flink-conf.yml中的checkpoint配置来进行自动恢复。

  2. 如果是用户需要手动停止任务,再恢复任务的话,需要先在 flink-conf.yml 配置 state.savepoints.dir: [file://或hdfs://]/home/user/savepoints_dir 后按以下两种场景来操作:
    a.用户现场具备 停止mysql增删改 的条件(即会丢失 停止 ~ 再次重启 之间的数据):

    1. 使用如上描述的sql-client方式创建任务后获得jobid
    2. 停止 mysql 的增删改操作
    3. flink中停止对应的jobid
    4. 对mysql或starrocks进行变更操作
    5. 修改 flink-cdc 的src table配置增加 ‘scan.startup.mode’=‘latest-offset’
    6. 重复第1步的步骤提交任务
    7. 恢复 mysql 的正常使用即可。

    b.用户现场不具备控制mysql侧的条件,并且希望再次重启后的任务可以不丢失任何数据:

    1. 使用 ./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL flink-connector-starrocks-xxxx.jar -f flink-create.all.sql 提交任务后获取jobid
    2. 停止任务则执行: ./flink stop jobid 此时会提示 savepoints 保存的目录
    3. 对mysql或starrocks进行变更操作
    4. 如果修改了表结构则需要使用smt工具重新生成 flink-create.all.sql 文件
    5. 再次启动任务 ./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql 即可从任务停止时的offset继续消费binlog的数据

四.注意事项

  1. 如果有多组规则,需要给每一组规则匹配database,table和 flink-connector的配置。
[table-rule.1]
# pattern to match databases for setting properties
database = ^console_19321.*$
# pattern to match tables for setting properties
table = ^.*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000

[table-rule.2]
# pattern to match databases for setting properties
database = ^database2.*$
# pattern to match tables for setting properties
table = ^.*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
# 如果导入数据不方便选出合适的分隔符可以考虑使用Json格式,但是会有一定的性能损失,使用方法:用以下参数替换flink.starrocks.sink.properties.column_separator和flink.starrocks.sink.properties.row_delimiter参数
flink.starrocks.sink.properties.strip_outer_array=true
flink.starrocks.sink.properties.format=json
~~~
  1. Flink.starrocks.sink 的参数可以参考上文,比如可以给不同的规则配置不同的导入频率等参数。
  2. 针对分库分表的大表可以单独配置一个规则,比如:有两个数据库 edu_db_1,edu_db_2,每个数据库下面分别有course_1,course_2 两张表,并且所有表的数据结构都是相同的,通过如下配置把他们导入StarRocks的一张表中进行分析。
[table-rule.3]
# pattern to match databases for setting properties
database = ^edu_db_[0-9]*$
# pattern to match tables for setting properties
table = ^course_[0-9]*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=5000

这样会自动生成一个多对一的导入关系,在StarRocks默认生成的表名是 course__auto_shard,也可以自行在生成的配置文件中修改。

  1. 如果在sql-client中命令行执行建表和同步任务,需要做对’'字符进行转义
'sink.properties.column_separator' = '\\x01'
'sink.properties.row_delimiter' = '\\x02'  
  1. 如何开启MySQL binlog

    修改/etc/my.cnf

#开启binlog日志
log-bin=/var/lib/mysql/mysql-bin

#log_bin=ON
##binlog日志的基本文件名
#log_bin_basename=/var/lib/mysql/mysql-bin
##binlog文件的索引文件,管理所有binlog文件
#log_bin_index=/var/lib/mysql/mysql-bin.index
#配置serverid
server-id=1
binlog_format = row

重启mysqld,然后可以通过 SHOW VARIABLES LIKE ‘log_bin’; 确认是否已经打开。

我本地数据库一百多个表,发现用这个smt 工作,每个表会生成一个任务,能否按照db 合并为同一个任务呢? 我的业务是 多db 多数据表的形式,如果按照表生成任务的话,将会太多了,不利于维护。

可以参考注意事项中的内容,将库表的名称配置正则,可以实现可以任务处理多个库表

可能我没表述清楚,通过正则表达式 可以实现多表同步,但是每个表生成了一个flink 任务,不利于维护,所以能不能多个表 (这个db中的所有表)生成一个flink 任务?

1赞

多表sink只能通过 stream job 的形式,需要的话可以加下微信群,私信获取相关demo

哪儿有demo~~ 请求获取…

demo/FlinkDemo/src/main/java/com/starrocks/flink at master · StarRocks/demo (github.com)

3Q :love_you_gesture:

这块需要给每个表定义实体属性?

使用smt工具可以实现多表的同步

:smiley:,这不又回到了之前问的问题么,smt 工具 是可以实现多表同步,但是会生成很多个flink job ,不利于维护,然后你建议用flink demo 中的形式处理

有没有mongoDB实时同步的例子

请问这种多个任务对应多个表的方式在读取binlog时是所有表读一次还是一个表读一次昵?如果每张表都需要读一次binlog的话是不是会有io瓶颈?

可以使用mongoDB-》flink->flink-connector

我开启checkpoints之后,不停产生checkpoint文件,请问这个可以配置只保留最新几个吗?

flink cdc时多表sink的demo有吗,数据量比较大,要用streamload的。还有怎么加微信群,谢谢了!

我有600张表,但是用flink同步的时候,无论slot调多大,都只有一个job运行,好奇怪

会不会可能是因为Operator Chain优化导致你看起来就是一个job