问题描述
我将结构化流处理与PySpark结合使用,试图将两个数据流合并成一个带有左外部联接的数据流,以从这两个流中检索所有数据。
例如,我有以下数据模型:
# Schemas
test_df1_schema = StructType([
StructField("item1",StringType(),True),StructField("item2",IntegerType(),StructField("target_id",LongType(),StructField("df1_timestamp",TimestampType(),])
test_df2_schema = StructType([
StructField("item1",StructField("df2_timestamp",])
# Initialize tables
test_df1 = spark.createDataFrame([
("BlaBla1",126,111111,datetime.now()),],test_df1_schema) \
.write \
.format("delta") \
.mode('overwrite') \
.save("/data/tables/test_df1")
test_df2 = spark.createDataFrame([
("BlaBla1",999999,test_df2_schema) \
.write \
.format("delta") \
.mode('overwrite') \
.save("/data/tables/test_df2")
这两个表如下所示:
+-------+------+---------+--------------------------+
|item1 |item2 |target_id|df1_timestamp |
+-------+----------------+--------------------------+
|BlaBla1|126 |111111 |2020-09-03 05:54:55.103165|
+-------+------+---------+--------------------------+
+-------+------+---------+--------------------------+
|item1 |item2 |target_id|df2_timestamp |
+-------+----------------+--------------------------+
|BlaBla1|126 |999999 |2020-09-03 05:55:02.848808|
+-------+------+---------+--------------------------+
从那里,我创建两个要使用左外部联接合并的流,以从两侧获取数据:
# Read and Join tables
test_df1_stream = spark.readStream.format('delta').load('/data/tables/test_df1') \
.selectExpr(
"target_id AS df1_target_id","df1_timestamp AS df1_timestamp","item1 AS df1_item1","item2 AS df1_item2"
) \
.withWatermark( "df1_timestamp","30 minutes" ) \
test_df2_stream = spark.readStream.format('delta').load('/data/tables/test_df2') \
.selectExpr(
"target_id AS df2_target_id","df2_timestamp AS df2_timestamp","item1 AS df2_item1","item2 AS df2_item2"
) \
.withWatermark( "df2_timestamp","30 minutes" ) \
test_df_join_stream = test_df1_stream \
.join(
test_df2_stream,F.expr("""
df1_item1 = df2_item1 AND
df1_item2 = df2_item2 AND
df2_timestamp >= df1_timestamp AND
df2_timestamp <= df1_timestamp + interval 1 hour
"""),how='leftOuter'
) \
.writeStream \
.format("delta") \
.option("checkpointLocation","/data/tables/test_df_join_stream/_checkpoints/streaming-agg") \
.queryName("test_df_join_stream") \
.start("/data/tables/test_df_join_stream")
结果如下:
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|df1_target_id|df1_timestamp |df1_item1|df1_item2|df2_target_id|df2_timestamp |df2_item1|df2_item2|
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|111111 |2020-09-03 06:23:33.651641|BlaBla1 |126 |999999 |2020-09-03 06:23:46.3197|BlaBla1 |126 |
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
这还不错,但是我实际上想要的是这样的东西:
+--------------------------+--------+------+----------+
|timestamp |item1 |item2 |target_id |
+--------------------------+--------+------+----------+
|2020-09-03 06:23:33.651641|BlaBla1 |126 |111111 |
|2020-09-03 06:23:46.3197 |BlaBla1 |126 |999999 |
+--------------------------+--------+------+----------+
两个流将基于item1
,item2
和target_id
作为两个流的键合并。有什么好方法吗?
谢谢您的帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)