flink写StarRocks过程中报错

java.lang.RuntimeException: Writing records to StarRocks failed.
at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:291)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.writeRecord(StarRocksSinkManager.java:164)
at com.starrocks.connector.flink.table.StarRocksDynamicSinkFunction.invoke(StarRocksDynamicSinkFunction.java:105)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: java.io.IOException: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“current running txns on db 12003 is 100, larger than limit 100”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“bfca24b7-e4b8-48cb-9571-ba77ecfd3948”,“LoadBytes”:0,“StreamLoadPutTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:0,“TxnId”:-1,“LoadTimeMs”:0,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}

at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:273)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.access$000(StarRocksSinkManager.java:52)
at com.starrocks.connector.flink.manager.StarRocksSinkManager$1.run(StarRocksSinkManager.java:121)
at java.lang.Thread.run(Thread.java:748)

Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“current running txns on db 12003 is 100, larger than limit 100”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“bfca24b7-e4b8-48cb-9571-ba77ecfd3948”,“LoadBytes”:0,“StreamLoadPutTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:0,“TxnId”:-1,“LoadTimeMs”:0,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}

at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:87)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:259)

请问这个错误应该调大哪个参数

flink connector sink.buffer-flush.* 阈值调大些。

好的,谢谢~~~~

我目前也遇到了这个问题,请问我应该是调整sink.buffer-flush.interval-ms这个参数吗

sink.buffer-flush.*有三个参数,哪个先达到就触发sink,可以看下日志里面是先满足哪个条件触发的sink,然后调整下对应的参数


这样的日志信息怎么判断是满足哪个条件触发的

日志里面搜索_stream_load看下怎么触发的。上面的日志是单个db导入事物超过了限制,默认100

我没找到怎么触发的,只有报错be.WARNING (50.4 MB)
be.WARNING.log.20220720-112721 (50.4 MB)

导入频率太高了,建议每个批次10s+

sink.buffer-flush.interval-ms 这个参数设置15s的时候还是会报错,15s提交一次这个速度也算频繁吗,还是说提交频率受别的参数控制?

您使用的是哪种导入语义?at least once 语义下 flink connector sink.buffer-flush.* 阈值调大些,下面三个参数中除了interval-ms。其余两个业务要调大,三个参数中先到哪个都会触发flush数据

  "'sink.buffer-flush.max-rows' = '1000000'," 
  "'sink.buffer-flush.max-bytes' = '300000000'," 
  "'sink.buffer-flush.interval-ms' = '5000'," 

Message":“current running txns on db 14003 is 100, larger than limit 100”

请问这个错误是针对单表的导入频率才对吧,数据库里某个表的的频繁导入会影响其他表吗

全局的,会影响其它表

可以改大,或是减少导入频率

默认100的限制可以修改吗,怎么修改,我看BE CPU Idle这项监控数值,基本上还很充足。