使用 ape-dts 从 MySQL / Postgres 导入 全量/增量 数据

开源地址

https://github.com/apecloud/ape-dts

ape-dts简介

  • ape-dts 是一款旨在实现 any-to-any 的数据迁移工具,并具有数据订阅和数据加工能力。
  • 简单、轻量、高效,不依赖第三方组件和额外存储。
  • 使用 Rust。

主要特性

  • 支持多种数据库间的同构、异构数据迁移和同步。
  • 支持全量、增量任务的断点续传。
  • 支持数据校验、订正。
  • 支持库、表、列级别的过滤和路由。
  • 针对不同源、目标、任务类型,实现不同的并发算法,提高性能。
  • 可加载用户 lua 脚本,编辑正在迁移/同步的数据。
  • 支持以 HTTP Server 的方式启动 ape-dts 并拉取源端数据,用户可使用任何语言的 HTTP Client 获取数据并自主消费。

从 MySQL 导入教程

设置镜像版本

  • MYSQL 支持 5.7 -> 8.0
  • Postgres 支持 11 -> latest
  • StarRocks 支持 2.5.4 -> latest
export APE_DTS_IMAGE="apecloud-registry.cn-zhangjiakou.cr.aliyuncs.com/apecloud/ape-dts:2.0.11"
export MYSQL_IMAGE="mysql:5.7.40"
export POSTGRES_IMAGE="postgis/postgis:15-3.4"
export STARROCKS_IMAGE="starrocks/allin1-ubuntu:3.2.11"

准备源库,目标库

  • MySQL
docker run -d --name some-mysql-1 \
--platform linux/x86_64 \
-it \
-p 3307:3306 -e MYSQL_ROOT_PASSWORD="123456" \
 "$MYSQL_IMAGE" --lower_case_table_names=1 --character-set-server=utf8 --collation-server=utf8_general_ci \
 --datadir=/var/lib/mysql \
 --user=mysql \
 --server_id=1 \
 --log_bin=/var/lib/mysql/mysql-bin.log \
 --max_binlog_size=100M \
 --gtid_mode=ON \
 --enforce_gtid_consistency=ON \
 --binlog_format=ROW \
 --default_time_zone=+08:00
  • StarRocks
docker run -itd --name some-starrocks \
-p 9030:9030 \
-p 8030:8030 \
-p 8040:8040 \
"$STARROCKS_IMAGE"

迁移库表结构

准备库表

mysql -h127.0.0.1 -uroot -p123456 -P3307

CREATE DATABASE test_db;
CREATE TABLE test_db.tb_1(id int, value int, primary key(id));

启动任务

cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
extract_type=struct
db_type=mysql
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled

[sinker]
url=mysql://root:@127.0.0.1:9030
sink_type=struct
db_type=starrocks

[filter]
do_dbs=test_db

[parallelizer]
parallel_type=serial

[pipeline]
buffer_size=100
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 

检查结果

mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "

SHOW CREATE TABLE test_db.tb_1;
CREATE TABLE `tb_1` (
  `id` int(11) NOT NULL COMMENT "",
  `value` int(11) NULL COMMENT "",
  `_ape_dts_is_deleted` boolean NULL COMMENT "",
  `_ape_dts_timestamp` bigint(20) NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "LZ4"
);

迁移全量数据

准备源数据

mysql -h127.0.0.1 -uroot -p123456 -P3307

INSERT INTO test_db.tb_1 VALUES(1,1),(2,2),(3,3),(4,4);

启动任务

cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled

