Spark流updateStateWithKey失败

问题描述

我正在做一个作业,我需要在Python Spark流作业中保持批次之间的数据总计运行。我正在使用updateStateByKey(下面的代码的结尾):

import sys

from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import os

if __name__ == "__main__":
    # Create Spark Context
    sc = SparkContext(appName="PythonStreamingDirectKafkaCount")
    ssc = StreamingContext(sc,1)

    # Checkpoint for backups
    ssc.checkpoint("file:///tmp/spark")

    brokers,topic = sys.argv[1:]

    print(brokers)
    print(topic)

    sc.setLogLevel("WARN")

    #Connect to Kafka
    kafkaParams = {"Metadata.broker.list": brokers}
    kafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams)


    def parse_log_line(line):
        (uuid,timestamp,url,user,region,browser,platform,cd,ttf) = line.strip().split(",")
        hour = timestamp[0:13]
        return (url,1)

    lines = kafkaStream.map(lambda x: x[1])
    parsed_lines = lines.map(parse_log_line)

    clicks = parsed_lines.reduceByKey(lambda a,b: a + b)
    clicks.pprint()

    def countKeys(newValues,lastSum):
        if lastSum is None :
            lastSum = 0
        return sum(newValues,lastSum)

    # The problem is here
    sum_clicks = clicks.updateStateByKey(countKey)
    # I tried this but it didn't help
    # sum_clicks = clicks.updateStateByKey(countKeys,numPartitions=2)
    sum_clicks.pprint()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

调用pprint()时会显示错误消息的相关部分,但是我认为这仅仅是因为这会触发评估。错误是:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 195,num of partitions: 2]; Checkpoint RDD [ID: 267,num of partitions: 0].

它表明原始RDD和Checkpoint RDD中的分区数是不同的-但我尝试指定numPartitions = 2并没有区别。

有人知道我在做什么错吗?谢谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)