问题描述
我正在将kafka 2.3.1(server)与spring-kafka 2.2.14和kafka-clients 2.0.1一起使用。
我有一个包含10个分区的主题。
制作人
一个生产者将消息发布到Kafka集群,根据消息的密钥顺序将消息根据密钥分配给分区。
acks = all
消费者
同一使用者的一些实例(从2到5,以k8s为水平缩放),每个实例只有一个线程,提取这些消息并执行处理。
消息处理时间变化不定。
为避免频繁进行组重新平衡,我将消息处理移至另一个线程。消费者在处理器仍在工作时继续拨打轮询。
auto.offset.reset = earliest
enable.auto.commit = false
max.poll.interval.ms = 300000
max.poll.records = 1
AckMode.MANUAL_IMMEDIATE
syncCommits = true
执行示例
将10个分区分配给使用者0(0到4)和使用者1(5到9)。在处理过程中会添加新的使用者,并且每次添加都会导致重新平衡。我无法恢复以前暂停的分区。
消息1被消耗了两次。由于处理花费的时间比max.poll.interval.ms
长,因此重新平衡会重新排列分区。触发提交后,偏移分区可能与另一个使用者相关联。此提交生效。重新平衡可能会恢复所有已暂停的分区。与此分区关联的新使用者可以再次拉出消息,因为它没有暂停。
consumer-1 [partition-5 offset-12] Received Message [message-1]
consumer-1 [partition-5 offset-12] Paused Partitions [partition-5,partition-8,partition-9,partition-6,partition-7]
consumer-2 Started
consumer-2 [partition-3 offset-80] Received Message [message-2]
consumer-2 [partition-3 offset-80] Paused Partitions [partition-2,partition-3,partition-0,partition-1]
consumer-3 Started
consumer-3 [partition-7 offset-43] Received Message [message-3]
consumer-3 [partition-7 offset-43] Paused Partitions [partition-6,partition-7]
consumer-4 Started
consumer-4 [partition-5 offset-12] Received Message [message-1]
consumer-4 [partition-5 offset-12] Paused Partitions [partition-4,partition-5]
consumer-1 [partition-5 offset-12] Executed Task
consumer-1 [partition-5 offset-12] Committed [message-1]
consumer-1 [partition-5 offset-12] Resumed Partitions []
consumer-2 [partition-3 offset-80] Executed Task
consumer-2 [partition-3 offset-80] Committed [message-2]
consumer-2 [partition-3 offset-80] Resumed Partitions []
consumer-3 [partition-7 offset-43] Executed Task
consumer-3 [partition-7 offset-43] Committed [message-3]
consumer-3 [partition-7 offset-43] Resumed Partitions []
consumer-4 [partition-5 offset-12] Executed Task
consumer-4 [partition-5 offset-12] Committed [message-1]
consumer-4 [partition-5 offset-12] Resumed Partitions [partition-4,partition-5]
Groovy中的消费实施
@KafkaListener(topics = '${topic.files}',containerFactory = "fileKafkaListenerContainerFactory")
void receive(@Payload FileEvent event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack, Consumer consumer) {
LOGGER.info("[partition-{} offset-{}] Received Message [{}]",partition,offset,event)
try {
LOGGER.debug("[partition-{} offset-{}] Paused Partitions [{}]",consumer.assignment())
consumer.pause(consumer.assignment())
ListenableFuture<Void> future = applicationTaskExecutor.submitListenable(callable(event))
future.addCallback({ },{ ex -> throw ex } )
while (!future.isDone()) {
Thread.sleep(500)
consumer.poll(Duration.ofMillis(3_000))
}
future.get()
ack.acknowledge()
LOGGER.debug("[partition-{} offset-{}] Committed [{}]",event)
} catch (Exception cause) {
String message = String.format("Fail to consume partition=%s offset=%s %s",event)
throw new RuntimeException(message,cause)
} finally {
LOGGER.debug("[partition-{} offset-{}] Resumed Partitions [{}]",consumer.paused())
consumer.resume(consumer.paused())
}
}
Callable<Void> callable(FileEvent event) {
{ ->
indexService.index(event)
}
}
考虑到为长时间运行的处理添加新使用者时发生的重新平衡,我如何正确地暂停/恢复操作?
解决方法
请勿在消费者本身上致电pause()
/ resume()
;而是暂停监听器容器(并将max.poll.records
设置为1)。
容器具有重新平衡后重新暂停的逻辑。
if (ListenerConsumer.this.consumerPaused) {
ListenerConsumer.this.consumer.pause(partitions);
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again,so the initial poll() will never return any records");
}
如果您真的想自己做所有这些事情,请将max.poll.records
设置为1,然后使用重新平衡监听器重新暂停使用者。