flink cdc3.0 同步mysql->sr出现报错

【详述】flink cdc3.0 同步mysql数据到sr
配置信息
source:
type: mysql
hostname: xxx
port: 3306
username: admin
password: xxxx
tables: flink_cdc_test..*
server-id: 360121348
server-time-zone: Asia/Shanghai

sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://xxx:9030?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&failOverReadOnly=false&serverTimezone=GMT%2B8
load-url: xxx:80
username: root
password: xxx
table.create.properties.replication_num: 1

pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 2
【是否存算分离】是
【StarRocks版本】3.2.4
【集群规模】3fe(1 follower+2observer)+ 1cn
【机器信息】万兆
【导入或者导出方式】Flink
【附件】
2024-04-09 03:42:54,177 ERROR com.starrocks.connector.flink.catalog.StarRocksCatalog [] - Failed to open StarRocks catalog

java.lang.RuntimeException: Failed to connect StarRocks via JDBC: jdbc:mysql://xxx?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&failOverReadOnly=false&serverTimezone=GMT%2B8.

at com.starrocks.connector.flink.catalog.StarRocksCatalog.open(StarRocksCatalog.java:72) ~[flink-cdc-pipeline-connector-starrocks-3.0.0.jar:3.0.0]

at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:67) ~[?:?]

at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:82) ~[?:?]

at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:149) ~[?:?]

at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123) ~[?:?]

at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121) ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1079) ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:689) ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]

at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source) ~[?:?]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_402]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_402]

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) ~[flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.18.0-amzn-0.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) ~[flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) ~[flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akkac7338d41-f490-4168-8915-b93738e6d904.jar:1.18.0-amzn-0]

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_402]

at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_402]

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_402]

at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_402]

Caused by: java.sql.SQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:899) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:824) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[flink-cdc-pipeline-connector-mysql-3.0.0.jar:3.0.0]

at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_402]

at java.sql.DriverManager.getConnection(DriverManager.java:247) ~[?:1.8.0_402]

at com.starrocks.connector.flink.catalog.StarRocksCatalog.getConnection(StarRocksCatalog.java:475) ~[flink-cdc-pipeline-connector-starrocks-3.0.0.jar:3.0.0]

at com.starrocks.connector.flink.catalog.StarRocksCatalog.open(StarRocksCatalog.java:69) ~[flink-cdc-pipeline-connector-starrocks-3.0.0.jar:3.0.0]

... 37 more

Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.


请问大佬解决了吗?碰到了类似的问题

有点忘了,好像是需要搞一个fe的proxy,我是k8s

我也是on k8s,不过是偶现了,经常就断链了

可能跟你的LB有关系,之前我用外网的LB,好像会出现这个,然后我搞了内网的LB,就没事了

明白,感谢大佬!