PySpark-使用GroupBy的条件创建列

问题描述

我需要使用第二个pyspark数据框为基于日期条件的每个ID创建一个指标。

指示器为1或0,表明传感器发生故障。该指标取决于具有第一个失败日期和最后一个失败日期的第二个数据框。如果故障记录在fail_df中,则main_df行在fail_df的第一个和最后一个记录的故障之间应该有一个1。如果传感器在main_df日期内未发生故障,则应将其存储为0。

main_df数据框

ID         |  Date      |Value 
-------------------------------------------------
P1         | 2016-10-01 |100
P1         | 2016-10-02 |200
P1         | 2016-12-16 |700
P1         | 2016-12-17 |800
P1         | 2016-12-18 |800
P2         | 2016-01-31 |700
P2         | 2016-02-01 |800
P2         | 2016-02-02 |900

故障列表数据框

ID         |  First Fail Date      | Last Fail Date
-----------------------------------------------------
P1         | 2016-10-01            |2016-10-02  |
P2         | 2016-01-31            |2016-02-01  |

所需的数据框

ID         |  Date      |Value  | Failure_Indicator     
-------------------------------------------------
P1         | 2016-10-01 |100    | 1
P1         | 2016-10-02 |200    | 1
P1         | 2016-12-16 |700    | 0
P1         | 2016-12-17 |800    | 0
P1         | 2016-12-18 |800    | 0
P2         | 2016-01-31 |700    | 1
P2         | 2016-02-01 |800    | 1
P2         | 2016-02-02 |900    | 0

我尝试过的方法: AttributeError:“ GroupedData”对象没有属性“ withColumn”

df = main_df.groupBy('ID').withColumn(
    'Failure_Indicator',F.when((fail_df.col("First fail Date") >= main_df.Date) & 
           (fail_df.col("Last fail Date") >= main_df.Date),1)
     .otherwise(0))

解决方法

df_1=spark.createDataFrame([("P1","2016-10-01",100),("P1","2016-10-03",200),("P3","2016-10-09",200)],["id","date","value"])

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
| P1|2016-10-01|  100|
| P1|2016-10-03|  200|
| P3|2016-10-09|  200|
+---+----------+-----+

df_2=spark.createDataFrame([("P1","2016-10-02")],"start_date","end_date"])

+---+----------+----------+
| id|start_date|  end_date|
+---+----------+----------+
| P1|2016-10-01|2016-10-02|
+---+----------+----------+


df_1.join(df_2,'id','left_outer') \
    .withColumn('failure_indicator',when((col("date") >= col("start_date")) & (col("date") <= col("end_date")),1).otherwise(0)) \
    .select('id','date','value','failure_indicator') \
    .show()

+---+----------+-----+-----------------+
| id|      date|value|failure_indicator|
+---+----------+-----+-----------------+
| P3|2016-10-09|  200|                0|
| P1|2016-10-01|  100|                1|
| P1|2016-10-03|  200|                0|
+---+----------+-----+-----------------+