Pyspark列不可迭代

问题描述

我有一个与此类似的df:

old_df = sqlContext.createDataFrame(
 [   ('375',20),('265',('052',('111',None),],['old_col','example_new_col_val'])

我需要通过对照列表检查旧列的值来创建新列。我是Pyspark的新手,不理解我的错误消息。这是我尝试过的:

from pyspark.sql import functions as F

my_list = ['375','012','013','014','015','016']
expr = F.when(F.col("old_col").isin(my_list),F.lit(20)).otherwise(None).alias("new_col")

new_df = old_df.select("*",*expr)   

我的错误消息:TypeError: Column is not iterable

解决方法

摆脱*中的*expr-expr是一列,不应进行迭代/解包。

new_df = old_df.select("*",expr)
,

定义my_list时,请尝试使用:

my_list = list(['375','012','013','014','015','016'].toPandas())

其余代码保持不变。

,

您需要在此处使用 withColumn() 函数,以便为现有数据框创建新的 column

df = df.withColumn("new_col",F.when(F.col("old_col").isin(my_list),F.lit("20")).otherwise(F.lit(None)))