flink cdc oracle 2 starrock 无效

oracle11 的数据同步到 starrocks3
使用的flink-sql-connector-oracle-cdc-2.3.0.jar和 flink-connector-starrocks-1.2.6_flink-1.15.jar
提交flinksql后, taskmanager日志:
2023-05-10 15:10:37,363 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request ceaedaaf6e1faa217485b346ef681a9b for job 5242bc16d1029c8c25c4537d18653c6c from resource manager with leader id 00000000000000000000000000000000.

2023-05-10 15:10:37,367 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for ceaedaaf6e1faa217485b346ef681a9b.

2023-05-10 15:10:37,368 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 5242bc16d1029c8c25c4537d18653c6c for job leader monitoring.

2023-05-10 15:10:37,369 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_2 with leader id 00000000-0000-0000-0000-000000000000.

2023-05-10 15:10:37,381 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration

2023-05-10 15:10:37,395 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_2 for job 5242bc16d1029c8c25c4537d18653c6c.

2023-05-10 15:10:37,396 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 5242bc16d1029c8c25c4537d18653c6c.

2023-05-10 15:10:37,397 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 5242bc16d1029c8c25c4537d18653c6c.

2023-05-10 15:10:37,421 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot ceaedaaf6e1faa217485b346ef681a9b.

2023-05-10 15:10:37,432 INFO org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - Creating a changelog storage with name ‘memory’.

2023-05-10 15:10:37,445 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: B2CUAT__STU_src[1] -> ConstraintEnforcer[2] -> Sink: B2CUAT__STU_sink[2] (1/1)#0 (c4d1f505862a83f91e5221a4cf929b91), deploy into slot with allocation id ceaedaaf6e1faa217485b346ef681a9b.

2023-05-10 15:10:37,446 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: B2CUAT__STU_src[1] -> ConstraintEnforcer[2] -> Sink: B2CUAT__STU_sink[2] (1/1)#0 (c4d1f505862a83f91e5221a4cf929b91) switched from CREATED to DEPLOYING.

2023-05-10 15:10:37,450 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: B2CUAT__STU_src[1] -> ConstraintEnforcer[2] -> Sink: B2CUAT__STU_sink[2] (1/1)#0 (c4d1f505862a83f91e5221a4cf929b91) [DEPLOYING].

2023-05-10 15:10:37,451 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot ceaedaaf6e1faa217485b346ef681a9b.

2023-05-10 15:10:37,453 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 5242bc16d1029c8c25c4537d18653c6c/p-7ae43ac11b505732642e76168fb8966c68a7a6e0-30003bfecc0a7241f0b519d70474dc35 from jobmanager/172.18.0.3:6124

2023-05-10 15:10:37,465 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 5242bc16d1029c8c25c4537d18653c6c/p-027ab2f212c11e0f1cb7a73827d21ca9d71679a8-011f50ea71264055eb1ae3420f273859 from jobmanager/172.18.0.3:6124

2023-05-10 15:10:37,653 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@281759ed

2023-05-10 15:10:37,654 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend

2023-05-10 15:10:37,655 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to ‘jobmanager’

2023-05-10 15:10:37,665 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: B2CUAT__STU_src[1] -> ConstraintEnforcer[2] -> Sink: B2CUAT__STU_sink[2] (1/1)#0 (c4d1f505862a83f91e5221a4cf929b91) switched from DEPLOYING to INITIALIZING.

2023-05-10 15:10:37,848 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name ‘currentEmitEventTimeLag’. Metric will not be reported.[172.18.0.5, taskmanager, 172.18.0.5:37053-afb73d, insert-into_default_catalog.xe.B2CUAT__STU_sink, Source: B2CUAT__STU_src[1], 0]

2023-05-10 15:10:37,848 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name ‘sourceIdleTime’. Metric will not be reported.[172.18.0.5, taskmanager, 172.18.0.5:37053-afb73d, insert-into_default_catalog.xe.B2CUAT__STU_sink, Source: B2CUAT__STU_src[1], 0]

2023-05-10 15:10:37,876 INFO com.starrocks.connector.flink.manager.StarRocksSinkManagerV2 [] - Flink-StarRocks-Sink-Manager start, enableAutoCommit: true, streamLoader: com.starrocks.data.load.stream.TransactionStreamLoader

