Pyspark错误:py4j.protocol.Py4JNetworkError:来自Java方面的答复为空

问题描述

我在jupyter笔记本上使用Python 3.6.7和Pyspark 2.3.0和spark 2.3.0从kafka提取推文,并使用spark流处理它们。在运行下面的代码

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
import logging

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")

ssc = StreamingContext(sc,60)

logging.getLogger("py4j").setLevel(logging.ERROR)

kafkaStream = KafkaUtils.createStream(ssc,'cdh57-01-node-01.moffatt.me:2181','spark-streaming',{'kafkaspark':1})

我遇到以下错误

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py",line 1062,in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py",line 908,in send_command
    response = connection.send_command(command)
  File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py",line 1067,in send_command
    "Error while receiving",e,proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-7-a7a877501187> in <module>
----> 1 kafkaStream = KafkaUtils.createStream(ssc,{'kafkaspark':1})

C:\Sentiment_Analysis\spark\python\pyspark\streaming\kafka.py in createStream(ssc,zkQuorum,groupId,topics,kafkaParams,storageLevel,keyDecoder,valueDecoder)
     77         jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
     78         helper = KafkaUtils._get_helper(ssc._sc)
---> 79         jstream = helper.createStream(ssc._jssc,jlevel)
     80         ser = PairDeserializer(NoOpSerializer(),NoOpSerializer())
     81         stream = DStream(jstream,ssc,ser)

C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self,*args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer,self.gateway_client,self.target_id,self.name)
   1161 
   1162         for temp_arg in temp_args:

C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer,gateway_client,target_id,name)
    326             raise Py4JError(
    327                 "An error occurred while calling {0}{1}{2}".
--> 328                 format(target_id,".",name))
    329     else:
    330         type = answer[1]

Py4JError: An error occurred while calling o32.createStream


我不知道如何解决错误?我是Spark和kafka的初学者,所以有人可以简单地向我解释如何摆脱这种情况吗?我该怎么办?

解决方法

这是兼容性问题,请参考以下链接:http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources 因此,要解决您的问题,请使用与您的Spark版本兼容的版本的kafka库。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...