问题描述
我为协作过滤推荐系统编写了一个Spark程序(Python 3.6和Spark 2.3.2),可在2种情况下工作:
- 案例1:基于项目的CF推荐系统
- 案例2:具有Min-Hash LSH的基于用户的CF推荐系统
我编写了包含这两种情况的培训和预测程序。我的代码适用于基于用户的推荐,但是当我尝试为基于项目的CF训练模型时,出现以下错误:
2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py",line 238,in main
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py",line 690,in read_int
length = stream.read(4)
File "C:\Users\17372\AppData\Local\Programs\Python\python36\lib\socket.py",line 586,in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
我尝试使用此链接上的解决方案来解决此问题: Pyspark socket timeout exception after application running for a while
它不起作用。
我找到了一种在执行中添加“ --spark.worker.timeout = 120”的解决方案,如下所示:
bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120
我仍然看到相同的错误。也尝试了Try Catch块,但我不确定如何正确处理。
我该怎么办?
我的基于项目的CF代码:
if model_type == ITEM_BASED_MODEL:
# group original data by bidx,and remove those unpopular business (rated time < 3)
# tuple(bidx,(uidx,score))
# [(5306,[(3662,5.0),(3218,(300,..]),()
shrunk_bid_uids_rdd = input_lines \
.map(lambda kv: (bus_index_dict[kv[1]],(user_index_dict[kv[0]],kv[2]))) \
.groupByKey().mapValues(lambda uid_score: list(uid_score)) \
.filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
.mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
.mapValues(lambda val: flatMixedList(val))
candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)
# convert shrunk_bid_uids_rdd into dict form
# dict(bidx: dict(uidx: score))
# => e.g. {5306: defaultdict(<class 'list'>,{3662: 5.0,3218: 5.0,300: 5.0...}),bid_uid_dict = shrunk_bid_uids_rdd \
.map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
.flatMap(lambda kv_items: kv_items.items()).collectAsMap()
# generate all possible pair between candidate bidx
# and compute the pearson similarity
candidate_pair = candidate_bids.cartesian(candidate_bids) \
.filter(lambda id_pair: id_pair[0] < id_pair[1]) \
.filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],bid_uid_dict[id_pair[1]])) \
.map(lambda id_pair: (id_pair,computeSimilarity(bid_uid_dict[id_pair[0]],bid_uid_dict[id_pair[1]]))) \
.filter(lambda kv: kv[1] > 0) \
.map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],"b2": reversed_index_bus_dict[kv[0][1]],"sim": kv[1]})
解决方法
我在本地运行的Python 3.7和Spark 2.4.4遇到了相同的错误。火花选项组合没有帮助。
我正在从严重倾斜的实木复合地板文件中读取行。它们包含一个二进制列,其值介于几个字节到10MB以上。尽管为spark.default.parallelism
设置了较高的数量,但所得数据帧仍包含相对较少的分区数量。分区的数量仍然与我正在读取的镶木地板文件的数量相似,并且不断出现套接字超时。
我试图将spark.sql.files.maxPartitionBytes
设置为足够小的值,但是错误仍然存在。唯一有帮助的是读取数据后的repartition
,以增加分区数并更均匀地分布行。请注意,这只是一个观察,我仍然无法解释为什么错误消失了。
如果数据偏斜也是此处的主题,可以通过将代码更改为以下内容来缓解:
input_lines \
.repartition(n) \
.map(...)
n
取决于您的集群和工作特征,并且有一个最佳选择。如果n
太低,您将获得套接字超时。如果n
太大,将会对性能产生负面影响。