如何从'py-kafka消费者'获取最后一条消息?

问题描述

我使用这个配置:

def __init__(self):
    self.client = KafkaClient(hosts='localhost:9092')
    self.topic = None
    self.consumer = None
    self.producer = None



def initialize_variables(self,topic_name):
    if ((self.topic == None) or (self.topic != topic_name)):
        self.topic = self.client.topics[topic_name]
        self.consumer = self.topic.get_simple_consumer(
                        auto_offset_reset=OffsetType.LATEST,reset_offset_on_start=False
        )
        self.producer = self.topic.get_producer()

还有:

@keyword('Kafka Consume')
    def consume_1(self,topic_name,timeout = DEFAULT_TIMEOUT):
        self.initialize_variables(topic_name)
        loop_time = time.time() + timeout
        while time.time() <= loop_time:
              msg_tuple = self.consumer.consume(block = False)
              if msg_tuple is None:
                  continue
              received_msg = json.loads(msg_tuple.value)
              return received_msg
        return None

每次我使用这个设置时,我总是得到最后的结果,而不是当前的结果。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)