问题描述
我们正在测试卡夫卡的灾难恢复方案。我们在单独的区域中有2个kafka群集。我们正在使用MirrorMaker2复制主题和消息。 主题和消息能够复制。但是我们观察到偏移量不是复制的。
例如 从生产者那里产生了10条指向卡夫卡1区的消息。
从消费者那里获得的5条消息指向了卡夫卡地区1
停止消费者指向region1
初始消费者指向region2
消费信息
这里期望的是区域2的消费者应该从偏移6开始消费
但它从偏移量0开始消耗
clusters = primary,secondary
# primary cluster information
primary.bootstrap.servers = test1-primary.com:9094,test2-primary.com.apttuscloud.io:9094,test3-primary.com:9094
primary.security.protocol= SASL_SSL
primary.ssl.truststore.password= dummypassword
primary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
primary.ssl.keystore.password= dummypassword
primary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
primary.ssl.endpoint.identification.algorithm=
primary.sasl.mechanism= PLAIN
primary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";
# secondary cluster information
secondary.bootstrap.servers = test1-secondary.com:9094,test2-secondary.com.apttuscloud.io:9094,test3-secondary.com:9094
secondary.security.protocol= SASL_SSL
secondary.ssl.truststore.password= dummypassword
secondary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
secondary.ssl.keystore.password= dummypassword
secondary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
secondary.ssl.endpoint.identification.algorithm=
secondary.sasl.mechanism=PLAIN
secondary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";
# Topic Configuration
primary->secondary.enabled = true
primary->secondary.topics = .*
secondary->primary.enabled = true
secondary->primary.topics = .*
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats","B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing,a value greater than 1 is recommended to ensure availability such as 3
checkpoints.topic.replication.factor= 3
heartbeats.topic.replication.factor= 3
offset-syncs.topic.replication.factor= 3
# The replication factor for connect internal topics "mm2-configs.B.internal","mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing,a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
replication.factor = 3
refresh.topics.enabled = true
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 10
topics.blacklist = .*[\-\.]internal,.*\.replica,__consumer_offsets
groups.blacklist = console-consumer-.*,connect-.*,__.*
primary->secondary.emit.heartbeats.enabled = true
primary->secondary.emit.checkpoints.enabled = true
请注意,一些Confedentilal值与虚拟值一起放置
此致
Narendra Jadhav
解决方法
使用MirrorMaker 2.5,在群集之间移动使用者时,偏移量不会自动转换。
因此,在另一个群集上启动使用者时,使用者需要使用RemoteClusterUtils.translateOffsets()
在此群集中查找其偏移量。
在2.7(预期为2020年11月)中,您可以让MirrorMaker 2自动转换偏移量,请参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0