使用pyspark中的图框Pregel API在组织和深度上进行员工层次结构

问题描述

我有spark / scala Graphx解决方案,可以解决员工层次结构问题,并向我提供每位员工与高层管理人员相比的深度。它在内部使用Pregel API。我可以使用pyspark图框实现相同的功能吗,如果可以的话,可以使用pregel api。

    import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.DataFrame

type Role = String
case class Employee(name: String,role: Role)

val employeeRawData = Array(
  (1L,"Steve","Jobs","CEO",None),(2L,"Leslie","Lamport","CTO",Some(1L)),(3L,"Jason","Fried","Manager",(4L,"Joel","Spolsky",Some(2L)),(5L,"Jeff","Dean","Lead",Some(4L)),(6L,"Martin","Odersky","Sr.Dev",Some(5L)),(7L,"Linus","Trovalds","Dev",Some(6L)),(8L,"Wozniak",(9L,"Matei","Zaharia",(10L,"James","Faeldon","Intern",Some(7L))
)

val employeeDf = sc.parallelize(employeeRawData,4).toDF(
  "employeeId","firstName","lastName","role","supervisorId"
).cache()

val verticesRdd: RDD[(VertexId,Employee)] = employeeDf
  .select($"employeeId",concat($"firstName",lit(" "),$"lastName"),$"role")
  .rdd.map(emp => (emp.getLong(0),Employee(emp.getString(1),emp.getString(2))))

val edgesRdd: RDD[Edge[String]] = employeeDf
  .filter($"supervisorId".isNotNull) # Remove vertices without supervisor,in Scala None === Null
  .select($"supervisorId",$"employeeId",$"role") # First column is supervisorID (not employeeId),since direction of edge is top-down
  .rdd.map(emp => Edge(emp.getLong(0),emp.getLong(1),emp.getString(2))) # Edge property is the Role

# Define a default employee in case there are missing employee referenced in Graph
val missingEmployee = Employee("John Doe","Unknown")

# Let's build the graph model
val employeeGraph: Graph[Employee,String] = Graph(verticesRdd,edgesRdd,missingEmployee)

# The structure of the message to be passed to vertices
case class EmployeeMessage(
  currentId: Long,# Tracks the most recent vertex appended to path and used for flagging isCyclic
  level: Int,# The number of up-line supervisors (level in reporting heirarchy)
  head: String,# The top-most supervisor
  path: List[String],# The reporting path to the the top-most supervisor
  isCyclic: Boolean,# Is the reporting structure of the employee cyclic
  isLeaf: Boolean # Is the employee rank and file (no down-line reporting employee)
)

# The structure of the vertex values of the graph
case class EmployeeValue(
  name: String,# The employee name
  currentId: Long,# Initial value is the employeeId
  level: Int,# Initial value is zero
  head: String,# Initial value is this employee's name
  path: List[String],# Initial value contains this employee's name only
  isCyclic: Boolean,# Initial value is false
  isLeaf: Boolean # Initial value is true
)

# Initialize the employee vertices
val employeeValueGraph: Graph[EmployeeValue,String] = employeeGraph.mapVertices { (id,v) =>
  EmployeeValue(
    name = v.name,currentId = id,level = 0,head = v.name,path = List(v.name),isCyclic = false,isLeaf = false
  )
}

def vprog(
  vertexId: VertexId,value: EmployeeValue,message: EmployeeMessage
): EmployeeValue = {
  
  if (message.level == 0) { #superstep 0 - initialize
    value.copy(level = value.level + 1)
  } else if (message.isCyclic) { # set isCyclic
    value.copy(isCyclic = true)
  } else if (!message.isLeaf) { # set isleaf    
    value.copy(isLeaf = false)  
  } else { # set new values
    value.copy(
      currentId = message.currentId,level = value.level + 1,head = message.head,path = value.name :: message.path
    )
  }
}


def sendMsg(
  triplet: EdgeTriplet[EmployeeValue,String]
): Iterator[(VertexId,EmployeeMessage)] = {
  
  val src = triplet.srcAttr
  val dst = triplet.dstAttr
  
  # Handle cyclic reporting structure
  if (src.currentId == triplet.dstId || src.currentId == dst.currentId) {
    if (!src.isCyclic) { # Set isCyclic
      Iterator((triplet.dstId,EmployeeMessage(
        currentId = src.currentId,level = src.level,head = src.head,path = src.path,isCyclic = true,isLeaf = src.isLeaf
      )))
    } else { # Already marked as isCyclic (possibly,from previous superstep) so ignore
      Iterator.empty
    }
  } else { # Regular reporting structure
    if (src.isLeaf) { # Initially every vertex is leaf. Since this is a source then it should NOT be a leaf,update
      Iterator((triplet.srcId,isLeaf = false # This is the only important value here
      )))
    } else { # Set new values by propagating source values to destination
      #Iterator.empty
      Iterator((triplet.dstId,# Set to false so that cyclic updating is ignored in vprog
        isLeaf = true # Set to true so that leaf updating is ignored in vprog
      )))
    }
  }
}


def mergeMsg(msg1: EmployeeMessage,msg2: EmployeeMessage): EmployeeMessage = msg2

val initialMsg = EmployeeMessage(
    currentId = 0L,head = "",path = Nil,isLeaf = true
)

val results = employeeValueGraph.pregel(
  initialMsg,Int.MaxValue,EdgeDirection.Out
)(
  vprog,sendMsg,mergeMsg
)

val resultDf = results
  .vertices.map { case (id,v) => (id,v.name,v.level,v.head,v.path.reverse.mkString(">"),v.isCyclic,v.isLeaf) }
  .toDF("id","employee","level","head","path","cyclic","leaf")

val df = resultDf.withColumn("letters",split(col("path"),">"))

val numCols = df
  .withColumn("letters_size",size($"letters"))
  .agg(max($"letters_size"))
  .head()
  .getInt(0)

df.drop(col("path")).drop(col("leaf")).drop("cyclic")
  .select( col("*") +: 
    (0 until numCols).map(i => $"letters".getItem(i).as(s"level$i")): _*
  ).drop(col("letters")).orderBy(col("level"))
  .show()

这将生成类似这样的输出。

Output

有人可以指导我进行pyspark graphframes转换。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...