如何从Apache Kafka使用数据

问题描述

参加挑战,就像:您的第一步-使用来自Apache Kafka的数据示例。 因此,他们给了我主题名称 API_KEY API_SECRET 。哦,引导服务器。 然后他们声称,好像您不熟悉Kafka一样,Confluent提供了全面的文档。好吧,登录到汇合处,建立一个集群,然后..下一步使用数据是什么?

解决方法

这是将来自Kafka的消息放入Python列表中的基本模式。

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'someTopicName',bootstrap_servers=['192.168.1.160:9092'],auto_offset_reset='earliest',enable_auto_commit=True,group_id='my-group',value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)

messageCache = []

for message in consumer:
    messageCache.append(message.value)

在这种情况下,我的Kafka代理使用默认端口位于我的专用LAN上,因此我的引导服务器列表仅为[“ 192.168.1.160:9092”]。

您可以使用标准计数器和if语句将列表保存到文件等中,因为假定Kafka流将永远运行。例如,我有一个使用Kafka消息的过程,每隔1,000,000条消息,将它们作为一个数据帧保存到HDFS中。在这种情况下,我想保存历史消息以开发ML模型。关于Kafka的妙处在于,我可以编写另一个过程来评估并可能实时响应每个消息。