np.linalg.inv() 函数的pyspark实现是什么

问题描述

我是 Pyspark 的初学者。我正在尝试将 Pandas 函数转换为 pyspark。在函数中有一个矩阵求逆,但我发现使用 pyspark 很难得到逆矩阵。 我拥有的矩阵在 pyspark 数据框中。 如何在 pyspark 中实现这种反演? 这是numpy方法链接

np.linalg.inv()

https://numpy.org/doc/stable/reference/generated/numpy.linalg.inv.html

解决方法

在 pyspark 中没有直接的方法可以做到这一点。 我能想到的是在两者之间使用 Scala 操作来进行这种转换。

# Crating a random dataframe to check the pipleline

from pyspark.sql.types import StructType,StructField,DoubleType
data2 = [(1.0,2.0),(3.0,4.0),]

schema = StructType([ \
    StructField("A",DoubleType(),True),\
    StructField("B",True)\
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
df.registerTempTable("temp_table")

>>
df:pyspark.sql.dataframe.DataFrame = [A: double,B: double]
root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)

+---+---+
|A  |B  |
+---+---+
|1.0|2.0|
|3.0|4.0|
+---+---+
>>

# using scala breeze operation to get the inverse
%scala
import scala.util.Random
import breeze.linalg.DenseMatrix
import breeze.linalg.inv

val featuresDF = table("temp_table")

var FeatureArray: Array[Array[Double]] = Array.empty
val features = featuresDF.columns

for(i <- features.indices){
    FeatureArray = FeatureArray :+ featuresDF.select(features(i)).collect.map(_(0).toString).map(_.toDouble)
}

val desnseMat = DenseMatrix(FeatureArray: _*).t
val inverse = inv(desnseMat)
println(inverse)

val c = inverse.toArray.toSeq
val matrix = c.toDF("mat")
matrix.createOrReplaceTempView("matrix_df")

>>
featuresDF:org.apache.spark.sql.DataFrame = [A: double,B: double]
matrix:org.apache.spark.sql.DataFrame = [mat: double]
-1.9999999999999996  0.9999999999999998   
1.4999999999999998   -0.4999999999999999  
import scala.util.Random
import breeze.linalg.DenseMatrix
import breeze.linalg.inv
featuresDF: org.apache.spark.sql.DataFrame = [A: double,B: double]
FeatureArray: Array[Array[Double]] = Array(Array(1.0,3.0),Array(2.0,4.0))
features: Array[String] = Array(A,B)
desnseMat: breeze.linalg.DenseMatrix[Double] =
1.0  2.0
3.0  4.0
inverse: breeze.linalg.DenseMatrix[Double] =
-1.9999999999999996  0.9999999999999998
1.4999999999999998   -0.4999999999999999
c: Seq[Double] = WrappedArray(-1.9999999999999996,1.4999999999999998,0.9999999999999998,-0.4999999999999999)
matrix: org.apache.spark.sql.DataFrame = [mat: double]
>>


# collecting inverse flattened matrix from scala operation in pyspark
import numpy as np
matrix_df=spark.sql('''SELECT * FROM matrix_df  ''')
df = matrix_df
df.show()


# converting long form to wide form.

from pyspark.sql import functions as F,Window
from math import sqrt
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))

out = (df.withColumn("Rnum",((rnum-1)%c).cast("Integer"))
 .withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("mat")))
out.show()

>>
out:pyspark.sql.dataframe.DataFrame = [Rnum: integer,1: double ... 1 more fields]
+----+-------------------+------------------+
|Rnum|                  1|                 2|
+----+-------------------+------------------+
|   0|-1.9999999999999996|1.4999999999999998|
|   1|-0.4999999999999999|0.9999999999999998|
+----+-------------------+------------------+
>>