开源地址
https://github.com/apecloud/ape-dts
ape-dts简介
- ape-dts 是一款旨在实现 any-to-any 的数据迁移工具,并具有数据订阅和数据加工能力。
- 简单、轻量、高效,不依赖第三方组件和额外存储。
- 使用 Rust。
主要特性
- 支持多种数据库间的同构、异构数据迁移和同步。
- 支持全量、增量任务的断点续传。
- 支持数据校验、订正。
- 支持库、表、列级别的过滤和路由。
- 针对不同源、目标、任务类型,实现不同的并发算法,提高性能。
- 可加载用户 lua 脚本,编辑正在迁移/同步的数据。
- 支持以 HTTP Server 的方式启动 ape-dts 并拉取源端数据,用户可使用任何语言的 HTTP Client 获取数据并自主消费。
从 MySQL 导入教程
设置镜像版本
- MYSQL 支持 5.7 -> 8.0
- Postgres 支持 11 -> latest
- StarRocks 支持 2.5.4 -> latest
export APE_DTS_IMAGE="apecloud-registry.cn-zhangjiakou.cr.aliyuncs.com/apecloud/ape-dts:2.0.11"
export MYSQL_IMAGE="mysql:5.7.40"
export POSTGRES_IMAGE="postgis/postgis:15-3.4"
export STARROCKS_IMAGE="starrocks/allin1-ubuntu:3.2.11"
准备源库,目标库
- MySQL
docker run -d --name some-mysql-1 \
--platform linux/x86_64 \
-it \
-p 3307:3306 -e MYSQL_ROOT_PASSWORD="123456" \
"$MYSQL_IMAGE" --lower_case_table_names=1 --character-set-server=utf8 --collation-server=utf8_general_ci \
--datadir=/var/lib/mysql \
--user=mysql \
--server_id=1 \
--log_bin=/var/lib/mysql/mysql-bin.log \
--max_binlog_size=100M \
--gtid_mode=ON \
--enforce_gtid_consistency=ON \
--binlog_format=ROW \
--default_time_zone=+08:00
- StarRocks
docker run -itd --name some-starrocks \
-p 9030:9030 \
-p 8030:8030 \
-p 8040:8040 \
"$STARROCKS_IMAGE"
迁移库表结构
准备库表
mysql -h127.0.0.1 -uroot -p123456 -P3307
CREATE DATABASE test_db;
CREATE TABLE test_db.tb_1(id int, value int, primary key(id));
启动任务
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
extract_type=struct
db_type=mysql
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[sinker]
url=mysql://root:@127.0.0.1:9030
sink_type=struct
db_type=starrocks
[filter]
do_dbs=test_db
[parallelizer]
parallel_type=serial
[pipeline]
buffer_size=100
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
检查结果
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SHOW CREATE TABLE test_db.tb_1;
CREATE TABLE `tb_1` (
`id` int(11) NOT NULL COMMENT "",
`value` int(11) NULL COMMENT "",
`_ape_dts_is_deleted` boolean NULL COMMENT "",
`_ape_dts_timestamp` bigint(20) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "LZ4"
);
迁移全量数据
准备源数据
mysql -h127.0.0.1 -uroot -p123456 -P3307
INSERT INTO test_db.tb_1 VALUES(1,1),(2,2),(3,3),(4,4);
启动任务
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
batch_size=5000
[filter]
do_dbs=test_db
do_events=insert
[parallelizer]
parallel_type=snapshot
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
检查结果
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SELECT * FROM test_db.tb_1;
+------+-------+---------------------+--------------------+
| id | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+-------+---------------------+--------------------+
| 1 | 1 | NULL | 1731665154965 |
| 2 | 2 | NULL | 1731665159858 |
| 3 | 3 | NULL | 1731665159880 |
| 4 | 4 | NULL | 1731665159880 |
+------+-------+---------------------+--------------------+
增量数据,硬删除(不推荐)
启动任务
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[filter]
do_dbs=test_db
do_events=insert,update,delete
[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
hard_delete=true
batch_size=5000
[parallelizer]
parallel_type=rdb_merge
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
修改源数据
mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307
DELETE FROM test_db.tb_1 WHERE id=1;
UPDATE test_db.tb_1 SET value=2000000 WHERE id=2;
INSERT INTO test_db.tb_1 VALUES(5,5);
检查结果
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SELECT * FROM test_db.tb_1;
+------+---------+---------------------+--------------------+
| id | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+---------+---------------------+--------------------+
| 2 | 2000000 | NULL | 1731665679461 |
| 3 | 3 | NULL | 1731665609225 |
| 4 | 4 | NULL | 1731665609236 |
| 5 | 5 | NULL | 1731665679572 |
+------+---------+---------------------+--------------------+
增量任务,软删除(推荐)
启动任务
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[filter]
do_dbs=test_db
do_events=insert,update,delete
[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
batch_size=5000
[parallelizer]
parallel_type=table
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
修改源数据
mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307
DELETE FROM test_db.tb_1 WHERE id=3;
检查结果
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SELECT * FROM test_db.tb_1;
+------+---------+---------------------+--------------------+
| id | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+---------+---------------------+--------------------+
| 2 | 2000000 | NULL | 1731665679461 |
| 3 | 3 | 1 | 1731665747381 |
| 4 | 4 | NULL | 1731665609236 |
| 5 | 5 | NULL | 1731665679572 |
+------+---------+---------------------+--------------------+
工作原理
我们使用 Stream Load 从 MySQL/Postgres 导入数据。需要配置 url(查询元数据)和 stream_load_url(指定 Stream Load 端口和用户信息)。
通过 Stream Load 将数据导入 StarRocks 时,需要避免频繁的小批量导入,因为这可能会导致 StarRocks 中的限流错误。你可以通过配置 batch_sink_interval_secs 来解决这个问题,具体参考 任务模板。通常,只有增量任务需要配置 batch_sink_interval_secs。
Stream Load 允许一次导入最多 10GB 数据。你可以更改以下配置以调整批量数据大小来提高性能。
[pipeline]
buffer_size=100000
buffer_memory_mb=200
[sinker]
batch_size=5000
数据类型映射
MySQL | StarRocks |
---|---|
tinyint | TINYINT |
tinyint unsigned | SMALLINT |
smallint | SMALLINT |
smallint unsigned | INT |
mediumint | INT |
mediumint unsigned | BIGINT |
int | INT |
int unsigned | BIGINT |
bigint | BIGINT |
bigint unsigned | LARGEINT |
decimal | DECIMAL |
float | FLOAT |
double | DOUBLE |
bit | BIGINT |
datetime | DATETIME |
time | VARCHAR |
date | DATE |
year | INT |
timestamp | DATETIME |
char | CHAR |
varchar | VARCHAR |
binary | VARBINARY |
varbinary | VARBINARY |
tinytext | STRING |
text | STRING |
mediumtext | STRING |
longtext | STRING |
tinyblob | VARBINARY |
blob | VARBINARY |
mediumblob | VARBINARY |
longblob | VARBINARY |
enum | VARCHAR |
set | VARCHAR |
json | JSON/STRING |
举例
- 在 MySQL 中建表:
CREATE TABLE test_db.one_pk_no_uk (
f_0 tinyint,
f_0_1 tinyint unsigned,
f_1 smallint,
f_1_1 smallint unsigned,
f_2 mediumint,
f_2_1 mediumint unsigned,
f_3 int,
f_3_1 int unsigned,
f_4 bigint,
f_4_1 bigint unsigned,
f_5 decimal(10,4),
f_6 float(6,2),
f_7 double(8,3),
f_8 bit(64),
f_9 datetime(6),
f_10 time(6),
f_11 date,
f_12 year,
f_13 timestamp(6) NULL,
f_14 char(255),
f_15 varchar(255),
f_16 binary(255),
f_17 varbinary(255),
f_18 tinytext,
f_19 text,
f_20 mediumtext,
f_21 longtext,
f_22 tinyblob,
f_23 blob,
f_24 mediumblob,
f_25 longblob,
f_26 enum('x-small','small','medium','large','x-large'),
f_27 set('a','b','c','d','e'),
f_28 json,
PRIMARY KEY (f_0) );
- ape-dts 将会在 StarRocks 中执行:
CREATE TABLE IF NOT EXISTS `test_db`.`one_pk_no_uk` (
`f_0` TINYINT NOT NULL,
`f_0_1` SMALLINT,
`f_1` SMALLINT,
`f_1_1` INT,
`f_2` INT,
`f_2_1` BIGINT,
`f_3` INT,
`f_3_1` BIGINT,
`f_4` BIGINT,
`f_4_1` LARGEINT,
`f_5` DECIMAL(10, 4),
`f_6` FLOAT,
`f_7` DOUBLE,
`f_8` BIGINT,
`f_9` DATETIME,
`f_10` VARCHAR(255),
`f_11` DATE,
`f_12` INT,
`f_13` DATETIME,
`f_14` CHAR(255),
`f_15` VARCHAR(255),
`f_16` VARBINARY,
`f_17` VARBINARY,
`f_18` STRING,
`f_19` STRING,
`f_20` STRING,
`f_21` STRING,
`f_22` VARBINARY,
`f_23` VARBINARY,
`f_24` VARBINARY,
`f_25` VARBINARY,
`f_26` VARCHAR(255),
`f_27` VARCHAR(255),
`f_28` JSON,
`_ape_dts_is_deleted` BOOLEAN,
`_ape_dts_timestamp` BIGINT
) PRIMARY KEY (`f_0`) DISTRIBUTED BY HASH(`f_0`);
硬删除 & 软删除
-
由于 StarRocks 在处理删除操作时性能较差,因此不推荐使用硬删除。
-
软删除的前提条件:
- 目标表必须有 _ape_dts_is_deleted 列。
-
硬删除的前提条件:
- [parallelizer] parallel_type 必须为 rdb_merge。
支持的版本
我们在 StarRocks 2.5.4 和 3.2.11 上进行了测试。
对于 2.5.4,任务配置中的 [sinker] stream_load_url 应该使用 be_http_port 而不是 fe_http_port。
不支持在增量任务中的 DDL 同步
- 当前还不支持此功能,DDL 事件会被忽略,我们可能在未来支持此功能。