PySpark:删除源自其他行的行

问题描述

我确实具有以下数据框,该数据框包含经过所有节点后的树中的所有路径。对于节点之间的每次跳转,都会创建一行,其中“ dist”是到目前为止的节点数,“ node”是当前节点,“ path”是到目前为止的路径。

dist   |  node     |  path
0      |     1     |    [1]   
1      |     2     |    [1,2] 
1      |     5     |    [1,5] 
2      |     3     |    [1,2,3] 
2      |     4     |    [1,4] 

最后,我只想有一个包含完整路径的数据框,而无需中间步骤:

dist   |  node     |  path
1      |     5     |    [1,4]

我也尝试通过将path列作为字符串(“ 1; 2; 3”),并比较哪一行是彼此的子字符串,但是我找不到解决方法。

解决方法

我找到了我的旧代码并为您的问题创建了一个改编示例。为此,我使用了火花图库 Graphframes。路径可以由类似消息聚合循环的 Pregel 确定。

这里是代码。 首先导入所有模块

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

import pyspark.sql.functions as f
from graphframes import GraphFrame
from pyspark.sql.types import *

from graphframes.lib import *
# shortcut for the aggregate message object from the graphframes.lib
AM=AggregateMessages


# to plot the graph
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt


spark = (SparkSession
         .builder
         .appName("PathReduction")
         .getOrCreate()
        )

sc=spark.sparkContext

然后创建一个示例数据集

# create dataframe
raw_data = [
  ("0","1"),("1","2"),"5"),("2","3"),"4"),("a","b"),("b","c"),("c","d")]

schema = ["src","dst"]
data = spark.createDataFrame(data=raw_data,schema = schema) 
data.show()

+---+---+
|src|dst|
+---+---+
|  0|  1|
|  1|  2|
|  1|  5|
|  2|  3|
|  2|  4|
|  a|  b|
|  b|  c|
|  c|  d|
+---+---+

用于可视化运行

plotData_1 = data.select("src","dst").rdd.collect()
plotData_2 = np.array(plotData_1)

plotData_3=[]
for row in plotData_2:
  plotData_3.append((row[0],row[1]))

G=nx.DiGraph(directed=True)
G.add_edges_from(plotData_3)

options = {
    'node_color': 'orange','node_size': 500,'width': 2,'arrowstyle': '-|>','arrowsize': 20,}

nx.draw(G,arrows=True,**options,with_labels=True)

plot of graph

使用此消息聚合算法,您可以在搜索路径时找到路径。如果您将标志 show_steps 设置为 True,则会显示每一步的结果,这有助于理解。

# if flag is true print results within the loop for debuging
show_steps=False
# max itertions of the loop,should be larger then the longest expected path
max_iter=10

# create vertices from edge data set
vertices=(data.select("src").union(data.select("dst")).distinct().withColumnRenamed('src','id'))
edges=data

# create graph to get in and out degrees
gx = GraphFrame(vertices,edges)
# calclulate in and out degrees of each node
inDegrees=gx.inDegrees
outDegrees=gx.outDegrees

if(show_steps==True):
  print("in and out degrees")
  inDegrees.show()
  outDegrees.show()

# create intial vertices
init_vertices=(vertices
               # join out degrees on vertices
               .join(outDegrees,on="id",how="left")
               # join in degree on vertices
               .join(inDegrees,how="left")
               # define root,childs in the middle and leafs of the path in order to distinguish full paths later on
               .withColumn("nodeType",f.when(f.col("inDegree").isNull(),"root").otherwise(f.when(f.col("outDegree").isNull(),"leaf").otherwise("child")))
               # define message with all information [array(id) and array(nodeType)] to be send to the next noe
               .withColumn("message",f.array_union(f.array(f.array(f.col("id"))),f.array(f.array(f.col("nodeType")))))
               # remove columns that are not used anymore
               .drop("inDegree","outDegree")
              )

if(show_steps==True):
  print("init vertices")
  init_vertices.show()

