考虑到在为长时间运行的处理添加新使用者时发生的重新平衡,我如何正确地暂停/恢复操作?

问题描述

我正在将kafka 2.3.1(server)与spring-kafka 2.2.14和kafka-clients 2.0.1一起使用。

我有一个包含10个分区的主题。

制作人

一个生产者将消息发布到Kafka集群,根据消息的密钥顺序将消息根据密钥分配给分区。

acks = all

消费者

同一使用者的一些实例(从2到5,以k8s为水平缩放),每个实例只有一个线程,提取这些消息并执行处理。

消息处理时间变化不定。

为避免频繁进行组重新平衡,我将消息处理移至另一个线程。消费者在处理器仍在工作时继续拨打轮询。

auto.offset.reset = earliest
enable.auto.commit = false
max.poll.interval.ms = 300000
max.poll.records = 1
AckMode.MANUAL_IMMEDIATE    
syncCommits = true

执行示例

将10个分区分配给使用者0(0到4)和使用者1(5到9)。在处理过程中会添加新的使用者,并且每次添加都会导致重新平衡。我无法恢复以前暂停的分区。

消息1被消耗了两次。由于处理花费的时间比max.poll.interval.ms长,因此重新平衡会重新排列分区。触发提交后,偏移分区可能与另一个使用者相关联。此提交生效。重新平衡可能会恢复所有已暂停的分区。与此分区关联的新使用者可以再次拉出消息,因为它没有暂停。

consumer-1 [partition-5 offset-12] Received Message [message-1]
consumer-1 [partition-5 offset-12] Paused Partitions [partition-5,partition-8,partition-9,partition-6,partition-7]
consumer-2 Started
consumer-2 [partition-3 offset-80] Received Message [message-2]
consumer-2 [partition-3 offset-80] Paused Partitions [partition-2,partition-3,partition-0,partition-1]
consumer-3 Started
consumer-3 [partition-7 offset-43] Received Message [message-3]
consumer-3 [partition-7 offset-43] Paused Partitions [partition-6,partition-7]
consumer-4 Started
consumer-4 [partition-5 offset-12] Received Message [message-1]
consumer-4 [partition-5 offset-12] Paused Partitions [partition-4,partition-5]
consumer-1 [partition-5 offset-12] Executed Task 
consumer-1 [partition-5 offset-12] Committed [message-1]
consumer-1 [partition-5 offset-12] Resumed Partitions []
consumer-2 [partition-3 offset-80] Executed Task
consumer-2 [partition-3 offset-80] Committed [message-2]
consumer-2 [partition-3 offset-80] Resumed Partitions []
consumer-3 [partition-7 offset-43] Executed Task
consumer-3 [partition-7 offset-43] Committed [message-3]
consumer-3 [partition-7 offset-43] Resumed Partitions []
consumer-4 [partition-5 offset-12] Executed Task
consumer-4 [partition-5 offset-12] Committed [message-1]
consumer-4 [partition-5 offset-12] Resumed Partitions [partition-4,partition-5]

Groovy中的消费实施

    @KafkaListener(topics = '${topic.files}',containerFactory = "fileKafkaListenerContainerFactory")
    void receive(@Payload FileEvent event,             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,             @Header(KafkaHeaders.OFFSET) Long offset,             Acknowledgment ack,             Consumer consumer) {
        LOGGER.info("[partition-{} offset-{}] Received Message [{}]",partition,offset,event)
        try {
            LOGGER.debug("[partition-{} offset-{}] Paused Partitions [{}]",consumer.assignment())
            consumer.pause(consumer.assignment())
            ListenableFuture<Void> future = applicationTaskExecutor.submitListenable(callable(event))
            future.addCallback({ },{ ex -> throw ex }  )
            while (!future.isDone()) {
                Thread.sleep(500)
                consumer.poll(Duration.ofMillis(3_000))
            }
            future.get()
            ack.acknowledge()
            LOGGER.debug("[partition-{} offset-{}] Committed [{}]",event)
        } catch (Exception cause) {
            String message = String.format("Fail to consume partition=%s offset=%s %s",event)
            throw new RuntimeException(message,cause)
        } finally {
            LOGGER.debug("[partition-{} offset-{}] Resumed Partitions [{}]",consumer.paused())
            consumer.resume(consumer.paused())
        }
    }
    
    Callable<Void> callable(FileEvent event) {
        { ->
            indexService.index(event)
        }
    }

考虑到为长时间运行的处理添加新使用者时发生的重新平衡,我如何正确地暂停/恢复操作?

解决方法

请勿在消费者本身上致电pause() / resume();而是暂停监听器容器(并将max.poll.records设置为1)。

容器具有重新平衡后重新暂停的逻辑。

if (ListenerConsumer.this.consumerPaused) {
    ListenerConsumer.this.consumer.pause(partitions);
    ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
            + "consumer paused again,so the initial poll() will never return any records");
}

如果您真的想自己做所有这些事情,请将max.poll.records设置为1,然后使用重新平衡监听器重新暂停使用者。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...