并行读取数据时,Spark 不会将过滤器下推到 PostgreSQL 数据源,为下限和上限提供值

问题描述

我正在尝试并行读取 Postgresql 表中的数据。我使用时间戳列作为分区列并提供下限、上限和 numPartitions 的值。它正在创建多个查询以并行读取数据,但它没有将过滤器下推到 Postgresql 数据库。当我在数据帧上使用解释命令时,它在物理计划中的推送过滤器中没有任何内容。我也试过在加载方法之后应用过滤器子句,但它仍然没有按下过滤器。

选项 1:这里我没有使用过滤条件

 val df = spark.read
        .format("jdbc")
        .option("url",jdbcurl)
        .option("dbtable",query)
        .option("partitionColumn","transactionbegin")
        .option("numPartitions",12) 
        .option("driver","org.postgresql.Driver")
        .option("fetchsize",50000)  
        .option("user","user")
        .option("password","password")
        .option("lowerBound","2018-01-01 00:00:00")
        .option("upperBound","2018-12-31 23:59:00")
        .load

解释计划输出

== Physical Plan ==
*(1) Scan JDBCRelation((   SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] PushedFilters: [],ReadSchema: struct<columnnames>

现在,如果我确实在 df 上进行了解释,推送的过滤器中没有任何内容,但是我能够使用 pg_stat_activity 从 Postgresql 获取查询显示了 12 个具有 where 条件的不同查询。我在这里提供了一个查询

SELECT 1 FROM (   SELECT columnnames
FROM schema.Transaction ) a WHERE "transactionbegin" >= '2018-03-02 19:59:50' AND "transactionbegin" < '2018-04-02 05:59:45'

在这里有点困惑,它是过滤 Postgresql 中的记录还是按照解释计划在 spark 中执行此操作,您在推送的过滤器中没有任何内容,但基于生成查询,它看起来像它正在过滤 Postgresql 中的数据。

选项 2:使用过滤条件

val df = spark.read
        .format("jdbc")
        .option("url","2018-12-31 23:59:00")
        .load.filter(s"TransactionBegin between cast('2018-01-01 00:00:00' as TIMESTAMP) and cast('2018-12-31 23:59:00' as TIMESTAMP)")

解释上述数据框的计划

== Physical Plan ==
*(1) Scan JDBCRelation((   SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] 
PushedFilters: [*IsNotNull(transactionbegin),*GreaterThanorEqual(transactionbegin,2018-01-01 00:00:00.0),...,ReadSchema: struct<columnnames>

使用 pg_stat_activity 来自 Postgresql查询之一

SELECT 1 FROM (   SELECT columnnames
FROM schema.Transaction ) a 
WHERE (("transactionbegin" IS NOT NULL) AND ("transactionbegin" >= '2018-01-01 00:00:00.0') 
AND ("transactionbegin" <= '2018-12-31 23:59:00.0')) AND 
("transactionbegin" >= '2018-06-02 01:59:35' AND "transactionbegin" < '2018-07-02 11:59:30')

我想理解的是,为什么在提供分区列和下限和上限时,它没有将过滤器推送到数据库,而是在通过将值转换为时间戳来应用显式过滤器后,它会向下推过滤器。框架也不应该足够聪明,将我们传递的值视为下限和上限,以将其视为时间戳列的范围。

如果您有大量数据需要在过滤条件之后读取,那么最有效的处理方法是什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...