[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
batch_size=5000

[filter]
do_dbs=test_db
do_events=insert

[parallelizer]
parallel_type=snapshot
parallel_size=8

[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 

检查结果

mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "

SELECT * FROM test_db.tb_1;
+------+-------+---------------------+--------------------+
| id   | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+-------+---------------------+--------------------+
|    1 |     1 |                NULL |    1731665154965   |
|    2 |     2 |                NULL |    1731665159858   |
|    3 |     3 |                NULL |    1731665159880   |
|    4 |     4 |                NULL |    1731665159880   |
+------+-------+---------------------+--------------------+

增量数据,硬删除(不推荐)

启动任务

cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled

[filter]
do_dbs=test_db
do_events=insert,update,delete

[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
hard_delete=true
batch_size=5000

[parallelizer]
parallel_type=rdb_merge
parallel_size=8

[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 

修改源数据

mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307

DELETE FROM test_db.tb_1 WHERE id=1;
UPDATE test_db.tb_1 SET value=2000000 WHERE id=2;
INSERT INTO test_db.tb_1 VALUES(5,5);

检查结果

mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "

SELECT * FROM test_db.tb_1;
+------+---------+---------------------+--------------------+
| id   | value   | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+---------+---------------------+--------------------+
|    2 | 2000000 |                NULL |    1731665679461   |
|    3 |       3 |                NULL |    1731665609225   |
|    4 |       4 |                NULL |    1731665609236   |
|    5 |       5 |                NULL |    1731665679572   |
+------+---------+---------------------+--------------------+

增量任务,软删除(推荐)

启动任务

cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled

[filter]
do_dbs=test_db
do_events=insert,update,delete

[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
batch_size=5000

[parallelizer]
parallel_type=table
parallel_size=8

[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini 

修改源数据

mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307

DELETE FROM test_db.tb_1 WHERE id=3;

检查结果

mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "

SELECT * FROM test_db.tb_1;
+------+---------+---------------------+--------------------+
| id   | value   | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+---------+---------------------+--------------------+
|    2 | 2000000 |                NULL |    1731665679461   |
|    3 |       3 |                   1 |    1731665747381   |
|    4 |       4 |                NULL |    1731665609236   |
|    5 |       5 |                NULL |    1731665679572   |
+------+---------+---------------------+--------------------+

工作原理

我们使用 Stream Load 从 MySQL/Postgres 导入数据。需要配置 url(查询元数据)和 stream_load_url(指定 Stream Load 端口和用户信息)。

通过 Stream Load 将数据导入 StarRocks 时,需要避免频繁的小批量导入,因为这可能会导致 StarRocks 中的限流错误。你可以通过配置 batch_sink_interval_secs 来解决这个问题,具体参考 任务模板。通常,只有增量任务需要配置 batch_sink_interval_secs。

Stream Load 允许一次导入最多 10GB 数据。你可以更改以下配置以调整批量数据大小来提高性能。

[pipeline]
buffer_size=100000
buffer_memory_mb=200

[sinker]
batch_size=5000

数据类型映射

MySQL StarRocks
tinyint TINYINT
tinyint unsigned SMALLINT
smallint SMALLINT
smallint unsigned INT
mediumint INT
mediumint unsigned BIGINT
int INT
int unsigned BIGINT
bigint BIGINT
bigint unsigned LARGEINT
decimal DECIMAL
float FLOAT
double DOUBLE
bit BIGINT
datetime DATETIME
time VARCHAR
date DATE
year INT
timestamp DATETIME
char CHAR
varchar VARCHAR
binary VARBINARY
varbinary VARBINARY
tinytext STRING
text STRING
mediumtext STRING
longtext STRING
tinyblob VARBINARY
blob VARBINARY
mediumblob VARBINARY
longblob VARBINARY
enum VARCHAR
set VARCHAR
json JSON/STRING

举例

  • 在 MySQL 中建表:
CREATE TABLE test_db.one_pk_no_uk ( 
    f_0 tinyint, 
    f_0_1 tinyint unsigned, 
    f_1 smallint, 
    f_1_1 smallint unsigned, 
    f_2 mediumint,
    f_2_1 mediumint unsigned, 
    f_3 int, 
    f_3_1 int unsigned, 
    f_4 bigint, 
    f_4_1 bigint unsigned, 
    f_5 decimal(10,4), 
    f_6 float(6,2), 
    f_7 double(8,3), 
    f_8 bit(64),
    f_9 datetime(6), 
    f_10 time(6), 
    f_11 date, 
    f_12 year, 
    f_13 timestamp(6) NULL, 
    f_14 char(255), 
    f_15 varchar(255), 
    f_16 binary(255), 
    f_17 varbinary(255), 
    f_18 tinytext, 
    f_19 text, 
    f_20 mediumtext, 
    f_21 longtext, 
    f_22 tinyblob, 
    f_23 blob, 
    f_24 mediumblob, 
    f_25 longblob, 
    f_26 enum('x-small','small','medium','large','x-large'), 
    f_27 set('a','b','c','d','e'), 
    f_28 json,
    PRIMARY KEY (f_0) );
  • ape-dts 将会在 StarRocks 中执行:
CREATE TABLE IF NOT EXISTS `test_db`.`one_pk_no_uk` (
    `f_0` TINYINT NOT NULL,
    `f_0_1` SMALLINT,
    `f_1` SMALLINT,
    `f_1_1` INT,
    `f_2` INT,
    `f_2_1` BIGINT,
    `f_3` INT,
    `f_3_1` BIGINT,
    `f_4` BIGINT,
    `f_4_1` LARGEINT,
    `f_5` DECIMAL(10, 4),
    `f_6` FLOAT,
    `f_7` DOUBLE,
    `f_8` BIGINT,
    `f_9` DATETIME,
    `f_10` VARCHAR(255),
    `f_11` DATE,
    `f_12` INT,
    `f_13` DATETIME,
    `f_14` CHAR(255),
    `f_15` VARCHAR(255),
    `f_16` VARBINARY,
    `f_17` VARBINARY,
    `f_18` STRING,
    `f_19` STRING,
    `f_20` STRING,
    `f_21` STRING,
    `f_22` VARBINARY,
    `f_23` VARBINARY,
    `f_24` VARBINARY,
    `f_25` VARBINARY,
    `f_26` VARCHAR(255),
    `f_27` VARCHAR(255),
    `f_28` JSON,
    `_ape_dts_is_deleted` BOOLEAN,
    `_ape_dts_timestamp` BIGINT
) PRIMARY KEY (`f_0`) DISTRIBUTED BY HASH(`f_0`);

硬删除 & 软删除

  • 由于 StarRocks 在处理删除操作时性能较差,因此不推荐使用硬删除。

  • 软删除的前提条件:

    • 目标表必须有 _ape_dts_is_deleted 列。
  • 硬删除的前提条件:

    • [parallelizer] parallel_type 必须为 rdb_merge。

支持的版本

我们在 StarRocks 2.5.4 和 3.2.11 上进行了测试。

对于 2.5.4,任务配置中的 [sinker] stream_load_url 应该使用 be_http_port 而不是 fe_http_port。

不支持在增量任务中的 DDL 同步

  • 当前还不支持此功能,DDL 事件会被忽略,我们可能在未来支持此功能。

从 Postgres 导入教程

1赞

老师~给你换了一个分类,有个实用工具类的在这里以后比较好找~

1赞