pyspark 更改数据捕获实现

问题描述

我有一个基表,用于保存实际数据。下面是表结构

id 名称 地址 年龄 日期
A1 {"fname": "Alex","lname": "Bhatt"} {"lane": "Mac Street","flat": ["24","26","27","29"]} 56 20201128
A2 {"fname": "Bob","lname": "Natarajan"} {"lane": "Royd Street","flat": ["22","23","27"],"pin": "123514"} 53 20201123
A1 {"fname": "Alex","lname": "Bhattacharya"} {"lane": "Mac Street","29"]} 56 20201228
A2 {"fname": "Bob","24","pin": "123514"} 53 20201228

在上表中,A1 和 A2 的数据有所变化。此更改的数据摘要由另一个表捕获并提供。表结构如下所述。

id changed_field 日期
A1 name.lname 20201228
A2 address.flat[1] 20201228

从上面的 2 个表中,我必须准备最终表,其中将捕获更改数据的详细信息。以下是预期的表格。

id changed_field new_value 新日期 old_value olddate
A1 name.lname Bhattacharya 20201228 Bhatt 20201128
A2 address.flat[1] 24 20201228 23 20201123

我尝试过使用 spark sql 函数 get_json_object() 但它不起作用。任何建议都会非常有帮助

解决方法

我认为您需要创建另一个 json 列才能使用 get_json_object... 请参阅下面的答案。

import pyspark.sql.functions as F

result = df1.select(
    'id','date',F.to_json(
        F.struct(
            F.from_json('name','fname string,lname string').alias('name'),F.from_json('address','lane string,flat array<string>,pin string').alias('address')
        )
    ).alias('jsoncol')
).join(
    df2.withColumnRenamed('date','date2'),'id'
).withColumn(
    'new_value',F.expr("get_json_object(jsoncol,'$.' || changed_field)")
).groupBy('id','changed_field').agg(
    F.array_sort(
        F.collect_list(
            F.array('date','new_value')
        )
    ).alias('values')
).select(
    'id','changed_field',F.col('values')[1][1].alias('new_value'),F.col('values')[1][0].alias('newdate'),F.col('values')[0][1].alias('old_value'),F.col('values')[0][0].alias('olddate')
)
result.show(truncate=False)
+---+---------------+------------+--------+---------+--------+
|id |changed_field  |new_value   |newdate |old_value|olddate |
+---+---------------+------------+--------+---------+--------+
|A1 |name.lname     |Bhattacharya|20201228|Bhatt    |20201128|
|A2 |address.flat[1]|24          |20201228|23       |20201123|
+---+---------------+------------+--------+---------+--------+