Spark LSH 管道,增加文本长度时的性能问题

问题描述

从这个 example 开始,我在 Pyspark 上使用了局部敏感哈希 (LSH) 来查找重复的文档。
关于我的数据库的一些说明:我有 400 万个文本文件。每个文件平均有 20K 个字符。目前,我只考虑每个文档的前 500 个字符。
当我将字符数从 500 增加到 1000 时,出现内存错误
我已经尝试处理管道的参数。我知道我可以避免增加 Ngram 中的 n 和减少 MinHashLSH 中的 NumHashTables 的内存错误。然而,这会增加过多的假阴性。
流程中是否还有其他可以提高性能的步骤?
我的目标是将字符数从 500 增加到 2000,而不会出现内存错误或很长的计算时间(理想情况下,时间计算 这是我的带有假数据的代码

# Prameters
# NGram
n_gram = 2 #really,i use n_gram=8 because i have 500char per each document 
# MinHashLSH
hash_tables = 10 #really,i use hash_tables=3 to avoid memory error and too long computational time 
# jaccard treshold
treshold_test = 0.5
#Fake dataframe
df = spark.createDataFrame([
  (0,"Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),(1,"I wish Java Could use case classes I wish Java Could use case classes!!"),(2,"Logistic,regression,models,are,neat,etc,etc"),(3,"Hi I heard about Spork Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),(4,"Hi I heard about Java Hi I heard about Java Hi I heard about Java Hi     I heard about Java")
],["id","text"])
# cleaning puntuactions and double spaces
df = df.withColumn("text",regexp_replace('text',r'\p{Punct}',''))
df = df.withColumn("text",r' (?= |$)',''))
#trim whitespaces and filtering out text too short
df = df.withColumn("text",trim(col("text")))\
.filter((col('text') != "") & (length(col('text')) > n_gram*3))
df.show(5,False)
# LSH pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer,NGram,HashingTF,MinHashLSH

db = df
query = df

model = Pipeline(stages=[
        RegexTokenizer(
        pattern="",inputCol="text",outputCol="tokens",minTokenLength=1
    ),NGram(n=n_gram,inputCol="tokens",outputCol="ngrams"),HashingTF(inputCol="ngrams",outputCol="vectors"),MinHashLSH(inputCol="vectors",outputCol="lsh",numHashTables=hash_tables)]).fit(db)

db_hashed = model.transform(db)
query_hashed = model.transform(query)
output = model.stages[-1].approxSimilarityJoin(db_hashed,query_hashed,treshold_test)

# similar pairs of documents:
output.filter(col('datasetA.id') != col('datasetB.id'))\
.select(col("datasetA.id").alias("idA"),col("datasetB.id").alias("idB"),col("datasetA.text").alias("textA"),col("datasetB.text").alias("textB"),col("distCol")).sort(col("distCol"))\
.withColumn('comb',sort_array(array(*('idA','idB')))).dropDuplicates(['comb']).show()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)