KakaConsumer.polltimeoutMs和KafkaConsumer.pollDuration.ZERO之间的差异

问题描述

我从一开始就读卡夫卡主题。为了使用seekToBeginning(),我首先需要对poll()进行虚拟调用。以下是我的代码段:

    // Subscribe
    consumer.subscribe(Collections.singleton(TOPIC_NAME));
    // Seek to beginning
    // consumer.poll(Duration.ZERO);
    consumer.poll(0);
    consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME,0)));

使用consumer.poll(0)可以正常工作。当我使用consumer.poll(Duration.ZERO)时会导致以下异常:

[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1,groupId=group2] Seeking to EARLIEST offset of partition test-lc-1-0
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-lc-1-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
    at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
    at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)

我研究了两个API的实现。最后,两个api最终都以0作为参数调用相同的方法。知道为什么poll(Duration.ZERO)会失败吗?

谢谢你, 艾哈迈德。

解决方法

启动消费者时寻找的正确方法是使用ConsumerRebalanceListener

例如,类似:

try (KafkaConsumer<String,String> consumer = new KafkaConsumer<>(configs);) {
    consumer.subscribe(Collections.singleton(TOPIC_NAME),new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.seekToBeginning(partitions);
        }
    });
    while (true) {
        consumer.poll(Duration.ofSeconds(1L));
        ...
    }
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...