使用 PySpark 计算 Jaccard 距离时的对数少于应有的数量

问题描述

我正在尝试以 SparseVectors 的形式计算某些 id 与它们的属性间的 Jaccard 距离。

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark import SparkConf,SparkContext
from pyspark.sql import sqlContext
import pyspark.sql.functions as F
from pyspark.mllib.linalg import Vectors,VectorUDT
from pyspark.sql.functions import udf
sqlContext = sqlContext(sc)
df = sqlContext.read.load("path")
par = udf(lambda s: Vectors.parse(s),VectorUDT())
d = df_filtered.select("id",par("vect"))
from pyspark.ml.linalg import VectorUDT as VectorUDTML
as_ml = udf(lambda v: v.asML() if v is not None else None,VectorUDTML())
result = d.withColumn("<lambda>(vect)",as_ml("<lambda>(vect)"))
mh = MinHashLSH(inputCol="<lambda>(vect)",outputCol="hashes",seed=12345,numHashTables=15)
model = mh.fit(df)
a = model.transform(df)

jd = model.approxSimilarityJoin(a,a,1.0,distCol="Jaccarddistance").select(
     col("datasetA.id1").alias("idA"),col("datasetB.id1").alias("idB"),col("Jaccarddistance"))

df 有两列,idsparse_vectorid 列是一个字母数字 ID,sparse_vector 列包含这样的记录 SparseVector(243775,{0: 1.0,1: 1.0,2: 1.0,3: 1.0,4: 1.0,6: 1.0,7: 1.0,8: 1.0,9: 1.0,10: 1.0,11: 1.0,12: 1.0,13: 1.0,14: 1.0,15: 1.0,16: 1.0,24: 1.0,30: 1.0,31: 1.0,32: 1.0,61: 1.0,88: 1.0,90: 1.0,96: 1.0,104: 1.0,153: 1.0,155: 1.0,159: 1.0,160: 1.0,161: 1.0,162: 1.0,169: 1.0,181: 1.0,194: 1.0,212: 1.0,220: 1.0,222: 1.0,232: 1.0,303: 1.0,390: 1.0,427: 1.0,506: 1.0,508: 1.0,509: 1.0,518: 1.0,554: 1.0,568: 1.0,798: 1.0,1431: 1.0,2103: 1.0,2139: 1.0,3406: 1.0,3411: 1.0,3415: 1.0,3429: 1.0,3431: 1.0,3440: 1.0,3443: 1.0,3449: 1.0}))

当我计算 Jaccard 并记下数据时,我遗漏了很多 id 对。数据中共有 45k 个身份,因此输出应包含大约 45k*45k 对。

此外,当我仅将 1k id 与 45k id 进行比较并以这种方式处理所有 id 时,我得到了所有可能的对,有点像批次。任何输入都会有所帮助。 另外,我可以并行化代码以便更快地拥有批处理系统吗?我正在 emr 集群上运行代码,并且拥有增加集群大小的资源。

以下脚本可用于生成带有 id 和人工生成的稀疏向量的样本数据。

from random import randint
from collections import OrderedDict
with open('/mnt/lsh_data.csv','a') as the_file:
    the_file.write("id\vect\n")
    for i in range(45000):
        a = "id"
        b = a + str(i)
        num_ent = randint(101,195) + randint(102,200)
        lis = []
        for j in range(num_ent):
            lis.append(randint(0,599999))
        lis.sort()
        l = list(OrderedDict.fromkeys(lis))
        data = []
        for j in range(len(l)):
            c = randint(0,1)
            if c == 0:
                data.append(1.0)
            else:
                data.append(0.0)
        b = b + "\t(600000,"+str(l)+","+str(data)+")\n"
        the_file.write(b)

解决方法

不是真正的答案,但评论太长了:

我不确定 approxSimilarityJoin 是如何工作的以及预期的输出是什么。但是,我检查了文档 (http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=minhash%20lsh#pyspark.ml.feature.MinHashLSH) 中给出的示例,它只有 3 x 3,即使在那里我们也没有得到完整的叉积(即使我们增加了阈值)。所以也许这不是预期的输出...

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import MinHashLSH

data = [(0,Vectors.sparse(6,[0,1,2],[1.0,1.0,1.0]),),(1,[2,3,4],(2,2,)]

df = spark.createDataFrame(data,["id","features"])

mh = MinHashLSH(inputCol="features",outputCol="hashes",seed=12345)

model = mh.fit(df)
model.transform(df).head()

data2 = [(3,[1,5],(4,(5,)]

df2 = spark.createDataFrame(data2,"features"])

model.approxSimilarityJoin(df,df2,distCol="JaccardDistance").show()

,

检查 approxSimilarityJoin 源代码,您可以看到该函数首先对每个输入向量的 locality sensitive hash (LSH) 执行连接,“以高概率将相似的输入项散列到相同的桶中。 ”然后计算结果的距离。其效果是仅在获取每个向量的 LSH 后最终在同一桶中的向量之间计算距离。这就是为什么您看不到输入数据集中所有对的距离,只看到最终在同一个桶中的向量对。

此外,LSH 的输入是来自数据的输入向量和从初始种子派生的随机系数,这解释了为什么改变种子会改变分桶,从而改变输出。

如果您通过更改 MinHashLSH seed 参数的值进行实验,您可以看到分桶的变化。