# update graph object with init vertices
gx = GraphFrame(init_vertices,edges)


# define empty dataframe to append found paths on
results = sqlContext.createDataFrame(
        sc.emptyRDD(),StructType([StructField("paths",ArrayType(StringType()),True)])
    )


# start loopp for mesage aggregation. Set a max_iter value which has to be larger as the longest path expected

for iter_ in range(max_iter):

    if(show_steps==True):
        print("iteration step=" + str(iter_))
        print("##################################################")
    # define the message that should be send. Here we send a message to the source node and we take the column message from the destination source we send backward
    msgToSrc = AM.dst["message"]
    agg = gx.aggregateMessages(
      f.collect_set(AM.msg).alias("aggMess"),# aggregation function is a collect into an array (attention!! this can be an expensive operation in terms of shuffel)
      sendToSrc=msgToSrc,sendToDst=None
    )
    
    if(show_steps==True):
      print("aggregated message")
      agg.show(truncate=False)
    
    # stop loop if no more agg messages collected
    if(len(agg.take(1))==0):
      print("All paths found in " + str(iter_) + " iterations")
      break
    
    # get new vertices to send into next round. Here we have to prepare the next message columns all _column_names are temporary columns for calculation purpose only
    vertices_update=(agg
                     # join initial data to aggregation in order to have to nodeType of the vertice
                     .join(init_vertices,how="left")
                     # exploe the nested array with the path and the nodeType
                     .withColumn("_explode_to_flatten_array",f.explode(f.col("aggMess")))
                     # put the path aray into a seperate column 
                     .withColumn("_dataMsg",f.col("_explode_to_flatten_array")[0])
                     # put the node type into a seperate column
                     .withColumn("_typeMsg",f.col("_explode_to_flatten_array")[1][0])
                     # deside if a path is complete. A path is complete if the vertices type is a root and the message type is a leaf
                     .withColumn("pathComplete",f.when(((f.col("nodeType")=="root") & (f.col("_typeMsg")=="leaf")),True).otherwise(False))
                     # append the curent vertice id to the path array that is send forward
                     .withColumn("_message",f.array_union(f.array(f.col("id")),f.col("_dataMsg")))
                     # merge together the path array and the nodeType array for the new message object
                     .withColumn("message",f.array_union(f.array(f.col("_message")),f.array(f.array(f.col("_typeMsg")))))
                    )
                    
    if(show_steps==True):
      print("new vertices with all temp columns") 
      vertices_update.show()
    
    # add complete paths to the result dataframe
    results=(
            results
            .union(
                vertices_update
                .where(f.col("pathComplete")==True)
                .select(f.col("_message"))
                )
        )

    
    # chache the vertices for next iteration and only push forward the two relevant columns in order to reduce data shuffeling between spark executors
    cachedNewVertices = AM.getCachedDataFrame(vertices_update.select("id","message"))
    # create new updated graph object for next iteration
    gx = GraphFrame(cachedNewVertices,gx.edges)
    
    
print("##################################################")   
print("Collecting result set")    
results.show()  

然后显示正确的结果

All paths found in 3 iterations
##################################################
Collecting result set
+------------+
|       paths|
+------------+
|   [0,1,5]|
|[0,2,3]|
|[0,4]|
|[a,b,c,d]|
+------------+

要获得最终的数据框,您可以将其重新连接或将数组的第一个和最后一个元素放入单独的列中

result2=(results
         .withColumn("dist",f.element_at(f.col("paths"),1))
         .withColumn("node",-1))
        )
result2.show()

+------------+----+----+
|       paths|dist|node|
+------------+----+----+
|   [0,5]|   0|   5|
|[0,3]|   0|   3|
|[0,4]|   0|   4|
|[a,d]|   a|   d|
+------------+----+----+

我想您可以使用 Graphframes Pregel API 编写相同的算法。

P.S:如果图有 lops 或向后有向边,这种形式的算法可能会导致问题。我有另一种算法来首先清理循环和循环

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...