问题描述
我正在使用 pykafka 库在 Kafka 上发布消息。我的数据集是一个 JSON
{"user": "jpoole","created_at_unixtime": 1440407147.033846,"id": 3600730356622213650,"text": "Techical support for my new computer as A+,thank you @fudgemart","created_at": "Mon Aug 24 05:05:47 +0000 2015"}
]
我的要求是使用 pykafka 为上面的每个 JSON 字符串生成 2 个 kafka 消息,1 个。到目前为止,我已经尝试了以下方法。
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
with open('./tweets.json') as f:
dataItems =json.load(f)
s=json.dumps(dataItems).encode('utf-8')
with topic.get_sync_producer() as producer:
for data in s:
producer.produce(data)
我已将 JSON 加载到文件中(我最初的要求)。上面的代码有效,但它没有将第一个 JSON 字符串作为一个整体,而是将字符串中的每个字符都作为消息。
我的要求是将每个 JSON 字符串作为单独的 Kafka 消息发布。
Message 1
{"user": "jpoole","created_at_unixtime": 1448221456.6646008,"id": 3731785240073317438,"text": "Glad I bought my electronics from @fudgemart","created_at": "Sun Nov 22 14:44:16 +0000 2015"}
Message 2
{"user": "jpoole","created_at": "Mon Aug 24 05:05:47 +0000 2015"}
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)