Kafka connect 到Starrocks报错

【详述】问题详细描述

当数据量在100条/s (不同topic),会出现事务 label为null的问题导致数据无法继续写入

【背景】做过哪些操作?
手动写入消息都是正常的,但是使用java程序进行写入就会出现这个问题
【StarRocks版本】3.1
【表模型】主键模型
【导入或者导出方式】kafka

  • 完整的报错异常栈
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: maintancetestdb, table: m_netstat_0, label: null,
    responseBody: {
    “Status”: “FAILED”,
    “Message”: “class com.starrocks.common.UserException: empty label.”
    }
    errorLog: null
    at com.starrocks.connector.kafka.StarRocksSinkTask.put(StarRocksSinkTask.java:289)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
    … 11 more
    Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: maintancetestdb, table: m_netstat_0, label: null,
    responseBody: {
    “Status”: “FAILED”,
    “Message”: “class com.starrocks.common.UserException: empty label.”
    }
    errorLog: null
    at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:428)
    at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
    at com.starrocks.connector.kafka.StarRocksSinkTask.preCommit(StarRocksSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:429)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:399)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:230)
    … 9 more

自己在写入时,指定一个label值呢

我不知道java往kafka写入消息的时候怎么指定label值,我尝试在发送消息的时候加一个RecordHeader,
record.headers().add(new RecordHeader(“label”, xxx);
然后给label指定一个值,但是并没有解决这个问题,问题还是会存在

不是在kafka的消息中加header,是在http请求中加label的header

那就是我使用java程序使用kafka的客户端直接写入消息不可行是吧?得通过HTTP API的方式写入消息才可以是吗?能麻烦您给我一个样例吗,我陷入泥潭了,因为我看日志显示,同样的数据结构同一个topic,有的会成功有的才会出现这个问题

我是使用 通过 Standalone 模式启动 Kafka Connect的。
2. 1. 启动 Kafka Connect。

CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties

没太明白,kafka-connector直接可以消费kafka到sr,不需要java程序呀

不好意思,这个是我没有解释清楚,是因为我们的项目是java项目,因为设计原因导致写入数据很频繁,直接往StarRocks里写入会出现超出版本(1000)的问题,所以我们寻求其它方式来进行写入,我们看文档可以通过kafka-connector进行数据写入,所以改成了 -java项目----->kafka---->StarRocks,来进行数据写入,所以遇到了这个问题,

kafka-connector的配置发下

这个是connect-starrocks-sink.properties

name=starrocks-kafka-connector-demo
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics.regex=m_.*
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# StarRocks FE 的 HTTP Server 地址,默认端口 8030
starrocks.http.url=10.0.253.37:8130
# 当 Kafka Topic 的名称与 StarRocks 表名不一致时,需要配置两者的映射关系
#starrocks.topic2table.map=
# StarRocks 用户名
starrocks.username=maintancetest
# StarRocks 用户密码。您必须输入用户密码。
starrocks.password=maintancetest@0729
starrocks.database.name=maintancetestdb
sink.properties.strip_outer_array=true
bufferflush.intervalms=30000

这个是 connect-standalone.properties

bootstrap.servers=10.0.20.126:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/starrocks-kafka-connector-1.0.4

您好,这个问题有好的解决方式吗?

写入的时候加上sink.properties.label="xxx"试试