问题描述
我正在尝试从pyspark升级到postgresql。 NiFi正在使用executesparkinteractive处理器提交pyspark脚本。这就是我正在尝试的-
- 通过使用文件中的键查询表来查找公用记录并找到公用记录
- 常用记录保存到数据框
- 使用公共记录数据框中的键将删除查询发射到表中
- 通过在文件和公共记录数据帧之间进行内部联接来准备新文件(我必须在新文件中保留公共记录df中的2列,因此我选择文件中的所有字段(两列除外),并选择公共中的两列记录df。
- 使用标准jdbc追加将新文件插入数据库 这是我的代码-
def load_data(query):
jdbcdf = spark.read.format("jdbc") \
.option("url",url) \
.option("dbtable",query) \
.option("user",user) \
.option("password",password) \
.option("driver",driver) \
.load()
return jdbcdf
def writetoDatabase(dataframe,tableName):
dataframe.write.format('jdbc').options(
stringtype='unspecified',url= url,driver=driver,dbtable=tableName,user=user,password=password).mode('append').save()
def delete_records(table,key_list,param):
connection = None
try:
connection = psycopg2.connect(host=host,database=db,password=password,port = port)
cursor = connection.cursor()
delete_query = "Delete from " +table+ " where "+param+" in "+ str(tuple(key_list))
cursor.execute(delete_query,"test")
connection.commit()
except (Exception,psycopg2.DatabaseError) as error :
if connection is not None:
connection.rollback()
finally:
if connection is not None:
cursor.close()
connection.close()
input_df = input file (Read thru pyspark)
def common_data(input_df):
query = """(select column_key,column_b,column_c from customer_table)"""
table_df = load_data(str(query))
columns_to_drop = [column_b,column_c]
reduced_df = input_df.drop(*columns_to_drop)
common_record_df = (table_df.alias('table_df').join(reduced_df.alias('reduced_df'),on = table_df['column_key'] == reduced_df['column_key'],how = 'inner')
.select('reduced_df.*','table_df.column_b','table_df.column_c'))
return common_record_df
common_record_df = common_data(input_df)
key = common_record_df.select('column_key').collect()
key_list = [i.column_key for i in key]
delete_records("customer_table","column_key")
Subtracted_df = input_df.join(common_record_df,["column_key"],"left_anti")
New_file_df = Subtracted_df.unionAll(common_record_df.select(*Subtracted_df.columns))
writetoDatabase(New_file_df,customer_table)
我看到非常奇怪的行为。看起来所有数据框都具有基于与数据库的实时连接的数据。我正在观察的是下面的删除记录行之前的普通record_df
delete_records("customer_table","column_key")
有记录。但是,在删除操作被触发后,common_record_df变为空白,就好像它是根据客户表中的新记录进行了更新一样(因为在删除操作被触发后,文件和表之间没有公共记录)。代码中的所有其他数据框都发生了相同的情况。我进行了更多调查,以确认代码没有两次执行。看起来代码中的所有数据框在delete语句之后都发生了更改(即使它们都是在delete语句之前创建的)。我认为代码是按顺序执行的,然后在删除语句被触发后,如何在删除语句之前创建的数据帧被更新。我无能为力。有没有人观察到这发生的原因。我已经花了一个星期的时间,无法解决问题。预先感谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)