问题描述
您好,我正在尝试运行此代码以使用kafka提取实时推文,但显示 kafka生产者中收到的错误。我不知道问题出在哪里。
import json
from kafka import KafkaProducer
import tweepy
import configparser
class TweeterStreamListener(tweepy.StreamListener):
def __init__(self,api):
self.api = api
super(tweepy.StreamListener,self).__init__()
self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
def on_status(self,status):
# This method is called whenever new data arrives from live stream.
# We asynchronously push this data to kafka queue
msg = status.text.encode('utf-8')
try:
self.producer.send_messages(b'twitterstream',msg)
except Exception as e:
print(e)
return False
return True
def on_error(self,status_code):
print("Error received in kafka producer")
return True # Don't kill the stream
def on_timeout(self):
return True # Don't kill the stream
谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)