PySpark 使用聚合和分组依据

问题描述

有人可以使用聚合函数和 groupby 函数帮助我使用 pyspark 吗?我已经制作了我的数据框,并应用了过滤器和选择来获取我想要的数据。但是,我现在无法正确聚合。

目前,我的代码输出如下内容

+----------+-----------+--------------+---------------+----------+---------+
|l_orderkey|o_orderdate|o_shippriority|l_extendedprice|l_discount|      rev|
+----------+-----------+--------------+---------------+----------+---------+
|     53634| 1995-02-22|             0|       20517.44|      0.08|18876.045|
|    265539| 1995-01-25|             0|       70423.08|      0.01| 69718.85|
|    331590| 1994-12-10|             0|       46692.75|      0.03| 45291.97|
|    331590| 1994-12-10|             0|        37235.1|       0.1| 33511.59|
|    420545| 1995-03-05|             0|        75542.1|      0.04|72520.414|
|    420545| 1995-03-05|             0|         1062.0|      0.07|987.66003|
|    420545| 1995-03-05|             0|        9729.45|       0.1| 8756.505|
|    420545| 1995-03-05|             0|        15655.6|      0.04|15029.375|
|    420545| 1995-03-05|             0|         3121.3|      0.03|3027.6611|
|    420545| 1995-03-05|             0|        71723.0|      0.03| 69571.31|
|    488928| 1995-02-15|             0|        1692.77|      0.01|1675.8423|
|    488928| 1995-02-15|             0|       22017.84|      0.01|21797.662|
|    488928| 1995-02-15|             0|       57100.42|      0.04|54816.402|
|    488928| 1995-02-15|             0|        3807.76|      0.05| 3617.372|
|    488928| 1995-02-15|             0|       73332.52|      0.01|72599.195|
|    510754| 1994-12-21|             0|       41171.78|      0.09| 37466.32|
|    512422| 1994-12-26|             0|       87251.56|      0.07| 81143.95|
|    677761| 1994-12-26|             0|       60123.34|       0.0| 60123.34|
|    956646| 1995-03-07|             0|       61853.68|      0.05|58760.996|
|   1218886| 1995-02-13|             0|        24844.0|      0.01| 24595.56|
+----------+-----------+--------------+---------------+----------+---------+

我希望通过以下方式应用组:l_orderkey 并将 Rev 聚合为总和。

这是我最近的尝试,'t' 是数据框,F 是来自 pyspark.sql "from pyspark.sql import functions as F"函数

(t .groupby(t.l_orderkey,t.o_orderdate,t.o_shippriority) 
 .agg(F.collect_set(sum(t.rev)),F.collect_set(t.l_orderkey)) .show())

有人可以帮助我知道我是否在正确的轨道上吗?我不断收到“列不可迭代”

解决方法

total_rev = t.groupby(t.l_orderkey).agg(F.sum(t.rev).alias('total_rev'))

# print /show the top results
total_rev.show()

会给你一个带有 l_orderkey,total_rev 列的新 df,其中 total_rev 将存储 rev 的总和

您在尝试删除重复项时使用 collect_set

您还得到 Column is not iterable,因为您使用的是内置的 Python 函数 sum 而不是 spark 函数 F.sum

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...