Py4JJavaError:作业由于阶段失败而中止:阶段460.0中的任务0失败4次

问题描述

我在用pyspark编写的Spark Streaming代码中遇到了这个奇怪的错误。我尝试调试此代码,但没有任何原因

下面是我的代码。该文件名称为Script.py

https://www.example.com

错误日志:-

import os
from pyspark.sql.types import *
import json
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
from pyspark.sql import Row,SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import sqlContext
import time as t
import shutil
pkey = "userid"

def getsqlContextInstance(sparkContext):
        if ('sqlContextSingletonInstance' not in globals()):
                globals()['sqlContextSingletonInstance'] = sqlContext(sparkContext)
        return globals()['sqlContextSingletonInstance']
def processRDDs(time,rdd):
        print("========= %s =========" % str(time))
        if rdd.isEmpty() > 0:  #RDD is Empty
        print("*****************************EMPTY RDD ********************************")
    else:
        sqlContext = getsqlContextInstance(rdd.context)
        myRDD = rdd.map(lambda y : json.dumps(y))
        newRDD = sc.parallelize(myRDD.collect(),40)
                df0 = sqlContext.read.json(newRDD)
        df0.createOrReplaceTempView("mytable")
        newDF = spark.sql( '''    select  userid,userdevicetype,date_format(cast(timestamp as timestamp),'yyyy-MM-dd HH:mm:ss.SSS') timestamp,reminderemails,referredname,paymentmade,joined,inviteid,emailaddress  from mytable ''')
    
        newDF = newDF.select([col(c).cast("string") for c in newDF.columns])
        newDF.coalesce(3).write.mode("append").parquet("s3://bucket/file")

if __name__ == "__main__":
        sc = SparkContext(appName="app_test")
        ssc = StreamingContext(sc,120)
        print("spark context set")
        spark = SparkSession(sc)
        sqlContext = sqlContext(sc)
        zkQuorum,topic = 'ip_Add','topic'
        kvs = KafkaUtils.createStream(ssc,zkQuorum,"grp1",{topic: 1},{"auto.offset.reset" : "smallest"})
        print("********************************************connection set****************************************************************")
        dstream = kvs.map(lambda x: json.loads(x[1]))
        dstream.foreachRDD(processRDDs) 
        ssc.start()
        ssc.awaitTermination()

我无法找到根本原因。每次我重新运行此代码,它运行良好。但是经常出现此错误

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)