Kafka使用JSON和架构注册表进行连接

问题描述

我正在尝试使用来自Kafka Connect的JdbcSinkConnector将来自Kafka主题的json消息存储到Postgresql表中。 Confluent Platform from version 5.5

应该支持功能

在生产者方面,我正在使用以下序列化器:

value.serial'sizer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

架构注册表配置:

{
  "$schema": "http://json-schema.org/draft-07/schema#","$id": "http://example.com/myURI.schema.json","title": "value_greeting","description": "Sample schema to help you get started.","type": "object","properties": {
    
    "msg": {
      "type": "string","description": "The string type is used for strings of text."
    },"name": {
      "type": "string","description": "The string type is used for strings of text."
    }  
  }
}

连接器的配置:

"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "io.confluent.connect.json.JsonSchemaConverter","topics": "greeting","connection.url": "jdbc:postgresql://postgres:5432/postgres","connection.user": "postgres","connection.password": "postgres","value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

不幸的是,连接器失败并显示以下错误

ERROR WorkerSinkTask{id=GREE-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data Failed due to serialization error:
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:111)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: UnkNown magic byte!

我在做什么错了?

非常感谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...