求一个flink 写入SR的demo

求一个flink 写入SR的demo

demo/FlinkDemo/src/main/java/com/starrocks/flink at master · StarRocks/demo · GitHub

1赞

谢谢,现在有个问题求助一下


这个地址为什么不能通过我代码的地址访问,而是下图的ip,我访问数据的ip,都想通过跳板机
image
如果这个配置我改成跳板机的ip,fe注册的ip就是127.0.0.1

sunny, [2023-05-14 17:30]
2023-05-14 17:11:08
org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=300000,backoffTimeMS=10000,maxFailuresPerInterval=3)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:912)
at org.apache.flink.runtime.executiongraph.ExecutionVertex.markFailed(ExecutionVertex.java:474)
at org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations.markFailed(DefaultExecutionVertexOperations.java:41)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskDeploymentFailure(DefaultScheduler.java:609)
at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignAllResourcesAndRegisterProducedPartitions$6(DefaultScheduler.java:481)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:545)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:127)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:355)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:344)
at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:809)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$0(AkkaRpcActor.java:308)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)

sunny, [2023-05-14 17:30]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResource$8(DefaultScheduler.java:539)
… 39 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
… 37 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

一样的案例,我的demo怎么不行啊

fe.conf不用修改,你的程序需要可以访问到be节点的8040端口,跳板机需要增加到内网8040的映射

跳板机可以访问8040,关键现在是BE的配置是内网ip,demo里面请求的是跳板机的IP,最终代码访问是BE的内网IP,所有不通,现在放到flink里面任务是可以提交上去了,不过job启动不起来,又FLINK安装部署的资料吗?

flink安装部署可以参考这个链接中的“前提条件”部分 https://docs.starrocks.io/zh-cn/latest/unloading/Flink_connector

这个demo本地可以执行,放到flink就不行呢


这个demo本地可以了,但是放到flink就跑不通了

有什么报错信息么

TIS有针对 mysql-> StarRocks开箱即用的方案 https://tis.pub/docs/example/mysql-sync-starrocks

演示案例:
https://www.bilibili.com/video/BV1Ah411x7J8/?spm_id_from=888.80997.embed_other.whitelist&vd_source=fb348e27d4839f46057045d150b06dce