问题描述
使用的包:
- pyspark 2.4.5
- psycopg2
- 熊猫
在 postgres 数据库中,我有一些参考数据可用于连接从 Kafka 消息派生的流数据帧。计划是仅在使用 psycopg2 通过 INSERT 查询对其进行更改时才更新参考数据。
我创建了一个处理程序模块,其中包含一个 setter 和 getter,为其获取数据库的初始状态
def ref_data_stream_reader(self):
get_ref_data_stream_df: DataFrame = Warehouse.read_stream(self.spark,os.getenv('SPARK_INPUT_TOPIC_REF_DATA'))
ref_df: DataFrame = get_ref_data_stream_df.select(
from_json('value',ref_data_schema)
.alias('ref_data'),)\
.select('ref_data.*')
ref_df \
.writeStream \
.outputMode('update') \
.option('truncate','false') \
.foreachBatch(lambda ref_df,batch_id: self._update_intermediate_layer(ref_df)) \
.start()
def _update_intermediate_layer(self,ref_df: DataFrame):
ref_data_pandas_df_raw: pd.DataFrame = ref_df.toPandas()
if len(ref_data_pandas_df_raw.index) > 0:
#Replace NaN values
ref_data_df: pd.DataFrame = ref_data_pandas_df_raw.replace({pd.np.nan: None})
# Convert all columns to lower case
ref_data_df.columns = map(str.lower,ref_data_df.columns)
try:
# Get Connection from Pool
connection_pool = self.conn_pool.get_reference_connection_pool()
connection = connection_pool.getconn()
# Set AutoCommit to False,required for transactional statements
connection.autocommit = False
# Create cursor
cursor = connection.cursor()
query,values = PostgresHelper.get_insert_query(ref_data_df,'schema.ref_table')
cursor.executemany(query,values)
connection.commit()
self.get_data_from_reference()
except Exception as err:
logger.error(err)
if connection is not None:
connection.rollback()
finally:
cursor.close()
connection_pool.putconn(connection)
def get_data_from_reference(self):
try:
# Get Connection from Pool
connection_pool = self.conn_pool.get_reference_connection_pool()
connection = connection_pool.getconn()
# Set AutoCommit to False,required for transactional statements
connection.autocommit = False
# Create cursor
cursor = connection.cursor()
cursor.execute('SELECT * FROM schema.ref_table')
ref_df = cursor.fetchall()
self.set_ref_data_state(self.spark.createDataFrame(ref_df,ref_schema))
def set_ref_data_state(self,input_ref_data_state: DataFrame):
self.ref_data_state = input_ref_data_state
流数据是使用 readStream
操作从 Kafka 主题获取的(这包含在 Warehouse.read_stream()
方法中)
get_streaming_df: DataFrame = Warehouse.read_stream(self.spark,os.getenv('SPARK_INPUT_TOPIC_STREAM'))
streaming_df: DataFrame = get_streaming_df.select(
from_json('value',streaming_schema)
.alias('data'),col('timestamp').alias('data_timestamp'))\
.select('data.*','data_timestamp')\
return streaming_df
在实例化ref数据对象并启动Kafka readStream后,将此对象传递给执行连接的方法
transaction_process_manager.stream_computation(ref_data_handler,streaming_df)
def stream_computation(self,ref_data_handler,streaming_df):
ref_data_df = ref_data_handler.get_ref_data_state()
# Perform joins between ref_data_df and streaming_df
我发现 ref_data_handler 的初始状态用于所有连接,即使参考层中的数据通过来自 SPARK_INPUT_TOPIC_REF_DATA
主题的消息更改,初始状态仍然使用.
我可以使用什么实现来确保 Postgres 数据库表的当前状态用于所有连接,而无需为每个单独的连接与流数据帧往返于数据库?如果每 10 个事务中发生 1 个对参考层的更新,我应该只需要执行一次 INSERT 和 SELECT,而不是对所有 10 个事务执行 SELECT。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)