实时数据飞跃:Flink1.20.1 + CDC-3.3.0 打通 MySQL 到 StarRocks
如何利用 Apache Flink 结合 CDC(Change Data Capture,变更数据捕获)技术,将 MySQL 的数据实时导入 StarRocks,打造高效的实时数仓。这不仅是企业数字化转型的利器,也是技术人提升竞争力的绝佳实战场景!
在数据驱动的时代,实时性是企业的核心竞争力。传统的批量 ETL(抽取-转换-加载)方式往往因为延迟高、效率低而无法满足实时分析需求。而 Flink 作为流处理的王者,搭配 CDC 捕获 MySQL 的增量变更,再结合 StarRocks 的高性能分析能力,形成了一个强大的实时数据入湖方案。无论你是数据库工程师、数据分析师,还是对数仓建设感兴趣的初学者,这篇文章都将手把手带你完成从环境搭建到整库同步的实战流程。
本文没有安装 docker 如果需要了解 docker 配置可以看 flink 官网 配置
最后通过这篇博文,你将学会如何真正安装 Flink 和 MySQL CDC 连接器,编写 YAML 文件实现整库同步,并将数据无缝导入 StarRocks。准备好服务器,泡杯咖啡 ,咱们一起“上代码、上步骤、上实战” 一起开启这场实时入湖的硬核之旅!
为什么选择 Flink + CDC + StarRocks?
Flink 是一个强大的流处理框架,擅长处理实时数据流,吞吐量高、延迟低,非常适合实时数仓场景。而 CDC 技术能捕获 MySQL 数据库的增量变更(比如插入、更新、删除操作),让我们无需全量扫描数据库,就能实时获取数据变化。至于 StarRocks,它是一个高性能的分析型数据库,查询速度快,支持实时分析场景。把这三者结合起来,简直是实时数据入湖的“黄金三角”!
环境准备与工具安装
先是vm 上 安装了 4台 linux centos 8 (如下图)
配置了 4台 VM 虚拟机
3台 安装 StarRocks 如果想知道怎么安装看我的这篇文章
还需要格外一台安装 flink+flinkcdc+mysql
1台安装 mysql +flink+flinkcdc
具体配置
组件 | 安装 IP 地址 | 角色/说明 |
---|---|---|
Apache Flink | 192.168.5.131 | Flink 集群(JobManager + TaskManager) |
Flink CDC 连接器 | 192.168.5.131 | 部署于 Flink 的 lib 目录,用于捕获 MySQL 变更数据 |
MySQL | 192.168.5.131 | 源数据库,提供数据并启用 Binlog |
StarRocks FE + BE | 192.168.5.128 | StarRocks 前端(FE)+ 后端(BE) |
StarRocks BE | 192.168.5.129 | StarRocks 后端(BE)节点 2 |
StarRocks BE | 192.168.5.130 | StarRocks 后端(BE)节点 3 |
说明
Flink 和 Flink CDC:均部署在 192.168.5.131,Flink CDC 连接器通常作为 JAR 文件放置在 Flink 的 lib 目录下,与 Flink 共享同一节点。
MySQL:与 Flink 部署在同一服务器(192.168.5.131),需确保 Binlog 已启用
StarRocks:分布式部署,FE 和一个 BE 节点在 192.168.5.128,另外两个 BE 节点分别在 192.168.5.129 和 192.168.5.130,形成一个典型的多节点集群。
网络要求:确保所有 IP 地址之间网络互通,特别是 MySQL(3306 端口)、Flink(8081 等端口)、StarRocks(8030、9030 等端口)需开放相关端口。
CentOS8_cd_Flink 配置情况
配置 mysql (IP 192.168.5.131)
验证是否是 binlog 模式
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE '%binlog%';
如果没有 需要 启动 binlog 模式 、
# /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 启用
binlog-format=ROW
– 1. 检查 binlog 是否启用 SHOW VARIABLES LIKE ‘log_bin’; – 必须是 ON
– 2. 设置格式为 ROW(最重要) SET GLOBAL binlog_format = ‘ROW’;
– 3. 设置合理的过期时间 SET GLOBAL binlog_expire_logs_seconds = 604800; – 7 天
创建数据库和表
我们从mysql 官网下载了一个demo数据库 sakila 做测试
wget https://downloads.mysql.com/docs/sakila-db.zip
下载后解压
unzip sakila-db.zip
mysql> SOURCE C:/temp/sakila-db/sakila-schema.sql;
mysql> SOURCE C:/temp/sakila-db/sakila-data.sql;
java 配置
可以看到 java 已经安装好 版本 11
配置 Flink 1.20.1
- 下载 Flink-1.20.1
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz
- 解压 Flink 1.20.1
tar -xzf flink-1.20.1-bin-scala_2.12.tgz
cd flink-1.20.1
ll
- 配置 Flink-1.20.1/conf/config.yaml
vim flink-1.20.1/conf/config.yaml
改为 rest.address: 0.0.0.0
改为 rest.bind-address: 0.0.0.0
rest.address 和 rest.bind-address 是与 Flink 的 REST API 和 Web UI 相关的配置项,用于控制 Flink JobManager 的 REST 服务监听地址。
这些配置决定了 Flink 的 Web UI 和客户端如何访问 JobManager。
配置 Flink-CDC-3.3.0
- 下载 和解压 Flink-CDC-3.3.0
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz
tar -xzf flink-cdc-3.3.0-bin.tar.gz
cd flink-cdc-3.3.0
- 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
wget https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar?Expires=1753349291&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=jnckjewN8vqzaE3tbupo691o8YY%3D" -O lib/flink-cdc-pipeline-connector-mysql-3.3.0.jar
- 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
wget https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.3.0/flink-cdc-pipeline-connector-starrocks-3.3.0.jar?Expires=1753349371&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=Fd0JxnlDxr1nKkP8wOoIHHhGV2c%3D" -O lib/flink-cdc-pipeline-connector-starrocks-3.3.0.jar
- 下载 MySQL 驱动
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
- 移到 Flink-cdc-3.3.0/lib 目录下
- mysql-connector-j-8.0.33.jar也需要 移到flink-1.20.1/lib 目录下
启动 Flink-1.20.1
./flink-1.20.1/bin/start-cluster.sh
- 打开 网页192.168.5.131 :8081
如果可以打开说明启动成功
网络和数据库连接 测试
这部很重要,如果网络不通,将不能实时同步
ping 192.168.5.131
ping 192.168.5.128
telnet 192.168.5.131 3306
telnet 192.168.5.128 9030
MySQL 读取数据同步到 StarRocks 的 Pipeline
新建 vi sync-mysql-to-starrocks.yaml
source:
type: mysql
hostname: 192.168.5.131
port: 3306
username: root
password: 123456
tables: sakila.\.*
server-id: 5500-5504
server-time-zone: Asia/Shanghai
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://192.168.5.128:9030
load-url: 192.168.5.128:8030
username: root
password: 123456
table.create.properties.replication_num: 1
route:
- source-table: sakila.city
sink-table: ods_sakila.ods_city
- source-table: sakila.country
sink-table: ods_sakila.ods_country
- source-table: sakila.actor
sink-table: ods_sakila.ods_actor
pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 1
在starrocks 新建数据库和表
CREATE DATABASE `ods_sakila`;
CREATE TABLE `ods_actor` (
`actor_id` smallint NOT NULL COMMENT "" ,
`first_name` varchar(45) NOT NULL COMMENT "",
`last_name` varchar(45) NOT NULL COMMENT "",
`last_update` DATETIME NOT NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`actor_id`)
DISTRIBUTED BY HASH(`actor_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "3"
);
CREATE TABLE `ods_city` (
`city_id` smallint NOT NULL COMMENT "" ,
`city` varchar(50) NOT NULL COMMENT "" ,
`country_id` smallint NOT NULL COMMENT "" ,
`last_update` DATETIME NOT NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`city_id`)
DISTRIBUTED BY HASH(`city_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "3"
);
CREATE TABLE `ods_country` (
`country_id` smallint NOT NULL COMMENT "",
`country` varchar(50) NOT NULL COMMENT "",
`last_update` DATETIME NOT NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`country_id`)
DISTRIBUTED BY HASH(`country_id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "3"
);
如果想知道 mysql 和 starrocks 的数据类型转换可以官网 看这篇
执行 yaml 文件 ,并部署同步
./bin/flink-cdc.sh --flink-home /root/flink-1.20.1 sync-mysql-to-starrocks.yaml
查看 flink web
验证同步时间
Flink
09 :28: 20 执行这个 命令
StarRocks
SELECT TABLE_SCHEMA,Table_name,ENGINE , Update_Time
FROM information_schema.tables
WHERE Table_SCHEMA = 'ods_sakila'
可以看到 3表 最后一张表 是在 09:28 :50 同步完成
web UI
总结
通过本篇实战教程,我们成功打通了从 MySQL 到 StarRocks 的实时数据通道,构建了一个高效、可扩展的 实时入湖架构 。借助 Flink 1.20.1 强大的流处理能力与 Flink CDC 3.3.0 捕捉 MySQL 实时变更的能力,结合 StarRocks 的极速分析性能,我们完成了从环境搭建、组件部署、CDC 配置、到整库同步的全流程。
本教程涵盖了:
- 环境部署(4台 CentOS 虚拟机,分布式 StarRocks + Flink)
- Flink + CDC + MySQL 的安装与配置
- 启用 Binlog,确保增量同步的前提
- 使用 YAML 文件编排同步任务,结构清晰,便于管理
- 在 StarRocks 中验证数据同步是否成功,观察更新时间
借助 Flink CDC,数据同步不再依赖传统批量 ETL,真正实现了秒级级别的数据入湖体验。而 StarRocks 则让实时分析成为可能,适合各种 BI 报表、OLAP 分析与指标计算场景。