starrocks kafka connector参考文档执行导入报错

参考的文档为:https://docs.starrocks.io/zh/docs/3.0/loading/Kafka-connector-starrocks/

worker.properties配置如下:

bootstrap.servers=10.0.96.4:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/ec2-user/kafka/starrocks-kafka-connector-1.0.0

connect-StarRocks-sink.properties的配置如下:

name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=example_topic
starrocks.http.url=10.0.3.149:8030
starrocks.username=root
starrocks.password=123456
starrocks.database.name=automq_db
key.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter

tips: 如果额外添加transforms相关配置也仍然报错
transforms=addfield,unwrap
transforms.addfield.type=com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=rewrite

报错信息为:

[2023-11-10 09:58:53,269] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2023-11-10 09:58:53,282] INFO AbstractConfig values:
 (org.apache.kafka.common.config.AbstractConfig:376)
[2023-11-10 09:58:53,290] ERROR Failed to create job for connect-StarRocks-sink.properties (org.apache.kafka.connect.cli.ConnectStandalone:111)
[2023-11-10 09:58:53,290] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration key.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration value.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
	at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration key.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration value.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
	at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:741)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:206)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$putConnectorConfig$0(StandaloneHerder.java:192)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

如果将connect-StarRocks-sink.properties改成:

name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=example_topic
starrocks.http.url=10.0.3.149:8030
starrocks.username=root
starrocks.password=123456
starrocks.database.name=automq_db

则报错改为:

[2023-11-10 10:15:52,949] ERROR [starrocks-kafka-connector|task-0] Failed to start task starrocks-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker:652)
java.lang.NoClassDefFoundError: com/starrocks/data/load/stream/StreamLoadDataFormat
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:467)
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:735)
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:146)
	at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:51)
	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:607)
	at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startTask(StandaloneHerder.java:392)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:385)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:379)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:436)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$putConnectorConfig$2(StandaloneHerder.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: com.starrocks.data.load.stream.StreamLoadDataFormat
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:136)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	... 21 more

看报错是io.confluent.connect.json.JsonSchemaConverter这个converter找不到,你用的kafka connect是社区版的,还是confluent版的

解决了么,遇到了同样的问题