问题描述
尝试提取A列和B列(以下)的不同值的最新日期的记录
理想的结果:
当前解决方案:
from pyspark.sql import functions as f
test = df.groupBy(df['A'],df['B']).agg(f.first(df['C']),f.first(df['D']),f.max(df['E']))
寻找漏洞或建议以优化上述内容吗?
解决方法
将 Window
功能与 max
列上的 date
一起使用,并将其用于 filter
。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("A","B")
df1.withColumn("max",F.max(F.to_date("E","yyyyMMdd")).over(w))\
.filter(F.to_date(F.col("E"),"yyyyMMdd")==F.col("max")).drop("max").show()
如果 E(date)
列为 not of StringType
,请使用以下命令:
w=Window().partitionBy("A","B")
df.withColumn("max",F.max(F.to_date(F.col("E").cast('string'),"yyyyMMdd")).over(w))\
.filter(F.to_date(F.col("E").cast('string'),"yyyyMMdd")==F.col("max")).drop("max").show()
输出:
#+---+---+----+---+--------+
#| A| B| C| D| E|
#+---+---+----+---+--------+
#| 12|ERP|7500| D|20200330|
#| 12|ERF|4500| D|20200430|
#+---+---+----+---+--------+
,
您可以通过降序在E列上排序,然后使用 row_number
函数仅提取最新数据。
df.show()
#+---+---+----+---+--------+
#| A| B| C| D| E|
#+---+---+----+---+--------+
#| 12|ERP|1000| M|20200130|
#| 12|ERP|2000| M|20200228|
#| 12|ERP|7500| D|20200330|
#| 12|ERF|4500| D|20200430|
#| 12|ERF|4000| L|20200228|
#| 12|ERF|3400| L|20200330|
#+---+---+----+---+--------+
from pyspark.sql.functions import *
from pyspark.sql import *
w=Window.partitionBy("A","B").orderBy(col("Z").desc())
df.withColumn("z",to_date(col("E"),"yyyyMMdd")).\
withColumn("rn",row_number().over(w)).\
filter(col("rn") == 1).\
drop(*['z','rn']).\
show()
#+---+---+----+---+--------+
#| A| B| C| D| E|
#+---+---+----+---+--------+
#| 12|ERP|7500| D|20200330|
#| 12|ERF|4500| D|20200430|
#+---+---+----+---+--------+