FLINK读取KAFKA写入SR报 “Message”:“Cancelled FileScanNode::get_next”。

【详述】问题详细描述
【导入/导出方式】
1、FLINK读取KAFKA写入SR,70万消息,单并发,20秒,20万数据,都报 “Message”:“Cancelled FileScanNode::get_next”。

2、用SR消费KAFKA,20s ,20万一批,也有abortedTaskNum。
{“receivedBytes”:2286034057,“errorRows”:0,“committedTaskNum”:10500,“loadedRows”:1915595,“loadRowsRate”:0,“abortedTaskNum”:1639,“totalRows”:1915595,“unselectedRows”:0,“receivedBytesRate”:67000,“taskExecuteTimeMs”:33816164}

【背景】做过哪些操作?
【业务影响】
【StarRocks版本】例如:2.0.1
【集群规模】例如:1fe +3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【附件】

  • fe.warn.log/be.warn.log/相应截图

FE报错:
WARNING: correlationId:568946 timeout with bound channel =>[id: 0x762dca9b, L:/10.0.1.23:55698 - R:/10.0.2.24:8060]
May 26, 2022 10:53:59 AM com.baidu.jprotobuf.pbrpc.transport.RpcTimerTask run
WARNING: correlationId:568947 timeout with bound channel =>[id: 0x762dca9b, L:/10.0.1.23:55698 - R:/10.0.2.24:8060]
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

BE报错:
最多的报错:W0609 18:52:35.290482 1614 thread.cpp:221] failed to set thread name: UpdateApplyThreadPool

附件中的日志都不是出现问题的根本原因,routine load的创建语句脱敏发一下,数据质量确定是没问题的是吧。

CREATE ROUTINE LOAD db1.label699 ON table1
COLUMNS(c1, c2, c3, c4, c5, c6,…… …… ,c40)
PROPERTIES
(
“desired_concurrent_number” = “3”,
“max_batch_interval” = “20”,
“max_error_number” = “10”,
“strict_mode” = “false”,
“format” = “json”,
“jsonpaths” = “[”$.c1", “$.c2”, “$.c3”, “$.c4”, “$.c5”, “$.c6”,…… ……, “$.c40”]"
)
FROM KAFKA
(
“kafka_broker_list” = “10.20.1.1:9092,10.20.1.2::9092,10.20.1.3:9092”,
“kafka_topic” = “json_kafka”,
“property.group.id” = “starrocks-group-label699”,
“property.client.id” = “starrocks-client”,
“kafka_partitions” = “0,1,2”,
“kafka_offsets” = “3954670,3949080,3953600”
);

ALTER ROUTINE LOAD FOR db1.label699
PROPERTIES
(
    “strict_mode” = “true”
);

您将参数设置成这个,然后执行routine load。看下报错信息

你说是数据质量问题?我用datax补数进去了,应该不是这个问题。

FE还有报接收包错误的问题:

2022-06-10 09:23:56,860 WARN (starrocks-mysql-nio-pool-3232|82703) [ReadListener.lambda$handleEvent$417():63] Exception happened in one session(com.starrocks.mysql.nio.NConnectContext@3c0ff646).
java.io.IOException: Error happened when receiving packet.
at com.starrocks.qe.ConnectProcessor.processOnce(ConnectProcessor.java:642) ~[starrocks-fe.jar:?]
at com.starrocks.mysql.nio.ReadListener.lambda$handleEvent$417(ReadListener.java:55) ~[starrocks-fe.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_291]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_291]

FLINK写SR,失败,FE把FLINK连接杀了
Jun 9, 2022 @ 18:02:27.967 [ConnectContext.checkTimeout():470] kill wait timeout connection, remote: 10.10.0.10:53486, wait timeout: 28800
Jun 9, 2022 @ 18:02:27.967 [ConnectContext.kill():444] kill timeout query, 10.10.0.10:53486, kill connection: true

在10.0.2.24这台be执行下下面语句
ss -antpl|grep 8060多执行几次截图下
cat /proc/sys/net/core/somexconn截图下
netstat -s|grep overflowed多执行几次截图下

看下be节点有其他报错的信息么,把日志上下文附件贴下把。

cat /proc/sys/net/core/somexconn 的值是 4096

netstat -s|grep overflowed 没有输出
image

ss -antpl|grep 8060多执行几次截图下
netstat -s|grep overflowed多执行几次截图下

这两个命令多执行几次呢?另外是稳定复现的还是偶尔。