Kafka Connect 与 Solace Source Connector 和 JDBC SInk Connector 的问题

问题描述

我想使用 kafka connect solace source connector 和 avro 转换器将消息推送到 kafka 主题。一旦在 kafka 主题上发布消息,我想使用 kafka connect jdbc sink 连接器和 avro 转换器将消息从 kafka 主题推送到 oracle 数据库。我可以将消息从 Solace 推送到 kafka 主题,但是当我运行接收器连接器部分时,它给出了一个错误 ->“org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct”。>

下面是我的配置:

Solace 源连接器属性:

name=solaceSourceConnector
connector.class=com.solace.connector.kafka.connect.source.SolaceSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=testtopic

#Remaining properties are solace connection related
.

JDBC 接收器连接器属性:

name=test-oracle
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.user = orcl
connection.url = jdbc:oracle:thin:@localhost:1521/TESTSERVICE
connection.password = ****
topics=testtopic
auto.create=false
table.name.format=TEST_TABLE1
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false

运行 solace 源连接器后。我可以看到在 kafka 主题中推送的消息。但是在运行 JDBC 接收器连接器后,我收到以下错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)

请帮助我了解我哪里出错了。还有什么可以做来解决这个问题。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)