Ogg+Flink 实现Oracle数据实时同步到StarRocks

背景

实现Oracle实时同步

ogg架构

环境准备

StarRocks:

Fe:172.26.92.139:/home/disk2/jingdan/starrocks-1.18.2-fe

Be:172.26.92.139/172.26.92.154/172.26.92.155:/home/disk2/jingdan/starrocks-1.18.2-be

Oracle:

172.26.92.139:docker exec -it oracle bash

Ogg:Oracle GoldenGate 11.1.1.1.0

Ogg Bigdata:Oracle GoldenGate for Big Data 12.2.0.1.0

版本 OGG版本 ip
Oracle 11.2.0.1.0 Oracle GoldenGate 11.1.1.1.2 for Oracle on Linux x86-64 172.26.92.139:docker exec -it oracle bash
目的 kafka_2.12-2.5.0 Oracle GoldenGate for Big Data 12.2.0.1.0 on Linux x86-64 172.26.92.141

模拟测试

Ogg安装配置

1 源端配置(172.26.92.139)

1.1 解压
docker exec -it oracle bash
su - (密码helowin)
mkdir -p /opt/ogg
unzip V28941-01.zip
tar -xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
chown oracle.oinstall /opt/ogg
1.2 配置环境变量
vi /etc/profile

export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
export PATH=$OGG_HOME:$PATH

source /etc/profile

ggsci #测试下ogg命令
1.3 Oracle 打开归档模式
docker exec -it oracle bash
sqlplus / as sysdba
archive log list #查看是否为归档模式
"""
Database log mode               No Archive Mode
Automatic archival               Disabled
"""
#以上输出为Disabled则需手动打开
conn /as sysdba
shutdown immediate
startup mount
alter database archivelog; (更改数据库为归档模式)
alter database open; 
alter system archive log start; (启用自动归档)

1.4 Oracle打开日志开关

OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态

select force_logging, supplemental_log_data_min from v$database;#查看日志状态,默认为NO
"""
FORCE_ SUPPLEMENTAL_LOG
------ ----------------
NO     NO
"""
alter database force logging;
alter database add supplemental log data;

1.5 创建复制用户
create tablespace oggtbs datafile 'oggtbs01.dbf' size 1000M autoextend on;
create user ogg identified by ogg default tablespace oggtbs;
grant dba to ogg;
1.6 OGG初始化
cd /opt/ogg
ggsci
create subdirs
"""
Creating subdirectories under current directory /opt/ogg

Parameter files                /opt/ogg/dirprm: created
Report files                   /opt/ogg/dirrpt: created
Checkpoint files               /opt/ogg/dirchk: created
Process status files           /opt/ogg/dirpcs: created
SQL script files               /opt/ogg/dirsql: created
Database definitions files     /opt/ogg/dirdef: created
Extract data files             /opt/ogg/dirdat: created
Temporary files                /opt/ogg/dirtmp: created
Stdout files                   /opt/ogg/dirout: created
"""

1.7 创建测试表
create user test_ogg  identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));
1.8 OGG配置
1.8.1 配置全局变量
GGSCI (877a2c5b9435) 1> dblogin userid ogg password ogg
GGSCI (877a2c5b9435) 2> edit param ./globals
oggschema ogg
1.8.2 配置管理器mgr

说明:PORT即mgr的默认监听端口;DYNAMICPORTLIST动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;AUTORESTART重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;PURGEOLDEXTRACTS即TRAIL文件的定期清理

GGSCI (877a2c5b9435) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
1.8.3 添加同步表
GGSCI (877a2c5b9435) 4> add trandata test_ogg.test_ogg

GGSCI (877a2c5b9435) 5> info trandata test_ogg.test_ogg
Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG
1.8.4 配置抽取进程

说明:第一行指定extract进程名称;dynamicresolution动态解析;SETENV设置环境变量,这里分别设置了Oracle数据库以及字符集;userid ggs,password ggs即OGG连接Oracle数据库的帐号密码,这里使用2.5中特意创建的复制帐号;exttrail定义trail文件的保存位置以及文件名,注意这里文件名只能是2个字母,其余部分OGG会补齐;table即复制表的表名,支持*通配,必须以;结尾

