flink cdc写入starrocks报错

【详述】flink cdc写入starrocks报错,看日志无明显报错
【背景】
【StarRocks版本】例如:2.4.1
flink sql:
CREATE TABLE IF NOT EXISTS o_order_sub_src (
id BIGINT NOT NULL,
order_no_sub STRING NOT NULL,
order_no_parent STRING NOT NULL,
origin_order_no_sub STRING NULL,
shop_id STRING NULL,
shop_name STRING NULL,
ship_time TIMESTAMP NULL,
end_time TIMESTAMP NULL,
disct_amount DECIMAL(20, 0) NULL,
pay_amount DECIMAL(20, 0) NULL,
freight DECIMAL(20, 0) NULL,
total_amount DECIMAL(20, 0) NULL,
order_status SMALLINT NULL,
cancel_type TINYINT NULL,
cancel_time TIMESTAMP NULL,
client_addr_id STRING NULL,
client_addr STRING NULL,
recipient_name STRING NULL,
recipient_mobile STRING NULL,
courier_code STRING NULL,
courier_number STRING NULL,
courier_company STRING NULL,
delivered_time TIMESTAMP NULL,
coupon_code STRING NULL,
take_foods_code STRING NULL,
take_foods_time TIMESTAMP NULL,
cut_order_time TIMESTAMP NULL,
merchant_code STRING NULL,
merchant_type TINYINT NULL,
merchant_name STRING NULL,
on_white TINYINT NULL,
business_property INT NOT NULL,
device_on_white INT NOT NULL,
delivery_method INT NULL,
platform TINYINT NULL,
platform_version STRING NULL,
merchant_sub_name STRING NULL,
merchant_sub_id STRING NULL,
poi_type TINYINT NULL,
poi_code STRING NULL,
poi_name STRING NULL,
apply_code STRING NULL,
video_url STRING NULL,
split_account_status TINYINT NULL,
poi_id STRING NULL,
poi_address STRING NULL,
apply_open_time TIMESTAMP NULL,
device_open_time TIMESTAMP NULL,
deduct_flag TINYINT NULL,
order_type STRING NULL,
business_type STRING NULL,
operate_type TINYINT NULL,
create_id STRING NULL,
modifier_id STRING NULL,
gmt_create TIMESTAMP NULL,
gmt_modify TIMESTAMP NULL,
version SMALLINT NULL,
remark STRING NULL,
is_delete TINYINT NOT NULL,
PRIMARY KEY(id)
NOT ENFORCED
) with (
‘password’ = ‘123456’,
‘database-name’ = ‘test’,
‘table-name’ = ‘o_order_sub’,
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘ip’,
‘port’ = ‘3306’,
‘username’ = ‘root’
);

CREATE TABLE IF NOT EXISTS o_order_sub_sink (
id BIGINT NOT NULL,
order_no_sub STRING NOT NULL,
order_no_parent STRING NOT NULL,
origin_order_no_sub STRING NULL,
shop_id STRING NULL,
shop_name STRING NULL,
ship_time TIMESTAMP NULL,
end_time TIMESTAMP NULL,
disct_amount DECIMAL(20, 0) NULL,
pay_amount DECIMAL(20, 0) NULL,
freight DECIMAL(20, 0) NULL,
total_amount DECIMAL(20, 0) NULL,
order_status SMALLINT NULL,
cancel_type TINYINT NULL,
cancel_time TIMESTAMP NULL,
client_addr_id STRING NULL,
client_addr STRING NULL,
recipient_name STRING NULL,
recipient_mobile STRING NULL,
courier_code STRING NULL,
courier_number STRING NULL,
courier_company STRING NULL,
delivered_time TIMESTAMP NULL,
coupon_code STRING NULL,
take_foods_code STRING NULL,
take_foods_time TIMESTAMP NULL,
cut_order_time TIMESTAMP NULL,
merchant_code STRING NULL,
merchant_type TINYINT NULL,
merchant_name STRING NULL,
on_white TINYINT NULL,
business_property INT NOT NULL,
device_on_white INT NOT NULL,
delivery_method INT NULL,
platform TINYINT NULL,
platform_version STRING NULL,
merchant_sub_name STRING NULL,
merchant_sub_id STRING NULL,
poi_type TINYINT NULL,
poi_code STRING NULL,
poi_name STRING NULL,
apply_code STRING NULL,
video_url STRING NULL,
split_account_status TINYINT NULL,
poi_id STRING NULL,
poi_address STRING NULL,
apply_open_time TIMESTAMP NULL,
device_open_time TIMESTAMP NULL,
deduct_flag TINYINT NULL,
order_type STRING NULL,
business_type STRING NULL,
operate_type TINYINT NULL,
create_id STRING NULL,
modifier_id STRING NULL,
gmt_create TIMESTAMP NULL,
gmt_modify TIMESTAMP NULL,
version SMALLINT NULL,
remark STRING NULL,
is_delete TINYINT NOT NULL,
PRIMARY KEY(id)
NOT ENFORCED
) with (
‘load-url’ = ‘ip:8030’,
‘sink.buffer-flush.interval-ms’ = ‘15000’,
‘jdbc-url’ = ‘jdbc:mysql://ip:9030’,
‘password’ = ‘123456’,
‘connector’ = ‘starrocks’,
‘table-name’ = ‘order_sub_s’,

‘sink.properties.format’ = ‘json’,
‘username’ = ‘root’,
‘sink.properties.strip_outer_array’ = ‘true’,
‘database-name’ = ‘ods’
);

