使用合并的列pyspark将两个数据流连接到单个表

问题描述

我将结构化流处理与PySpark结合使用,试图将两个数据流合并成一个带有左外部联接的数据流,以从这两个流中检索所有数据。

例如,我有以下数据模型:

# Schemas
test_df1_schema = StructType([
    StructField("item1",StringType(),True),StructField("item2",IntegerType(),StructField("target_id",LongType(),StructField("df1_timestamp",TimestampType(),])

test_df2_schema = StructType([
    StructField("item1",StructField("df2_timestamp",])

# Initialize tables
test_df1 = spark.createDataFrame([
    ("BlaBla1",126,111111,datetime.now()),],test_df1_schema) \
    .write \
    .format("delta") \
    .mode('overwrite') \
    .save("/data/tables/test_df1")

test_df2 = spark.createDataFrame([
    ("BlaBla1",999999,test_df2_schema) \
    .write \
    .format("delta") \
    .mode('overwrite') \
    .save("/data/tables/test_df2")

这两个表如下所示:

+-------+------+---------+--------------------------+
|item1  |item2 |target_id|df1_timestamp             |
+-------+----------------+--------------------------+
|BlaBla1|126   |111111   |2020-09-03 05:54:55.103165|
+-------+------+---------+--------------------------+

+-------+------+---------+--------------------------+
|item1  |item2 |target_id|df2_timestamp             |
+-------+----------------+--------------------------+
|BlaBla1|126   |999999   |2020-09-03 05:55:02.848808|
+-------+------+---------+--------------------------+

从那里,我创建两个要使用左外部联接合并的流,以从两侧获取数据:

# Read and Join tables
test_df1_stream = spark.readStream.format('delta').load('/data/tables/test_df1') \
    .selectExpr( 
        "target_id AS df1_target_id","df1_timestamp AS df1_timestamp","item1 AS df1_item1","item2 AS df1_item2"
    ) \
    .withWatermark( "df1_timestamp","30 minutes" ) \

test_df2_stream = spark.readStream.format('delta').load('/data/tables/test_df2') \
    .selectExpr( 
        "target_id AS df2_target_id","df2_timestamp AS df2_timestamp","item1 AS df2_item1","item2 AS df2_item2"
    ) \
    .withWatermark( "df2_timestamp","30 minutes" ) \

test_df_join_stream = test_df1_stream \
    .join(
        test_df2_stream,F.expr("""
            df1_item1 = df2_item1 AND
            df1_item2 = df2_item2 AND
            df2_timestamp >= df1_timestamp AND
            df2_timestamp <= df1_timestamp + interval 1 hour
        """),how='leftOuter'
    ) \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation","/data/tables/test_df_join_stream/_checkpoints/streaming-agg") \
    .queryName("test_df_join_stream") \
    .start("/data/tables/test_df_join_stream")

结果如下:

+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|df1_target_id|df1_timestamp             |df1_item1|df1_item2|df2_target_id|df2_timestamp           |df2_item1|df2_item2|
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|111111       |2020-09-03 06:23:33.651641|BlaBla1  |126      |999999       |2020-09-03 06:23:46.3197|BlaBla1  |126      |
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+

这还不错,但是我实际上想要的是这样的东西:

+--------------------------+--------+------+----------+
|timestamp                 |item1   |item2 |target_id |
+--------------------------+--------+------+----------+
|2020-09-03 06:23:33.651641|BlaBla1 |126   |111111    |   
|2020-09-03 06:23:46.3197  |BlaBla1 |126   |999999    |
+--------------------------+--------+------+----------+

两个流将基于item1item2target_id作为两个流的键合并。有什么好方法吗?

谢谢您的帮助!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...