问题描述
我在用pyspark编写的Spark Streaming代码中遇到了这个奇怪的错误。我尝试调试此代码,但没有任何原因
https://www.example.com
错误日志:-
import os
from pyspark.sql.types import *
import json
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
from pyspark.sql import Row,SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import sqlContext
import time as t
import shutil
pkey = "userid"
def getsqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = sqlContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def processRDDs(time,rdd):
print("========= %s =========" % str(time))
if rdd.isEmpty() > 0: #RDD is Empty
print("*****************************EMPTY RDD ********************************")
else:
sqlContext = getsqlContextInstance(rdd.context)
myRDD = rdd.map(lambda y : json.dumps(y))
newRDD = sc.parallelize(myRDD.collect(),40)
df0 = sqlContext.read.json(newRDD)
df0.createOrReplaceTempView("mytable")
newDF = spark.sql( ''' select userid,userdevicetype,date_format(cast(timestamp as timestamp),'yyyy-MM-dd HH:mm:ss.SSS') timestamp,reminderemails,referredname,paymentmade,joined,inviteid,emailaddress from mytable ''')
newDF = newDF.select([col(c).cast("string") for c in newDF.columns])
newDF.coalesce(3).write.mode("append").parquet("s3://bucket/file")
if __name__ == "__main__":
sc = SparkContext(appName="app_test")
ssc = StreamingContext(sc,120)
print("spark context set")
spark = SparkSession(sc)
sqlContext = sqlContext(sc)
zkQuorum,topic = 'ip_Add','topic'
kvs = KafkaUtils.createStream(ssc,zkQuorum,"grp1",{topic: 1},{"auto.offset.reset" : "smallest"})
print("********************************************connection set****************************************************************")
dstream = kvs.map(lambda x: json.loads(x[1]))
dstream.foreachRDD(processRDDs)
ssc.start()
ssc.awaitTermination()
我无法找到根本原因。每次我重新运行此代码,它运行良好。但是经常出现此错误
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)