如何刷新从数据库获取的数据以加入流数据?

问题描述

使用的包:

  • pyspark 2.4.5
  • psycopg2
  • 熊猫

在 postgres 数据库中,我有一些参考数据可用于连接从 Kafka 消息派生的流数据帧。计划是仅在使用 psycopg2 通过 INSERT 查询对其进行更改时才更新参考数据。

我创建了一个处理程序模块,其中包含一个 setter 和 getter,为其获取数据库的初始状态

    def ref_data_stream_reader(self):
        get_ref_data_stream_df: DataFrame = Warehouse.read_stream(self.spark,os.getenv('SPARK_INPUT_TOPIC_REF_DATA'))

        ref_df: DataFrame = get_ref_data_stream_df.select(
            from_json('value',ref_data_schema)
                .alias('ref_data'),)\
            .select('ref_data.*')

        ref_df \
            .writeStream \
            .outputMode('update') \
            .option('truncate','false') \
            .foreachBatch(lambda ref_df,batch_id: self._update_intermediate_layer(ref_df)) \
            .start()

    def _update_intermediate_layer(self,ref_df: DataFrame):
        ref_data_pandas_df_raw: pd.DataFrame = ref_df.toPandas()

        if len(ref_data_pandas_df_raw.index) > 0:
            #Replace NaN values
            ref_data_df: pd.DataFrame = ref_data_pandas_df_raw.replace({pd.np.nan: None})

            # Convert all columns to lower case
            ref_data_df.columns = map(str.lower,ref_data_df.columns)

            try:
                # Get Connection from Pool
                connection_pool = self.conn_pool.get_reference_connection_pool()
                connection = connection_pool.getconn()

                # Set AutoCommit to False,required for transactional statements
                connection.autocommit = False
                # Create cursor
                cursor = connection.cursor()

                query,values = PostgresHelper.get_insert_query(ref_data_df,'schema.ref_table')

                cursor.executemany(query,values)
                connection.commit()

                self.get_data_from_reference()

            except Exception as err:
                logger.error(err)
                if connection is not None:
                    connection.rollback()

            finally:
                cursor.close()
                connection_pool.putconn(connection)

   def get_data_from_reference(self):
        try:
            # Get Connection from Pool
            connection_pool = self.conn_pool.get_reference_connection_pool()
            connection = connection_pool.getconn()

            # Set AutoCommit to False,required for transactional statements
            connection.autocommit = False
            # Create cursor
            cursor = connection.cursor()

            cursor.execute('SELECT * FROM schema.ref_table')
            ref_df = cursor.fetchall()

            self.set_ref_data_state(self.spark.createDataFrame(ref_df,ref_schema))

    def set_ref_data_state(self,input_ref_data_state: DataFrame):
        self.ref_data_state = input_ref_data_state

流数据是使用 readStream 操作从 Kafka 主题获取的(这包含在 Warehouse.read_stream() 方法中)

get_streaming_df: DataFrame = Warehouse.read_stream(self.spark,os.getenv('SPARK_INPUT_TOPIC_STREAM'))

streaming_df: DataFrame = get_streaming_df.select(
    from_json('value',streaming_schema)
    .alias('data'),col('timestamp').alias('data_timestamp'))\
    .select('data.*','data_timestamp')\

return streaming_df

在实例化ref数据对象并启动Kafka readStream后,将此对象传递给执行连接的方法

transaction_process_manager.stream_computation(ref_data_handler,streaming_df)
    def stream_computation(self,ref_data_handler,streaming_df):
        ref_data_df = ref_data_handler.get_ref_data_state()
        # Perform joins between ref_data_df and streaming_df

我发现 ref_data_handler 的初始状态用于所有连接,即使参考层中的数据通过来自 SPARK_INPUT_TOPIC_REF_DATA 主题的消息更改,初始状态仍然使用.

我可以使用什么实现来确保 Postgres 数据库表的当前状态用于所有连接,而无需为每个单独的连接与流数据帧往返于数据库?如果每 10 个事务中发生 1 个对参考层的更新,我应该只需要执行一次 INSERT 和 SELECT,而不是对所有 10 个事务执行 SELECT。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)