kafka sink connecror 任务运行一段时间后空指针异常


【详述】kafka sink connecror 任务运行一段时间后空指针异常
【背景】通过kafka sink connecror 实时消费数据进入sr
【业务影响】
【是否存算分离】
【StarRocks版本】2.5
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be混部)
【表模型】例如:主键模型
【导入或者导出方式】kafka sink connecror
【联系方式】为了在解决问题过程中能及时联系到您获取一些日志信息,请补充下您的联系方式,例如:社区群4-小李或者邮箱,谢谢
【附件】

  • fe.log/be.INFO/相应截图
  • 完整的报错异常栈

看着是解析异常了 麻烦提供下 kafka中的数据类型和sink.properties.*的配置

kafka的数据类型是字符串,之前也怀疑过是数据问题,但是删掉任务重新提交后能正常消费,所以又觉得不是数据问题,非必须的

相关日志如下:
[2024-04-24 15:10:13,609] ERROR [sink-connector3|task-0] WorkerSinkTask{id=sink-connector3-0} Commit of offsets threw an unexpected exception for sequence number 11: null (org.apache.kafka.connect.runtime.WorkerSinkTask:277)
java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Failed to prepare transaction, please check taskmanager log for details, Transaction{database=‘’, table=='’, label=‘edcbfe19-c6d8-48c4-9046-208bbbfecd7b’, finish=false}
at com.starrocks.connector.kafka.StarRocksSinkTask.preCommit(StarRocksSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:417)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2024-04-24 15:10:14,408] ERROR [sink-connector3|task-0] Stream load failure 4 times, which bigger than maxRetryTimes 3 (com.starrocks.connector.kafka.StarRocksSinkTask:230)
[2024-04-24 15:10:14,409] ERROR [sink-connector3|task-0] WorkerSinkTask{id=sink-connector3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null (org.apache.kafka.connect.runtime.WorkerSinkTask:622)
java.lang.NullPointerException
at com.starrocks.connector.kafka.StarRocksSinkTask.put(StarRocksSinkTask.java:231)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTccask.iteration(WorkerSinkTask.java:242)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2024-04-24 15:10:14,409] INFO [sink-connector3|task-0] Stream load manager flush (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:377)
[2024-04-24 15:10:14,410] INFO [sink-connector3|task-0] All regions committed for savepoint, number of regions: 0 (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:191)
[2024-04-24 15:10:14,410] INFO [sink-connector3|task-0] Stream load manager flush (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:377)
[2024-04-24 15:10:14,410] INFO [sink-connector3|task-0] All regions committed for savepoint, number of regions: 0 (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:191)
[2024-04-24 15:10:14,410] ERROR [sink-connector3|task-0] WorkerSinkTask{id=sink-connector3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:624)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at com.starrocks.connector.kafka.StarRocksSinkTask.put(StarRocksSinkTask.java:231)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)
… 11 more
[2024-04-24 15:10:14,411] INFO [sink-connector3|task-0] Starrocks sink task stopped. version is 1.0.0 (com.starrocks.connector.kafka.StarRocksSinkTask:317)
[2024-04-24 15:10:14,411] INFO [sink-connector3|task-0] [Consumer clientId=connector-consumer-sink-connector3-0, groupId=connect-sink-connector3] Revoke previously assigned partitions czmk.nation.base_user-0, czmk.nation.battery_retire-0, czmk.nation.product_pack-0, czmk.nation.recover_sto-0, czmk.nation.replace_cell-0, czmk.nation.replace_module-0, czmk.nation.replace_pack-0, czmk.nation.replace_record-0, czmk.nation.sys_manufactory-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:343)
[2024-04-24 15:10:14,411] INFO [sink-connector3|task-0] [Consumer clientId=connector-consumer-sink-connector3-0, groupId=connect-sink-connector3] Member connector-consumer-sink-connector3-0-a0d47f25-985f-4994-9cb4-61b179d638a7 sending LeaveGroup request to coordinator kafka.evmam-tbrat.com:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1133)
[2024-04-24 15:10:14,412] INFO [sink-connector3|task-0] [Consumer clientId=connector-consumer-sink-connector3-0, groupId=connect-sink-connector3] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)
[2024-04-24 15:10:14,412] INFO [sink-connector3|task-0] [Consumer clientId=connector-consumer-sink-connector3-0, groupId=connect-sink-connector3] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-04-24 15:10:14,527] INFO [sink-connector3|task-0] [Consumer clientId=connector-consumer-sink-connector3-0, groupId=connect-sink-connector3] Node 0 sent an invalid full fetch response with extraIds=(jl-oKnCjTsGTUy9HBcnTbA), response=() (org.apache.kafka.clients.FetchSessionHandler:555)
[2024-04-24 15:10:14,528] INFO [sink-connector3|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)
[2024-04-24 15:10:14,528] INFO [sink-connector3|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:697)
[2024-04-24 15:10:14,528] INFO [sink-connector3|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)