背景
在支持客户中,我们发现有一些客户公司已经存在一些比较完善数据通道,不允许业务直接消费MySQL Binlog,所有的数据消费都是从Kafka中获取,所以写这篇文档分享下如何消费Kafka中canal格式的数据写到到starrocks,实现CDC。
数据流向
Mysql Binlog–>Canal–>kafka–>Flink SQL–>StarRocks
环境准备
-
Canal
关于canal相关的配置这里就不赘述了,建议大家可以参考使用canal从mysql同步binlog导入StarRocks安装和配置下canal以及依赖环境
-
Flink
这里我们需要利用flink sql完成数据的读取和写入,所以需要大家安装flink服务。以下介绍单机版Flink安装教程。(如果公司已有flink集群服务,可跳过这一部分)
- 下载 Flink, 推荐使用1.13,最低支持版本1.11。
- 下载 Flink CDC connector,请注意下载对应Flink版本的Flink-MySQL-CDC。
- 下载 Flink StarRocks connector,请注意1.13版本和1.11/1.12版本使用不同的connector.
- 下载Flink SQL Kafka connector,请注意下载Flink对应版本的connector,我这里下载的1.13.3版本flink-sql-connector-kafka_2.12-1.13.3.jar
- 复制 flink-sql-connector-kafka_2.12-1.13.3.jar,flink-sql-connector-mysql-cdc-xxx.jar,flink-connector-starrocks-xxx.jar 到 flink-xxx/lib/
启动Flink服务:
cd flink-xxx
./bin/start-cluster.sh
DDL
Kafka中数据样例
{
"data":[
{
"id":"2f2192e9-f8b5-4332-a96f-192b05c9e6bc",
"agent_id":"16",
"http_port":"8031",
"rpc_port":"9020",
"query_port":"8306",
"edit_log_port":"9010",
"meta_dir":"",
"absolute_meta_dir":"/home/disk1/sr/data/sr/meta",
"log_dir":"",
"absolute_log_dir":"/home/disk1/sr/starrocks-manager-20211008/fe-2f2192e9-f8b5-4332-a96f-192b05c9e6bc/log",
"role":"FOLLOWER",
"install_path":"/home/disk1/sr/starrocks-manager-20211008",
"absolute_migrate_path":"/home/disk1/sr/app/StarRocks/SE/StarRocks-1.18.3/fe",
"deleted":"0",
"deleted_at":"0",
"created_at":"1633759183484",
"updated_at":"1634240355691"
}
],
"database":"test",
"es":1634240355000,
"id":1076,
"isDdl":false,
"mysqlType":{
"id":"varchar(48)",
"agent_id":"int(11)",
"http_port":"int(11)",
"rpc_port":"int(11)",
"query_port":"int(11)",
"edit_log_port":"int(11)",
"meta_dir":"text",
"absolute_meta_dir":"text",
"log_dir":"text",
"absolute_log_dir":"text",
"role":"varchar(32)",
"install_path":"text",
"absolute_migrate_path":"text",
"deleted":"tinyint(1)",
"deleted_at":"bigint(20)",
"created_at":"bigint(20)",
"updated_at":"bigint(20)"
},
"old":[
{
"updated_at":"1634240295633"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":12,
"agent_id":4,
"http_port":4,
"rpc_port":4,
"query_port":4,
"edit_log_port":4,
"meta_dir":2005,
"absolute_meta_dir":2005,
"log_dir":2005,
"absolute_log_dir":2005,
"role":12,
"install_path":2005,
"absolute_migrate_path":2005,
"deleted":-7,
"deleted_at":-5,
"created_at":-5,
"updated_at":-5
},
"table":"fe_instances",
"ts":1634240355886,
"type":"UPDATE"
}
StarRocks
create database canaltest;
CREATE TABLE IF NOT EXISTS `canaltest`.`canal_test_sink` (
`id` STRING NOT NULL,
`agent_id` int(11) NULL,
`http_port` int(11) NULL,
`rpc_port` int(11) NULL,
`query_port` int(11),
`edit_log_port` int(11),
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` varchar(32),
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint(1),
`deleted_at` bigint(20),
`created_at` bigint(20),
`updated_at` bigint(20)
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
Flink SQL
可以写到文件flink-create.sql中
CREATE DATABASE IF NOT EXISTS `testdb`;
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` varchar,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint
) with (
'connector' = 'kafka',
'topic' = 'canal_test', #kafka topic名字
'properties.bootstrap.servers' = '$kafka_host:9092', #kafka主机名
'properties.group.id' = 'canal_group', #kafka消费组
'format' = 'canal-json' -- 使用 canal-json 格式
);
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int NULL,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` STRING,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint,
PRIMARY KEY(`id`)
NOT ENFORCED
) with (
'load-url' = '$fe_host:8030',
'sink.properties.row_delimiter' = '\x02',
'username' = 'root',
'database-name' = 'canaltest',
'sink.properties.column_separator' = '\x01',
'jdbc-url' = 'jdbc:mysql://$fe_host:9030',
'password' = '',
'sink.buffer-flush.interval-ms' = '15000',
'connector' = 'starrocks',
'table-name' = 'canal_test_sink' #starrocks中的表名
);
INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
启动任务测试
cd flink-xxx
./bin/sql-client.sh -f flink-create.sql
#查看任务状态
./bin/flink list
#输出如下图表示正常启动
Waiting for response...
------------------ Running/Restarting Jobs -------------------
18.03.2022 09:48:34 : 4a2c5035ca292fef9691524c731122c2 : insert-into_default_catalog.test.canal_test_sink (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
确认数据是否已经导入starrocks中
select * from canaltest.canal_test_sink;
常见问题排查
1.Flink任务没有报错的时候
第一步:确认binlog是否开启,可以通过 SHOW VARIABLES LIKE 'log_bin’查看;
第二步:确认flink、flink-cdc、flink-starrocks-connector和mysql版本(MySQL版本为5.7和8.0.X)是否满足要求,flink、flink-cdc和flink-starrocks-connector的大版本需要一致,例如都是1.13版本
第三步:逐步判断是查源表还是写starrocks的问题,这里利用下面的sql文件演示一下,该文件是上面生成步骤生成的flink-create.sql
安装的Flink目录下执行下面语句进入flink-sql
bin/sql-client.sh
首先验证读取source表是否正常
#分别把上面的sql粘贴进来判断是查询源表的问题还是写入到starrocks的问题
CREATE DATABASE IF NOT EXISTS `testdb`;
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` varchar,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint
) with (
'connector' = 'kafka',
'topic' = 'canal_test', #kafka topic名字
'properties.bootstrap.servers' = '$kafka_host:9092',
'properties.group.id' = 'canal_group',
'format' = 'canal-json' -- 使用 canal-json 格式
);
#验证source是否正常
select * from `testdb`.`canal_test_source`
再验证写入starrocks是否正常
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int NULL,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` STRING,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint,
PRIMARY KEY(`id`)
NOT ENFORCED
) with (
'load-url' = '$fe_host:8030',
'sink.properties.row_delimiter' = '\x02',
'username' = 'root',
'database-name' = 'canaltest',
'sink.properties.column_separator' = '\x01',
'jdbc-url' = 'jdbc:mysql://$fe_host:9030',
'password' = '',
'sink.buffer-flush.interval-ms' = '15000',
'connector' = 'starrocks',
'table-name' = 'canal_test_sink'
);
INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;
2.Flink任务出错
第一步:确认flink集群是否有启动,可能有的同学本地下载的flink没有启动,需要./bin/start-cluster.sh启动下flink
第二步:根据具体的报错再具体分析