问题描述
我想加入两个带有条件的pyspark数据框,并添加一个新列。
df1 = spark.createDataFrame(
[(2010,1,'rdc','bdvs'),(2010,'yybp'),(2007,6,'utw','itcs'),'tbsw')
],("year","month","u_id","p_id"))
df2 = spark.createDataFrame(
[(2010,'itcs')
],"p_id"))
df1
year month u_id p_id
2010 1 rdc bdvs
2010 1 rdc yybp
2007 6 utw ircs
2007 6 utw tbsw
df2
year month u_id p_id
2010 1 rdc bdvs
2007 6 utw ircs
我需要的新df:
year month u_id p_id is_true
2010 1 rdc bdvs 1
2010 1 rdc yybp 0
2007 6 utw ircs 1
2007 6 utw tbsw 0
import pyspark.sql.functions as F
t =df1.join(df2,(df1.year==df2.year) & (df1.month==df2.month) & (df1.u_id==df2.u_id),how='left').withColumn('is_true',F.when(df1.p_id==df2.p_id,F.lit(1)).otherWise(F.lit(0)))
我遇到错误:
TypeError: 'Column' object is not callable
我想念什么吗?我尝试根据某些条件将常量添加为新的列值。
谢谢
解决方法
将otherWise
更改为 otherwise
。
Example:
t =df1.alias("df1").join(df2.alias("df2"),(df1.year==df2.year) & (df1.month==df2.month) & (df1.u_id==df2.u_id),how='left').\
withColumn('is_true',F.when(df1.p_id == df2.p_id,F.lit(1)).otherwise(F.lit(0))).select("df1.*","is_true")
t.show()
#+----+-----+----+----+-------+
#|year|month|u_id|p_id|is_true|
#+----+-----+----+----+-------+
#|2007| 6| utw|itcs| 1|
#|2007| 6| utw|tbsw| 0|
#|2010| 1| rdc|bdvs| 1|
#|2010| 1| rdc|yybp| 0|
#+----+-----+----+----+-------+
不使用when statement
的另一种方法是使用 left_semi,left_anti
。
from pyspark.sql.functions import *
columns=df1.columns
df1.\
join(df2,columns,'left_anti').\
withColumn("is_true",lit(1)).\
unionAll(df1.\
join(df2,'left_semi').\
withColumn("is_true",lit(0))).\
show()
#+----+-----+----+----+-------+
#|year|month|u_id|p_id|is_true|
#+----+-----+----+----+-------+
#|2010| 1| rdc|yybp| 1|
#|2007| 6| utw|tbsw| 1|
#|2007| 6| utw|itcs| 0|
#|2010| 1| rdc|bdvs| 0|
#+----+-----+----+----+-------+