在将数据帧合并到特定大小后执行聚合

问题描述

我有一个像这样的 pyspark 数据框:(在这个例子中我有 20 条记录)

+-----------------------+---------+
|TIME_STAMP             |RESULT   |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0     |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0     |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0     |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0     |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0     |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9  |
|2020-08-31 00:21:42.546|80.0     |
|2020-08-31 00:26:01.334|80.0     |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+

我已按 TIME_STAMP 对其进行排序,并希望以 5 为一组对数据框进行分类,并在 mean 列上为每个组执行聚合 (RESULT)。因此,前 5 条记录将组成一个组,接下来的 5 条记录将组成 4 个组。

预期输出

bin     mean
5   16802.7174
10  16798.8162
15  22374.829
20  16802.8264

此处,bin 列来自记录 1-5mean 列是这 5 条记录的平均值,依此类推。

在我的研究中,似乎我可能不得不使用 monotonically_increasing_id() pyspark 函数,我试图避免这种情况,因为我有非常大的数据集并且可能会导致 OOM。

有没有一种方法可以实现这一点,而无需将整个数据集collect 发送给驱动程序?

作为一个附加的问题,在上面的例子中,记录总数(20)可以被5整除。但是说我有19条记录,有没有办法让3组5条记录和4条记录在最后一组?

解决方法

  1. 首先通过使用 row_number() 为每一行分配一个行号 (按时间戳排序)。无需分区。
  2. 接下来,通过地板将行号分箱((row_number - 1)/5)。
  3. 最后它变成了一个微不足道的组

您可以按原样运行并轻松适应您的数据的示例 SQL:

SELECT floor((id - 1)/5),avg(value)
FROM   (SELECT row_number() OVER (ORDER BY value) AS id,value
        FROM   (SELECT Explode(Array(10,20,30,40,50,60,70,80,90,100,110,120,130,140,150,160,170,180,190,200,210)) AS value) a)
GROUP  BY 1