INSERT INTO o_order_sub_sink SELECT * FROM o_order_sub_src;

错误信息:
2022-12-12 17:42:52,057 ERROR com.starrocks.data.load.stream.DefaultStreamLoader [] - Stream load failed unknown, label : cbb2699d-1567-4c89-a5ff-839f505214e9
java.lang.NullPointerException: null
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257) ~[flink-connector-starrocks-1.2.4_flink-1.13_2.12.jar:?]
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113) ~[flink-connector-starrocks-1.2.4_flink-1.13_2.12.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2022-12-12 17:42:52,061 ERROR com.starrocks.data.load.stream.DefaultStreamLoader [] - Stream load failed, thread : I/O client dispatch - 9697b8a2-0a16-43c3-8d82-cc1b00dbefdd
java.lang.NullPointerException: null
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257) ~[flink-connector-starrocks-1.2.4_flink-1.13_2.12.jar:?]
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113) ~[flink-connector-starrocks-1.2.4_flink-1.13_2.12.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2022-12-12 17:42:52,061 ERROR com.starrocks.data.load.stream.DefaultStreamLoadManager [] - Stream load failed
java.lang.NullPointerException: null
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257) ~[flink-connector-starrocks-1.2.4_flink-1.13_2.12.jar:?]
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113) ~[flink-connector-starrocks-1.2.4_flink-1.13_2.12.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2022-12-12 17:42:53,475 INFO com.starrocks.data.load.stream.DefaultStreamLoadManager [] - manager report, current Bytes : 2943, waitQ : 0, prepareQ : 1, commitQ : 0
2022-12-12 17:42:58,483 INFO com.starrocks.data.load.stream.DefaultStreamLoadManager [] - manager report, current Bytes : 2943, waitQ : 0, prepareQ : 1, commitQ : 0
2022-12-12 17:43:03,490 INFO com.starrocks.data.load.stream.DefaultStreamLoadManager [] - manager report, current Bytes : 2943, waitQ : 0, prepareQ : 1, commitQ : 0
2022-12-12 17:43:08,497 INFO com.starrocks.data.load.stream.DefaultStreamLoadManager [] - manager report, current Bytes : 2943, waitQ : 0, prepareQ : 1, commitQ : 0
2022-12-12 17:43:13,504 INFO com.starrocks.data.load.stream.DefaultStreamLoadManager [] - manager report, current Bytes : 2943, waitQ : 0, prepareQ : 1, commitQ : 0
2022-12-12 17:43:18,511 INFO com.starrocks.data.load.stream.DefaultStreamLoadManager [] - manager report, current Bytes : 2943, waitQ : 0, prepareQ : 1, commitQ : 0

麻烦去到可flink 任务下搜下 http://$fe:${http_port}/api/$db/$tbl/_stream_load 关键字,看是否触发了 Stream Load 任务的的启动,任务结果也在这块,可以提供下这里的截图

我不确定是我代码写的不对,还是什么问题,我在本地测试的时候也遇到了这个问题



兄弟我看你导入格式是json
不需要指定导入的映射关系吗?
image

端口写错了,load-url里面的端口是http_port,默认8030

我重新写了个demo试了一下,还是会报错


Exception in thread “main” org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
… 4 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 183920,
“Label”: “2bf642b7-a994-4594-9df5-04162a34c2b8”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 15,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 0,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:168)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 183920,
“Label”: “2bf642b7-a994-4594-9df5-04162a34c2b8”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 15,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 0,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}
at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:267)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:180)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
… 3 more
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 183920,
“Label”: “2bf642b7-a994-4594-9df5-04162a34c2b8”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 15,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 0,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:270)
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
… 1 more
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 183920,
“Label”: “2bf642b7-a994-4594-9df5-04162a34c2b8”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 15,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 0,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}
at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:267)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.finish(StarRocksDynamicSinkFunctionV2.java:175)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.finish(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
… 12 more
[CIRCULAR REFERENCE:com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 183920,
“Label”: “2bf642b7-a994-4594-9df5-04162a34c2b8”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 15,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 0,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}]

Process finished with exit code 1

sr的建表语句发下

