使用者启动后,Python Kafka使用者未收到从其他线程发送的消息

问题描述

所以我有一个集成测试,我正在尝试在我想查看的是否发送给消费者的消息中运行。逻辑是我启动一个新线程,该线程将在5秒钟后发送消息,然后启动使用者。由于消息是非阻塞的,因此应该在使用者启动后发送该消息。但是使用者没有收到任何消息,它只是继续运行就可以了,但是它应该打印出我发送的消息。

这是我的代码

import time
import threading
from kafka import KafkaConsumer
from kafka import KafkaProducer

def send_message(message,topic="facts",sleep=0.5):
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092']
    )

    time.sleep(sleep)
    message = bytes(message,encoding="utf8")
    producer.send(topic,key=b'PayloadKey',value=message)


def consume():
    consumer = KafkaConsumer(
        "facts",enable_auto_commit=False,group_id='lelu',bootstrap_servers=['localhost:9092']
    )

    for message in consumer:
        consumer.commit()
        yield message

messages = consume()

thread = threading.Thread(target=send_message,args=('hello',5))
thread.daemon = True
thread.start()

for msg in messages:
    print(msg)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)