Kafka connector消费debezium oracle增量数据疑问

sr版本3.1.0
kafka版本3.5.1
debezium oracle connector 2.4

想问下有没有具体的代码实现 starrocks Kafka connector 消费kafka里debezium解析的oracle增量数据?
我根据官网的步骤尝试了下,报错:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value io.confluent.connect.json.JsonSchemaConverter for configuration key.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.\nInvalid value io.confluent.connect.json.JsonSchemaConverter for configuration value.converter: Class io.confluent.connect.json.JsonSchemaConverter could not be found.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

kafka connector配置如下:

curl -i http://10.133.1.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{  "name":"starrocks-kafka-connector",  "config":{    "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",    "topics":"test1.t1",	"starrocks.topic2table.map":"test1.t1:t1",    "starrocks.http.url":"10.133.1.1:8030","starrocks.username":"root",    "starrocks.password":"xxxx","starrocks.database.name":"ems",    "key.converter":"io.confluent.connect.json.JsonSchemaConverter",    "value.converter":"io.confluent.connect.json.JsonSchemaConverter",	"transforms":"addfield,unwrap",	"transforms.addfield.type":"com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",	"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"true",	"transforms.unwrap.delete.handling.mode":"rewrite"  }}' 

参考文档,看起来配置不太对,https://docs.starrocks.io/zh-cn/main/loading/Kafka-connector-starrocks

前面的配置参考的这里,还是说只用后面就行了?

需要加下transforms的配置

上面的命令有,但还是报错 :joy:

{
	"name": "starrocks-kafka-connector",
	"connector": {
		"state": "RUNNING",
		"worker_id": "10.133.1.1:8083"
	},
	"tasks": [
		{
			"id": 0,
			"state": "FAILED",
			"worker_id": "10.133.1.1:8083",
			"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: java.lang.NullPointerException: Cannot invoke \"java.lang.Throwable.getMessage()\" because the return value of \"com.starrocks.data.load.stream.v2.StreamLoadManagerV2.getException()\" is null\n\tat com.starrocks.connector.kafka.StarRocksSinkTask.put(StarRocksSinkTask.java:231)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)\n\t... 11 more\n"
		}
	],
	"type": "sink"
}

发下完整的kafka connector配置

完整的connector就上面的,通过curl方式添加

curl -i http://10.133.1.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{  "name":"starrocks-kafka-connector",  "config":{    "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",    "topics":"test1.t1",	"starrocks.topic2table.map":"test1.t1:t1",    "starrocks.http.url":"10.133.1.1:8030","starrocks.username":"root",    "starrocks.password":"xxxx","starrocks.database.name":"ems",    "key.converter":"io.confluent.connect.json.JsonSchemaConverter",    "value.converter":"io.confluent.connect.json.JsonSchemaConverter",	"transforms":"addfield,unwrap",	"transforms.addfield.type":"com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",	"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"true",	"transforms.unwrap.delete.handling.mode":"rewrite"  }}' 

kafka connect用的standlone,配置默认

@U_1677637349988_8967 将key.converter和value.converter这两个配置去掉,重试下

:joy:放弃了,最后用其他办法实现了

按照文档配置transform 配置后会卡住超时,去掉转换后可以正常提交但是库里无法写入数据,请问下可能是什么原因呢
sr 2.5
kafkaconnect 2.13-3.6.0

端口错了,http端口是8030

改完端口提交请求依然会卡死,请问standlone模式是否可以通过post请求进行提交

但是改成如下配置后可以正常请求并输出日志,但是库里没有数据,官方文档中的convert 和transform 会卡死

请问最后通过什么方式实现的呢,感谢!

source是apache kafka还是confluent

apache kafka

又尝试了下apache kafka connect distribured 模式,http 请求报错但是正常启动:


提交任务后,是否必须注册schema服务?


修改配置提交后,事务失败


cloudcanal :joy:(最近在测flinkcdc 3.0)

看这里已经出发写入任务了,但是看起来数据格式有问题,写入的数据是json格式还是csv格式

加下这个配置
“sink.properties.format”:“json”,
“sink.properties.strip_outer_array”:“true”

加上这个配置可以了,非常感谢!