我在使用 PyKafka 在 Kafka 上发布 JSON 消息时遇到问题

问题描述

我正在使用 pykafka 库在 Kafka 上发布消息。我的数据集是一个 JSON

{"user": "jpoole","created_at_unixtime": 1440407147.033846,"id": 3600730356622213650,"text": "Techical support for my new computer as A+,thank you @fudgemart","created_at": "Mon Aug 24 05:05:47 +0000 2015"} 
]

我的要求是使用 pykafka 为上面的每个 JSON 字符串生成 2 个 kafka 消息,1 个。到目前为止,我已经尝试了以下方法

from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']

with open('./tweets.json') as f:
    dataItems =json.load(f)

s=json.dumps(dataItems).encode('utf-8')


with topic.get_sync_producer() as producer:
    for data in s:
        producer.produce(data)

我已将 JSON 加载到文件中(我最初的要求)。上面的代码有效,但它没有将第一个 JSON 字符串作为一个整体,而是将字符串中的每个字符都作为消息。

我的要求是将每个 JSON 字符串作为单独的 Kafka 消息发布。

Message 1
{"user": "jpoole","created_at_unixtime": 1448221456.6646008,"id": 3731785240073317438,"text": "Glad I bought my electronics from @fudgemart","created_at": "Sun Nov 22 14:44:16 +0000 2015"}

Message 2
{"user": "jpoole","created_at": "Mon Aug 24 05:05:47 +0000 2015"}

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)