问题描述
我想使用 kafka connect solace source connector 和 avro 转换器将消息推送到 kafka 主题。一旦在 kafka 主题上发布消息,我想使用 kafka connect jdbc sink 连接器和 avro 转换器将消息从 kafka 主题推送到 oracle 数据库。我可以将消息从 Solace 推送到 kafka 主题,但是当我运行接收器连接器部分时,它给出了一个错误 ->“org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct”。>
下面是我的配置:
Solace 源连接器属性:
name=solaceSourceConnector
connector.class=com.solace.connector.kafka.connect.source.SolaceSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=testtopic
#Remaining properties are solace connection related
.
JDBC 接收器连接器属性:
name=test-oracle
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.user = orcl
connection.url = jdbc:oracle:thin:@localhost:1521/TESTSERVICE
connection.password = ****
topics=testtopic
auto.create=false
table.name.format=TEST_TABLE1
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
运行 solace 源连接器后。我可以看到在 kafka 主题中推送的消息。但是在运行 JDBC 接收器连接器后,我收到以下错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
请帮助我了解我哪里出错了。还有什么可以做来解决这个问题。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)