为数据库中的多个表配置 debezium 连接器

问题描述

我正在尝试为 MysqL 数据库中的多个表配置 Debezium 连接器(我在 MysqL 8.0 上使用 debezium 1.4)。 我公司在kafka中创建主题时有一个命名模式要遵循,并且这个模式不允许使用下划线(_),所以我不得不用连字符(-)替换它们

所以,我的主题名称是:

主题 1

fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status". 
- All changes in that table,must go to that topic.

主题 2

fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table,must go to that topic.

主题 3

fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table,must go to that topic.

我正在尝试使用转换“ByLogicalTableRouter”,但我找不到解决我的问题的正则表达式解决方案。

{ "name": "debezium.connector","config":
    { 
"connector.class": "io.debezium.connector.MysqL.MysqLConnector","tasks.max": "1","database.hostname": "myhostname","database.port": "3306","database.user": "debezium","database.password": "password","database.server.id": "1000","database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search","schema.include.list": "transaction_search","table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal","database.history.kafka.bootstrap.servers": "kafka.intranet:9097","database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history","snapshot.mode": "schema_only","transforms":"RerouteName,RerouteUnderscore","transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter","transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)","transforms.RerouteName.topic.replacement": "$1$2" 
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter","transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)","transforms.RerouteUnderscore.topic.replacement": "$1-$2" 
    }
}
  • 在第一次转换中,我试图删除重复的架构 主题路由中的名称
  • 在第二次转换中,替换所有 仍然下划线 _ 表示 hihens -

但是,我收到下面的错误,这表明它正在尝试将所有内容发送到同一主题

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier

如何进行转换,将每个表的事件转发到各自的主题

解决方法

  1. 删除架构名称

在第一次转换中,我试图删除主题路由中重复的模式名称。

使用正则表达式转换后,您将有两个点,因此您需要修复它:

"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)","transforms.RerouteName.topic.replacement": "$1.$2" 
  1. 用下划线代替 hihens

您可以尝试使用来自 ChangeCaseKafka Connect Common Transformations SMT。