Pyspark:在从pyspark插入Posgresql时发现奇怪的行为

问题描述

我正在尝试从pyspark升级到postgresql。 NiFi正在使用executesparkinteractive处理器提交pyspark脚本。这就是我正在尝试的-

  1. 通过使用文件中的键查询表来查找公用记录并找到公用记录
  2. 常用记录保存到数据框
  3. 使用公共记录数据框中的键将删除查询发射到表中
  4. 通过在文件和公共记录数据帧之间进行内部联接来准备新文件(我必须在新文件中保留公共记录df中的2列,因此我选择文件中的所有字段(两列除外),并选择公共中的两列记录df。
  5. 使用标准jdbc追加将新文件插入数据库 这是我的代码-
def load_data(query):
    jdbcdf = spark.read.format("jdbc") \
        .option("url",url) \
        .option("dbtable",query) \
        .option("user",user) \
        .option("password",password) \
        .option("driver",driver) \
        .load()
    return jdbcdf

def writetoDatabase(dataframe,tableName):
     dataframe.write.format('jdbc').options(
     stringtype='unspecified',url= url,driver=driver,dbtable=tableName,user=user,password=password).mode('append').save()

def delete_records(table,key_list,param):
    connection = None
    try: 
        connection = psycopg2.connect(host=host,database=db,password=password,port = port)
        cursor = connection.cursor()
        delete_query = "Delete from " +table+ " where "+param+" in "+ str(tuple(key_list))
        cursor.execute(delete_query,"test")
        connection.commit()
    except (Exception,psycopg2.DatabaseError) as error :
            if connection is not None:
                connection.rollback()
    finally:
        if connection is not None:
            cursor.close()
            connection.close()

input_df = input file (Read thru pyspark)

def common_data(input_df):

    query = """(select column_key,column_b,column_c from customer_table)"""

    table_df = load_data(str(query))

    columns_to_drop = [column_b,column_c]

    reduced_df = input_df.drop(*columns_to_drop)

    common_record_df = (table_df.alias('table_df').join(reduced_df.alias('reduced_df'),on = table_df['column_key'] == reduced_df['column_key'],how = 'inner')
                               .select('reduced_df.*','table_df.column_b','table_df.column_c'))
    return common_record_df

common_record_df = common_data(input_df)

key = common_record_df.select('column_key').collect()
key_list = [i.column_key for i in key]
delete_records("customer_table","column_key")

Subtracted_df =  input_df.join(common_record_df,["column_key"],"left_anti")
New_file_df = Subtracted_df.unionAll(common_record_df.select(*Subtracted_df.columns))

writetoDatabase(New_file_df,customer_table)

我看到非常奇怪的行为。看起来所有数据框都具有基于与数据库的实时连接的数据。我正在观察的是下面的删除记录行之前的普通record_df

delete_records("customer_table","column_key")

有记录。但是,在删除操作被触发后,common_record_df变为空白,就好像它是根据客户表中的新记录进行了更新一样(因为在删除操作被触发后,文件和表之间没有公共记录)。代码中的所有其他数据框都发生了相同的情况。我进行了更多调查,以确认代码没有两次执行。看起来代码中的所有数据框在delete语句之后都发生了更改(即使它们都是在delete语句之前创建的)。我认为代码是按顺序执行的,然后在删除语句被触发后,如何在删除语句之前创建的数据帧被更新。我无能为力。有没有人观察到这发生的原因。我已经花了一个星期的时间,无法解决问题。预先感谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)