问题描述
在 Kafka 2.7.0 中,我使用 Mirromaker 2.0 作为 Kafka 连接器将所有主题从主 Kafka 集群复制到备份集群。
除了__consumer_offsets
之外,所有主题都被完美复制。以下是连接配置:
{
"name": "test-connector","config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector","topics.blacklist": "some-random-topic","replication.policy.separator": "","source.cluster.alias": "","target.cluster.alias": "","exclude.internal.topics":"false","tasks.max": "10","key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter","value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter","source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094","target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094","topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
}
}
在类似的问题 here 中,接受的答案如下:
exclude.internal.topics=false
client.id=__admin_client
我应该将这些添加到我的配置中的什么位置?
此处的 Connector Configuration Properties 没有名为 client.id
的属性,但我已将 exclude.internal.topics
的值设置为 false
。
这里有我遗漏的东西吗?
更新
我了解到 Kafka 2.7 及更高版本支持使用 here 中提到的 MirrorCheckpointTask
自动消费者偏移同步。
我为此创建了一个具有以下配置的连接器:
{
"name": "mirror-checkpoint-connector","config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector","sync.group.offsets.enabled": "true","topics": "__consumer_offsets"
}
}
仍然没有帮助。 这是正确的方法吗?有什么需要吗?
解决方法
您不想复制 connsumer_offsets。由于各种原因,从 src 到目标集群的偏移量不会相同。
MirrorMaker2 提供了进行偏移平移的能力。它将使用从 src 集群生成的转换偏移量填充目标集群。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0