PySpark :广播在最近的日期时间/ unix上连接两个数据集

问题描述

我正在使用PySpark,快要放弃我的问题了。 我有两个数据集:一个非常非常大的数据集(A组)和一个很小的数据集(B组)。 它们的形式为:

Data set A:
variable   | timestampA
---------------------------------
x          | 2015-01-01 09:29:21
y          | 2015-01-01 12:01:57


Data set B:
different @R_213_4045@ion | timestampB
-------------------------------------------
info a                | 2015-01-01 09:30:00
info b                | 2015-01-01 09:30:00
info a                | 2015-01-01 12:00:00
info b                | 2015-01-01 12:00:00

A有很多行,其中每一行都有不同的时间戳。 B每隔几分钟就有一个时间戳。这里的主要问题是,两个数据集中都没有匹配的确切时间戳。

我的目标是在最近的时间戳上加入数据集。由于我想以特定方式加入,因此出现了另一个问题。 对于A中的每个条目,我想在复制A中的条目时映射最接近的时间戳的全部信息。因此,结果应类似于:

Final data set
variable   | timestampA          | @R_213_4045@ion     | timestampB
--------------------------------------------------------------------------
x          | 2015-01-01 09:29:21 | info a          | 2015-01-01 09:30:00
x          | 2015-01-01 09:29:21 | info b          | 2015-01-01 09:30:00
y          | 2015-01-01 12:01:57 | info a          | 2015-01-01 12:00:00
y          | 2015-01-01 12:01:57 | info b          | 2015-01-01 12:00:00

我对PySpark非常陌生(还有stackoverflow)。我认为我可能需要使用窗口函数和/或广播联接,但是我真的没有起点,希望对您有所帮助。谢谢!

解决方法

您可以使用broadcast来避免改组。

如果正确理解,您在set_B中有一些确定的时间间隔所导致的时间戳?如果是这样,您可以执行以下操作:

from pyspark.sql import functions as F

# assuming 5 minutes is your interval in set_B
interval = 'INTERVAL {} SECONDS'.format(5 * 60 / 2)

res = set_A.join(F.broadcast(set_B),(set_A['timestampA'] > (set_B['timestampB'] - F.expr(interval))) & (set_A['timestampA'] <= (set_B['timestampB'] + F.expr(interval))))

输出:

+--------+-------------------+------+-------------------+
|variable|         timestampA|  info|         timestampB|
+--------+-------------------+------+-------------------+
|       x|2015-01-01 09:29:21|info a|2015-01-01 09:30:00|
|       x|2015-01-01 09:29:21|info b|2015-01-01 09:30:00|
|       y|2015-01-01 12:01:57|info a|2015-01-01 12:00:00|
|       y|2015-01-01 12:01:57|info b|2015-01-01 12:00:00|
+--------+-------------------+------+-------------------+

如果您尚未确定间隔,则只需交叉联接,然后找到min(timestampA - timestampB)间隔就可以解决问题。您可以使用窗口函数和row_number函数,如下所示:

w = Window.partitionBy('variable','info').orderBy(F.abs(F.col('timestampA').cast('int') - F.col('timestampB').cast('int')))

res = res.withColumn('rn',F.row_number().over(w)).filter('rn = 1').drop('rn')