融合架构注册表:JDBC接收器连接器无法识别更新架构中的新列

问题描述

Confluent Platform (Docker)开始,我进行了以下设置:

MySQL DB ----------> Debezium MySQL Source Connector --> Kafka (topic: X)
Kafka (topic: X) --> Confluent JDBC Sink Connector ----> Postgres DB

(我将docker-compose文件中的键和值转换器更改为Avro。)

我正在Confluent Schema Registry的帮助下测试架构的演变。我正在执行的步骤:

  1. 首先更新接收器数据库:将新列abc添加到Postgres DB表X:alter table X add column abc int;

    测试管道。好吧。

  2. 更新X的架构:在控制中心http:// localhost:9021 /上,添加默认值为abc的{​​{1}}类型的可选字段int。现在,主题X具有2个架构版本。我保留了默认的向后兼容性。

    再次测试管道。好吧。

  3. 更新源数据库:将新列null添加到MySQL数据库表X:abc

    通过更新源表的记录来测试管道。我在Kafka Connect服务日志中看到堆栈跟踪。不好。

alter table X add column abc int;

我已经在Confluent Control Center上再次检查了[2020-08-11 05:58:45,014] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32},name='abc',isPrimaryKey=false}] among column names [... list of columns ...] (io.confluent.connect.jdbc.sink.DbStructure) [2020-08-11 05:58:45,017] ERROR WorkerSinkTask{id=test-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table "X" is missing fields ([SinkRecordField{schema=Schema{INT32},isPrimaryKey=false}]) and auto-evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask) 的架构,现在有3个版本。版本2和版本3相同。更新源表后,新消息是否创建了新的相同架构版本?

关于自动进化的错误消息,我的理解是我们应该保持禁用状态,因为我们将依赖Schema Registry来控制进化。

能否请您帮助我理解为何即使在更新架构后Confluent JDBC连接器也无法找到新字段?我在这里想念什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...