加入 pyspark 中多列的通用合并

问题描述

我必须合并许多 spark DataFrame。合并后,我想在多个具有相同名称的列之间执行合并。

我能够在此 question 之后创建一个最小的示例。

但是,我需要一段更通用的代码支持:一组要合并的变量(在示例 join_keys = set(('id')) 中)和多个连接键(在示例 pyspark 中)。>

df1 = spark.createDataFrame([ ( 1,None,"aa"),( 2,"a",None ),( 3,"b",None),( 4,"h",],"id int,var1 string,var2 string",) df2 = spark.createDataFrame([ ( 1,"f","Ba"),"bb" ),) df1 = df1.alias("df1") df2 = df2.alias("df2") df3 = df1.join(df2,df1.id == df2.id,how='left').withColumn("var1_",coalesce("df1.var1","df2.var1")).drop("var1").withColumnRenamed("var1_","var1").withColumn("var2_",coalesce("df1.var2","df2.var2")).drop("var2").withColumnRenamed("var2_","var2") 中是否有更简洁(更通用)的方法来获得此结果?

PhoneAuthProvider.provider().verifyPhoneNumber(phoneNumber,uiDelegate: nil){( verificationID,error) inif error != nil {print(error!)}else { print("verification id :”,verificationID)}}

解决方法

我们可以通过将列作为列表传递给连接方法而不是编写连接条件来避免重复列,参考这个link。 但是这里有一些不需要加入条件的常见列。我们可以使用 for 循环来概括您的代码。

spark = SparkSession.builder.master("local[*]").getOrCreate()

df1 =  spark.createDataFrame([
        ( 1,None,"aa"),( 2,"a",None ),( 3,"b",None),( 4,"h",],"id int,var1 string,var2 string",)

df2 =  spark.createDataFrame([
        ( 1,"f","Ba"),"bb" ),)

df1 = df1.alias("df1")
df2 = df2.alias("df2")

key_columns = ["id"]
# Get common columns between 2 dataframes excluding columns-
# -which are being used in joining conditions
other_common_columns = set(df1.columns).intersection(set(df2.columns))\
.difference(set(key_columns))

outputDF = df1.join(df2,key_columns,how='left')

for i in other_common_columns:
    outputDF = outputDF.withColumn(f"{i}_",coalesce(f"df1.{i}",f"df2.{i}"))\
.drop(i).withColumnRenamed(f"{i}_",i)

outputDF.show()

+---+----+----+
| id|var2|var1|
+---+----+----+
|  1|  aa|   f|
|  3|null|   b|
|  4|null|   h|
|  2|  bb|   a|
+---+----+----+

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...