问题描述
我正在使用Docker。我是Kafka Connect的新手。我的用例是这样的,我有一个Postgres数据库,需要从那里捕获有关Kafka主题的任何更改事件(INSERT-UPDATE-DELETE)并进行进一步处理。但是我被困在捕获CHANGE事件。我正在关注以下链接:
仅在使用以下配置创建连接器之后:
{"name": "postgres-source","config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname": "postgres","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname" : "students","database.server.name": "dbserver15","database.whitelist": "students","database.history.kafka.bootstrap.servers": "kafka:9092","database.history.kafka.topic": "schema-changes.students","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "true","value.converter.schema.registry.url": "http://schema-registry:8081","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}
我正在使用以下命令捕获数据库中的快照/更改事件:
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic dbserver15.public.admission
它以以下格式显示数据:Struct {student_id = 1,gre = 337,toefl = 118} ...
但是,只要对此表执行任何INSERT-UPDATE-DELETE操作,Kafka-Connector就会引发以下错误:
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)\n\tat io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:156)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Invalid identifier:
at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:68)
at io.debezium.text.TokenStream.start(TokenStream.java:445)
at io.debezium.relational.TableIdParser.parse(TableIdParser.java:28)
at io.debezium.relational.TableId.parse(TableId.java:39)\n\tat io.debezium.connector.postgresql.PostgresSchema.parse(PostgresSchema.java:218)
at io.debezium.connector.postgresql.RecordsstreamProducer.process(RecordsstreamProducer.java:238)
at io.debezium.connector.postgresql.RecordsstreamProducer.lambda$streamChanges$1(RecordsstreamProducer.java:131)
at io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder.processMessage(PgProtoMessageDecoder.java:48)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:265)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:250)
at io.debezium.connector.postgresql.RecordsstreamProducer.streamChanges(RecordsstreamProducer.java:131)
at io.debezium.connector.postgresql.RecordsstreamProducer.lambda$start$0(RecordsstreamProducer.java:117)
... 5 more
下面是解决方案,我研究了:
https://gitter.im/debezium/user?at=5e1f2846be66165ecbd4e0fe
https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-when-things-go-wrong
表示 snapshot.mode设置为导出,这允许连接器执行无锁快照。
但是当我添加“ snapshot.mode” =“ exported”时,会指出错误,
'snapshot.mode'值'exported'无效:该值必须是always,never,initial_only,initial,custom之一
有人可以再详细一点,向我解释我在想什么。我想它与配置有关。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)