运行kafka生产者时为什么会出现错误?

问题描述

您好,我正在尝试运行此代码以使用kafka提取实时推文,但显示 kafka生产者中收到的错误。我不知道问题出在哪里。

import json
from kafka import KafkaProducer
import tweepy
import configparser

class TweeterStreamListener(tweepy.StreamListener):

    def __init__(self,api):
        self.api = api
        super(tweepy.StreamListener,self).__init__()
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

    def on_status(self,status):
        # This method is called whenever new data arrives from live stream.
        # We asynchronously push this data to kafka queue
        msg =  status.text.encode('utf-8')
        try:
            self.producer.send_messages(b'twitterstream',msg)
        except Exception as e:
            print(e)
            return False
        return True

    def on_error(self,status_code):
        print("Error received in kafka producer")
        return True # Don't kill the stream

    def on_timeout(self):
        return True # Don't kill the stream

谢谢。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)