问题描述
我有一个巨大的数据集,其中包含row_id,因此我知道row_id单调递增,并且数据分区由该row_id排序。
现在我想使用lag()函数移动某些列:
window = sql.Window.partitionBy().orderBy('row_id')
df = df.withColumn('shifted_my_value',F.lag(df.my_value).over(window))
但是,pyspark并不知道该数据已经由row_id排序,并且想要分析/重新组合整个数据集(尽管我知道这将是一个小问题)。
有没有办法告诉我,我的数据已经按row_id排序了,它应该只移动my_values中的值,而无需尝试洗牌。
解决方法
一个选项可能只是不按窗口中的一行排序-
window = sql.Window.partitionBy().orderBy('select 1')
df = df.withColumn('shifted_my_value',F.lag(df.my_value).over(window))
我不确定pyspark是否会像这样或那样工作-
window = sql.Window.partitionBy().orderBy('(select 1)')
df = df.withColumn('shifted_my_value',F.lag(df.my_value).over(window))
在SQL中,它可以那样工作-
df.registerTempTable('df')
spark.sql('select LAG(df.my_value) OVER (order by (select 1)) FROM df' )