CREATE TABLE xasj_24.sinktest (
id varchar(32) NOT NULL,
value varchar(32) NOT NULL,
create_time varchar(32) NOT NULL
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH( id,value ) BUCKETS 10
PROPERTIES (‘replication_num’ = ‘1’);
我的环境是单机的,所以这个replication_num 设置的是 1

sink之前打印下写入的数据,应该是写入的数据有问题

是顺序不对吗

加下下面的sink参数看看

"sink.properties.strict_mode", "true"

加上了还是和之前一样的

代码发下?另外具体的报错还是跟之前一样吗?

另外使用的flink-connector-starrocks版本是哪个?

报错信息和之前的是一样的
StarRocks版本:2.2.0
flink-connector-starrocks版本:1.2.5_flink-1.15
flink版本:1.14.2
代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<String> starRocksSource = env.fromCollection(Arrays.asList(JSONObject.toJSONString(new Test("1", "test1", "2023-01-01 13:13:13")),
            JSONObject.toJSONString(new Test("2", "test2", "2023-01-01 13:14:13")),
            JSONObject.toJSONString(new Test("3", "test3", "2023-01-01 13:15:13"))));
    starRocksSource.print("--->SR");
    DataStreamSink<String> starRocks = starRocksSource.addSink(StarRocksSink.sink(
            StarRocksSinkOptions.builder()
                    .withProperty("jdbc-url", "jdbc:mysql://192.168.3.35:9030")
                    .withProperty("load-url", "192.168.3.35:8030")
                    .withProperty("username", "root")
                    .withProperty("password", "xasj@2020")
                    .withProperty("table-name", "sinktest")
                    .withProperty("database-name", "xasj_24")
                    // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
                    // .withProperty("sink.properties.partial_update", "true")
                    // .withProperty("sink.properties.columns", "k1,k2,k3")
                    .withProperty("sink.properties.format", "json")
                    .withProperty("sink.properties.strip_outer_array", "true")
                    .withProperty("sink.properties.strict_mode", "true")
                    // 设置并行度,多并行度情况下需要考虑如何保证数据有序性
                    .withProperty("sink.parallelism", "1")
                    .build()
            )
    );

    env.execute();
}

报错信息:
—>SR:2> {“id”:“1”,“value”:“test1”,“create_time”:“2023-01-01 13:13:13”}
—>SR:3> {“id”:“2”,“value”:“test2”,“create_time”:“2023-01-01 13:14:13”}
—>SR:4> {“id”:“3”,“value”:“test3”,“create_time”:“2023-01-01 13:15:13”}
Exception in thread “main” org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
… 4 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 184364,
“Label”: “f1cf4efc-ae18-40e1-b789-897fed794d51”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 9,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 1,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:168)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: {
“TxnId”: 184364,
“Label”: “f1cf4efc-ae18-40e1-b789-897fed794d51”,
“Status”: “Fail”,
“Message”: “all partitions have no load data”,
“NumberTotalRows”: 0,
“NumberLoadedRows”: 0,
“NumberFilteredRows”: 0,
“NumberUnselectedRows”: 0,
“LoadBytes”: 64,
“LoadTimeMs”: 9,
“BeginTxnTimeMs”: 0,
“StreamLoadPutTimeMs”: 1,
“ReadDataTimeMs”: 0,
“WriteDataTimeMs”: 3,
“CommitAndPublishTimeMs”: 0
}

可以暂时使用1.2.3_fink-1.14_2.12(2.12是scala的版本,如果是2.11的scala,这块换成1.2.3_flink-1.14_2.11),这个问题我们跟进排查修复下

ok,换成1.2.3_flink-1.14_2.11可以正常使用

你好,我又遇到一个问题,就是1.2.3_flink-1.14_2.11使用springboot项目启动的时候会报一个依赖冲突的问题,我一开始以为是和项目的其他jar包冲突,但是将其他依赖中的冲突内容排除后还是无法启动,我通过新建一个springboot项目测试,发现确实启动不了,冲突的内容是jackson-databind里面的SimpleModule.class找不到


然后我看到一个博客(博客地址:https://blog.csdn.net/hzymarine/article/details/126527067)和我的问题是差不多的,它是自己手动进行了打包,我也试了自己打包,但是我打包出来的就只有com.starrocks.connection包,其他内容没有,
image
下面是一个干净的springboot测试项目引用的依赖,springboot版本是2.3.5.release
<?xml version="1.0" encoding="UTF-8"?>

4.0.0

org.springframework.boot
spring-boot-starter-parent
2.3.5.RELEASE


com.example
demo
0.0.1-SNAPSHOT
demo
Demo project for Spring Boot

<dependencies>

    <dependency>
        <groupId>com.starrocks</groupId>
        <artifactId>flink-connector-starrocks</artifactId>
        <version>1.2.3_flink-1.14_2.11</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>jackson-databind</artifactId>
                <groupId>com.fasterxml.jackson.core</groupId>
            </exclusion>
        </exclusions>
    </dependency>


    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

在最新的connector里shade了jackson,你可以换成1.2.5试下,同时为了解决你之前遇到的"all partitions have no load data"问题,可以在sink上加一个配置"sink.version" = “V1”