参考的文档为:https://docs.starrocks.io/zh/docs/3.0/loading/Kafka-connector-starrocks/
worker.properties配置如下:
bootstrap.servers=10.0.96.4:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/ec2-user/kafka/starrocks-kafka-connector-1.0.0
connect-StarRocks-sink.properties的配置如下:
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=example_topic
starrocks.http.url=10.0.3.149:8030
starrocks.username=root
starrocks.password=123456
starrocks.database.name=automq_db
key.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
tips: 如果额外添加transforms相关配置也仍然报错
transforms=addfield,unwrap
transforms.addfield.type=com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=rewrite
报错信息为:
[2023-11-10 09:58:53,269] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2023-11-10 09:58:53,282] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:376)
[2023-11-10 09:58:53,290] ERROR Failed to create job for connect-StarRocks-sink.properties (org.apache.kafka.connect.cli.ConnectStandalone:111)
[2023-11-10 09:58:53,290] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration key.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration value.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration key.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
Invalid value io.confluent.connect.json.JsonSchemaConverter for configuration value.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:741)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:206)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$putConnectorConfig$0(StandaloneHerder.java:192)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
如果将connect-StarRocks-sink.properties改成:
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=example_topic
starrocks.http.url=10.0.3.149:8030
starrocks.username=root
starrocks.password=123456
starrocks.database.name=automq_db
则报错改为:
[2023-11-10 10:15:52,949] ERROR [starrocks-kafka-connector|task-0] Failed to start task starrocks-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker:652)
java.lang.NoClassDefFoundError: com/starrocks/data/load/stream/StreamLoadDataFormat
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:735)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:146)
at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:51)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:607)
at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startTask(StandaloneHerder.java:392)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:385)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:379)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:436)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$putConnectorConfig$2(StandaloneHerder.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: com.starrocks.data.load.stream.StreamLoadDataFormat
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:136)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
... 21 more