ISR集3的Kafka大小不足以达到最小ISR 2

问题描述

在Apache Kafka 2.6中使用MirrorMaker 1镜像数据时,我遇到一个奇怪的Kafka Server错误

org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(3) is insufficient to satisfy the min.isr requirement of 2 for partition FooBar-0

奇怪的是,min.isr设置为2,而ISR集有3个节点。 但是,我得到了 NotEnoughReplicasException 异常。

主题进行更深入的研究不会显示任何好奇心

[root@LoremIpsum kafka]# /usr/lib/kafka/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic FooBar
Topic: FooBar       PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,max.message.bytes=5242880,min.compaction.lag.ms=604800000,message.timestamp.type=LogAppendTime,unclean.leader.election.enable=false
        Topic: FooBar       Partition: 0    leader: 3       Replicas: 2,3,1 Isr: 3

3个节点的日志看起来很正常(据我判断)。还有其他可能导致此消息的原因。还有什么可以检查的?

非常感谢您的任何建议!


ConsumerConfig

exclude.internal.topics=true
auto.offset.reset=earliest
enable.auto.commit=false
isolation.level=read_committed
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
max.partition.fetch.bytes=5242880

生产者配置

acks=all
enable.idempotence=true
max.in.flight.requests.per.connection=1
#retries=
#delivery.timeout.ms=
#request.timeout.ms=
#linger.ms
batch.size=1000
max.request.size=5242880

解决方法

术语“ ISR集(3)”表示只有代理#3是同步。这在kafka-topics命令的输出中也可见。显然,代理之间的数据复制存在问题。

在MirrorMaker1的封面下,有一个简单的KafkaConsumer和KafkaProducer可以完成工作。根据生产者的JavaDocs CallbackNotEnoughReplicasException可重试例外。

因此,您可以通过设置以下生产者配置来摆脱此错误:

acks=all: The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. 
retry.backoff.ms=1000: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
delivery.timeout.ms=300000: An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending,the time to await acknowledgement from the broker (if expected),and the time allowed for retriable send failures.

有关KafkaProducer配置的所有详细信息均已提供here