问题描述
我使用 pyspark,还有 DStream(一些用户交互)。 当新一批数据到达时,我想选择数据的一个子集(并使用它来拟合 ML 模型)并将未选择值的子集保存到另一个数据帧。 然后,当下一批到达时,我想将上一步中未选择的数据帧与到达的数据帧连接起来并计算转换/分组/等。
但是,有一个问题:我只能调用 created_df.join(saved_df) 一次(因为 Streaming API 在计算之前构造了 DAG),现在我不知道如何更新 saved_df 并在新的时候加入它批次到货!
该计划相当庞大且晦涩难懂,所以有一个说明:
- 我们现在定义了 dag 与 received.join(saved),saved = []
- 第一批到达:到达=[1,2,3];保存=[]。 (开始时保存为空)
- 我们选择了 [1,2],并为下一步保存了 3,所以 saved=[3]
- 新批次到达:到达=[4,5,6],保存=[3]
- 然而,将调用 join(saved),但在构建 DAG 时保存为 [],因此 join 结果为 [4,6],而不是我想要的 [4,6,3]。
我的问题是:在 spark DStream 中保存先前批次的值并在未来批次到达时在下一个计算中使用它们的正确方法是什么?
目前我有 2 个解决方法,但我不认为这些是解决我的问题的正确方法:
- 使用一些列名(在我的例子中为“user_id”)调用到达.groupby(),并使用相同的“user_id”将每个组加入到保存的组中,然后使用这个加入的df
- 编写每次新批次到达时都会产生saved_df的DStream,然后我们可以调用DStream.join(savedDStream)
一些代码示例:
stream_reader = spark.readStream
data_stream = stream_reader.text("some_path_to_dir")
data_batch_df = df_from_stream() # some function to convert RDD to DF
# creating new DF which'll contain saved data
saved_df = spark.createDataFrame(spark.sparkContext.emptyRDD())
joined_df = data_batch_df.join(saved_df)
chosen_interacts,new_saved_df = _split_chosen_and_unchosen_data(joined_df) # returns two dataframes
# HERE I NEED TO SAVE new_saved_df AND USE IT IN JOIN WHEN NEXT BATCH ARRIVES
# BUT I CAN'T,BECAUSE data_batch_df.join(saved_df) already has link to old version
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)