问题描述
我正在jupyter上运行一个Spark流传输程序以使用来自Kafka集群的推文,而我遇到了麻烦,无法获取以下代码来正确打印推文计数。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
import logging
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")
ssc = StreamingContext(sc,60)
kafkaStream = KafkaUtils.createStream(ssc,'cdh57-01-node-01.moffatt.me:2181','spark-streaming',{'kafkaspark':1})
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination(timeout=180)
这是我运行 ssc.start() and ssc.awaitTermination(timeout=180)
-------------------------------------------
Time: 2020-10-22 19:48:00
-------------------------------------------
-------------------------------------------
Time: 2020-10-22 19:49:00
-------------------------------------------
-------------------------------------------
Time: 2020-10-22 19:50:00
-------------------------------------------
只打印时间,而不打印时间。是什么引起了这个问题?在终端中运行的我的kafka用户中可以看到提取的推文。
这是我的Kafka集群代码:
import pykafka
import json
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
#TWITTER API CONfigURATIONS
consumer_key = '--key--'
consumer_secret = '--key--'
access_token = '--key--'
access_secret = '--key--'
#TWITTER API AUTH
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)
api = tweepy.API(auth)
#Twitter Stream Listener
class KafkaPushListener(StreamListener):
def __init__(self):
#localhost:9092 = Default Zookeeper Producer Host and Port Adresses
self.client = pykafka.KafkaClient("localhost:9092")
#Get Producer that has topic name is Twitter
self.producer = self.client.topics[bytes("kafkaspark","ascii")].get_producer()
def on_data(self,data):
#Producer produces data for consumer
#Data comes from Twitter
self.producer.produce(bytes(data,"ascii"))
return True
def on_error(self,status):
print(status)
return True
#Twitter Stream Config
twitter_stream = Stream(auth,KafkaPushListener())
#Produce Data that has hashtag (Tweets)
twitter_stream.filter(track=['#Coldplay'])
我是Kafka和Spark的初学者,不知道是什么原因造成的。有人可以帮我吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)