从 Pyspark 中另一列的值构建一列

问题描述

我有一张如下表。

+----------+---------------------+-----------------+----------+--------------------------+--------------------------+-----------------+-----------------------+------------------+------------------------+
|cust_pr_id|cust_pr_name         |Now_prcs_status  |pr_join_dt|installation_due          |installation_completed    |seg_purchase_due |seg_purchase_completed |wire_in_line_due  |wire_in_line_completed  |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+-----------------+-----------------------+------------------+------------------------+
|9822647220|Jonathan RM Berlin   |installation     |20200202  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|7166582305|Paola RM Berlin      |seg purchase     |20200903  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|9964201263|Roy RM Poland        |installation     |20201023  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|7288402221|Katerina RM Mia      |wire in line     |20201110  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|8424182826|Smidge RM Siberia    |seg purchase     |20200902  |2019-07-15 08:00:00.000000|2019-07-10 09:11:30.599000|                 |                       |                  |                        |
|4445859610|Donna RM Brazil      |seg purchase     |20200903  |2019-07-15 08:00:00.000000|2019-07-10 09:11:30.599000|                 |                       |                  |                        |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+-----------------+-----------------------+------------------+------------------------+

从这个数据我想建立一个数据集如下。 在这里,如果“Now_prcs_status”字段的值是“installation”,那么我需要将“installation_due”的值生成为“curr_prcs_due”,将“installation_completed”的值生成为“curr_prcs_completed”。 同样,如果“Now_prcs_status”的值是“seg purchase”,我需要将“seg_purchase_due”的值生成为“curr_prcs_due”,将“seg_purchase_completed”的值生成为“curr_prcs_completed”。 当“cust_pr_name”的值为“wire in line”时相同,我需要将其到期值和已完成值分别填充为“curr_prcs_due”和“seg_purchase_completed”。

+----------+---------------------+-----------------+----------+--------------------------+--------------------------+
|cust_pr_id|cust_pr_name         |Now_prcs_status  |pr_join_dt|curr_prcs_due             |curr_prcs_completed       |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+
|9822647220|Jonathan RM Berlin   |installation     |20200202  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7166582305|Paola RM Berlin      |seg purchase     |20200903  |1562745600000             |1562761216526             |
|9964201263|Roy RM Poland        |installation     |20201023  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7288402221|Katerina RM Mia      |wire in line     |20201110  |                          |                          |
|8424182826|Smidge RM Siberia    |seg purchase     |20200902  |                          |                          |
|4445859610|Donna RM Brazil      |seg purchase     |20200903  |                          |                          |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+

以上是期望值。

我不想使用 sql case 语句,因为在我的实际数据集中,cust_pr_name 共有 105 个不同的值,而且我不想最终编写 105 个 case 语句。

有人可以帮助我通过 pyspark 或 hive 实现这一目标..

谢谢!

解决方法

一些列表理解应该可以完成这项工作:

import pyspark.sql.functions as F

prcs = [c[:-4] for c in df.columns[4::2]]

df2 = df.select(
    *df.columns[:4],F.coalesce(*[
        F.when(
            F.col('now_prcs_status') == p.replace('_',' '),F.col(p + '_due')
        ) 
        for p in prcs
    ]).alias('curr_prcs_due'),F.col(p + '_completed')
        ) 
        for p in prcs
    ]).alias('curr_prcs_completed')
)

df2.show(truncate=False)
+----------+------------------+---------------+----------+--------------------------+--------------------------+
|cust_pr_id|cust_pr_name      |now_prcs_status|pr_join_dt|curr_prcs_due             |curr_prcs_completed       |
+----------+------------------+---------------+----------+--------------------------+--------------------------+
|9822647220|Jonathan RM Berlin|installation   |20200202  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7166582305|Paola RM Berlin   |seg purchase   |20200903  |1562745600000             |1562761216526             |
|9964201263|Roy RM Poland     |installation   |20201023  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7288402221|Katerina RM Mia   |wire in line   |20201110  |null                      |null                      |
|8424182826|Smidge RM Siberia |seg purchase   |20200902  |null                      |null                      |
|4445859610|Donna RM Brazil   |seg purchase   |20200903  |null                      |null                      |
+----------+------------------+---------------+----------+--------------------------+--------------------------+