使用kafka-python消费kafka,seek()重置的本地偏移量会提交给kafka吗?

问题描述

使用kafka-python消费kafka,seek方法重置的本地偏移量会提交给kafka吗? 我正在研究获取双中心机房中Kafka集群的rpo索引的解决方案。使用kafka-python获取Kafka集群的最大时间戳,取两个机房中Kafka集群的最大时间戳之差。

使用seek()将offset重置为分区最大offset-1,然后poll()获取最新消息,但是循环中获取不到消息,查看当前的offset消费者组,发现消息被堆叠为0

#reset offset to (max_offset-1)
for tp,offset in offsets_dict.items():
            offset = offset - 1
            if (offset)<0:
                effective_partition = effective_partition-1
                continue
            consumer.seek(tp,offset)
            kafkaoffset = consumer.position(tp)
           
        if effective_partition==0:
            consumer.close()
            return max_timestamp

        try:
            Counter=0
            while(True):
                message = consumer.poll(max_records=1)
                if not message:
                    continue
                for partition,msgs in six.iteritems(message):
                    for msg in msgs:
                        max_timestamp = max(max_timestamp,int(msg.timestamp))
                        self.logger.debug(f"{max_timestamp}")
                Counter = Counter +1
                if Counter == effective_partition:
                    break
        except Exception as ex:
            raise ex
        finally:
            consumer.close()
            return max_timestamp

解决方法

如果您在查找后提交,那么这将是该组的新偏移量,是的。

enable_auto_commit 默认为 True,如果您将其设置为 False,则可以使用 KafkaConsumer.commit(offsets) 函数手动控制此行为

调用 .close() 也默认执行提交,除非您真的只想使用正常的线性消费者进程在进程外查找和读取少量消息,否则您可能应该不理会它