集成Spark Streaming和Twitter API时,为什么总是将空RDD发送给Spark?

问题描述

我是Spark Streaming的新手,我正在做一个小型个人项目来研究这项技术。我想使用Twitter API获取实时推文,然后使用Spark Streaming转换流数据以可视化流行标签。

我编写了一个Python脚本twitter_api.py,以从Twitter API获取推文,然后通过TCP连接将数据发送到Spark。我认为这一步没有问题,因为我可以打印出获得的推文。 但是,在另一个脚本spark.py中,我在处理RDD时总是得到'ValueError'。它说我的RDD是空的。

我认为pyspark在笔记本电脑上配置良好,因为我可以运行静态示例。

spark.py脚本如下所示:

#!/usr/bin/env python3
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession,Row
from twitter_app import TCP_IP,TCP_PORT,KEYWORD
import seaborn as sns
import matplotlib.pyplot as plt
import time
import sys


def spark(TCP_IP,KEYWORD):
   sc = SparkContext(appName='TwitterStreamingApp')
   sc.setLogLevel('ERROR')

   ssc = StreamingContext(sc,5)


   ssc.checkpoint("checkpoint_TwitterApp")

   
   data_stream = ssc.socketTextStream(TCP_IP,TCP_PORT)
   lines = data_stream.window(20)

   words = lines.flatMap(lambda x: x.split(' '))
   hashtags = words.filter(lambda x: '#' in x)  

   def process_rdd(rdd):
       try:
           # initialization of SparkSession
           spark = SparkSession.builder.config(conf=rdd.context.getConf()).getOrCreate()

           # map each rdd to each row
           rdd_row = rdd.map(lambda x: Row(tag=x))

           # create the dataframe
           hashtags_df = spark.createDataFrame(rdd_row)

           hashtags_df.createOrReplaceTempView('tags')
           hashtags_count_df = spark.sql(
            'SELECT tag,count(tag) FROM tags GROUP BY tag ORDER BY COUNT(tag) DESC LIMIT 10')
          

           pd_df = hashtags_count_df.toPandas()

           plt.figure(figsize=(10,8))
           sns.barplot(x="total",y="word",data=pd_df.head(20))
           plt.show()

       except:
           e = sys.exc_info()[0]
           print("Error: %s" % e)

   hashtags.foreachRDD(process_rdd)
   ssc.start()

   # wait for the streaming to finish
   ssc.awaitTermination()


if __name__ == "__main__":
    spark(TCP_IP,KEYWORD)

The errors I got

我真的不知道出什么问题了!

p.s。我正在将MacBook Pro 2018与Catalina 10.15.5、2.3 GHz四核,Intel Core i5配合使用

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...