在火花结构的 Delta 流的情况下的下推过滤器

问题描述

我有一个用例,我们需要将开源 Delta 表流式传输到多个查询中,并在分区列之一上进行过滤。 例如,。 给定在年份列上分区的增量表。

Streaming query 1
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2013")

Streaming query 2
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2014")

实物图在流媒体播放后显示过滤器。

> == Physical Plan == Filter (isnotnull(year#431) AND (year#431 = 2013))
> +- Streamingrelation delta,[]

我的问题是下推谓词是否适用于 Delta 中的流式查询? 我们可以只从 Delta 流式传输特定分区吗?

解决方法

如果该列已经分区,则只扫描需要的分区。

让我们创建分区和非分区增量表并执行结构化流。

分区增量表流式传输:

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
    
//sample dataframe
val df = Seq((1,2020),(2,2021),(3,(4,(5,(6,(7,2019),(8,(9,2018),(10,2020)).toDF("id","year")
    
//partionBy year column and save as delta table
df.write.format("delta").partitionBy("year").save("delta-stream")
    
//streaming delta table
spark.readStream.format("delta").load("delta-stream")
.where('year===2020)
.writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:注意partitionFilters

enter image description here

非分区增量表流式传输:

df.write.format("delta").save("delta-stream")

spark.readStream.format("delta").load("delta-stream")
    .where('year===2020)
    .writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:注意pushFilters

enter image description here