使用 PySpark 从 Lat/Lon 列创建 LineString

问题描述

我有一个 PySpark 数据框,其中包含由“trajectories_id”列标识的不同轨迹的纬度/经度点。

trajectory_id 纬度 经度
1 45 5
1 45 6
1 45 7
2 46 5
2 46 6
2 46 7

我想要做的是为每个trajectory_id 提取一个Linestring 并将其存储在另一个数据帧中,其中每一行代表一个带有“id”和“geometry”列的轨迹。在这个例子中,输出应该是:

trajectory_id 几何
1 LInesTRING (5 45,6 45,7 45)
2 LInesTRING (5 46,6 46,7 46)

这与 this question 中的要求类似,但就我而言,我需要使用 PySpark。

我尝试了以下方法

import pandas as pd
from shapely.geometry import Point,Linestring
df = pd.DataFrame([[1,45,5],[1,6],7],[2,46,7]],columns=['trajectory_id','latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
    df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
    df2['points']=df2[["longitude","latitude"]].apply(Point,axis=1)
    geo_df.geometry.iloc[k]=str(Linestring(df2['points']))
    geo_df['trajectory_id'].iloc[k]=i
    k=k+1

代码有效,但在我的任务中,我正在处理更多轨迹(> 200 万条),这需要永远,因为我在每次迭代中都转换为 Pandas。 有没有办法以更有效的方式获得相同的输出? 如前所述,我知道使用 toPandas() (和/或 collect() )是我应该避免的,尤其是在 for 循环中

解决方法

您可以使用 pyspark SQL 的本机函数来做到这一点。

import pyspark.sql.functions as func

long_lat_df = df.withColumn('joined_long_lat',func.concat(func.col("longitude"),func.lit(" "),func.col("latitude")));

grouped_df = long_lat_df .groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry',func.concat_ws(",",func.col("geometry")));