GGSCI (877a2c5b9435) 6> edit param extkafka
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

添加extract进程:

GGSCI (877a2c5b9435) 7> add extract extkafka,tranlog,begin now
EXTRACT added.

添加trail文件的定义与extract进程绑定

GGSCI (877a2c5b9435) 8> add exttrail /opt/ogg/dirdat/to,extract extkafka
1.8.5 配置发送进程

把trail文件传递到目标端

说明:第一行指定extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称

GGSCI (877a2c5b9435) 9> edit param pukafka
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 172.26.92.141 mgrport 7809
rmttrail /home/disk1/starrocks/opt/ogg/dirdat/to
table test_ogg.test_ogg;

分别将本地trail文件和目标端的trail文件绑定到extract进程:

GGSCI (877a2c5b9435) 10> add extract pukafka,exttrailsource /opt/ogg/dirdat/to
EXTRACT added.
GGSCI (877a2c5b9435) 11> add rmttrail /home/disk1/starrocks/opt/ogg/dirdat/to,extract pukafka
RMTTRAIL added.
1.8.6 配置define文件

Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在OGG命令行执行:

GGSCI (877a2c5b9435) 12> edit param test_ogg
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

在OGG主目录下执行(oracle用户):

./defgen paramfile dirprm/test_ogg.prm
#将生成的/opt/ogg/dirdef/test_ogg.test_ogg发到目的端
scp -r /opt/ogg/dirdef/test_ogg.test_ogg starrocks@172.26.92.141: /home/disk1/starrocks/opt/ogg/dirdef/

2 目的端配置

2.1 解压初始化OGG
mkdir -p /home/disk1/starrocks/opt/ogg
unzip V100447-01.zip
tar -xf ggs_Adapters_Linux_x64.tar -C /home/disk1/starrocks/opt/ogg

配置环境变量

vim ~/.bashrc

export OGG_HOME=/home/disk1/starrocks/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$LD_LIBRARY_PATH:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH

source ~/.bashrc

初始化OGG

cd /home/disk1/starrocks/opt/ogg
ggsci
GGSCI (daily01) 1> create subdirs
2.2 配置管理器mgr
GGSCI (daily01) 2> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
2.3 配置checkpoint

checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可

GGSCI (daily01) 3> edit  param  ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
2.4 配置同步进程

说明:REPLICATE rekafka定义rep进程名称;sourcedefs即在4.6中在源服务器上做的表映射文件;TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;REPORTCOUNT即复制任务的报告生成频率;GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系

GGSCI (daily01) 4> edit param rekafka
REPLICAT rekafka
sourcedefs /home/disk1/starrocks/opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
2.5 配置kafka.props

其中需要将后面的注释去掉

