Pyspark组通过并合并多个不同的列值

问题描述

尝试提取A列和B列(以下)的不同值的最新日期的记录

enter image description here

理想的结果:

enter image description here

当前解决方案:

from pyspark.sql import functions as f
test = df.groupBy(df['A'],df['B']).agg(f.first(df['C']),f.first(df['D']),f.max(df['E']))

寻找漏洞或建议以优化上述内容吗?

解决方法

Window 功能与 max 列上的 date 一起使用,并将其用于 filter

from pyspark.sql import functions as F
from pyspark.sql.window import Window


w=Window().partitionBy("A","B")

df1.withColumn("max",F.max(F.to_date("E","yyyyMMdd")).over(w))\
  .filter(F.to_date(F.col("E"),"yyyyMMdd")==F.col("max")).drop("max").show()

如果 E(date) 列为 not of StringType ,请使用以下命令:

w=Window().partitionBy("A","B")

df.withColumn("max",F.max(F.to_date(F.col("E").cast('string'),"yyyyMMdd")).over(w))\
  .filter(F.to_date(F.col("E").cast('string'),"yyyyMMdd")==F.col("max")).drop("max").show()

输出:

#+---+---+----+---+--------+
#|  A|  B|   C|  D|       E|
#+---+---+----+---+--------+
#| 12|ERP|7500|  D|20200330|
#| 12|ERF|4500|  D|20200430|
#+---+---+----+---+--------+
,

您可以通过降序在E列上排序,然后使用 row_number 函数仅提取最新数据。

df.show()
#+---+---+----+---+--------+
#|  A|  B|   C|  D|       E|
#+---+---+----+---+--------+
#| 12|ERP|1000|  M|20200130|
#| 12|ERP|2000|  M|20200228|
#| 12|ERP|7500|  D|20200330|
#| 12|ERF|4500|  D|20200430|
#| 12|ERF|4000|  L|20200228|
#| 12|ERF|3400|  L|20200330|
#+---+---+----+---+--------+
from pyspark.sql.functions import *
from pyspark.sql import *

w=Window.partitionBy("A","B").orderBy(col("Z").desc())

df.withColumn("z",to_date(col("E"),"yyyyMMdd")).\
withColumn("rn",row_number().over(w)).\
filter(col("rn") == 1).\
drop(*['z','rn']).\
show()
#+---+---+----+---+--------+
#|  A|  B|   C|  D|       E|
#+---+---+----+---+--------+
#| 12|ERP|7500|  D|20200330|
#| 12|ERF|4500|  D|20200430|
#+---+---+----+---+--------+

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...