在 Python 中获取 Confluent Kafka 主题的最新消息

问题描述

这是我迄今为止尝试过的:

from confleunt_kafka import Consumer

c = Consumer({... several security/server settings skipped...
              'auto.offset.reset': 'beginning','group.id': 'my-group'})

c.subscribe(['my.topic'])
msg = poll(30.0)  # msg is of None type.

msg 几乎总是以 None 结束。我认为问题可能是 'my-group' 已经消耗了 'my.topic' 的所有消息......但我不在乎消息是否已经被消耗 - 我仍然需要最新的消息。具体来说,我需要来自最新消息的时间戳。

我尝试了更多,从这里看来,该主题中可能有 25 条消息,但我不知道如何获取它们:

a = c.assignment()
print(a)  # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets)  # Outputs: (25,25)

如果由于主题根本没有写入任何内容而没有消息,我如何确定?如果是这种情况,我如何确定该主题已存在多长时间?我希望编写一个脚本,该脚本可以自动删除过去 X 天(最初 14 天 - 可能会随着时间的推移进行调整。)未写入的任何主题。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)