使用pyspark从平面记录创建段数组

问题描述

我有一个人口稀少的表格,其中包含用于不同用户ID的各个细分的值。我需要创建一个仅包含unique_id和相关段标题的数组

请注意,这只是一个指示性数据集。我有几百个这样的细分。

------------------------------------------------
| user_id   | seg1 | seg2 | seg3 | seg4 | seg5 |
------------------------------------------------
| 100       |   M  |  null|   25 |  null|  30  |
| 200       |  null|  null|   43 |  null|  250 |
| 300       |   F  |  3000|  null|  74  |  null|
------------------------------------------------

我希望输出

-------------------------------
| user_id| segment_array      |
-------------------------------
| 100    | [seg1,seg3,seg5] |
| 200    | [seg3,seg5]       |
| 300    | [seg1,seg2,seg4] |
-------------------------------

pyspark-sql的pyspark中是否有任何功能可以实现此目的?

感谢您的帮助!

解决方法

我找不到直接的方法,但是您可以做到。

cols= df.columns[1:]

r = df.withColumn('array',array(*[when(col(c).isNotNull(),lit(c)).otherwise('notmatch') for c in cols])) \
  .withColumn('array',array_remove('array','notmatch'))
r.show()
+-------+----+----+----+----+----+------------------+
|user_id|seg1|seg2|seg3|seg4|seg5|             array|
+-------+----+----+----+----+----+------------------+
|    100|   M|null|  25|null|  30|[seg1,seg3,seg5]|
|    200|null|null|  43|null| 250|      [seg3,seg5]|
|    300|   F|3000|null|  74|null|[seg1,seg2,seg4]|
+-------+----+----+----+----+----+------------------+
,

不确定这是最好的方法,但是我会这样攻击:

有一个collect_set函数,它将始终在您汇总的值列表中为您提供唯一的值。

对以下各段进行并集:

df_seg_1 = df.select(
  'user_id',fn.when(
    col('seg1').isNotNull(),lit('seg1)
  ).alias('segment')
)
# repeat for all segments

df = df_seg_1.union(df_seg_2).union(...)

df.groupBy('user_id').agg(collect_list('segment'))