python – 将pyspark数据帧与另一个数据帧进行比较

我有2个数据帧来比较它们具有相同的列数,并且比较结果应该具有不匹配的字段以及值和ID.

数据帧一

+-----+---+--------+
| name| id|    City|
+-----+---+--------+
|  Sam|  3| Toronto|
| BALU| 11|     YYY|
|CLAIR|  7|Montreal|
|HELEN| 10|  London|
|HELEN| 16|  Ottawa|
+-----+---+--------+

数据帧二

+-------------+-----------+-------------+
|Expected_name|Expected_id|Expected_City|
+-------------+-----------+-------------+
|          SAM|          3|      Toronto|
|         BALU|         11|          YYY|
|        CLARE|          7|     Montreal|
|        HELEN|         10|        Londn|
|        HELEN|         15|       Ottawa|
+-------------+-----------+-------------+

预期产出

+---+------------+--------------+-----+
| ID|Actual_value|Expected_value|Field|
+---+------------+--------------+-----+
|  7|       CLAIR|         CLARE| name|
|  3|         Sam|           SAM| name|
| 10|      London|         Londn| City|
+---+------------+--------------+-----+

创建示例数据

from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql import SparkSession

sc = SparkContext()
sql_context = SQLContext(sc)

spark = SparkSession.builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # log only on fails

df_Actual = sql_context.createDataFrame(
    [("Sam",3,'Toronto'),("BALU",11,'YYY'),("CLAIR",7,'Montreal'),("HELEN",10,'London'),16,'Ottawa')],["name","id","City"]
)

df_Expected = sql_context.createDataFrame(
     [("SAM",("CLARE",'Londn'),15,["Expected_name","Expected_id","Expected_City"]
)

为Result创建空数据框

field = [
    StructField("ID",StringType(),True),StructField("Actual_value",StructField("Expected_value",StructField("Field",True)
]

schema = StructType(field)
Df_Result = sql_context.createDataFrame(sc.emptyRDD(),schema)

加入预期和实际的id

df_cobined = df_Actual.join(df_Expected,(df_Actual.id == df_Expected.Expected_id))

col_names=df_Actual.schema.names

遍历每列以查找不匹配

for col_name in col_names:

    #Filter for column values not matching
    df_comp= df_cobined.filter(col(col_name)!=col("Expected_"+col_name ))\
        .select(col('id'),col(col_name),col("Expected_"+col_name ))

    #Add not matching column name
    df_comp = df_comp.withColumn("Field",lit(col_name))

    #Add to final result
    Df_Result = Df_Result.union(df_comp)
Df_Result.show()

此代码按预期工作.但是,在实际情况中,我有更多列和数百万行进行比较.使用此代码,完成比较需要更多时间.有没有更好的方法来提高性能并获得相同的结果?

最佳答案
避免进行联合的一种方法如下:

>创建要比较的列列表:to_compare
>接下来选择id列并使用pyspark.sql.functions.when比较列.对于那些不匹配的人,使用3个字段构建一个结构数组:to_compare中每列的(Actual_value,Expected_value,Field)
>爆炸临时数组列并删除空值
>最后选择id并使用col.*将结构中的值扩展为列.

码:

StructType用于存储不匹配的字段.

import pyspark.sql.functions as f

# these are the fields you want to compare
to_compare = [c for c in df_Actual.columns if c != "id"]

df_new = df_cobined.select(
        "id",f.array([
            f.when(
                f.col(c) != f.col("Expected_"+c),f.struct(
                    f.col(c).alias("Actual_value"),f.col("Expected_"+c).alias("Expected_value"),f.lit(c).alias("Field")
                )
            ).alias(c)
            for c in to_compare
        ]).alias("temp")
    )\
    .select("id",f.explode("temp"))\
    .dropna()\
    .select("id","col.*")
df_new.show()
#+---+------------+--------------+-----+
#| id|Actual_value|Expected_value|Field|
#+---+------------+--------------+-----+
#|  7|       CLAIR|         CLARE| name|
#| 10|      London|         Londn| City|
#|  3|         Sam|           SAM| name|
#+---+------------+--------------+-----+

相关文章

Python中的函数(二) 在上一篇文章中提到了Python中函数的定...
Python中的字符串 可能大多数人在学习C语言的时候,最先接触...
Python 面向对象编程(一) 虽然Python是解释性语言,但是它...
Python面向对象编程(二) 在前面一篇文章中谈到了类的基本定...
Python中的函数(一) 接触过C语言的朋友对函数这个词肯定非...
在windows下如何快速搭建web.py开发框架 用Python进行web开发...