问题描述
是否有一种有效的方法可以对频率大约为13-15分钟到15分钟的数据进行上采样/重采样。我有多个id
和200M +行。
dataframe=spark.createDataFrame([("J1","2019-12-29 12:07:38",100),("J1","2019-12-29 12:24:25",200),"2019-12-29 12:37:58",("J8","2020-09-09 13:06:36",300),"2020-09-09 13:21:37","2020-09-09 13:36:38",400)],["id","date_time","some_value"]).show()
+---+-------------------+----------+
| id| date|some_value|
+---+-------------------+----------+
| J1|2019-12-29 12:07:38| 100|
| J1|2019-12-29 12:24:25| 200|
| J1|2019-12-29 12:37:58| 100|
| J8|2020-09-09 13:06:36| 300|
| J8|2020-09-09 13:21:37| 200|
| J8|2020-09-09 13:36:38| 400|
+---+-------------------+----------+
所需数据框:
+---+-------------------+----------+
| id| date|some_value|
+---+-------------------+----------+
| J1|2019-12-29 12:15:00| 100|
| J1|2019-12-29 12:30:00| 200|
| J1|2019-12-29 12:45:00| 100|
| J8|2020-09-09 13:00:00| 300|
| J8|2020-09-09 13:15:00| 200|
| J8|2020-09-09 13:30:00| 400|
+---+-------------------+----------+
解决方法
有一个功能window
。它同时生成start
和end
。您可能需要应用其他功能来选择最接近的。
from pyspark.sql import functions as F
df.withColumn("date_time",F.window("date_time","15 minutes")["end"]).show()
+---+-------------------+----------+
| id| date_time|some_value|
+---+-------------------+----------+
| J1|2019-12-29 12:15:00| 100|
| J1|2019-12-29 12:30:00| 200|
| J1|2019-12-29 12:45:00| 100|
| J8|2020-09-09 13:15:00| 300|
| J8|2020-09-09 13:30:00| 200|
| J8|2020-09-09 13:45:00| 400|
+---+-------------------+----------+