问题描述
我正在尝试以 SparseVectors 的形式计算某些 id 与它们的属性之间的 Jaccard 距离。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark import SparkConf,SparkContext
from pyspark.sql import sqlContext
import pyspark.sql.functions as F
from pyspark.mllib.linalg import Vectors,VectorUDT
from pyspark.sql.functions import udf
sqlContext = sqlContext(sc)
df = sqlContext.read.load("path")
par = udf(lambda s: Vectors.parse(s),VectorUDT())
d = df_filtered.select("id",par("vect"))
from pyspark.ml.linalg import VectorUDT as VectorUDTML
as_ml = udf(lambda v: v.asML() if v is not None else None,VectorUDTML())
result = d.withColumn("<lambda>(vect)",as_ml("<lambda>(vect)"))
mh = MinHashLSH(inputCol="<lambda>(vect)",outputCol="hashes",seed=12345,numHashTables=15)
model = mh.fit(df)
a = model.transform(df)
jd = model.approxSimilarityJoin(a,a,1.0,distCol="Jaccarddistance").select(
col("datasetA.id1").alias("idA"),col("datasetB.id1").alias("idB"),col("Jaccarddistance"))
df 有两列,id
和 sparse_vector
。 id
列是一个字母数字 ID,sparse_vector
列包含这样的记录 SparseVector(243775,{0: 1.0,1: 1.0,2: 1.0,3: 1.0,4: 1.0,6: 1.0,7: 1.0,8: 1.0,9: 1.0,10: 1.0,11: 1.0,12: 1.0,13: 1.0,14: 1.0,15: 1.0,16: 1.0,24: 1.0,30: 1.0,31: 1.0,32: 1.0,61: 1.0,88: 1.0,90: 1.0,96: 1.0,104: 1.0,153: 1.0,155: 1.0,159: 1.0,160: 1.0,161: 1.0,162: 1.0,169: 1.0,181: 1.0,194: 1.0,212: 1.0,220: 1.0,222: 1.0,232: 1.0,303: 1.0,390: 1.0,427: 1.0,506: 1.0,508: 1.0,509: 1.0,518: 1.0,554: 1.0,568: 1.0,798: 1.0,1431: 1.0,2103: 1.0,2139: 1.0,3406: 1.0,3411: 1.0,3415: 1.0,3429: 1.0,3431: 1.0,3440: 1.0,3443: 1.0,3449: 1.0}))
当我计算 Jaccard 并记下数据时,我遗漏了很多 id 对。数据中共有 45k 个身份,因此输出应包含大约 45k*45k 对。
此外,当我仅将 1k id 与 45k id 进行比较并以这种方式处理所有 id 时,我得到了所有可能的对,有点像批次。任何输入都会有所帮助。 另外,我可以并行化代码以便更快地拥有批处理系统吗?我正在 emr 集群上运行代码,并且拥有增加集群大小的资源。
以下脚本可用于生成带有 id 和人工生成的稀疏向量的样本数据。
from random import randint
from collections import OrderedDict
with open('/mnt/lsh_data.csv','a') as the_file:
the_file.write("id\vect\n")
for i in range(45000):
a = "id"
b = a + str(i)
num_ent = randint(101,195) + randint(102,200)
lis = []
for j in range(num_ent):
lis.append(randint(0,599999))
lis.sort()
l = list(OrderedDict.fromkeys(lis))
data = []
for j in range(len(l)):
c = randint(0,1)
if c == 0:
data.append(1.0)
else:
data.append(0.0)
b = b + "\t(600000,"+str(l)+","+str(data)+")\n"
the_file.write(b)
解决方法
不是真正的答案,但评论太长了:
我不确定 approxSimilarityJoin
是如何工作的以及预期的输出是什么。但是,我检查了文档 (http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=minhash%20lsh#pyspark.ml.feature.MinHashLSH) 中给出的示例,它只有 3 x 3,即使在那里我们也没有得到完整的叉积(即使我们增加了阈值)。所以也许这不是预期的输出...
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import MinHashLSH
data = [(0,Vectors.sparse(6,[0,1,2],[1.0,1.0,1.0]),),(1,[2,3,4],(2,2,)]
df = spark.createDataFrame(data,["id","features"])
mh = MinHashLSH(inputCol="features",outputCol="hashes",seed=12345)
model = mh.fit(df)
model.transform(df).head()
data2 = [(3,[1,5],(4,(5,)]
df2 = spark.createDataFrame(data2,"features"])
model.approxSimilarityJoin(df,df2,distCol="JaccardDistance").show()
,
检查 approxSimilarityJoin
源代码,您可以看到该函数首先对每个输入向量的 locality sensitive hash (LSH) 执行连接,“以高概率将相似的输入项散列到相同的桶中。 ”然后计算结果的距离。其效果是仅在获取每个向量的 LSH 后最终在同一桶中的向量之间计算距离。这就是为什么您看不到输入数据集中所有对的距离,只看到最终在同一个桶中的向量对。
此外,LSH 的输入是来自数据的输入向量和从初始种子派生的随机系数,这解释了为什么改变种子会改变分桶,从而改变输出。
如果您通过更改 MinHashLSH
seed
参数的值进行实验,您可以看到分桶的变化。