无法在Pyspark上将一列拆分为更多列

问题描述

我无法在PySpark中将数据框的列拆分为更多列:

我有一个列表列表,我想将其转换为一个数据框,每个值都放在一列中。

我尝试过的事情:

我从此列表创建了一个数据框:

[['COL-4560','COL-9655','NWG-0610','D81-3754'],['DLL-7760','NAT-9885','PED-0550','MAR-0004','LLL-5554']]

使用此代码

from pyspark.sql import Row
R = Row('col1','col2')

# use enumerate to add the ID column
df_from_list = spark.createDataFrame([R(i,x) for i,x in enumerate(recs_list)])

我得到的结果是:

+----+--------------------+
|col1|                col2|
+----+--------------------+
|   0|[COL-4560,COL-96...|
|   1|[DLL-7760,NAT-98...|
+----+--------------------+

我想用逗号将值分成几列,所以我尝试了:

from pyspark.sql import functions as F

df2 = df_from_list.select('col1',F.split('col2',',').alias('col2'))

# If you don't kNow the number of columns:
df_sizes = df2.select(F.size('col2').alias('col2'))
df_max = df_sizes.agg(F.max('col2'))
nb_columns = df_max.collect()[0][0]

df_result = df2.select('col1',*[df2['col2'][i] for i in range(nb_columns)])
df_result.show()

但是在此行df2 = df_from_list.select('col1',').alias('col2'))上出现错误

AnalysisException: cannot resolve 'split(`col2`,-1)' due to data type mismatch: argument 1 requires string type,however,'`col2`' is of array<string> type.;;

我理想的最终输出是这样的:

+----------+----------+----------+----------+----------+
|  SKU     |  REC_01  | REC_02   | REC_03   | REC_04   |
+----------+----------+----------+----------+----------+
| COL-4560 | COL-9655 | NWG-0610 | D81-3754 | null     |
| DLL-7760 | NAT-9885 | PED-0550 | MAR-0004 | LLL-5554 |
+---------------------+----------+----------+----------+

有些行可能有四个值,但有些行或多或少,我不知道最终数据帧将具有的确切列数。

有人知道发生了什么吗?提前非常感谢您。

解决方法

数据框df_from_list col2 列已经是 array 类型,因此无需拆分 (由于 split stringtype 一起使用,这里我们有 arraytype )。

以下是适合您的步骤。

recs_list=[['COL-4560','COL-9655','NWG-0610','D81-3754'],['DLL-7760','NAT-9885','PED-0550','MAR-0004','LLL-5554']]

from pyspark.sql import Row
R = Row('col1','col2')

# use enumerate to add the ID column
df_from_list = spark.createDataFrame([R(i,x) for i,x in enumerate(recs_list)])

from pyspark.sql import functions as F

df2 = df_from_list

# If you don't know the number of columns:
df_sizes = df2.select(F.size('col2').alias('col2'))
df_max = df_sizes.agg(F.max('col2'))
nb_columns = df_max.collect()[0][0]

cols=['SKU','REC_01','REC_02','REC_03','REC_04']
df_result = df2.select(*[df2['col2'][i] for i in range(nb_columns)]).toDF(*cols)
df_result.show()
#+--------+--------+--------+--------+--------+
#|     SKU|  REC_01|  REC_02|  REC_03|  REC_04|
#+--------+--------+--------+--------+--------+
#|COL-4560|COL-9655|NWG-0610|D81-3754|    null|
#|DLL-7760|NAT-9885|PED-0550|MAR-0004|LLL-5554|
#+--------+--------+--------+--------+--------+