PySpark-取序列1和0的第一个值

问题描述

我需要获取数据框中的第一个indicator并为每组ID创建一个新的指标。会有很长的0序列,但是1序列中的第一个需要有一个称为first_indicator的行。

dataframe=spark.createDataFrame([("B2","2019-11-19 12:07:38",1),("B2","2019-11-19 12:24:25","2019-11-19 12:37:58",0),"2019-11-19 12:55:08","2019-11-19 13:07:28","2019-11-19 13:20:28",("F9","2020-02-02 13:06:36","2020-02-02 13:21:37","2020-02-02 13:36:38","2020-02-02 13:45:32","2020-02-02 14:06:32","2020-02-02 14:24:31",1)],["id","date_time","indicator"]).show()

+---+-------------------+---------+
| id|          date_time|indicator|
+---+-------------------+---------+
| B2|2019-11-19 12:07:38|        1|
| B2|2019-11-19 12:24:25|        1|
| B2|2019-11-19 12:37:58|        0|
| B2|2019-11-19 12:55:08|        1|
| B2|2019-11-19 13:07:28|        1|
| B2|2019-11-19 13:20:28|        0|
| F9|2020-02-02 13:06:36|        0|
| F9|2020-02-02 13:21:37|        1|
| F9|2020-02-02 13:36:38|        1|
| F9|2020-02-02 13:45:32|        0|
| F9|2020-02-02 14:06:32|        1|
| F9|2020-02-02 14:24:31|        1|
+---+-------------------+---------+

所需的数据框:

+---+-------------------+---------+---------------+
| id|          date_time|indicator|first_indicator|
+---+-------------------+---------+---------------+
| B2|2019-11-19 12:07:38|        1|              1|
| B2|2019-11-19 12:24:25|        1|              0|
| B2|2019-11-19 12:37:58|        0|              0|
| B2|2019-11-19 12:55:08|        1|              1|
| B2|2019-11-19 13:07:28|        1|              0|
| B2|2019-11-19 13:20:28|        0|              0|
| F9|2020-02-02 13:06:36|        0|              0|
| F9|2020-02-02 13:21:37|        1|              1|
| F9|2020-02-02 13:36:38|        1|              0|
| F9|2020-02-02 13:45:32|        0|              0|
| F9|2020-02-02 14:06:32|        1|              1|
| F9|2020-02-02 14:24:31|        1|              0|
+---+-------------------+---------+---------------+

解决方法

您可以使用窗口对数据框进行分区和排序,然后使用滞后函数比较前一个值为0和当前值为1。

w = Window.partitionBy('id').orderBy('date_time')

df.withColumn('target',((lag('indicator',1,0).over(w) == 0) & (col('indicator') == 1)).cast('int')).show()

+---+-------------------+---------+------+
| id|          date_time|indicator|target|
+---+-------------------+---------+------+
| B2|2019-11-19 12:07:38|        1|     1|
| B2|2019-11-19 12:24:25|        1|     0|
| B2|2019-11-19 12:37:58|        0|     0|
| B2|2019-11-19 12:55:08|        1|     1|
| B2|2019-11-19 13:07:28|        1|     0|
| B2|2019-11-19 13:20:28|        0|     0|
| F9|2020-02-02 13:06:36|        0|     0|
| F9|2020-02-02 13:21:37|        1|     1|
| F9|2020-02-02 13:36:38|        1|     0|
| F9|2020-02-02 13:45:32|        0|     0|
| F9|2020-02-02 14:06:32|        1|     1|
| F9|2020-02-02 14:24:31|        1|     0|
+---+-------------------+---------+------+
,

我建议您按“ id”分组并在列表中收集“ date_time”和“ indicator”,所以您会遇到类似的事情:

+---+---------------------------------------------------------+
| id|                           array                         |
+---+---------------------------------------------------------+
| B2|[(2019-11-19 12:07:38,1),(2019-11-19 12:24:25,1) ... ]|
| F9|[(2020-02-02 13:06:36,0),(2020-02-02 13:21:37,0) ... ]|
+---+---------------------------------------------------------+

接下来,您可以构建自己的UDF,并返回第一个指示符的记录。在此UDF中,您无需处理数据帧,因此要考虑的算法更加“自然”。