每当在Postgresql中执行任何诸如插入-更新-删除之类的更改事件时,DEBEZIUM KAFKA连接器都会引发错误

问题描述

我正在使用Docker。我是Kafka Connect的新手。我的用例是这样的,我有一个Postgres数据库,需要从那里捕获有关Kafka主题的任何更改事件(INSERT-UPDATE-DELETE)并进行进一步处理。但是我被困在捕获CHANGE事件。我正在关注以下链接

https://medium.com/high-alpha/data-stream-processing-for-newbies-with-kafka-ksql-and-postgres-c30309cfaaf8

仅在使用以下配置创建连接器之后:

{"name": "postgres-source","config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname": "postgres","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname" : "students","database.server.name": "dbserver15","database.whitelist": "students","database.history.kafka.bootstrap.servers": "kafka:9092","database.history.kafka.topic": "schema-changes.students","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "true","value.converter.schema.registry.url": "http://schema-registry:8081","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
  }
}

我正在使用以下命令捕获数据库中的快照/更改事件:

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic dbserver15.public.admission

它以以下格式显示数据:Struct {student_id = 1,gre = 337,toefl = 118} ...

但是,只要对此表执行任何INSERT-UPDATE-DELETE操作,Kafka-Connector就会引发以下错误

org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)\n\tat io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:156)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Invalid identifier: 
    at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:68)
    at io.debezium.text.TokenStream.start(TokenStream.java:445)
    at io.debezium.relational.TableIdParser.parse(TableIdParser.java:28)
    at io.debezium.relational.TableId.parse(TableId.java:39)\n\tat io.debezium.connector.postgresql.PostgresSchema.parse(PostgresSchema.java:218)
    at io.debezium.connector.postgresql.RecordsstreamProducer.process(RecordsstreamProducer.java:238)
    at io.debezium.connector.postgresql.RecordsstreamProducer.lambda$streamChanges$1(RecordsstreamProducer.java:131)
    at io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder.processMessage(PgProtoMessageDecoder.java:48)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:265)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:250)
    at io.debezium.connector.postgresql.RecordsstreamProducer.streamChanges(RecordsstreamProducer.java:131)
    at io.debezium.connector.postgresql.RecordsstreamProducer.lambda$start$0(RecordsstreamProducer.java:117)
    ... 5 more

下面是解决方案,我研究了:

org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped

https://gitter.im/debezium/user?at=5e1f2846be66165ecbd4e0fe

https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-when-things-go-wrong
表示 snapshot.mode设置为导出,这允许连接器执行无锁快照。
但是当我添加“ snapshot.mode” =“ exported”时,会指出错误
'snapshot.mode'值'exported'无效:该值必须是always,never,initial_only,initial,custom之一

有人可以再详细一点,向我解释我在想什么。我想它与配置有关。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)