问题描述
我使用这个配置:
def __init__(self):
self.client = KafkaClient(hosts='localhost:9092')
self.topic = None
self.consumer = None
self.producer = None
def initialize_variables(self,topic_name):
if ((self.topic == None) or (self.topic != topic_name)):
self.topic = self.client.topics[topic_name]
self.consumer = self.topic.get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,reset_offset_on_start=False
)
self.producer = self.topic.get_producer()
还有:
@keyword('Kafka Consume')
def consume_1(self,topic_name,timeout = DEFAULT_TIMEOUT):
self.initialize_variables(topic_name)
loop_time = time.time() + timeout
while time.time() <= loop_time:
msg_tuple = self.consumer.consume(block = False)
if msg_tuple is None:
continue
received_msg = json.loads(msg_tuple.value)
return received_msg
return None
每次我使用这个设置时,我总是得到最后的结果,而不是当前的结果。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)