Kafka Mirror Maker 2偏移复制不起作用

问题描述

我们正在测试卡夫卡的灾难恢复方案。我们在单独的区域中有2个kafka群集。我们正在使用MirrorMaker2复制主题和消息。 主题和消息能够复制。但是我们观察到偏移量不是复制的。

例如 从生产者那里产生了10条指向卡夫卡1区的消息。

从消费者那里获得的5条消息指向了卡夫卡地区1

停止消费者指向region1

初始消费者指向region2

消费信息

这里期望的是区域2的消费者应该从偏移6开始消费

但它从偏移量0开始消耗

下面是属性文件

 clusters = primary,secondary
# primary cluster information
 primary.bootstrap.servers = test1-primary.com:9094,test2-primary.com.apttuscloud.io:9094,test3-primary.com:9094
 primary.security.protocol= SASL_SSL
 primary.ssl.truststore.password= dummypassword
 primary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
 primary.ssl.keystore.password= dummypassword
 primary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
 primary.ssl.endpoint.identification.algorithm=
 primary.sasl.mechanism= PLAIN
 primary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";

# secondary cluster information
 secondary.bootstrap.servers = test1-secondary.com:9094,test2-secondary.com.apttuscloud.io:9094,test3-secondary.com:9094
 secondary.security.protocol= SASL_SSL
 secondary.ssl.truststore.password= dummypassword
 secondary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
 secondary.ssl.keystore.password= dummypassword
 secondary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
 secondary.ssl.endpoint.identification.algorithm=
 secondary.sasl.mechanism=PLAIN
 secondary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";
# Topic Configuration
 primary->secondary.enabled = true
 primary->secondary.topics = .*

 secondary->primary.enabled = true
 secondary->primary.topics = .*

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats","B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing,a value greater than 1 is recommended to ensure availability such as 3
 checkpoints.topic.replication.factor= 3
 heartbeats.topic.replication.factor= 3
 offset-syncs.topic.replication.factor= 3

# The replication factor for connect internal topics "mm2-configs.B.internal","mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing,a value greater than 1 is recommended to ensure availability such as 3.
 offset.storage.replication.factor=3
 status.storage.replication.factor=3
 config.storage.replication.factor=3

 replication.factor = 3
 refresh.topics.enabled = true
 sync.topic.configs.enabled = true
 refresh.topics.interval.seconds = 10
 topics.blacklist = .*[\-\.]internal,.*\.replica,__consumer_offsets
 groups.blacklist = console-consumer-.*,connect-.*,__.*
 primary->secondary.emit.heartbeats.enabled = true
 primary->secondary.emit.checkpoints.enabled = true

请注意,一些Confedentilal值与虚拟值一起放置

此致

Narendra Jadhav

解决方法

使用MirrorMaker 2.5,在群集之间移动使用者时,偏移量不会自动转换。

因此,在另一个群集上启动使用者时,使用者需要使用RemoteClusterUtils.translateOffsets()在此群集中查找其偏移量。

在2.7(预期为2020年11月)中,您可以让MirrorMaker 2自动转换偏移量,请参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0