问题描述
我正在做一个作业,我需要在Python Spark流作业中保持批次之间的数据总计运行。我正在使用updateStateByKey(下面的代码的结尾):
import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os
if __name__ == "__main__":
# Create Spark Context
sc = SparkContext(appName="PythonStreamingDirectKafkaCount")
ssc = StreamingContext(sc,1)
# Checkpoint for backups
ssc.checkpoint("file:///tmp/spark")
brokers,topic = sys.argv[1:]
print(brokers)
print(topic)
sc.setLogLevel("WARN")
#Connect to Kafka
kafkaParams = {"Metadata.broker.list": brokers}
kafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams)
def parse_log_line(line):
(uuid,timestamp,url,user,region,browser,platform,cd,ttf) = line.strip().split(",")
hour = timestamp[0:13]
return (url,1)
lines = kafkaStream.map(lambda x: x[1])
parsed_lines = lines.map(parse_log_line)
clicks = parsed_lines.reduceByKey(lambda a,b: a + b)
clicks.pprint()
def countKeys(newValues,lastSum):
if lastSum is None :
lastSum = 0
return sum(newValues,lastSum)
# The problem is here
sum_clicks = clicks.updateStateByKey(countKey)
# I tried this but it didn't help
# sum_clicks = clicks.updateStateByKey(countKeys,numPartitions=2)
sum_clicks.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()
调用pprint()时会显示错误消息的相关部分,但是我认为这仅仅是因为这会触发评估。错误是:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 195,num of partitions: 2]; Checkpoint RDD [ID: 267,num of partitions: 0].
它表明原始RDD和Checkpoint RDD中的分区数是不同的-但我尝试指定numPartitions = 2并没有区别。
有人知道我在做什么错吗?谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)