问题描述
我在python的pykafka中使用KafkaClient。我正在尝试读取文本文件并产生其到主题的行,然后由使用者读取它。但是,在运行时,它仅读取消息中的单个字母,而不读取文本文件中的单词或行。我在做什么错了?
我的制片人是
from pykafka import KafkaClient
text = open('filename.txt','r').read()
text = text.split()
client = KafkaClient(hosts='localhost:9099')
topic = client.topics['topic']
producer = topic.get_sync_producer()
for i in text:
producer.produce(i.encode('ascii'))
我的消费者是
from pykafka import KafkaClient
client = KafkaClient(hosts='localhost:9099')
topic = client.topics['topic']
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
print(message.offset,message.value.decode())
请多多指教。我想知道这是否是读取文本文件并通过kafka运行它的最佳方法。
解决方法
通过将库切换到kafka库,我得到了所需的输出。
制作人
from kafka import KafkaProducer
producer =
KafkaProducer(bootstrap_servers='localhost:9099')
with open('filename.txt')
as f:
for line in f:
producer.send('topic',line.encode('ascii'))
producer.flush()
producer.close()
消费者
from kafka import KafkaConsumer
consumer =
KafkaConsumer('topic',bootstrap_servers ='localhost:9099')
for msg in consumer:
print(msg.value.decode())