Pyspark套接字超时错误返回self._sock.recv_intobsocket.timeout:超时

问题描述

我为协作过滤推荐系统编写了一个Spark程序(Python 3.6和Spark 2.3.2),可在2种情况下工作:

  1. 案例1:基于项目的CF推荐系统
  2. 案例2:具有Min-Hash LSH的基于用户的CF推荐系统

我编写了包含这两种情况的培训和预测程序。我的代码适用于基于用户的推荐,但是当我尝试为基于项目的CF训练模型时,出现以下错误

2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py",line 238,in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py",line 690,in read_int
    length = stream.read(4)
  File "C:\Users\17372\AppData\Local\Programs\Python\python36\lib\socket.py",line 586,in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

我尝试使用此链接上的解决方案来解决此问题: Pyspark socket timeout exception after application running for a while

它不起作用。

我找到了一种在执行中添加“ --spark.worker.timeout = 120”的解决方案,如下所示:

bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120

我仍然看到相同的错误。也尝试了Try Catch块,但我不确定如何正确处理。

我该怎么办?

我的基于项目的CF代码

 if model_type == ITEM_BASED_MODEL:
        # group original data by bidx,and remove those unpopular business (rated time < 3)
        # tuple(bidx,(uidx,score))
        # [(5306,[(3662,5.0),(3218,(300,..]),()
        shrunk_bid_uids_rdd = input_lines \
            .map(lambda kv: (bus_index_dict[kv[1]],(user_index_dict[kv[0]],kv[2]))) \
            .groupByKey().mapValues(lambda uid_score: list(uid_score)) \
            .filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
            .mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
            .mapValues(lambda val: flatMixedList(val))

        candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)

        # convert shrunk_bid_uids_rdd into dict form
        # dict(bidx: dict(uidx: score))
        # => e.g. {5306: defaultdict(<class 'list'>,{3662: 5.0,3218: 5.0,300: 5.0...}),bid_uid_dict = shrunk_bid_uids_rdd \
            .map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
            .flatMap(lambda kv_items: kv_items.items()).collectAsMap()

        # generate all possible pair between candidate bidx
        # and compute the pearson similarity
        candidate_pair = candidate_bids.cartesian(candidate_bids) \
            .filter(lambda id_pair: id_pair[0] < id_pair[1]) \
            .filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],bid_uid_dict[id_pair[1]])) \
            .map(lambda id_pair: (id_pair,computeSimilarity(bid_uid_dict[id_pair[0]],bid_uid_dict[id_pair[1]]))) \
            .filter(lambda kv: kv[1] > 0) \
            .map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],"b2": reversed_index_bus_dict[kv[0][1]],"sim": kv[1]})

解决方法

我在本地运行的Python 3.7和Spark 2.4.4遇到了相同的错误。火花选项组合没有帮助。

我正在从严重倾斜的实木复合地板文件中读取行。它们包含一个二进制列,其值介于几个字节到10MB以上。尽管为spark.default.parallelism设置了较高的数量,但所得数据帧仍包含相对较少的分区数量。分区的数量仍然与我正在读取的镶木地板文件的数量相似,并且不断出现套接字超时

我试图将spark.sql.files.maxPartitionBytes设置为足够小的值,但是错误仍然存​​在。唯一有帮助的是读取数据后的repartition,以增加分区数并更均匀地分布行。请注意,这只是一个观察,我仍然无法解释为什么错误消失了。

如果数据偏斜也是此处的主题,可以通过将代码更改为以下内容来缓解:

input_lines \
    .repartition(n) \
    .map(...)

n取决于您的集群和工作特征,并且有一个最佳选择。如果n太低,您将获得套接字超时。如果n太大,将会对性能产生负面影响。