Pyspark的pprint没有在jupyter中打印

问题描述

我正在jupyter上运行一个Spark流传输程序以使用来自Kafka集群的推文,而我遇到了麻烦,无法获取以下代码来正确打印推文计数。

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
import logging

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")

ssc = StreamingContext(sc,60)

kafkaStream = KafkaUtils.createStream(ssc,'cdh57-01-node-01.moffatt.me:2181','spark-streaming',{'kafkaspark':1})

parsed = kafkaStream.map(lambda v: json.loads(v[1]))

parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

ssc.start()
ssc.awaitTermination(timeout=180)


这是我运行 ssc.start() and ssc.awaitTermination(timeout=180)

后得到的结果
-------------------------------------------
Time: 2020-10-22 19:48:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-22 19:49:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-22 19:50:00
-------------------------------------------

只打印时间,而不打印时间。是什么引起了这个问题?在终端中运行的我的kafka用户中可以看到提取的推文。

这是我的Kafka集群代码

import pykafka
import json
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

#TWITTER API CONfigURATIONS
consumer_key = '--key--'
consumer_secret = '--key--'
access_token = '--key--'
access_secret = '--key--'

#TWITTER API AUTH
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)

api = tweepy.API(auth)

#Twitter Stream Listener
class KafkaPushListener(StreamListener):          
    def __init__(self):
        #localhost:9092 = Default Zookeeper Producer Host and Port Adresses
        self.client = pykafka.KafkaClient("localhost:9092")

        #Get Producer that has topic name is Twitter
        self.producer = self.client.topics[bytes("kafkaspark","ascii")].get_producer()
  
    def on_data(self,data):
        #Producer produces data for consumer
        #Data comes from Twitter
        self.producer.produce(bytes(data,"ascii"))
        return True
                                                                                                                                           
    def on_error(self,status):
        print(status)
        return True

#Twitter Stream Config
twitter_stream = Stream(auth,KafkaPushListener())

#Produce Data that has hashtag (Tweets)
twitter_stream.filter(track=['#Coldplay'])

我是Kafka和Spark的初学者,不知道是什么原因造成的。有人可以帮我吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)