cd /home/disk1/starrocks/opt/ogg/dirprm/
vim kafka.props
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicName=test_ogg //kafka的topic名称,无需手动创建
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
gg.classpath=dirprm/:/home/disk1/starrocks/thirdparty/kafka_2.12-2.5.0/libs/*:/home/disk1/starrocks/opt/ogg/:/home/disk1/starrocks/opt/ogg/lib/*
vim custom_kafka_producer.properties
bootstrap.servers=172.26.92.141:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000 //重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
2.6 添加trail文件到replicate进程
GGSCI (daily01) 1> add replicat rekafka exttrail

测试

1. 启动所有进程

启动顺序:源mgr——目标mgr——源extract——源pump——目标replicate

源:

ggsci>start mgr
ggsci>start extkafka
ggsci>start pukafka

目的:

ggsci>start mgr
ggsci>start rekafka

2. 测试同步效果

源执行sql

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

目的端查看kafka消息

遇到问题

1. TNS:permission denied

OGG-00664 Oracle GoldenGate Capture for Oracle, extkafka.prm: OCI Error during OCIServerAttach (status = 12546-ORA-12546: TNS:permission denied).

原因:

怀疑是ogg版本高于Oracle版本导致

解决方法:

安装低版本ogg(原:11.2.1.0.3 现在:11.1.1.1.2)

2. 无法获取kafka消息

Failed to set property: topicMappingTemplate:=“test_ogg”

2021-08-18 11:11:36  ERROR   OGG-15051  Oracle GoldenGate Delivery, rekafka.prm:  Java or JNI exception:
oracle.goldengate.util.GGException:  Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: topicMappingTemplate:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
 Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: topicMappingTemplate:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
        at oracle.goldengate.datasource.DataSourceLauncher.<init>(DataSourceLauncher.java:161)
        at oracle.goldengate.datasource.UserExitMain.main(UserExitMain.java:108)
Caused by: org.springframework.beans.factory.BeanDefinitionStoreException: Factory method [public final oracle.goldengate.datasource.GGDataSource oracle.goldengate.datasource.factory.DataSourceFactory.getDataSource()] threw exception; nested exception is oracle.goldengate.util.ConfigException: Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: topicMappingTemplate:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
        at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:169)
        at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:570)
        ... 11 more
Caused by: oracle.goldengate.util.ConfigException: Unable to set property on handler 'kafkahandler' (orac
le.goldengate.handler.kafka.KafkaHandler). Failed to set property: topicMappingTemplate:="test_ogg" (clas
s: oracle.goldengate.handler.kafka.KafkaHandler).
        at oracle.goldengate.datasource.conf.DsHandler.initListener(DsHandler.java:246)
        at oracle.goldengate.datasource.conf.DsHandler.getDataSourceListener(DsHandler.java:212)
        at oracle.goldengate.datasource.factory.DataSourceFactory.getHandlers(DataSourceFactory.java:255)
        at oracle.goldengate.datasource.factory.DataSourceFactory.getDataSource(DataSourceFactory.java:14
8)

原因:

Ogg bigdata新版本对于kafka handler有过修改

解决方法:

Ogg bigdata 12.3.0.1.0之前需要这样设置kafka topic

gg.handler.kafkahandler.topicName=test_ogg

12.3.0.1.0之后需要这样设置kafka topic

gg.handler.kafkahandler.topicMappingTemplate=test_ogg

3. Error loading Java VM runtime library

ERROR OGG-15050 Oracle GoldenGate Delivery, rekafka.prm: Error loading Java VM runtime library: (2 No such file or directory)

原因:

JAVA_HOME未设置

解决方法:

vim ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-1.8.0
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$LD_LIBRARY_PATH

4. 字符集问题

The source database character set is unknown, and the SOURCECHARSET parameter is not specified

原因:

字符集未设置

解决方法:

ogg/dirprm/rekafka.prm添加以下内容

SOURCECHARSET AL32UTF8
1赞

当前实现了Oracle数据同步到kafka,后续补充kafka通过flink同步到starrocks

现在flink-connector-starrock 支持ogg格式的数据直接导入了吗?

你好,目前还不支持ogg格式,需要做一些etl

可以了解下flink-cdc的oracle connector,配合flink-connector-starrock可以实现实时同步

现在flink cdc 支持oracle connector了 这个是哪个版本的

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html 试下这里有个snapsot版本的connector

ogg json 格式的数据怎么通过flink导入starrocks

正在做flink-cdc实现直接从oracle同步到starrocks,不需要经过kafka

[quote=“jingdan, post:9, topic:765”]
同步到starrocks,不需要经过kafka
[/quote]我们现在数据就在kafka里面,是ogg格式的 自己定义了一个ogg 的format ,可以直接通过flinksql 导入吗

需要咱们自己实现写入starrocks的语义,假如kafka中的数据是:
{
before: {k1, k2},
after: {k1,k2}
}

需要在flink中自己写程序将其转换为两条记录:

stream.collect({k1,k2,__op:1})

stream.collect({k1,k2,__op:0})

请问下 对于kafka—>flink—>starrocks数据顺序是如何进行保证的呢

ogg update 并没有 推送全部字段,starRocks 的 模型为 unique 模型 会导致 其他字段为null 怎么破