在pyspark中,如何在groupby之后在一列中而不是在另一列中找到值设置减法?

问题描述

我有一个 pyspark 数据框 df 的记录,每条记录都有 idgroup,并标记是否发生了两个事件 (event1,event2)。我想找到每个组中的 id 数量,即:

  1. 这两件事都发生在他们身上,
  2. 他们有过 event2 但没有发生过 event1。

在这里提取一个简单的例子:

df:
|  id | event1 | event2 | group
| 001 |      1 |      0 |     A
| 001 |      1 |      0 |     A    
| 001 |      1 |      1 |     A  
| 002 |      0 |      1 |     A  
| 003 |      1 |      0 |     A  
| 003 |      1 |      1 |     A  
| ... |    ... |    ... |     B
...  

在上面的 df 中,对于 group = A,有 2 个 ID 有 event1:(001,003),有 3 个 ID 有 event2:(001,002,003)。因此,例如,event2 但不是 event1 中的 id 数为 1。

我希望得到这样的东西。

group | event2_not_1 | event1_and_2 |
    A |            1 |            2 |
    B |          ... |          ... |

到目前为止,我已经尝试收集为每个事件出现的一组 id,然后在 new_df 中单独执行设置操作。但我觉得这很笨拙。例如,

df_new = (
  df.withColumn('event1_id',when(col('event1') == 1,col('id')))
    .withColumn('event2_id',when(col('event2') == 1,col('id')))
    .groupby('group').agg(collect_set('event1_id').alias('has_event1'),collect_set('event2_id').alias('has_event2'))
)

如何在 pyspark 中优雅地实现这一点?

解决方法

使用 groupby 两次。

df.groupBy("group","id").agg(f.max("event1").alias("event1"),f.max("event2").alias("event2")) \
  .groupBy("group").agg(f.sum(f.expr("if(event2 = 1 and event1 = 0,1,0)")).alias("event2_not_1"),f.sum(f.expr("if(event1 = 1 and event2 = 1,0)")).alias("event1_and_2"))

+-----+------------+------------+
|group|event2_not_1|event1_and_2|
+-----+------------+------------+
|A    |1           |2           |
+-----+------------+------------+