问题描述
我有一个像这样的 pyspark 数据框:(在这个例子中我有 20 条记录)
+-----------------------+---------+
|TIME_STAMP |RESULT |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0 |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0 |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0 |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0 |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0 |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9 |
|2020-08-31 00:21:42.546|80.0 |
|2020-08-31 00:26:01.334|80.0 |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+
我已按 TIME_STAMP
对其进行排序,并希望以 5 为一组对数据框进行分类,并在 mean
列上为每个组执行聚合 (RESULT
)。因此,前 5 条记录将组成一个组,接下来的 5 条记录将组成 4 个组。
预期输出:
bin mean
5 16802.7174
10 16798.8162
15 22374.829
20 16802.8264
此处,bin
列来自记录 1-5
,mean
列是这 5 条记录的平均值,依此类推。
在我的研究中,似乎我可能不得不使用 monotonically_increasing_id()
pyspark 函数,我试图避免这种情况,因为我有非常大的数据集并且可能会导致 OOM。
有没有一种方法可以实现这一点,而无需将整个数据集collect
发送给驱动程序?
作为一个附加的问题,在上面的例子中,记录总数(20)可以被5整除。但是说我有19条记录,有没有办法让3组5条记录和4条记录在最后一组?
解决方法
- 首先通过使用 row_number() 为每一行分配一个行号 (按时间戳排序)。无需分区。
- 接下来,通过地板将行号分箱((row_number - 1)/5)。
- 最后它变成了一个微不足道的组
您可以按原样运行并轻松适应您的数据的示例 SQL:
SELECT floor((id - 1)/5),avg(value)
FROM (SELECT row_number() OVER (ORDER BY value) AS id,value
FROM (SELECT Explode(Array(10,20,30,40,50,60,70,80,90,100,110,120,130,140,150,160,170,180,190,200,210)) AS value) a)
GROUP BY 1