问题描述
使用kafka-python消费kafka,seek方法重置的本地偏移量会提交给kafka吗? 我正在研究获取双中心机房中Kafka集群的rpo索引的解决方案。使用kafka-python获取Kafka集群的最大时间戳,取两个机房中Kafka集群的最大时间戳之差。
使用seek()将offset重置为分区最大offset-1,然后poll()获取最新消息,但是循环中获取不到消息,查看当前的offset消费者组,发现消息被堆叠为0
#reset offset to (max_offset-1)
for tp,offset in offsets_dict.items():
offset = offset - 1
if (offset)<0:
effective_partition = effective_partition-1
continue
consumer.seek(tp,offset)
kafkaoffset = consumer.position(tp)
if effective_partition==0:
consumer.close()
return max_timestamp
try:
Counter=0
while(True):
message = consumer.poll(max_records=1)
if not message:
continue
for partition,msgs in six.iteritems(message):
for msg in msgs:
max_timestamp = max(max_timestamp,int(msg.timestamp))
self.logger.debug(f"{max_timestamp}")
Counter = Counter +1
if Counter == effective_partition:
break
except Exception as ex:
raise ex
finally:
consumer.close()
return max_timestamp
解决方法
如果您在查找后提交,那么这将是该组的新偏移量,是的。
enable_auto_commit
默认为 True,如果您将其设置为 False,则可以使用 KafkaConsumer.commit(offsets)
函数手动控制此行为
调用 .close()
也默认执行提交,除非您真的只想使用正常的线性消费者进程在进程外查找和读取少量消息,否则您可能应该不理会它