2023-05-10 15:10:37,876 INFO com.starrocks.connector.flink.manager.StarRocksSinkManagerV2 [] - manager running, scanningFrequency : 50

2023-05-10 15:10:38,026 INFO com.starrocks.data.load.stream.DefaultStreamLoader [] - Default Stream Loader start, properties : {“connectTimeout”:1000,“defaultTableProperties”:{“chunkLimit”:3221225472,“dataFormat”:{},“database”:“xe”,“enableUpsertDelete”:true,“properties”:{“db”:“xe”,“table”:“B2CUAT__STU”},“table”:“B2CUAT__STU”,“uniqueKey”:“xe-B2CUAT__STU”},“enableTransaction”:true,“expectDelayTime”:15000,“headers”:{“format”:“json”,“strip_outer_array”:“true”},“ioThreadCount”:2,“jdbcUrl”:“jdbc:mysql://starrocks:9030”,“loadUrls”:[“http://starrocks:8030”],“maxCacheBytes”:94371840,“oldThreshold”:0.9,“opAutoProjectionInJson”:true,“regionBufferRatio”:0.6,“scanningFrequency”:50,“socketTimeout”:0,“starRocksVersion”:{“major”:3,“minor”:0,“patch”:0},“tablePropertiesMap”:{},“username”:“root”,“version”:“3.0.0 48f4d81d7b”,“writingThreshold”:50,“youngThreshold”:0.1}, defaultHeaders : [{“elements”:[{“name”:“json”,“parameterCount”:0,“parameters”:[]}],“name”:“format”,“value”:“json”},{“elements”:[{“name”:“Basic cm9vdDo”,“parameterCount”:0,“parameters”:[],“value”:""}],“name”:“Authorization”,“value”:“Basic cm9vdDo=”},{“elements”:[{“name”:“true”,“parameterCount”:0,“parameters”:[]}],“name”:“strip_outer_array”,“value”:“true”},{“elements”:[{“name”:“100-continue”,“parameterCount”:0,“parameters”:[]}],“name”:“Expect”,“value”:“100-continue”},{“elements”:[{“name”:“600”,“parameterCount”:0,“parameters”:[]}],“name”:“timeout”,“value”:“600”}]

2023-05-10 15:10:38,088 INFO com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2 [] - Open sink function v2

2023-05-10 15:10:38,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: B2CUAT__STU_src[1] -> ConstraintEnforcer[2] -> Sink: B2CUAT__STU_sink[2] (1/1)#0 (c4d1f505862a83f91e5221a4cf929b91) switched from INITIALIZING to RUNNING.

但是oracle 表中的初始化数据和增量数据都没有被同步到starrocks中, 日志显示到这里后就不继续了。

使用flink cdc mysql 从mysql数据库同步数据到starrocks中全量和增量都正常, 也是类似上面的日志,但是会有全量调用starrocks http 和增量同步的日志信息。

为什么flink oracle cdc 却什么内容都没有?

flinksql 和starrocks sql 都是通过smt 配置到oracle库生成的。

以下是我oracle logminner的配置信息

oracle

mkdir /u01/app/oracle/oradata/recovery_area
chmod 777 /u01/app/oracle/oradata/recovery_area

sqlplus /nolog
CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = ‘/u01/app/oracle/oradata/recovery_area’ scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

CREATE TABLE b2cuat.stu ( “s_id” VARCHAR ( 255 ) PRIMARY KEY, “s_name” VARCHAR ( 255 ) );
ALTER TABLE b2cuat.stu ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

sqlplus sys/top_secret@localhost:1521/XE AS SYSDBA;
CREATE TABLESPACE logminer_tbs DATAFILE ‘/u01/app/oracle/oradata/XE/logminer_tbs.dbf’ SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;

GRANT CREATE TABLE TO flinkuser;
– need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

怀疑是oracle xe 11 版本logminner配置不正确, 但是看了半天就是这样配置的。
是不是flink-sql-connector-oracle-cdc-2.3.0的bug????

参考 [问题排查]导入失败相关 排查下读取source可以正常读取吗?参考cdc社区,确认下cdc版本和oracle版本的匹配度