整个数据帧是否没有使用 spark 中的缓存或持久方法缓存?

问题描述

我是 Spark 的新手,我正在尝试了解 Spark 的持久性。

当我调用 somedatframe.cache() 时,整个数据帧是否没有按原样缓存?例如,如果某个数据帧是使用 2 个表的连接制作的。源表会被缓存还是最终的数据帧被缓存? 是否有某种机制让 spark 决定要缓存数据帧中的内容 这是我面临的问题..

我必须使用表创建 2 个数据框:- test(hive table) 和 src_data(temp view)。我还保留了 2 个数据帧,并对它们调用一个操作以保存在内存中代码:-

val new_data= spark.sql("select * from src_data where id not in (select distinct id from test)")
   new_data.persist()
new_data.collect
   new_data.createOrReplaceTempView("new_data")
 
val unchanged_data= spark.sql("select * from test where id not in (select id from changed_data)")
   unchanged_data.persist()
unchanged_data.collect
   unchanged_data.createTempView("unchanged_data")

我还在两个数据帧上运行 show() 方法,我得到了预期的结果。 为了查看数据是否保留下来,我截断了测试表并再次在两个数据帧上运行 show() 方法。但是我只得到相同的结果(在截断表测试之前)'new_data' df 而我得到另一个空的结果集......

我不明白为什么会发生这种情况。我还可以看到,在截断之前,我在 Spark Web ui 的“存储”中有 2 个 rdds,在截断并运行 show() 之后,我只看到了一个

如果有帮助,这里是 toDebugString..

unchanged_data.rdd.toDebugString
res2: String =
(4) MapPartitionsRDD[48] at rdd at <console>:29 []
 |  sqlExecutionRDD[47] at rdd at <console>:29 []
 |  MapPartitionsRDD[46] at rdd at <console>:29 []
 |  MapPartitionsRDD[45] at rdd at <console>:29 []
 |  MapPartitionsRDD[44] at rdd at <console>:29 []
 |  broadcastnestedLoopJoin buildright,LeftAnti,((id#197 = id#49) OR isnull((id#197 = id#49)))
:- Scan hive default.test [id#197,value#198,description#199],HiveTableRelation `default`.`test`,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,[id#197,description#199]
+- broadcastExchange IdentitybroadcastMode,[id=#114]
   +- InMemoryTableScan [id#49]
         +- InMemoryRelation [id#49,value#50,description#51],StorageLevel(disk,memory,deserialized,1 replicas)
             ...

scala>    new_data.rdd.toDebugString
res3: String =
(2) MapPartitionsRDD[53] at rdd at <console>:29 []
 |  sqlExecutionRDD[52] at rdd at <console>:29 []
 |  MapPartitionsRDD[51] at rdd at <console>:29 []
 |  MapPartitionsRDD[50] at rdd at <console>:29 []
 |  MapPartitionsRDD[49] at rdd at <console>:29 []
 |  broadcastnestedLoopJoin buildright,((id#49 = id#126) OR isnull((id#49 = id#126)))
:- LocalTableScan [id#49,description#51]
+- broadcastExchange IdentitybroadcastMode,[id=#83]
   +- *(2) HashAggregate(keys=[id#126],functions=[],output=[id#126])
      +- Exchange hashpartitioning(id#126,200),true,[id=#79]
         +- *(1) HashAggregate(keys=[id#126],output=[id#126])
            +- Scan hive default.test [id#126],org....

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)